improved WHOHandler multi thread stability
authorpk910 <philipp@zoelle1.de>
Sat, 14 Jan 2012 12:24:57 +0000 (13:24 +0100)
committerpk910 <philipp@zoelle1.de>
Sat, 14 Jan 2012 17:16:15 +0000 (18:16 +0100)
src/ChanNode.c
src/ChanNode.h
src/ClientSocket.c
src/IRCParser.c
src/WHOHandler.c
src/WHOHandler.h
src/main.c
src/main.h

index 977b9b4c985eb8d9270e0f788607882a9db81d3a..528d441d59c7c0cd97ea33e06024b1c341d7ed84 100644 (file)
 #include "IRCEvents.h"
 
 static struct ChanNode **chanList;
-#ifdef HAVE_THREADS
-pthread_mutex_t cache_sync;
-#endif
 
 void init_ChanNode() {
-    THREAD_MUTEX_INIT(cache_sync);
     /*
      len pos chars 
      26  0   a-z
index 13cea48e8ae57a31c046573041a269ed755cf7c5..d0bb4a5f06d24c5b05d38c1be7fd1e511451e535 100644 (file)
@@ -23,10 +23,6 @@ struct trigger_cache;
 struct ModeNode;
 struct NeonSpamSettings;
 
-#ifdef HAVE_THREADS
-extern pthread_mutex_t cache_sync;
-#endif
-
 #define CHANFLAG_RECEIVED_USERLIST  0x01
 #define CHANFLAG_REQUESTED_CHANINFO 0x02
 #define CHANFLAG_CHAN_REGISTERED    0x04
index 27896187b19385b48d17fbb1af3145c9c828ba54..83a2e281c8bf8bf91a4cdc6edbbd0dfde6223954 100644 (file)
@@ -447,6 +447,7 @@ void socket_loop(int timeout_seconds) {
                 is_synchronized = 0;
                 DESYNCHRONIZE(synchronized_recv);
                 parse_lines(sock, linesbuf, used);
+                whohandler_end_of_recv(sock); //WHOHandler hack (unlock WHOQueue mutexes)
                 #else
                 int used = parse_lines(sock, sock->buffer, sock->bufferpos);
                 if(used == sock->bufferpos + 1) {
index 750ac7b783c871643de7fe85ecb1b04cf7a491ee..6ad38045641fd1dfc162c257be82b0512593759f 100644 (file)
@@ -57,7 +57,11 @@ int parse_lines(struct ClientSocket *client, char *lines, int len) {
 static void parse_line(struct ClientSocket *client, char *line) {
     int argc = 0;
     char *argv[MAXNUMPARAMS];
+    #ifdef HAVE_THREADS
+    printf("[%lu recv %lu] %s\n", syscall(SYS_gettid), (unsigned long) strlen(line), line);
+    #else
     printf("[recv %lu] %s\n", (unsigned long) strlen(line), line);
+    #endif
     if(line[0] == ':')
         line++;
     else
index a8a55d91c693956cebd57329babecb6ec73fa829..63232f7c7998d0be5451118cecb9cedfb662172b 100644 (file)
 
 struct WHOQueueEntry {
     char type;
+    int whoid;
     struct ChanNode *chan;
     struct UserNode *user;
     struct WHOQueueEntry *next;
     void *callback[MAXCALLBACKS];
     void *data[MAXCALLBACKS];
+    #ifdef HAVE_THREADS
+    unsigned long lock_tid;
+    pthread_mutex_t lock_mutex;
+    int lock_count;
+    #endif
 };
+
+static int checkWHOID(struct ClientSocket *client, int whoid) {
+    struct WHOQueueEntry *entry;
+    for(entry = client->whoqueue_first; entry; entry = entry->next) {
+        if(entry->whoid == whoid)
+            return 1;
+    }
+    return 0;
+}
+
 static struct WHOQueueEntry* addWHOQueueEntry(struct ClientSocket *client) {
+    SYNCHRONIZE(whohandler_sync);
+    int whoid = 0;
+    do {
+        whoid++;
+    } while(checkWHOID(client, whoid) && whoid < 1000);
+    if(whoid == 1000) {
+        DESYNCHRONIZE(whohandler_sync);
+        return NULL;
+    }
     struct WHOQueueEntry *entry = malloc(sizeof(*entry));
     if (!entry)
     {
         perror("malloc() failed");
+        DESYNCHRONIZE(whohandler_sync);
         return NULL;
     }
-    SYNCHRONIZE(cache_sync);
     entry->next = NULL;
+    entry->whoid = whoid;
+    #ifdef HAVE_THREADS
+    entry->lock_tid = 0;
+    THREAD_MUTEX_INIT(entry->lock_mutex);
+    entry->lock_count = 0;
+    #endif
     if(client->whoqueue_last) {
         client->whoqueue_last->next = entry;
     } else {
         client->whoqueue_first = entry;
     }
     client->whoqueue_last = entry;
-    DESYNCHRONIZE(cache_sync);
+    DESYNCHRONIZE(whohandler_sync);
     return entry;
 }
 
-static struct WHOQueueEntry* getNextWHOQueueEntry(struct ClientSocket *client, int freeEntry) {
+static struct WHOQueueEntry* getNextWHOQueueEntry(struct ClientSocket *client, int whoid, int freeEntry) {
     if(!client->whoqueue_first) return NULL;
-    SYNCHRONIZE(cache_sync);
-    struct WHOQueueEntry *entry = client->whoqueue_first;
+    SYNCHRONIZE(whohandler_sync);
+    struct WHOQueueEntry *entry;
+    for(entry = client->whoqueue_first; entry; entry = entry->next) {
+        if(entry->whoid == whoid)
+            break;
+    }
+    if(!entry) {
+        DESYNCHRONIZE(whohandler_sync);
+        return NULL;
+    }
     if(freeEntry) {
         client->whoqueue_first = entry->next;
         if(entry == client->whoqueue_last) {
             client->whoqueue_last = NULL;
         }
+        #ifdef HAVE_THREADS
+        entry->lock_tid = -1;
+        if(entry->lock_count) {
+            int i;
+            for(i = 0; i < entry->lock_count; i++) {
+                DESYNCHRONIZE(entry->lock_mutex); //unlock ALL
+            }
+        }
+        #endif
     }
-    DESYNCHRONIZE(cache_sync);
+    DESYNCHRONIZE(whohandler_sync);
     return entry;
 }
 
 void clear_whoqueue(struct ClientSocket *client) {
     if(!client->whoqueue_first) return;
-    SYNCHRONIZE(cache_sync);
+    SYNCHRONIZE(whohandler_sync);
     struct WHOQueueEntry *entry, *next;
     for(entry = client->whoqueue_first; entry; entry = next) {
         next = entry->next;
+        #ifdef HAVE_THREADS
+        entry->lock_tid = -1;
+        if(entry->lock_count) {
+            int i;
+            for(i = 0; i < entry->lock_count; i++) {
+                DESYNCHRONIZE(entry->lock_mutex); //unlock ALL
+            }
+        }
+        #endif
         free(entry);
     }
     client->whoqueue_last = NULL;
     client->whoqueue_first = NULL;
-    DESYNCHRONIZE(cache_sync);
+    DESYNCHRONIZE(whohandler_sync);
+}
+
+#if HAVE_THREADS
+void whohandler_end_of_recv(struct ClientSocket *client) {
+    SYNCHRONIZE(whohandler_sync);
+    unsigned long tid = syscall(SYS_gettid);
+    struct WHOQueueEntry *entry;
+    for(entry = client->whoqueue_first; entry; entry = entry->next) {
+        if(entry->lock_tid == tid) {
+            entry->lock_tid = 0;
+            DESYNCHRONIZE(entry->lock_mutex);
+        }
+    }
+    DESYNCHRONIZE(whohandler_sync);
 }
+#endif
 
 void get_userlist(struct ChanNode *chan, userlist_callback_t callback, void *data) {
     struct ClientSocket *bot;
@@ -114,7 +186,7 @@ void get_userlist(struct ChanNode *chan, userlist_callback_t callback, void *dat
         entry->data[0] = data;
         for(i = 1; i < MAXCALLBACKS; i++)
             entry->data[i] = NULL;
-        putsock(bot, "WHO %s,%d %%tuihnaf,%d", chan->name, entry->type, entry->type);
+        putsock(bot, "WHO %s,%d %%tuihnaf,%d", chan->name, entry->whoid, entry->whoid);
     } else
         callback(bot, chan, data);
 }
@@ -149,7 +221,7 @@ void _get_userlist_with_invisible(struct ChanNode *chan, userlist_callback_t cal
         entry->data[0] = data;
         for(i = 1; i < MAXCALLBACKS; i++)
             entry->data[i] = NULL;
-        putsock(bot, "WHO %s,%d d%%tuihnaf,%d", chan->name, entry->type, entry->type);
+        putsock(bot, "WHO %s,%d d%%tuihnaf,%d", chan->name, entry->whoid, entry->whoid);
     } else
         callback(bot, chan, data);
 }
@@ -197,18 +269,33 @@ void get_userauth(struct UserNode *user, userauth_callback_t callback, void *dat
     for(i = 1; i < MAXCALLBACKS; i++)
             entry->data[i] = NULL;
     //WHO ".$user->getNick().",".$id." %tuhna,".$id
-    putsock(bot, "WHO %s,%d %%tuhna,%d", user->nick, entry->type, entry->type);
+    putsock(bot, "WHO %s,%d %%tuhna,%d", user->nick, entry->whoid, entry->whoid);
 }
 
+static void _recv_whohandler_354(struct ClientSocket *client, char **argv, unsigned int argc);
 void recv_whohandler_354(struct ClientSocket *client, char **argv, unsigned int argc) {
+    _recv_whohandler_354(client, argv, argc);
+}
+
+static void _recv_whohandler_354(struct ClientSocket *client, char **argv, unsigned int argc) {
     int i;
     if(argc < 2) return;
     int type = atoi(argv[1]);
     if(!type) return;
-    if(!(type & WHOQUEUETYPE_ISONQUEUE)) return;
-    struct WHOQueueEntry* entry = getNextWHOQueueEntry(client, 0);
-    if(entry == NULL || (entry->type & WHOQUEUETYPE_CHECKTYPE) != (type & WHOQUEUETYPE_CHECKTYPE)) return;
-    if(type & WHOQUEUETYPE_USERLIST) {
+    struct WHOQueueEntry* entry = getNextWHOQueueEntry(client, type, 0);
+    if(entry == NULL) return;
+    #ifdef HAVE_THREADS
+    unsigned long tid = syscall(SYS_gettid);
+    if(entry->lock_tid != tid) {
+        entry->lock_count++;
+        SYNCHRONIZE(entry->lock_mutex);
+        if(entry->lock_tid == -1) {
+            return; //entry has been destroyed
+        }
+        entry->lock_tid = tid;
+    }
+    #endif
+    if(entry->type & WHOQUEUETYPE_USERLIST) {
         if(argc < 7) return;
         //:OGN2.OnlineGamesNet.net 354 skynet 1 pk910 2001:41d0:2:1d3b::babe skynet H@ pk910
         struct ChanNode *chan = entry->chan;
@@ -267,7 +354,7 @@ void recv_whohandler_354(struct ClientSocket *client, char **argv, unsigned int
             strcpy(user->auth, argv[7]);
             user->flags |= USERFLAG_ISAUTHED;
         }
-    } else if((type & WHOQUEUETYPE_USERAUTH) && !(entry->type & WHOQUEUETYPE_FOUND)) {
+    } else if((entry->type & WHOQUEUETYPE_USERAUTH) && !(entry->type & WHOQUEUETYPE_FOUND)) {
         //:OGN2.OnlineGamesNet.net 354 Skynet 1 pk910 2001:41d0:2:1d3b::babe Skynet pk910
         entry->type |= WHOQUEUETYPE_FOUND;
         entry->user->last_who = time(0);
@@ -283,16 +370,32 @@ void recv_whohandler_354(struct ClientSocket *client, char **argv, unsigned int
     }
 }
 
+static void _recv_whohandler_315(struct ClientSocket *client, char **argv, unsigned int argc);
 void recv_whohandler_315(struct ClientSocket *client, char **argv, unsigned int argc) {
+    _recv_whohandler_315(client, argv, argc);
+}
+
+static void _recv_whohandler_315(struct ClientSocket *client, char **argv, unsigned int argc) {
     if(argc < 2) return;
     char *typestr = strstr(argv[1], ",");
     if(!typestr) return;
     typestr++;
     int type = atoi(typestr);
-    if(!(type & WHOQUEUETYPE_ISONQUEUE)) return;
-    struct WHOQueueEntry* entry = getNextWHOQueueEntry(client, 1);
-    if(entry == NULL || (entry->type & WHOQUEUETYPE_CHECKTYPE) != (type & WHOQUEUETYPE_CHECKTYPE)) return;
-    if(type & WHOQUEUETYPE_USERLIST) {
+    struct WHOQueueEntry* entry = getNextWHOQueueEntry(client, type, 0);
+    if(entry == NULL) return;
+    #ifdef HAVE_THREADS
+    unsigned long tid = syscall(SYS_gettid);
+    if(entry->lock_tid != tid) {
+        entry->lock_count++;
+        SYNCHRONIZE(entry->lock_mutex);
+        if(entry->lock_tid == -1) {
+            return; //entry has been destroyed
+        }
+        entry->lock_tid = tid;
+    }
+    #endif
+    getNextWHOQueueEntry(client, type, 1);
+    if(entry->type & WHOQUEUETYPE_USERLIST) {
         //:OGN2.OnlineGamesNet.net 315 skynet #pk910,1 :End of /WHO list.
         entry->chan->flags |= CHANFLAG_RECEIVED_USERLIST;
         userlist_callback_t *callback;
@@ -312,7 +415,7 @@ void recv_whohandler_315(struct ClientSocket *client, char **argv, unsigned int
                 }
             }
         }
-    } else if(type & WHOQUEUETYPE_USERAUTH) {
+    } else if(entry->type & WHOQUEUETYPE_USERAUTH) {
         if(!(entry->type & WHOQUEUETYPE_FOUND)) {
             userauth_callback_t *callback;
             int i;
index 68ad205646b940f0bcc6063c58211129e0c7abc6..cf5340624a608c37e25a0955070e284a821dc185 100644 (file)
@@ -30,6 +30,9 @@ typedef USERLIST_CALLBACK(userlist_callback_t);
 typedef USERAUTH_CALLBACK(userauth_callback_t);
 
 void clear_whoqueue(struct ClientSocket *client);
+#if HAVE_THREADS
+void whohandler_end_of_recv(struct ClientSocket *client);
+#endif
 void recv_whohandler_354(struct ClientSocket *client, char **argv, unsigned int argc);
 void recv_whohandler_315(struct ClientSocket *client, char **argv, unsigned int argc);
 void get_userlist(struct ChanNode *chan, userlist_callback_t callback, void *data);
index ffa830d2a90563b04f67c442a339feb45327596b..107d50eef806d9b8f31f816c9a9bd10ec24f8a27 100644 (file)
@@ -44,6 +44,8 @@ int statistics_enabled;
 TIMEQ_CALLBACK(main_statistics);
 #ifdef HAVE_THREADS
 int running_threads;
+pthread_mutex_t cache_sync;
+pthread_mutex_t whohandler_sync, whohandler_mass_sync;
 #endif
 
 void cleanup() {
@@ -132,6 +134,12 @@ main:
     
     statistics_enabled = get_int_field("statistics.enable");
     
+    #ifdef HAVE_THREADS
+    THREAD_MUTEX_INIT(cache_sync);
+    THREAD_MUTEX_INIT(whohandler_sync);
+    THREAD_MUTEX_INIT(whohandler_mass_sync);
+    #endif
+    
     queue_init();
     init_sockets();
     init_timeq();
index 329aa5d13c60a3c3072e2365985b6b173fa39503..6a2a99f575a38f666adb593a2a1b4d9f38b45684 100644 (file)
     pthread_mutexattr_settype(&mutex_attr, PTHREAD_MUTEX_RECURSIVE_NP);\
     pthread_mutex_init(&var, &mutex_attr); \
 }
+#define THREAD_MUTEX_INIT_TYPE(var, type) { \
+    pthread_mutexattr_t mutex_attr; \
+    pthread_mutexattr_init(&mutex_attr);\
+    pthread_mutexattr_settype(&mutex_attr, type);\
+    pthread_mutex_init(&var, &mutex_attr); \
+}
 #define SYNCHRONIZE(var) pthread_mutex_lock(&var)
+#define SET_SYNCHRONIZE(var) pthread_mutex_trylock(&var)
 #define DESYNCHRONIZE(var) pthread_mutex_unlock(&var)
 #else
 #define THREAD_MUTEX_INIT(var)
@@ -127,6 +134,8 @@ extern time_t start_time;
 extern int statistics_enabled;
 #ifdef HAVE_THREADS
 extern int running_threads;
+extern pthread_mutex_t cache_sync;
+extern pthread_mutex_t whohandler_sync, whohandler_mass_sync;
 #endif
 
 int stricmp (const char *s1, const char *s2);