X-Git-Url: http://git.pk910.de/?a=blobdiff_plain;f=src%2FClientSocket.c;h=c49577724393e3de6b1443841072ff8e1ed46105;hb=c32e8254ec4ed0d77757e32f8aa5aabcdb494057;hp=647a63396bff25ebca2c4f54b1782edc47fea9f0;hpb=55831bf424312a6908ca07a904f288fba0919a9a;p=NeonServV5.git diff --git a/src/ClientSocket.c b/src/ClientSocket.c index 647a633..c495777 100644 --- a/src/ClientSocket.c +++ b/src/ClientSocket.c @@ -32,6 +32,13 @@ struct socket_list { #ifdef HAVE_THREADS static pthread_mutex_t synchronized; +static pthread_mutex_t synchronized_recv; + +struct ParseOrder { + unsigned long tid; + struct ParseOrder *next; +}; +struct ParseOrder *parse_order = NULL; #endif //the magic list :P @@ -40,6 +47,7 @@ static char buffer[BUF_SIZ]; void init_sockets() { THREAD_MUTEX_INIT(synchronized); + THREAD_MUTEX_INIT(synchronized_recv); sockets = malloc(sizeof(*sockets)); if (!sockets) @@ -367,10 +375,53 @@ int write_socket(struct ClientSocket *client, char* msg, int len) { return write_socket_force(client, msg, len); } +#if HAVE_THREADS +static void clientsocket_start_of_recv(unsigned long tid) { + SYNCHRONIZE(whohandler_sync); + struct ParseOrder *entry, *last; + for(last = parse_order; last; last = last->next) { + if(last->next == NULL) + break; + } + entry = malloc(sizeof(*entry)); + entry->tid = tid; + entry->next = NULL; + if(last) + last->next = entry; + else + parse_order = entry; + DESYNCHRONIZE(whohandler_sync); +} + +static void clientsocket_end_of_recv(unsigned long tid) { + SYNCHRONIZE(whohandler_sync); + struct ParseOrder *entry, *last = NULL; + for(entry = parse_order; entry; entry = entry->next) { + if(entry->tid == tid) { + if(last) + last->next = entry->next; + else + parse_order = entry->next; + free(entry); + break; + } else + last = entry; + } + DESYNCHRONIZE(whohandler_sync); +} + +int clientsocket_parseorder_top(unsigned long tid) { + if(parse_order && parse_order->tid == tid) + return 1; + else + return 0; +} +#endif + void socket_loop(int timeout_seconds) { if(sockets == NULL) return; int is_synchronized = 1; - SYNCHRONIZE(synchronized); + SYNCHRONIZE(synchronized_recv); fd_set fds; struct timeval timeout; struct ClientSocket *sock, *next; @@ -387,7 +438,7 @@ void socket_loop(int timeout_seconds) { timeout.tv_usec = 0; ret = select(ret + 1, &fds, NULL, NULL, &timeout); if(ret == 0) { - DEDESYNCHRONIZE(synchronized); + DESYNCHRONIZE(synchronized_recv); return; } for (sock = sockets->data; sock; sock = next) { @@ -424,8 +475,31 @@ void socket_loop(int timeout_seconds) { sock->flags |= SOCKET_FLAG_QUITTED; } else { sock->traffic_in += bytes; + #ifdef HAVE_THREADS + char linesbuf[BUF_SIZ*2]; + strcpy(linesbuf, sock->buffer); + int used = 0; + for(i = 0; i < sock->bufferpos; i++) { + if(sock->buffer[i] == '\n') { + used = i+1; + } + } + if(used == sock->bufferpos + 1) { + //used all bytes so just reset the bufferpos + sock->bufferpos = 0; + } else { + for(i = 0; i < sock->bufferpos - used; i++) { + sock->buffer[i] = sock->buffer[i+used]; + } + sock->bufferpos -= used; + } is_synchronized = 0; - DESYNCHRONIZE(synchronized); + unsigned long tid = syscall(SYS_gettid); + clientsocket_start_of_recv(tid); + DESYNCHRONIZE(synchronized_recv); + parse_lines(sock, linesbuf, used); + clientsocket_end_of_recv(tid); + #else int used = parse_lines(sock, sock->buffer, sock->bufferpos); if(used == sock->bufferpos + 1) { //used all bytes so just reset the bufferpos @@ -436,6 +510,9 @@ void socket_loop(int timeout_seconds) { } sock->bufferpos -= used; } + is_synchronized = 0; + DESYNCHRONIZE(synchronized_recv); + #endif #ifdef HAVE_THREADS FD_ZERO(&fds); //zero out all other pending sockets here (we have other threads receiving from them) #endif @@ -451,7 +528,7 @@ void socket_loop(int timeout_seconds) { } } if(is_synchronized) { - DESYNCHRONIZE(synchronized); + DESYNCHRONIZE(synchronized_recv); } }