From eba9bbb5fb73ad2bd0c420e03199a828b6eabc5f Mon Sep 17 00:00:00 2001 From: pk910 Date: Sat, 14 Jan 2012 13:24:57 +0100 Subject: [PATCH] improved WHOHandler multi thread stability --- src/ChanNode.c | 4 -- src/ChanNode.h | 4 -- src/ClientSocket.c | 1 + src/IRCParser.c | 4 ++ src/WHOHandler.c | 145 ++++++++++++++++++++++++++++++++++++++------- src/WHOHandler.h | 3 + src/main.c | 8 +++ src/main.h | 9 +++ 8 files changed, 149 insertions(+), 29 deletions(-) diff --git a/src/ChanNode.c b/src/ChanNode.c index 977b9b4..528d441 100644 --- a/src/ChanNode.c +++ b/src/ChanNode.c @@ -24,12 +24,8 @@ #include "IRCEvents.h" static struct ChanNode **chanList; -#ifdef HAVE_THREADS -pthread_mutex_t cache_sync; -#endif void init_ChanNode() { - THREAD_MUTEX_INIT(cache_sync); /* len pos chars 26 0 a-z diff --git a/src/ChanNode.h b/src/ChanNode.h index 13cea48..d0bb4a5 100644 --- a/src/ChanNode.h +++ b/src/ChanNode.h @@ -23,10 +23,6 @@ struct trigger_cache; struct ModeNode; struct NeonSpamSettings; -#ifdef HAVE_THREADS -extern pthread_mutex_t cache_sync; -#endif - #define CHANFLAG_RECEIVED_USERLIST 0x01 #define CHANFLAG_REQUESTED_CHANINFO 0x02 #define CHANFLAG_CHAN_REGISTERED 0x04 diff --git a/src/ClientSocket.c b/src/ClientSocket.c index 2789618..83a2e28 100644 --- a/src/ClientSocket.c +++ b/src/ClientSocket.c @@ -447,6 +447,7 @@ void socket_loop(int timeout_seconds) { is_synchronized = 0; DESYNCHRONIZE(synchronized_recv); parse_lines(sock, linesbuf, used); + whohandler_end_of_recv(sock); //WHOHandler hack (unlock WHOQueue mutexes) #else int used = parse_lines(sock, sock->buffer, sock->bufferpos); if(used == sock->bufferpos + 1) { diff --git a/src/IRCParser.c b/src/IRCParser.c index 750ac7b..6ad3804 100644 --- a/src/IRCParser.c +++ b/src/IRCParser.c @@ -57,7 +57,11 @@ int parse_lines(struct ClientSocket *client, char *lines, int len) { static void parse_line(struct ClientSocket *client, char *line) { int argc = 0; char *argv[MAXNUMPARAMS]; + #ifdef HAVE_THREADS + printf("[%lu recv %lu] %s\n", syscall(SYS_gettid), (unsigned long) strlen(line), line); + #else printf("[recv %lu] %s\n", (unsigned long) strlen(line), line); + #endif if(line[0] == ':') line++; else diff --git a/src/WHOHandler.c b/src/WHOHandler.c index a8a55d9..63232f7 100644 --- a/src/WHOHandler.c +++ b/src/WHOHandler.c @@ -33,57 +33,129 @@ struct WHOQueueEntry { char type; + int whoid; struct ChanNode *chan; struct UserNode *user; struct WHOQueueEntry *next; void *callback[MAXCALLBACKS]; void *data[MAXCALLBACKS]; + #ifdef HAVE_THREADS + unsigned long lock_tid; + pthread_mutex_t lock_mutex; + int lock_count; + #endif }; + +static int checkWHOID(struct ClientSocket *client, int whoid) { + struct WHOQueueEntry *entry; + for(entry = client->whoqueue_first; entry; entry = entry->next) { + if(entry->whoid == whoid) + return 1; + } + return 0; +} + static struct WHOQueueEntry* addWHOQueueEntry(struct ClientSocket *client) { + SYNCHRONIZE(whohandler_sync); + int whoid = 0; + do { + whoid++; + } while(checkWHOID(client, whoid) && whoid < 1000); + if(whoid == 1000) { + DESYNCHRONIZE(whohandler_sync); + return NULL; + } struct WHOQueueEntry *entry = malloc(sizeof(*entry)); if (!entry) { perror("malloc() failed"); + DESYNCHRONIZE(whohandler_sync); return NULL; } - SYNCHRONIZE(cache_sync); entry->next = NULL; + entry->whoid = whoid; + #ifdef HAVE_THREADS + entry->lock_tid = 0; + THREAD_MUTEX_INIT(entry->lock_mutex); + entry->lock_count = 0; + #endif if(client->whoqueue_last) { client->whoqueue_last->next = entry; } else { client->whoqueue_first = entry; } client->whoqueue_last = entry; - DESYNCHRONIZE(cache_sync); + DESYNCHRONIZE(whohandler_sync); return entry; } -static struct WHOQueueEntry* getNextWHOQueueEntry(struct ClientSocket *client, int freeEntry) { +static struct WHOQueueEntry* getNextWHOQueueEntry(struct ClientSocket *client, int whoid, int freeEntry) { if(!client->whoqueue_first) return NULL; - SYNCHRONIZE(cache_sync); - struct WHOQueueEntry *entry = client->whoqueue_first; + SYNCHRONIZE(whohandler_sync); + struct WHOQueueEntry *entry; + for(entry = client->whoqueue_first; entry; entry = entry->next) { + if(entry->whoid == whoid) + break; + } + if(!entry) { + DESYNCHRONIZE(whohandler_sync); + return NULL; + } if(freeEntry) { client->whoqueue_first = entry->next; if(entry == client->whoqueue_last) { client->whoqueue_last = NULL; } + #ifdef HAVE_THREADS + entry->lock_tid = -1; + if(entry->lock_count) { + int i; + for(i = 0; i < entry->lock_count; i++) { + DESYNCHRONIZE(entry->lock_mutex); //unlock ALL + } + } + #endif } - DESYNCHRONIZE(cache_sync); + DESYNCHRONIZE(whohandler_sync); return entry; } void clear_whoqueue(struct ClientSocket *client) { if(!client->whoqueue_first) return; - SYNCHRONIZE(cache_sync); + SYNCHRONIZE(whohandler_sync); struct WHOQueueEntry *entry, *next; for(entry = client->whoqueue_first; entry; entry = next) { next = entry->next; + #ifdef HAVE_THREADS + entry->lock_tid = -1; + if(entry->lock_count) { + int i; + for(i = 0; i < entry->lock_count; i++) { + DESYNCHRONIZE(entry->lock_mutex); //unlock ALL + } + } + #endif free(entry); } client->whoqueue_last = NULL; client->whoqueue_first = NULL; - DESYNCHRONIZE(cache_sync); + DESYNCHRONIZE(whohandler_sync); +} + +#if HAVE_THREADS +void whohandler_end_of_recv(struct ClientSocket *client) { + SYNCHRONIZE(whohandler_sync); + unsigned long tid = syscall(SYS_gettid); + struct WHOQueueEntry *entry; + for(entry = client->whoqueue_first; entry; entry = entry->next) { + if(entry->lock_tid == tid) { + entry->lock_tid = 0; + DESYNCHRONIZE(entry->lock_mutex); + } + } + DESYNCHRONIZE(whohandler_sync); } +#endif void get_userlist(struct ChanNode *chan, userlist_callback_t callback, void *data) { struct ClientSocket *bot; @@ -114,7 +186,7 @@ void get_userlist(struct ChanNode *chan, userlist_callback_t callback, void *dat entry->data[0] = data; for(i = 1; i < MAXCALLBACKS; i++) entry->data[i] = NULL; - putsock(bot, "WHO %s,%d %%tuihnaf,%d", chan->name, entry->type, entry->type); + putsock(bot, "WHO %s,%d %%tuihnaf,%d", chan->name, entry->whoid, entry->whoid); } else callback(bot, chan, data); } @@ -149,7 +221,7 @@ void _get_userlist_with_invisible(struct ChanNode *chan, userlist_callback_t cal entry->data[0] = data; for(i = 1; i < MAXCALLBACKS; i++) entry->data[i] = NULL; - putsock(bot, "WHO %s,%d d%%tuihnaf,%d", chan->name, entry->type, entry->type); + putsock(bot, "WHO %s,%d d%%tuihnaf,%d", chan->name, entry->whoid, entry->whoid); } else callback(bot, chan, data); } @@ -197,18 +269,33 @@ void get_userauth(struct UserNode *user, userauth_callback_t callback, void *dat for(i = 1; i < MAXCALLBACKS; i++) entry->data[i] = NULL; //WHO ".$user->getNick().",".$id." %tuhna,".$id - putsock(bot, "WHO %s,%d %%tuhna,%d", user->nick, entry->type, entry->type); + putsock(bot, "WHO %s,%d %%tuhna,%d", user->nick, entry->whoid, entry->whoid); } +static void _recv_whohandler_354(struct ClientSocket *client, char **argv, unsigned int argc); void recv_whohandler_354(struct ClientSocket *client, char **argv, unsigned int argc) { + _recv_whohandler_354(client, argv, argc); +} + +static void _recv_whohandler_354(struct ClientSocket *client, char **argv, unsigned int argc) { int i; if(argc < 2) return; int type = atoi(argv[1]); if(!type) return; - if(!(type & WHOQUEUETYPE_ISONQUEUE)) return; - struct WHOQueueEntry* entry = getNextWHOQueueEntry(client, 0); - if(entry == NULL || (entry->type & WHOQUEUETYPE_CHECKTYPE) != (type & WHOQUEUETYPE_CHECKTYPE)) return; - if(type & WHOQUEUETYPE_USERLIST) { + struct WHOQueueEntry* entry = getNextWHOQueueEntry(client, type, 0); + if(entry == NULL) return; + #ifdef HAVE_THREADS + unsigned long tid = syscall(SYS_gettid); + if(entry->lock_tid != tid) { + entry->lock_count++; + SYNCHRONIZE(entry->lock_mutex); + if(entry->lock_tid == -1) { + return; //entry has been destroyed + } + entry->lock_tid = tid; + } + #endif + if(entry->type & WHOQUEUETYPE_USERLIST) { if(argc < 7) return; //:OGN2.OnlineGamesNet.net 354 skynet 1 pk910 2001:41d0:2:1d3b::babe skynet H@ pk910 struct ChanNode *chan = entry->chan; @@ -267,7 +354,7 @@ void recv_whohandler_354(struct ClientSocket *client, char **argv, unsigned int strcpy(user->auth, argv[7]); user->flags |= USERFLAG_ISAUTHED; } - } else if((type & WHOQUEUETYPE_USERAUTH) && !(entry->type & WHOQUEUETYPE_FOUND)) { + } else if((entry->type & WHOQUEUETYPE_USERAUTH) && !(entry->type & WHOQUEUETYPE_FOUND)) { //:OGN2.OnlineGamesNet.net 354 Skynet 1 pk910 2001:41d0:2:1d3b::babe Skynet pk910 entry->type |= WHOQUEUETYPE_FOUND; entry->user->last_who = time(0); @@ -283,16 +370,32 @@ void recv_whohandler_354(struct ClientSocket *client, char **argv, unsigned int } } +static void _recv_whohandler_315(struct ClientSocket *client, char **argv, unsigned int argc); void recv_whohandler_315(struct ClientSocket *client, char **argv, unsigned int argc) { + _recv_whohandler_315(client, argv, argc); +} + +static void _recv_whohandler_315(struct ClientSocket *client, char **argv, unsigned int argc) { if(argc < 2) return; char *typestr = strstr(argv[1], ","); if(!typestr) return; typestr++; int type = atoi(typestr); - if(!(type & WHOQUEUETYPE_ISONQUEUE)) return; - struct WHOQueueEntry* entry = getNextWHOQueueEntry(client, 1); - if(entry == NULL || (entry->type & WHOQUEUETYPE_CHECKTYPE) != (type & WHOQUEUETYPE_CHECKTYPE)) return; - if(type & WHOQUEUETYPE_USERLIST) { + struct WHOQueueEntry* entry = getNextWHOQueueEntry(client, type, 0); + if(entry == NULL) return; + #ifdef HAVE_THREADS + unsigned long tid = syscall(SYS_gettid); + if(entry->lock_tid != tid) { + entry->lock_count++; + SYNCHRONIZE(entry->lock_mutex); + if(entry->lock_tid == -1) { + return; //entry has been destroyed + } + entry->lock_tid = tid; + } + #endif + getNextWHOQueueEntry(client, type, 1); + if(entry->type & WHOQUEUETYPE_USERLIST) { //:OGN2.OnlineGamesNet.net 315 skynet #pk910,1 :End of /WHO list. entry->chan->flags |= CHANFLAG_RECEIVED_USERLIST; userlist_callback_t *callback; @@ -312,7 +415,7 @@ void recv_whohandler_315(struct ClientSocket *client, char **argv, unsigned int } } } - } else if(type & WHOQUEUETYPE_USERAUTH) { + } else if(entry->type & WHOQUEUETYPE_USERAUTH) { if(!(entry->type & WHOQUEUETYPE_FOUND)) { userauth_callback_t *callback; int i; diff --git a/src/WHOHandler.h b/src/WHOHandler.h index 68ad205..cf53406 100644 --- a/src/WHOHandler.h +++ b/src/WHOHandler.h @@ -30,6 +30,9 @@ typedef USERLIST_CALLBACK(userlist_callback_t); typedef USERAUTH_CALLBACK(userauth_callback_t); void clear_whoqueue(struct ClientSocket *client); +#if HAVE_THREADS +void whohandler_end_of_recv(struct ClientSocket *client); +#endif void recv_whohandler_354(struct ClientSocket *client, char **argv, unsigned int argc); void recv_whohandler_315(struct ClientSocket *client, char **argv, unsigned int argc); void get_userlist(struct ChanNode *chan, userlist_callback_t callback, void *data); diff --git a/src/main.c b/src/main.c index ffa830d..107d50e 100644 --- a/src/main.c +++ b/src/main.c @@ -44,6 +44,8 @@ int statistics_enabled; TIMEQ_CALLBACK(main_statistics); #ifdef HAVE_THREADS int running_threads; +pthread_mutex_t cache_sync; +pthread_mutex_t whohandler_sync, whohandler_mass_sync; #endif void cleanup() { @@ -132,6 +134,12 @@ main: statistics_enabled = get_int_field("statistics.enable"); + #ifdef HAVE_THREADS + THREAD_MUTEX_INIT(cache_sync); + THREAD_MUTEX_INIT(whohandler_sync); + THREAD_MUTEX_INIT(whohandler_mass_sync); + #endif + queue_init(); init_sockets(); init_timeq(); diff --git a/src/main.h b/src/main.h index 329aa5d..6a2a99f 100644 --- a/src/main.h +++ b/src/main.h @@ -52,7 +52,14 @@ pthread_mutexattr_settype(&mutex_attr, PTHREAD_MUTEX_RECURSIVE_NP);\ pthread_mutex_init(&var, &mutex_attr); \ } +#define THREAD_MUTEX_INIT_TYPE(var, type) { \ + pthread_mutexattr_t mutex_attr; \ + pthread_mutexattr_init(&mutex_attr);\ + pthread_mutexattr_settype(&mutex_attr, type);\ + pthread_mutex_init(&var, &mutex_attr); \ +} #define SYNCHRONIZE(var) pthread_mutex_lock(&var) +#define SET_SYNCHRONIZE(var) pthread_mutex_trylock(&var) #define DESYNCHRONIZE(var) pthread_mutex_unlock(&var) #else #define THREAD_MUTEX_INIT(var) @@ -127,6 +134,8 @@ extern time_t start_time; extern int statistics_enabled; #ifdef HAVE_THREADS extern int running_threads; +extern pthread_mutex_t cache_sync; +extern pthread_mutex_t whohandler_sync, whohandler_mass_sync; #endif int stricmp (const char *s1, const char *s2); -- 2.20.1