X-Git-Url: http://git.pk910.de/?p=NeonServV5.git;a=blobdiff_plain;f=src%2FIOHandler.c;h=db9a5998d2f5dc486ad39fa69eb0d050d50cfca0;hp=3c5bc3d9787c73a24c1672fbd3dbaae59f0c6bca;hb=54ef844b721b720e9cc81dc4737c299a0e84cb59;hpb=59659c9123e0b56c34f5fa9281566f574365deb5 diff --git a/src/IOHandler.c b/src/IOHandler.c index 3c5bc3d..db9a599 100644 --- a/src/IOHandler.c +++ b/src/IOHandler.c @@ -26,6 +26,7 @@ #include #include #else +#include #include #include #include @@ -35,12 +36,21 @@ #include #endif +#ifndef EWOULDBLOCK +#define EWOULDBLOCK EAGAIN +#endif + #define MAXLOG 1024 iohandler_log_callback *iolog_backend = NULL; struct IODescriptor *first_descriptor = NULL; struct IODescriptor *timer_priority = NULL; +#ifdef HAVE_PTHREAD_H +static pthread_mutex_t io_thread_sync; +static pthread_mutex_t io_poll_sync; +#endif + void iohandler_log(enum IOLogType type, char *text, ...) { va_list arg_list; char logBuf[MAXLOG+1]; @@ -61,16 +71,22 @@ void iohandler_log(enum IOLogType type, char *text, ...) { extern struct IOEngine engine_select; /* select system call (should always be useable) */ extern struct IOEngine engine_kevent; extern struct IOEngine engine_epoll; +extern struct IOEngine engine_win32; struct IOEngine *engine = NULL; static void iohandler_init_engine() { if(engine) return; + IOTHREAD_MUTEX_INIT(io_thread_sync); + IOTHREAD_MUTEX_INIT(io_poll_sync); + //try other engines if(!engine && engine_kevent.init && engine_kevent.init()) engine = &engine_kevent; if(!engine && engine_epoll.init && engine_epoll.init()) engine = &engine_epoll; + if(!engine && engine_win32.init && engine_win32.init()) + engine = &engine_win32; if (!engine) { if(engine_select.init()) @@ -82,17 +98,10 @@ static void iohandler_init_engine() { } iohandler_log(IOLOG_DEBUG, "using %s IO engine", engine->name); iohandler_ssl_init(); - #ifdef WIN32 - WSADATA wsadata; - // Start Windows Sockets. - if (WSAStartup(MAKEWORD(2, 0), &wsadata)) { - iohandler_log(IOLOG_FATAL, "Unable to start Windows Sockets"); - return; - } - #endif } static void iohandler_append(struct IODescriptor *descriptor) { + IOSYNCHRONIZE(io_thread_sync); struct timeval *timeout = ((descriptor->timeout.tv_sec || descriptor->timeout.tv_usec) ? &descriptor->timeout : NULL); if(timeout) { struct IODescriptor *iofd; @@ -107,9 +116,9 @@ static void iohandler_append(struct IODescriptor *descriptor) { if(timeval_is_smaler(timeout, (&iofd->timeout))) { descriptor->prev = iofd->prev; descriptor->next = iofd; - iofd->prev = descriptor; if(iofd->prev) iofd->prev->next = descriptor; + iofd->prev = descriptor; if(set_priority) timer_priority = descriptor; break; @@ -139,10 +148,12 @@ static void iohandler_append(struct IODescriptor *descriptor) { first_descriptor->prev = descriptor; first_descriptor = descriptor; } + IODESYNCHRONIZE(io_thread_sync); } static void iohandler_remove(struct IODescriptor *descriptor, int engine_remove) { //remove IODescriptor from the list + IOSYNCHRONIZE(io_thread_sync); if(descriptor->prev) descriptor->prev->next = descriptor->next; else @@ -160,6 +171,7 @@ static void iohandler_remove(struct IODescriptor *descriptor, int engine_remove) free(descriptor->writebuf.buffer); iohandler_log(IOLOG_DEBUG, "removed IODescriptor (%d) of type `%s`", descriptor->fd, iohandler_iotype_name(descriptor->type)); free(descriptor); + IODESYNCHRONIZE(io_thread_sync); } struct IODescriptor *iohandler_add(int sockfd, enum IOType type, struct timeval *timeout, iohandler_callback *callback) { @@ -332,8 +344,19 @@ struct IODescriptor *iohandler_connect(const char *hostname, unsigned int port, dstaddrlen = sizeof(*ip4); } else return NULL; + //prevent SIGPIPE + #ifndef WIN32 + #if defined(SO_NOSIGPIPE) + { + int set = 1; + setsockopt(sockfd, SOL_SOCKET, SO_NOSIGPIPE, (void *)&set, sizeof(int)); + } + #else + signal(SIGPIPE, SIG_IGN); + #endif + #endif //make sockfd unblocking -#if defined(F_GETFL) + #if defined(F_GETFL) { int flags; flags = fcntl(sockfd, F_GETFL); @@ -341,11 +364,11 @@ struct IODescriptor *iohandler_connect(const char *hostname, unsigned int port, flags = fcntl(sockfd, F_GETFD); fcntl(sockfd, F_SETFD, flags|FD_CLOEXEC); } -#else + #else /* I hope you're using the Win32 backend or something else that * automatically marks the file descriptor non-blocking... */ -#endif + #endif descriptor = iohandler_add(sockfd, IOTYPE_CLIENT, NULL, callback); if(!descriptor) { close(sockfd); @@ -416,8 +439,19 @@ struct IODescriptor *iohandler_listen(const char *hostname, unsigned int port, i bind(sockfd, (struct sockaddr*)ip4, sizeof(*ip4)); } else return NULL; + //prevent SIGPIPE + #ifndef WIN32 + #if defined(SO_NOSIGPIPE) + { + int set = 1; + setsockopt(sockfd, SOL_SOCKET, SO_NOSIGPIPE, (void *)&set, sizeof(int)); + } + #else + signal(SIGPIPE, SIG_IGN); + #endif + #endif //make sockfd unblocking -#if defined(F_GETFL) + #if defined(F_GETFL) { int flags; flags = fcntl(sockfd, F_GETFL); @@ -425,11 +459,11 @@ struct IODescriptor *iohandler_listen(const char *hostname, unsigned int port, i flags = fcntl(sockfd, F_GETFD); fcntl(sockfd, F_SETFD, flags|FD_CLOEXEC); } -#else + #else /* I hope you're using the Win32 backend or something else that * automatically marks the file descriptor non-blocking... */ -#endif + #endif descriptor = iohandler_add(sockfd, IOTYPE_SERVER, NULL, callback); if(!descriptor) { close(sockfd); @@ -480,8 +514,8 @@ void iohandler_printf(struct IODescriptor *iofd, const char *text, ...) { iohandler_send(iofd, sendBuf, pos+1); } -void iohandler_try_write(struct IODescriptor *iofd) { - if(!iofd->writebuf.bufpos) return; +static int iohandler_try_write(struct IODescriptor *iofd) { + if(!iofd->writebuf.bufpos) return 0; iohandler_log(IOLOG_DEBUG, "write writebuf (%d bytes) to socket (fd: %d)", iofd->writebuf.bufpos, iofd->fd); int res; if(iofd->ssl_active) @@ -489,14 +523,16 @@ void iohandler_try_write(struct IODescriptor *iofd) { else res = send(iofd->fd, iofd->writebuf.buffer, iofd->writebuf.bufpos, 0); if(res < 0) { - if (errno != EAGAIN) { + if (errno != EAGAIN && errno != EWOULDBLOCK) iohandler_log(IOLOG_ERROR, "could not write to socket (fd: %d): %d - %s", iofd->fd, errno, strerror(errno)); - } + else + res = 0; } else { iofd->writebuf.bufpos -= res; if(iofd->state != IO_CLOSED) engine->update(iofd); } + return res; } void iohandler_close(struct IODescriptor *iofd) { @@ -546,6 +582,7 @@ void iohandler_events(struct IODescriptor *iofd, int readable, int writeable) { if(!readable && !writeable) { callback_event.type = IOEVENT_SSLFAILED; iofd->state = IO_CLOSED; + engine->update(iofd); } else { iohandler_log(IOLOG_DEBUG, "triggering iohandler_ssl_client_handshake for %s (fd: %d)", iohandler_iotype_name(iofd->type), iofd->fd); iohandler_ssl_client_handshake(iofd); @@ -602,7 +639,7 @@ void iohandler_events(struct IODescriptor *iofd, int readable, int writeable) { bytes = recv(iofd->fd, iofd->readbuf.buffer + iofd->readbuf.bufpos, iofd->readbuf.buflen - iofd->readbuf.bufpos, 0); } if(bytes <= 0) { - if (errno != EAGAIN) { + if (errno != EAGAIN || errno != EWOULDBLOCK) { iofd->state = IO_CLOSED; engine->update(iofd); callback_event.type = IOEVENT_CLOSED; @@ -651,7 +688,14 @@ void iohandler_events(struct IODescriptor *iofd, int readable, int writeable) { callback_event.type = IOEVENT_READABLE; } if(writeable) { - iohandler_try_write(iofd); + int bytes; + bytes = iohandler_try_write(iofd); + if(bytes < 0) { + iofd->state = IO_CLOSED; + engine->update(iofd); + callback_event.type = IOEVENT_CLOSED; + callback_event.data.errid = errno; + } } break; } @@ -662,11 +706,17 @@ void iohandler_events(struct IODescriptor *iofd, int readable, int writeable) { } void iohandler_poll() { + struct timeval timeout; + timeout.tv_sec = IO_MAX_TIMEOUT; + timeout.tv_usec = 0; + iohandler_poll_timeout(timeout); +} + +void iohandler_poll_timeout(struct timeval timeout) { if(engine) { - struct timeval timeout; - timeout.tv_sec = IO_MAX_TIMEOUT; - timeout.tv_usec = 0; + IOSYNCHRONIZE(io_poll_sync); //quite senceless multithread support... better support will follow engine->loop(&timeout); + IODESYNCHRONIZE(io_poll_sync); } }