From: pk910 Date: Fri, 13 Jan 2012 16:34:58 +0000 (+0100) Subject: added experimental multi thread support X-Git-Tag: v5.3~61 X-Git-Url: http://git.pk910.de/?p=NeonServV5.git;a=commitdiff_plain;h=55831bf424312a6908ca07a904f288fba0919a9a added experimental multi thread support --- diff --git a/configure.ac b/configure.ac index 27747ca..9dc6e4f 100644 --- a/configure.ac +++ b/configure.ac @@ -45,6 +45,13 @@ if test x"$do_have_ssl" = xyes; then AC_DEFINE([HAVE_SSL], 1, [Define if you are using SSL]) fi +AC_CHECK_LIB(pthread, pthread_create, [ + AC_CHECK_HEADERS(pthread.h, [ + LIBS="$LIBS -lpthread" + AC_DEFINE([HAVE_THREADS], 1, [Define if you have Threads]) + ]) +]) + AC_ARG_ENABLE([debug], [AS_HELP_STRING([--enable-debug], [debug mode (compile using -O0 -Wall -Wshadow -Werror)])], [CFLAGS='-g -O0 -Wall -Wshadow -Werror'], diff --git a/src/BanNode.c b/src/BanNode.c index 7b98d85..7bec1c0 100644 --- a/src/BanNode.c +++ b/src/BanNode.c @@ -23,22 +23,28 @@ struct BanNode* addChannelBan(struct ChanNode *chan, char *mask) { struct BanNode *ban = malloc(sizeof(*ban)); ban->chan = chan; ban->mask = strdup(mask); + SYNCHRONIZE(cache_sync); ban->next = chan->bans; chan->bans = ban; + DESYNCHRONIZE(cache_sync); return ban; } struct BanNode* getMatchingChannelBan(struct ChanNode *chan, char *mask) { + SYNCHRONIZE(cache_sync); struct BanNode *cban; for(cban = chan->bans; cban; cban = cban->next) { if(!match(cban->mask, mask)) { + DESYNCHRONIZE(cache_sync); return cban; } } + DESYNCHRONIZE(cache_sync); return NULL; } void removeChannelBanMask(struct ChanNode *chan, char *mask) { + SYNCHRONIZE(cache_sync); struct BanNode *cban, *last = NULL; for(cban = chan->bans; cban; cban = cban->next) { if(!strcmp(cban->mask, mask)) { @@ -52,9 +58,11 @@ void removeChannelBanMask(struct ChanNode *chan, char *mask) { } else last = cban; } + DESYNCHRONIZE(cache_sync); } void removeChannelBan(struct BanNode *ban) { + SYNCHRONIZE(cache_sync); struct BanNode *cban, *last = NULL; struct ChanNode *chan = ban->chan; for(cban = chan->bans; cban; cban = cban->next) { @@ -69,9 +77,11 @@ void removeChannelBan(struct BanNode *ban) { } else last = cban; } + DESYNCHRONIZE(cache_sync); } void removeChannelBans(struct ChanNode *chan) { + SYNCHRONIZE(cache_sync); struct BanNode *ban, *next; for(ban = chan->bans; ban; ban = next) { next = ban->next; @@ -79,4 +89,5 @@ void removeChannelBans(struct ChanNode *chan) { free(ban); } chan->bans = NULL; + DESYNCHRONIZE(cache_sync); } diff --git a/src/ChanNode.c b/src/ChanNode.c index 9641df3..977b9b4 100644 --- a/src/ChanNode.c +++ b/src/ChanNode.c @@ -24,8 +24,12 @@ #include "IRCEvents.h" static struct ChanNode **chanList; +#ifdef HAVE_THREADS +pthread_mutex_t cache_sync; +#endif void init_ChanNode() { + THREAD_MUTEX_INIT(cache_sync); /* len pos chars 26 0 a-z @@ -40,6 +44,7 @@ void init_ChanNode() { } void free_ChanNode() { + SYNCHRONIZE(cache_sync); //kamikaze free all channels and chanusers int i; struct ChanNode *chan, *next; @@ -55,6 +60,7 @@ void free_ChanNode() { } } free(chanList); + DESYNCHRONIZE(cache_sync); } int is_valid_chan(const char *name) { @@ -105,8 +111,9 @@ struct ChanNode* getAllChans(struct ChanNode *last) { cindex++; if(cindex >= CHANNEL_LIST_SIZE) return NULL; return chanList[cindex]; - } else + } else { return last->next; + } } struct ChanNode* getChanByName(const char *name) { //case insensitive @@ -143,8 +150,10 @@ struct ChanNode* addChannel(const char *name) { chan->modes = createModeNode(chan); chan->trigger = NULL; + SYNCHRONIZE(cache_sync); chan->next = chanList[chanListIndex]; chanList[chanListIndex] = chan; + DESYNCHRONIZE(cache_sync); return chan; } @@ -186,6 +195,7 @@ int getChanBanCount() { void delChannel(struct ChanNode* chan, int freeChan) { int chanListIndex = get_chanlist_entry(chan->name[1]); if(chanListIndex == -1) return; + SYNCHRONIZE(cache_sync); struct ChanNode *cchan, *last_chan = NULL; for(cchan = chanList[chanListIndex]; cchan; cchan = cchan->next) { if(cchan == chan) { @@ -209,6 +219,7 @@ void delChannel(struct ChanNode* chan, int freeChan) { freeChanNode(chan); else chan->next = NULL; + DESYNCHRONIZE(cache_sync); } void freeChanNode(struct ChanNode* chan) { @@ -238,6 +249,7 @@ int checkChannelVisibility(struct ChanNode* chan) { } } //free the channel... + SYNCHRONIZE(cache_sync); for(chanuser = getChannelUsers(chan, NULL); chanuser; chanuser = next) { next = getChannelUsers(chan, chanuser); //remove the channel from the user's channel-list @@ -250,5 +262,6 @@ int checkChannelVisibility(struct ChanNode* chan) { } chan->user = NULL; delChannel(chan, 1); + DESYNCHRONIZE(cache_sync); return 0; } diff --git a/src/ChanNode.h b/src/ChanNode.h index d0bb4a5..13cea48 100644 --- a/src/ChanNode.h +++ b/src/ChanNode.h @@ -23,6 +23,10 @@ struct trigger_cache; struct ModeNode; struct NeonSpamSettings; +#ifdef HAVE_THREADS +extern pthread_mutex_t cache_sync; +#endif + #define CHANFLAG_RECEIVED_USERLIST 0x01 #define CHANFLAG_REQUESTED_CHANINFO 0x02 #define CHANFLAG_CHAN_REGISTERED 0x04 diff --git a/src/ChanUser.c b/src/ChanUser.c index 7041fa6..9ac0691 100644 --- a/src/ChanUser.c +++ b/src/ChanUser.c @@ -33,6 +33,8 @@ struct ChanUser* addChanUser(struct ChanNode *chan, struct UserNode *user) { chanuser->changeTime = 0; chanuser->spamnode = NULL; + SYNCHRONIZE(cache_sync); + chanuser->next_user = chan->user; chan->user = chanuser; chan->usercount++; @@ -40,6 +42,8 @@ struct ChanUser* addChanUser(struct ChanNode *chan, struct UserNode *user) { chanuser->next_chan = user->channel; user->channel = chanuser; + DESYNCHRONIZE(cache_sync); + return chanuser; } @@ -57,8 +61,10 @@ struct ChanUser* addInvisibleChanUser(struct ChanNode *chan, struct UserNode *us chanuser->changeTime = 0; chanuser->spamnode = NULL; + SYNCHRONIZE(cache_sync); chanuser->next_user = chan->user; chan->user = chanuser; + DESYNCHRONIZE(cache_sync); chan->usercount++; return chanuser; @@ -97,6 +103,7 @@ struct ChanUser* getUserChannels(struct UserNode *user, struct ChanUser *last) { } void delChanUser(struct ChanUser *chanuser, int do_freeChanUser) { + SYNCHRONIZE(cache_sync); struct ChanUser *cchanuser, *last; //remove it from the user's channel-list if(!(chanuser->flags & CHANUSERFLAG_INVISIBLE)) { @@ -133,9 +140,11 @@ void delChanUser(struct ChanUser *chanuser, int do_freeChanUser) { chanuser->next_chan = NULL; chanuser->next_user = NULL; } + DESYNCHRONIZE(cache_sync); } void removeChanUserFromLists(struct ChanUser *chanuser, int remove_from_userlist, int remove_from_channellist, int do_freeChanUser) { + SYNCHRONIZE(cache_sync); struct ChanUser *cchanuser, *last; if(remove_from_userlist) { //remove it from the channel's user-list @@ -172,6 +181,7 @@ void removeChanUserFromLists(struct ChanUser *chanuser, int remove_from_userlist if(do_freeChanUser) { freeChanUser(chanuser); } + DESYNCHRONIZE(cache_sync); } void freeChanUser(struct ChanUser *chanuser) { diff --git a/src/ClientSocket.c b/src/ClientSocket.c index 2897d9d..647a633 100644 --- a/src/ClientSocket.c +++ b/src/ClientSocket.c @@ -30,11 +30,17 @@ struct socket_list { unsigned count; }; +#ifdef HAVE_THREADS +static pthread_mutex_t synchronized; +#endif + //the magic list :P static struct socket_list *sockets = NULL; static char buffer[BUF_SIZ]; -static void init_sockets() { +void init_sockets() { + THREAD_MUTEX_INIT(synchronized); + sockets = malloc(sizeof(*sockets)); if (!sockets) { @@ -46,7 +52,6 @@ static void init_sockets() { } struct ClientSocket* create_socket(char *host, int port, char *bindto, char *pass, char *nick, char *ident, char *realname) { - if(sockets == NULL) init_sockets(); struct ClientSocket *client = malloc(sizeof(*client)); if (!client) { @@ -73,13 +78,24 @@ struct ClientSocket* create_socket(char *host, int port, char *bindto, char *pas client->whoqueue_last = NULL; client->handleinfo_first = NULL; client->handleinfo_last = NULL; + SYNCHRONIZE(synchronized); client->next = sockets->data; sockets->data = client; + DESYNCHRONIZE(synchronized); return client; } -#ifndef WIN32 +static int _connect_socket(struct ClientSocket *client); + int connect_socket(struct ClientSocket *client) { + SYNCHRONIZE(synchronized); + int ret = _connect_socket(client); + DESYNCHRONIZE(synchronized); + return ret; +} + +#ifndef WIN32 +static int _connect_socket(struct ClientSocket *client) { if((client->flags & SOCKET_FLAG_CONNECTED)) return 1; int sock; @@ -206,7 +222,7 @@ int connect_socket(struct ClientSocket *client) { return 1; } #else -int connect_socket(struct ClientSocket *client) { +static int connect_socket(struct ClientSocket *client) { if((client->flags & SOCKET_FLAG_CONNECTED)) return 1; struct hostent *host; struct sockaddr_in addr; @@ -288,6 +304,7 @@ int disconnect_socket(struct ClientSocket *client) { } static void destroy_socket(struct ClientSocket *client, int free_socket) { + SYNCHRONIZE(synchronized); if((client->flags & SOCKET_FLAG_CONNECTED)) { close(client->sock); bot_disconnect(client); @@ -324,9 +341,11 @@ static void destroy_socket(struct ClientSocket *client, int free_socket) { client->flags &= ~SOCKET_FLAG_FAST_JUMP; connect_socket(client); } + DESYNCHRONIZE(synchronized); } int write_socket_force(struct ClientSocket *client, char* msg, int len) { + SYNCHRONIZE(synchronized); printf("[send %d] %s", len, msg); if(!(client->flags & SOCKET_FLAG_HAVE_SSL) || ssl_write(client, msg, len) == -2) { #ifdef WIN32 @@ -336,6 +355,7 @@ int write_socket_force(struct ClientSocket *client, char* msg, int len) { #endif } client->traffic_out += len; + DESYNCHRONIZE(synchronized); return 1; } @@ -349,6 +369,8 @@ int write_socket(struct ClientSocket *client, char* msg, int len) { void socket_loop(int timeout_seconds) { if(sockets == NULL) return; + int is_synchronized = 1; + SYNCHRONIZE(synchronized); fd_set fds; struct timeval timeout; struct ClientSocket *sock, *next; @@ -364,7 +386,10 @@ void socket_loop(int timeout_seconds) { timeout.tv_sec = timeout_seconds; timeout.tv_usec = 0; ret = select(ret + 1, &fds, NULL, NULL, &timeout); - if(ret == 0) return; + if(ret == 0) { + DEDESYNCHRONIZE(synchronized); + return; + } for (sock = sockets->data; sock; sock = next) { next = sock->next; if((sock->flags & (SOCKET_FLAG_CONNECTED | SOCKET_FLAG_QUITTED)) == SOCKET_FLAG_CONNECTED && FD_ISSET(sock->sock, &fds)) { @@ -399,6 +424,8 @@ void socket_loop(int timeout_seconds) { sock->flags |= SOCKET_FLAG_QUITTED; } else { sock->traffic_in += bytes; + is_synchronized = 0; + DESYNCHRONIZE(synchronized); int used = parse_lines(sock, sock->buffer, sock->bufferpos); if(used == sock->bufferpos + 1) { //used all bytes so just reset the bufferpos @@ -409,6 +436,9 @@ void socket_loop(int timeout_seconds) { } sock->bufferpos -= used; } + #ifdef HAVE_THREADS + FD_ZERO(&fds); //zero out all other pending sockets here (we have other threads receiving from them) + #endif } } else if((sock->flags & (SOCKET_FLAG_CONNECTED | SOCKET_FLAG_RECONNECT)) == SOCKET_FLAG_RECONNECT) { if(time(0) - sock->connection_time >= SOCKET_RECONNECT_TIME) { @@ -420,6 +450,9 @@ void socket_loop(int timeout_seconds) { destroy_socket(sock, (sock->flags & SOCKET_FLAG_DEAD)); } } + if(is_synchronized) { + DESYNCHRONIZE(synchronized); + } } void diff --git a/src/ClientSocket.h b/src/ClientSocket.h index 80dcfe1..4073a23 100644 --- a/src/ClientSocket.h +++ b/src/ClientSocket.h @@ -86,6 +86,7 @@ int write_socket(struct ClientSocket *client, char* msg, int len); void socket_loop(int timeout_seconds); void putsock(struct ClientSocket *client, const char *text, ...) PRINTF_LIKE(2, 3); struct ClientSocket* getBots(int flags, struct ClientSocket* last_bot); +void init_sockets(); void free_sockets(); #endif \ No newline at end of file diff --git a/src/DBHelper.c b/src/DBHelper.c index cf7105b..4f3dd75 100644 --- a/src/DBHelper.c +++ b/src/DBHelper.c @@ -26,6 +26,7 @@ #include "HandleInfoHandler.h" void _loadUserSettings(struct UserNode *user) { + SYNCHRONIZE(cache_sync); MYSQL_RES *res; MYSQL_ROW row; printf_mysql_query("SELECT `user_lang`, `user_reply_privmsg`, `user_god`, `user_id` FROM `users` WHERE `user_user` = '%s'", escape_string(user->auth)); @@ -42,6 +43,7 @@ void _loadUserSettings(struct UserNode *user) { } else user->language = get_default_language(); user->flags |= USERFLAG_LOADED_SETTINGS; + DESYNCHRONIZE(cache_sync); } int isGodMode(struct UserNode *user) { @@ -129,6 +131,7 @@ int checkChannelAccess(struct UserNode *user, struct ChanNode *chan, char *chann } void _loadChannelSettings(struct ChanNode *chan) { + SYNCHRONIZE(cache_sync); MYSQL_RES *res; MYSQL_ROW row; printf_mysql_query("SELECT `channel_id` FROM `channels` WHERE `channel_name` = '%s'", escape_string(chan->name)); @@ -138,6 +141,7 @@ void _loadChannelSettings(struct ChanNode *chan) { chan->channel_id = atoi(row[0]); } chan->flags |= CHANFLAG_REQUESTED_CHANINFO; + DESYNCHRONIZE(cache_sync); } //TODO: fix performance: we should cache the user access diff --git a/src/HandleInfoHandler.c b/src/HandleInfoHandler.c index c194935..99d1965 100644 --- a/src/HandleInfoHandler.c +++ b/src/HandleInfoHandler.c @@ -18,6 +18,7 @@ #include "HandleInfoHandler.h" #include "ClientSocket.h" #include "UserNode.h" +#include "ChanNode.h" #include "IRCEvents.h" #include "tools.h" @@ -40,17 +41,20 @@ static struct HandleInfoQueueEntry* addHandleInfoQueueEntry(struct ClientSocket perror("malloc() failed"); return NULL; } + SYNCHRONIZE(cache_sync); entry->next = NULL; if(client->handleinfo_last) client->handleinfo_last->next = entry; else client->handleinfo_first = entry; client->handleinfo_last = entry; + DESYNCHRONIZE(cache_sync); return entry; } static struct HandleInfoQueueEntry* getNextHandleInfoQueueEntry(struct ClientSocket *client, int freeEntry) { if(!client->handleinfo_first) return NULL; + SYNCHRONIZE(cache_sync); struct HandleInfoQueueEntry *entry = client->handleinfo_first; if(freeEntry) { client->handleinfo_first = entry->next; @@ -58,11 +62,13 @@ static struct HandleInfoQueueEntry* getNextHandleInfoQueueEntry(struct ClientSoc client->handleinfo_last = NULL; } } + DESYNCHRONIZE(cache_sync); return entry; } void clear_handleinfoqueue(struct ClientSocket *client) { if(!client->handleinfo_first) return; + SYNCHRONIZE(cache_sync); struct HandleInfoQueueEntry *entry, *next; for(entry = client->handleinfo_first; entry; entry = next) { next = entry->next; @@ -70,6 +76,7 @@ void clear_handleinfoqueue(struct ClientSocket *client) { } client->handleinfo_last = NULL; client->handleinfo_first = NULL; + DESYNCHRONIZE(cache_sync); } void lookup_authname(char *auth, authlookup_callback_t callback, void *data) { diff --git a/src/UserNode.c b/src/UserNode.c index 21b618d..f90e72f 100644 --- a/src/UserNode.c +++ b/src/UserNode.c @@ -15,6 +15,7 @@ * along with this program. If not, see . */ #include "UserNode.h" +#include "ChanNode.h" #include "ChanUser.h" #include "tools.h" #include "IRCEvents.h" @@ -205,8 +206,10 @@ struct UserNode* addUser(const char *nick) { user->flags = 0; user->channel = NULL; user->last_who = 0; + SYNCHRONIZE(cache_sync); user->next = userList[userListIndex]; userList[userListIndex] = user; + DESYNCHRONIZE(cache_sync); return user; } @@ -269,8 +272,10 @@ struct UserNode* createTempUser(const char *nick) { user->flags &= ~USERFLAG_ISAUTHED; //remove authed flag (security reasons) strcpy(user->nick, nick); if(!already_on_list) { + SYNCHRONIZE(cache_sync); user->next = userList[TEMPUSER_LIST_INDEX]; userList[TEMPUSER_LIST_INDEX] = user; + DESYNCHRONIZE(cache_sync); } return user; } @@ -361,8 +366,10 @@ struct UserNode* createTempUserMask(const char *mask) { } } if(!already_on_list) { + SYNCHRONIZE(cache_sync); user->next = userList[TEMPUSER_LIST_INDEX]; userList[TEMPUSER_LIST_INDEX] = user; + DESYNCHRONIZE(cache_sync); } return user; } @@ -376,6 +383,7 @@ int renameUser(struct UserNode* user, const char *new_nick) { } //delUser(user, 0); //EPIC FAIL! This deletes the user from the channel Userlist -.- //manually remove the user from the old userList + SYNCHRONIZE(cache_sync); int userListIndex = get_nicklist_entry(user->nick[0]); if(userListIndex != -1) { struct UserNode *cuser, *last_user = NULL; @@ -394,12 +402,14 @@ int renameUser(struct UserNode* user, const char *new_nick) { strcpy(user->nick, new_nick); user->next = userList[userListIndex]; userList[userListIndex] = user; + DESYNCHRONIZE(cache_sync); return 1; } void delUser(struct UserNode* user, int freeUser) { int userListIndex = ((user->flags & USERFLAG_ISTMPUSER) ? TEMPUSER_LIST_INDEX : get_nicklist_entry(user->nick[0])); if(userListIndex == -1) return; + SYNCHRONIZE(cache_sync); event_freeuser(user); struct UserNode *cuser, *last_user = NULL; for(cuser = userList[userListIndex]; cuser; cuser = cuser->next) { @@ -429,9 +439,11 @@ void delUser(struct UserNode* user, int freeUser) { free(user); } else user->next = NULL; + DESYNCHRONIZE(cache_sync); } void clearTempUsers() { + SYNCHRONIZE(cache_sync); int userListIndex = TEMPUSER_LIST_INDEX; struct UserNode *cuser, *next; time_t now = time(0); @@ -441,4 +453,5 @@ void clearTempUsers() { delUser(cuser, 1); } } + DESYNCHRONIZE(cache_sync); } diff --git a/src/WHOHandler.c b/src/WHOHandler.c index 61f2ed2..a8a55d9 100644 --- a/src/WHOHandler.c +++ b/src/WHOHandler.c @@ -46,6 +46,7 @@ static struct WHOQueueEntry* addWHOQueueEntry(struct ClientSocket *client) { perror("malloc() failed"); return NULL; } + SYNCHRONIZE(cache_sync); entry->next = NULL; if(client->whoqueue_last) { client->whoqueue_last->next = entry; @@ -53,11 +54,13 @@ static struct WHOQueueEntry* addWHOQueueEntry(struct ClientSocket *client) { client->whoqueue_first = entry; } client->whoqueue_last = entry; + DESYNCHRONIZE(cache_sync); return entry; } static struct WHOQueueEntry* getNextWHOQueueEntry(struct ClientSocket *client, int freeEntry) { if(!client->whoqueue_first) return NULL; + SYNCHRONIZE(cache_sync); struct WHOQueueEntry *entry = client->whoqueue_first; if(freeEntry) { client->whoqueue_first = entry->next; @@ -65,11 +68,13 @@ static struct WHOQueueEntry* getNextWHOQueueEntry(struct ClientSocket *client, i client->whoqueue_last = NULL; } } + DESYNCHRONIZE(cache_sync); return entry; } void clear_whoqueue(struct ClientSocket *client) { if(!client->whoqueue_first) return; + SYNCHRONIZE(cache_sync); struct WHOQueueEntry *entry, *next; for(entry = client->whoqueue_first; entry; entry = next) { next = entry->next; @@ -77,6 +82,7 @@ void clear_whoqueue(struct ClientSocket *client) { } client->whoqueue_last = NULL; client->whoqueue_first = NULL; + DESYNCHRONIZE(cache_sync); } void get_userlist(struct ChanNode *chan, userlist_callback_t callback, void *data) { diff --git a/src/main.c b/src/main.c index 876dc70..60b7ec6 100644 --- a/src/main.c +++ b/src/main.c @@ -92,6 +92,24 @@ static int load_mysql_config() { return 1; } +#ifdef HAVE_THREADS +void * thread_main(void *arg) { + time_t socket_wait; + while(running) { + socket_wait = time(0) + SOCKET_SELECT_TIME; + do { + socket_loop(SOCKET_SELECT_TIME); + } while(time(0) < socket_wait); + timeq_tick(); + loop_bots(); + clearTempUsers(); + destroyEvents(); + queue_loop(); + } + return NULL; +} +#endif + int main(int argc, char *argv[]) { main: @@ -114,6 +132,7 @@ main: statistics_enabled = get_int_field("statistics.enable"); queue_init(); + init_timeq(); init_lang(); ssl_init(); init_parser(); @@ -133,8 +152,21 @@ main: if(!update_minutes) update_minutes = 2; timeq_add(update_minutes * 60 + 10, main_statistics, NULL); - time_t socket_wait; + int worker_threads = get_int_field("General.worker_threads"); + if(!worker_threads) worker_threads = 1; + running = 1; + #ifdef HAVE_THREADS + pthread_t tid[worker_threads]; + int tid_id = 0; + for(tid_id = 0; tid_id < worker_threads; tid_id++) { + pthread_create(&tid[tid_id], NULL, thread_main, NULL); + } + for(tid_id = 0; tid_id < worker_threads; tid_id++) { + pthread_join(tid[tid_id], NULL); + } + #else + time_t socket_wait; while(running) { socket_wait = time(0) + SOCKET_SELECT_TIME; do { @@ -146,6 +178,7 @@ main: destroyEvents(); queue_loop(); } + #endif cleanup(); if(hard_restart) { /* Append a NULL to the end of argv[]. */ diff --git a/src/main.h b/src/main.h index bc2ff72..879f46c 100644 --- a/src/main.h +++ b/src/main.h @@ -42,6 +42,21 @@ #include #include #include +#ifdef HAVE_THREADS +#define THREAD_MUTEX_INIT(var) { \ + pthread_mutexattr_t mutex_attr; \ + pthread_mutexattr_init(&mutex_attr);\ + pthread_mutexattr_setkind_np(&mutex_attr, PTHREAD_MUTEX_RECURSIVE_NP);\ + pthread_mutex_init(&var, mutex_attr); \ +} +#define SYNCHRONIZE(var) pthread_mutex_lock(&var) +#define DESYNCHRONIZE(var) pthread_mutex_unlock(&var) +#include +#else +#define THREAD_MUTEX_INIT(var) +#define SYNCHRONIZE(var) +#define DESYNCHRONIZE(var) +#endif #if __GNUC__ #define PRINTF_LIKE(M,N) __attribute__((format (printf, M, N))) diff --git a/src/mysqlConn.c b/src/mysqlConn.c index b82d317..ea3c2b8 100644 --- a/src/mysqlConn.c +++ b/src/mysqlConn.c @@ -18,6 +18,14 @@ #include "mysqlConn.h" #define DATABASE_VERSION "14" +struct mysql_conn_struct { + unsigned long tid; + MYSQL *mysql_conn = NULL; + struct used_result *used_results; + struct escaped_string *escaped_strings; + struct mysql_conn_struct *next; +}; + struct used_result { MYSQL_RES *result; struct used_result *next; @@ -28,13 +36,18 @@ struct escaped_string { struct escaped_string *next; }; -MYSQL *mysql_conn = NULL; -static struct used_result *used_results; -static struct escaped_string *escaped_strings; +struct mysql_conn_struct *get_mysql_conn_struct(); + +struct mysql_conn_struct *mysql_conns = NULL; static int mysql_serverport; static char *mysql_host, *mysql_user, *mysql_pass, *mysql_base; +#ifdef HAVE_THREADS +static pthread_mutex_t synchronized; +#endif + void check_mysql() { + MYSQL *mysql_conn = get_mysql_conn(); int errid; if((errid = mysql_ping(mysql_conn))) { if(mysql_errno(mysql_conn) == CR_SERVER_GONE_ERROR) { @@ -49,46 +62,49 @@ void check_mysql() { } MYSQL_RES *mysql_use() { - MYSQL_RES *res = mysql_store_result(mysql_conn); + struct mysql_conn_struct *mysql_conn = get_mysql_conn_struct(); + MYSQL_RES *res = mysql_store_result(mysql_conn->mysql_conn); struct used_result *result = malloc(sizeof(*result)); if (!result) { mysql_free_result(res); return NULL; } result->result = res; - result->next = used_results; - used_results = result; + result->next = mysql_conn->used_results; + mysql_conn->used_results = result; return res; } void mysql_free() { + struct mysql_conn_struct *mysql_conn = get_mysql_conn_struct(); + if(!mysql_conn) return; struct used_result *result, *next_result; - for(result = used_results; result; result = next_result) { + for(result = mysql_conn->used_results; result; result = next_result) { next_result = result->next; mysql_free_result(result->result); free(result); } - used_results = NULL; + mysql_conn->used_results = NULL; struct escaped_string *escaped, *next_escaped; - for(escaped = escaped_strings; escaped; escaped = next_escaped) { + for(escaped = mysql_conn->escaped_strings; escaped; escaped = next_escaped) { next_escaped = escaped->next; free(escaped->string); free(escaped); } - escaped_strings = NULL; + mysql_conn->escaped_strings = NULL; } void init_mysql(char *host, int port, char *user, char *pass, char *base) { + THREAD_MUTEX_INIT(synchronized); mysql_host = strdup(host); mysql_serverport = port; mysql_user = strdup(user); mysql_pass = strdup(pass); mysql_base = strdup(base); - mysql_conn = mysql_init(NULL); - if (!mysql_real_connect(mysql_conn, mysql_host, mysql_user, mysql_pass, mysql_base, mysql_serverport, NULL, 0)) { - //error - show_mysql_error(); - } + + + MYSQL *mysql_conn = get_mysql_conn(); + //check database version... int version = 0; if(!mysql_query(mysql_conn, "SELECT `database_version` FROM `version`")) { @@ -176,15 +192,23 @@ void init_mysql(char *host, int port, char *user, char *pass, char *base) { } void free_mysql() { - mysql_close(mysql_conn); + struct mysql_conn_struct *mysql_conn, *next; + for(mysql_conn = mysql_conns; mysql_conn; mysql_conn = next) { + next = mysql_conn->next; + mysql_close(mysql_conn->mysql_conn); + free(mysql_conn); + } + mysql_conns = NULL; } void show_mysql_error() { + MYSQL *mysql_conn = get_mysql_conn(); //show mysql_error() printf("MySQL Error: %s\n", mysql_error(mysql_conn)); } void printf_mysql_query(const char *text, ...) { + MYSQL *mysql_conn = get_mysql_conn(); va_list arg_list; char queryBuf[MYSQLMAXLEN]; int pos; @@ -204,6 +228,7 @@ void printf_mysql_query(const char *text, ...) { } void printf_long_mysql_query(int len, const char *text, ...) { + MYSQL *mysql_conn = get_mysql_conn(); va_list arg_list; char queryBuf[len]; int pos; @@ -223,14 +248,45 @@ void printf_long_mysql_query(int len, const char *text, ...) { } char* escape_string(const char *str) { + struct mysql_conn_struct *mysql_conn = get_mysql_conn_struct(); struct escaped_string *escapedstr = malloc(sizeof(*escapedstr)); if (!escapedstr) { return NULL; } char escaped[strlen(str)*2+1]; - mysql_real_escape_string(mysql_conn, escaped, str, strlen(str)); + mysql_real_escape_string(mysql_conn->mysql_conn, escaped, str, strlen(str)); escapedstr->string = strdup(escaped); - escapedstr->next = escaped_strings; - escaped_strings = escapedstr; + escapedstr->next = mysql_conn->escaped_strings; + mysql_conn->escaped_strings = escapedstr; return escapedstr->string; } + +struct mysql_conn_struct *get_mysql_conn_struct() { + SYNCHRONIZE(synchronized); + struct mysql_conn_struct *mysql_conn; + unsigned long tid = syscall(SYS_gettid); + for(mysql_conn = mysql_conns; mysql_conn; mysql_conn = mysql_conn->next) { + if(mysql_conn->tid == tid) { + DESYNCHRONIZE(synchronized); + return mysql_conn; + } + } + mysql_conn = malloc(*mysql_conn); + mysql_conn->mysql_conn = mysql_init(NULL); + mysql_conn->tid = tid; + mysql_conn->used_results = NULL; + mysql_conn->escaped_strings = NULL; + mysql_conn->next = mysql_conns; + mysql_conns = mysql_conn; + if (!mysql_real_connect(mysql_conn->mysql_conn, mysql_host, mysql_user, mysql_pass, mysql_base, mysql_serverport, NULL, 0)) { + //error + show_mysql_error(); + } + DESYNCHRONIZE(synchronized); + return mysql_conn; +} + +MYSQL *get_mysql_conn() { + struct mysql_conn_struct *mysql_conn = get_mysql_conn_struct(); + return mysql_conn->mysql_conn; +} diff --git a/src/mysqlConn.h b/src/mysqlConn.h index c5152d2..3af13c2 100644 --- a/src/mysqlConn.h +++ b/src/mysqlConn.h @@ -23,8 +23,6 @@ #define MYSQLMAXLEN 1024 -extern MYSQL *mysql_conn; - void check_mysql(); MYSQL_RES *mysql_use(); void mysql_free(); @@ -34,5 +32,6 @@ void show_mysql_error(); void printf_mysql_query(const char *text, ...) PRINTF_LIKE(1, 2); void printf_long_mysql_query(int len, const char *text, ...) PRINTF_LIKE(2, 3); char* escape_string(const char *str); +MYSQL *get_mysql_conn(); #endif \ No newline at end of file diff --git a/src/timeq.c b/src/timeq.c index 41ed8a0..17b1904 100644 --- a/src/timeq.c +++ b/src/timeq.c @@ -18,8 +18,16 @@ #include "timeq.h" static struct timeq_entry *timeq_events; +#ifdef HAVE_THREADS +static pthread_mutex_t synchronized; +#endif + +void init_timeq() { + THREAD_MUTEX_INIT(synchronized); +} void timeq_tick() { + SYNCHRONIZE(synchronized); struct timeq_entry *entry, *next; time_t now = time(0); for(entry = timeq_events; entry; entry = next) { @@ -32,6 +40,7 @@ void timeq_tick() { } else break; } + DESYNCHRONIZE(synchronized); } struct timeq_entry* timeq_add(int seconds, timeq_callback_t *callback, void *data) { @@ -42,6 +51,7 @@ struct timeq_entry* timeq_add(int seconds, timeq_callback_t *callback, void *dat perror("malloc() failed"); return NULL; } + SYNCHRONIZE(synchronized); entry->execute = now + seconds; entry->callback = callback; entry->data = data; @@ -60,16 +70,20 @@ struct timeq_entry* timeq_add(int seconds, timeq_callback_t *callback, void *dat entry->next = next; prev->next = entry; } + DESYNCHRONIZE(synchronized); return entry; } struct timeq_entry* timeq_add_name(char *name, int seconds, timeq_callback_t *callback, void *data) { + SYNCHRONIZE(synchronized); struct timeq_entry *entry = timeq_add(seconds, callback, data); entry->name = strdup(name); + DESYNCHRONIZE(synchronized); return entry; } int timeq_del(struct timeq_entry* entry) { + SYNCHRONIZE(synchronized); struct timeq_entry *centry, *last = NULL; for(centry = timeq_events; centry; centry = centry->next) { if(centry == entry) { @@ -80,15 +94,18 @@ int timeq_del(struct timeq_entry* entry) { if(centry->name) free(centry->name); free(centry); + DESYNCHRONIZE(synchronized); return 1; } else { last = centry; } } + DESYNCHRONIZE(synchronized); return 0; } int timeq_del_name(char *name) { + SYNCHRONIZE(synchronized); struct timeq_entry *centry, *last = NULL; for(centry = timeq_events; centry; centry = centry->next) { if(centry->name && !stricmp(centry->name, name)) { @@ -98,20 +115,25 @@ int timeq_del_name(char *name) { timeq_events = centry->next; free(centry->name); free(centry); + DESYNCHRONIZE(synchronized); return 1; } else { last = centry; } } + DESYNCHRONIZE(synchronized); return 0; } int timeq_name_exists(char *name) { + SYNCHRONIZE(synchronized); struct timeq_entry *centry; for(centry = timeq_events; centry; centry = centry->next) { if(centry->name && !stricmp(centry->name, name)) { + DESYNCHRONIZE(synchronized); return 1; } } + DESYNCHRONIZE(synchronized); return 0; } diff --git a/src/timeq.h b/src/timeq.h index 7e10633..762df29 100644 --- a/src/timeq.h +++ b/src/timeq.h @@ -31,6 +31,7 @@ struct timeq_entry { struct timeq_entry *next; }; +void init_timeq(); void timeq_tick(); struct timeq_entry* timeq_add(int seconds, timeq_callback_t *callback, void *data); struct timeq_entry* timeq_add_name(char *name, int seconds, timeq_callback_t *callback, void *data);