do not filter access levels if not wanted (show database errors)
[NeonServV5.git] / src / IOHandler.c
index 3c5bc3d9787c73a24c1672fbd3dbaae59f0c6bca..db9a5998d2f5dc486ad39fa69eb0d050d50cfca0 100644 (file)
@@ -26,6 +26,7 @@
 #include <winsock2.h>
 #include <ws2tcpip.h>
 #else
+#include <signal.h>
 #include <stdarg.h>
 #include <stdlib.h>
 #include <string.h>
 #include <netinet/tcp.h>
 #endif
 
+#ifndef EWOULDBLOCK
+#define EWOULDBLOCK EAGAIN
+#endif
+
 #define MAXLOG 1024
 iohandler_log_callback *iolog_backend = NULL;
 
 struct IODescriptor *first_descriptor = NULL;
 struct IODescriptor *timer_priority = NULL;
 
+#ifdef HAVE_PTHREAD_H
+static pthread_mutex_t io_thread_sync;
+static pthread_mutex_t io_poll_sync;
+#endif
+
 void iohandler_log(enum IOLogType type, char *text, ...) {
     va_list arg_list;
     char logBuf[MAXLOG+1];
@@ -61,16 +71,22 @@ void iohandler_log(enum IOLogType type, char *text, ...) {
 extern struct IOEngine engine_select; /* select system call (should always be useable) */
 extern struct IOEngine engine_kevent;
 extern struct IOEngine engine_epoll;
+extern struct IOEngine engine_win32;
 
 struct IOEngine *engine = NULL;
 
 static void iohandler_init_engine() {
     if(engine) return;
+    IOTHREAD_MUTEX_INIT(io_thread_sync);
+    IOTHREAD_MUTEX_INIT(io_poll_sync);
+    
     //try other engines
     if(!engine && engine_kevent.init && engine_kevent.init())
         engine = &engine_kevent;
     if(!engine && engine_epoll.init && engine_epoll.init())
         engine = &engine_epoll;
+    if(!engine && engine_win32.init && engine_win32.init())
+        engine = &engine_win32;
     
     if (!engine) {
         if(engine_select.init())
@@ -82,17 +98,10 @@ static void iohandler_init_engine() {
     }
     iohandler_log(IOLOG_DEBUG, "using %s IO engine", engine->name);
     iohandler_ssl_init();
-    #ifdef WIN32
-    WSADATA wsadata;
-    // Start Windows Sockets.
-    if (WSAStartup(MAKEWORD(2, 0), &wsadata)) {
-        iohandler_log(IOLOG_FATAL, "Unable to start Windows Sockets");
-        return;
-    }
-    #endif
 }
 
 static void iohandler_append(struct IODescriptor *descriptor) {
+    IOSYNCHRONIZE(io_thread_sync);
     struct timeval *timeout = ((descriptor->timeout.tv_sec || descriptor->timeout.tv_usec) ? &descriptor->timeout : NULL);
     if(timeout) {
         struct IODescriptor *iofd;
@@ -107,9 +116,9 @@ static void iohandler_append(struct IODescriptor *descriptor) {
                 if(timeval_is_smaler(timeout, (&iofd->timeout))) {
                     descriptor->prev = iofd->prev;
                     descriptor->next = iofd;
-                    iofd->prev = descriptor;
                     if(iofd->prev)
                         iofd->prev->next = descriptor;
+                    iofd->prev = descriptor;
                     if(set_priority)
                         timer_priority = descriptor;
                     break;
@@ -139,10 +148,12 @@ static void iohandler_append(struct IODescriptor *descriptor) {
             first_descriptor->prev = descriptor;
         first_descriptor = descriptor;
     }
+    IODESYNCHRONIZE(io_thread_sync);
 }
 
 static void iohandler_remove(struct IODescriptor *descriptor, int engine_remove) {
     //remove IODescriptor from the list
+    IOSYNCHRONIZE(io_thread_sync);
     if(descriptor->prev)
         descriptor->prev->next = descriptor->next;
     else
@@ -160,6 +171,7 @@ static void iohandler_remove(struct IODescriptor *descriptor, int engine_remove)
         free(descriptor->writebuf.buffer);
     iohandler_log(IOLOG_DEBUG, "removed IODescriptor (%d) of type `%s`", descriptor->fd, iohandler_iotype_name(descriptor->type));
     free(descriptor);
+    IODESYNCHRONIZE(io_thread_sync);
 }
 
 struct IODescriptor *iohandler_add(int sockfd, enum IOType type, struct timeval *timeout, iohandler_callback *callback) {
@@ -332,8 +344,19 @@ struct IODescriptor *iohandler_connect(const char *hostname, unsigned int port,
         dstaddrlen = sizeof(*ip4);
     } else
         return NULL;
+    //prevent SIGPIPE
+    #ifndef WIN32
+    #if defined(SO_NOSIGPIPE)
+    {
+        int set = 1;
+        setsockopt(sockfd, SOL_SOCKET, SO_NOSIGPIPE, (void *)&set, sizeof(int));
+    }
+    #else
+    signal(SIGPIPE, SIG_IGN);
+    #endif
+    #endif
     //make sockfd unblocking
-#if defined(F_GETFL)
+    #if defined(F_GETFL)
     {
         int flags;
         flags = fcntl(sockfd, F_GETFL);
@@ -341,11 +364,11 @@ struct IODescriptor *iohandler_connect(const char *hostname, unsigned int port,
         flags = fcntl(sockfd, F_GETFD);
         fcntl(sockfd, F_SETFD, flags|FD_CLOEXEC);
     }
-#else
+    #else
     /* I hope you're using the Win32 backend or something else that
      * automatically marks the file descriptor non-blocking...
      */
-#endif
+    #endif
     descriptor = iohandler_add(sockfd, IOTYPE_CLIENT, NULL, callback);
     if(!descriptor) {
         close(sockfd);
@@ -416,8 +439,19 @@ struct IODescriptor *iohandler_listen(const char *hostname, unsigned int port, i
         bind(sockfd, (struct sockaddr*)ip4, sizeof(*ip4));
     } else
         return NULL;
+    //prevent SIGPIPE
+    #ifndef WIN32
+    #if defined(SO_NOSIGPIPE)
+    {
+        int set = 1;
+        setsockopt(sockfd, SOL_SOCKET, SO_NOSIGPIPE, (void *)&set, sizeof(int));
+    }
+    #else
+    signal(SIGPIPE, SIG_IGN);
+    #endif
+    #endif
     //make sockfd unblocking
-#if defined(F_GETFL)
+    #if defined(F_GETFL)
     {
         int flags;
         flags = fcntl(sockfd, F_GETFL);
@@ -425,11 +459,11 @@ struct IODescriptor *iohandler_listen(const char *hostname, unsigned int port, i
         flags = fcntl(sockfd, F_GETFD);
         fcntl(sockfd, F_SETFD, flags|FD_CLOEXEC);
     }
-#else
+    #else
     /* I hope you're using the Win32 backend or something else that
      * automatically marks the file descriptor non-blocking...
      */
-#endif
+    #endif
     descriptor = iohandler_add(sockfd, IOTYPE_SERVER, NULL, callback);
     if(!descriptor) {
         close(sockfd);
@@ -480,8 +514,8 @@ void iohandler_printf(struct IODescriptor *iofd, const char *text, ...) {
     iohandler_send(iofd, sendBuf, pos+1);
 }
 
-void iohandler_try_write(struct IODescriptor *iofd) {
-    if(!iofd->writebuf.bufpos) return;
+static int iohandler_try_write(struct IODescriptor *iofd) {
+    if(!iofd->writebuf.bufpos) return 0;
     iohandler_log(IOLOG_DEBUG, "write writebuf (%d bytes) to socket (fd: %d)", iofd->writebuf.bufpos, iofd->fd);
     int res;
     if(iofd->ssl_active)
@@ -489,14 +523,16 @@ void iohandler_try_write(struct IODescriptor *iofd) {
     else
         res = send(iofd->fd, iofd->writebuf.buffer, iofd->writebuf.bufpos, 0);
     if(res < 0) {
-        if (errno != EAGAIN) {
+        if (errno != EAGAIN && errno != EWOULDBLOCK)
             iohandler_log(IOLOG_ERROR, "could not write to socket (fd: %d): %d - %s", iofd->fd, errno, strerror(errno));
-        }
+        else
+            res = 0;
     } else {
         iofd->writebuf.bufpos -= res;
         if(iofd->state != IO_CLOSED)
             engine->update(iofd);
     }
+    return res;
 }
 
 void iohandler_close(struct IODescriptor *iofd) {
@@ -546,6 +582,7 @@ void iohandler_events(struct IODescriptor *iofd, int readable, int writeable) {
             if(!readable && !writeable) {
                 callback_event.type = IOEVENT_SSLFAILED;
                 iofd->state = IO_CLOSED;
+                engine->update(iofd);
             } else {
                 iohandler_log(IOLOG_DEBUG, "triggering iohandler_ssl_client_handshake for %s (fd: %d)", iohandler_iotype_name(iofd->type), iofd->fd);
                 iohandler_ssl_client_handshake(iofd);
@@ -602,7 +639,7 @@ void iohandler_events(struct IODescriptor *iofd, int readable, int writeable) {
                             bytes = recv(iofd->fd, iofd->readbuf.buffer + iofd->readbuf.bufpos, iofd->readbuf.buflen - iofd->readbuf.bufpos, 0);
                     }
                     if(bytes <= 0) {
-                        if (errno != EAGAIN) {
+                        if (errno != EAGAIN || errno != EWOULDBLOCK) {
                             iofd->state = IO_CLOSED;
                                                        engine->update(iofd);
                             callback_event.type = IOEVENT_CLOSED;
@@ -651,7 +688,14 @@ void iohandler_events(struct IODescriptor *iofd, int readable, int writeable) {
                     callback_event.type = IOEVENT_READABLE;
             }
             if(writeable) {
-                iohandler_try_write(iofd);
+                int bytes;
+                bytes = iohandler_try_write(iofd);
+                if(bytes < 0) {
+                    iofd->state = IO_CLOSED;
+                    engine->update(iofd);
+                    callback_event.type = IOEVENT_CLOSED;
+                    callback_event.data.errid = errno;
+                }
             }
             break;
     }
@@ -662,11 +706,17 @@ void iohandler_events(struct IODescriptor *iofd, int readable, int writeable) {
 }
 
 void iohandler_poll() {
+    struct timeval timeout;
+    timeout.tv_sec = IO_MAX_TIMEOUT;
+    timeout.tv_usec = 0;
+    iohandler_poll_timeout(timeout);
+}
+
+void iohandler_poll_timeout(struct timeval timeout) {
     if(engine) {
-        struct timeval timeout;
-        timeout.tv_sec = IO_MAX_TIMEOUT;
-        timeout.tv_usec = 0;
+        IOSYNCHRONIZE(io_poll_sync); //quite senceless multithread support... better support will follow
         engine->loop(&timeout);
+        IODESYNCHRONIZE(io_poll_sync);
     }
 }