X-Git-Url: http://git.pk910.de/?p=IOMultiplexer.git;a=blobdiff_plain;f=src%2FIOHandler.c;h=f6522950f4302f194b2b6307dca4445c0696c37b;hp=06e2bff70825241b2a8f17fc8c2242dad79f37ad;hb=e18ef9b450a0704ff5c55545f70e3f1ec215a5e6;hpb=5ad65e65d2234b02151ea7ca17e3fa9893a9b165 diff --git a/src/IOHandler.c b/src/IOHandler.c index 06e2bff..f652295 100644 --- a/src/IOHandler.c +++ b/src/IOHandler.c @@ -15,7 +15,9 @@ * along with this program. If not, see . */ #include "IOHandler.h" +#include "IODNSHandler.h" #include "IOEngine.h" +#include "IODNSEngine.h" #include "IOHandler_SSL.h" #include #include @@ -44,14 +46,44 @@ #define MAXLOG 1024 iohandler_log_callback *iolog_backend = NULL; -struct IODescriptor *first_descriptor = NULL; -struct IODescriptor *timer_priority = NULL; +struct IOLowlevelDescriptor *first_descriptor = NULL; +struct IOLowlevelDescriptor *timer_priority = NULL; +struct IODescriptorStartup *first_iostartup = NULL; #ifdef HAVE_PTHREAD_H static pthread_mutex_t io_thread_sync; static pthread_mutex_t io_poll_sync; #endif +#define IOSTARTUP_SRCLOOKUP 0x01 +#define IOSTARTUP_DSTLOOKUP 0x02 +#define IOSTARTUP_LOOKUPS 0x03 + +struct IODescriptorStartup { + struct IODescriptor *iofd; + struct IODNSQuery *iodns; /* dns queries for src and dst address */ + struct timeval timeout; + + unsigned int flags : 6; + unsigned int port : 16; + unsigned int iptype : 8; + unsigned int listening : 1; + unsigned int ssl : 1; + + char *srchost; + struct IODNSResult *srcaddr; + struct IODNSResult *use_srcaddr; + + char *dsthost; + struct IODNSResult *dstaddr; + struct IODNSResult *use_dstaddr; + + char *ssl_certfile; + char *ssl_keyfile; + + struct IODescriptorStartup *next, *prev; +}; + void iohandler_log(enum IOLogType type, char *text, ...) { va_list arg_list; char logBuf[MAXLOG+1]; @@ -111,165 +143,260 @@ static void iohandler_init_engine() { iohandler_ssl_init(); } -static void iohandler_append(struct IODescriptor *descriptor) { +static void iohandler_initialize_iobuf(struct IODescriptor *iofd) { + if(!iofd->readbuf.buffer) { + iofd->readbuf.buffer = malloc(IO_READ_BUFLEN + 2); + iofd->readbuf.bufpos = 0; + iofd->readbuf.buflen = IO_READ_BUFLEN; + iofd->writebuf.buffer = malloc(IO_READ_BUFLEN + 2); + iofd->writebuf.bufpos = 0; + iofd->writebuf.buflen = IO_READ_BUFLEN; + } +} + +static void iohandler_trigger_event(struct IOEvent *event) { + if(!event->iofd->callback) return; + iohandler_log(IOLOG_DEBUG, "triggering event (%s) for %s (fd: %d)", iohandler_ioeventtype_name(event->type), iohandler_iotype_name(event->iofd->type), event->iofd->fd); + event->iofd->callback(event); +} + +static void iohandler_increase_iobuf(struct IOBuffer *iobuf, size_t required) { + if(iobuf->buflen >= required) return; + char *new_buf = realloc(iobuf->buffer, required + 2); + if(new_buf) { + iobuf->buffer = new_buf; + iobuf->buflen = required; + } +} + +static void iohandler_append_iostartup(struct IODescriptorStartup *iostartup) { + iostartup->prev = NULL; + iostartup->next = first_iostartup; + first_iostartup = iostartup; +} + +static void iohandler_remove_iostartup(struct IODescriptorStartup *iostartup) { + if(iostartup->prev) + iostartup->prev->next = iostartup->next; + else + first_iostartup = iostartup->next; + if(iostartup->next) + iostartup->next->prev = iostartup->prev; + if(iostartup->iodns) + iodns_abort(iostartup->iodns); + if(iostartup->srchost) + free(iostartup->srchost); + if(iostartup->dsthost) + free(iostartup->dsthost); + if(iostartup->srcaddr) + iodns_free_result(iostartup->srcaddr); + if(iostartup->dstaddr) + iodns_free_result(iostartup->dstaddr); + if(iostartup->iofd) { + if(iostartup->iofd->readbuf.buffer) + free(iostartup->iofd->readbuf.buffer); + if(iostartup->iofd->writebuf.buffer) + free(iostartup->iofd->writebuf.buffer); + free(iostartup->iofd); + } + free(iostartup); +} + +static void iohandler_append(struct IOLowlevelDescriptor *iold) { IOSYNCHRONIZE(io_thread_sync); - struct timeval *timeout = ((descriptor->timeout.tv_sec || descriptor->timeout.tv_usec) ? &descriptor->timeout : NULL); + struct timeval *timeout = ((iold->timeout.tv_sec || iold->timeout.tv_usec) ? &iold->timeout : NULL); if(timeout) { - struct IODescriptor *iofd; + struct IOLowlevelDescriptor *iold2; int set_priority = 1; - descriptor->timeout = *timeout; if(timer_priority) - iofd = timer_priority; + iold2 = timer_priority; else - iofd = first_descriptor; - if(iofd) { - for(;;iofd = iofd->next) { - if(timeval_is_smaler(timeout, (&iofd->timeout))) { - descriptor->prev = iofd->prev; - descriptor->next = iofd; - if(iofd->prev) - iofd->prev->next = descriptor; - iofd->prev = descriptor; + iold2 = first_descriptor; + if(iold2) { + for(;;iold2 = iold2->next) { + if(timeval_is_smaler(timeout, (&iold2->timeout))) { + iold->prev = iold2->prev; + iold->next = iold2; + if(iold2->prev) + iold2->prev->next = iold; + iold2->prev = iold; if(set_priority) - timer_priority = descriptor; + timer_priority = iold; break; } - if(iofd == timer_priority) + if(iold2 == timer_priority) set_priority = 0; - if(iofd->next == NULL) { - descriptor->next = NULL; - descriptor->prev = iofd; - iofd->next = descriptor; + if(iold2->next == NULL) { + iold->next = NULL; + iold->prev = iold2; + iold2->next = iold; if(set_priority) - timer_priority = descriptor; + timer_priority = iold; break; } } } else { - descriptor->prev = NULL; - descriptor->next = NULL; - first_descriptor = descriptor; - timer_priority = descriptor; + iold->prev = NULL; + iold->next = NULL; + first_descriptor = iold; + timer_priority = iold; } - } else { - descriptor->prev = NULL; - descriptor->next = first_descriptor; + iold->prev = NULL; + iold->next = first_descriptor; if(first_descriptor) - first_descriptor->prev = descriptor; - first_descriptor = descriptor; + first_descriptor->prev = iold; + first_descriptor = iold; } IODESYNCHRONIZE(io_thread_sync); } -static void iohandler_remove(struct IODescriptor *descriptor, int engine_remove) { - //remove IODescriptor from the list +static void iohandler_remove(struct IOLowlevelDescriptor *iold) { + //remove IOLowlevelDescriptor from the list IOSYNCHRONIZE(io_thread_sync); - if(descriptor->prev) - descriptor->prev->next = descriptor->next; + if(iold->prev) + iold->prev->next = iold->next; else - first_descriptor = descriptor->next; - if(descriptor->next) - descriptor->next->prev = descriptor->prev; - if(descriptor == timer_priority) - timer_priority = descriptor->next; - - if(engine_remove) - engine->remove(descriptor); - if(descriptor->readbuf.buffer) - free(descriptor->readbuf.buffer); - if(descriptor->writebuf.buffer) - free(descriptor->writebuf.buffer); - iohandler_log(IOLOG_DEBUG, "removed IODescriptor (%d) of type `%s`", descriptor->fd, iohandler_iotype_name(descriptor->type)); - free(descriptor); + first_descriptor = iold->next; + if(iold->next) + iold->next->prev = iold->prev; + if(iold == timer_priority) + timer_priority = iold->next; + + struct IODescriptor *iofd = IOLOWLEVEL_GET_IOFD(iold); + if(iofd) { + if(iofd->readbuf.buffer) + free(iofd->readbuf.buffer); + if(iofd->writebuf.buffer) + free(iofd->writebuf.buffer); + iohandler_log(IOLOG_DEBUG, "removed IODescriptor (%d) of type `%s`", iold->fd, iohandler_iotype_name(iofd->type)); + free(iofd); + } + + free(iold); IODESYNCHRONIZE(io_thread_sync); } -struct IODescriptor *iohandler_add(int sockfd, enum IOType type, struct timeval *timeout, iohandler_callback *callback) { + +struct IODescriptor *iohandler_add_iofd(int sockfd, enum IOType type, struct timeval *timeout, iohandler_callback *callback) { //just add a new IODescriptor - struct IODescriptor *descriptor = calloc(1, sizeof(*descriptor)); - if(!descriptor) { + if(!engine) { + iohandler_init_engine(); + if(!engine) { + return NULL; + } + } + + struct IODescriptor *iofd = calloc(1, sizeof(*iofd)); + if(!iofd) { iohandler_log(IOLOG_ERROR, "could not allocate memory for IODescriptor in %s:%d", __FILE__, __LINE__); return NULL; } - descriptor->fd = (type == IOTYPE_STDIN ? fileno(stdin) : sockfd); - descriptor->type = type; - descriptor->state = (type == IOTYPE_STDIN ? IO_CONNECTED : IO_CLOSED); - descriptor->callback = callback; - if(timeout) { - descriptor->timeout = *timeout; - if(descriptor->timeout.tv_usec > 1000000) { - descriptor->timeout.tv_usec -= 1000000; - descriptor->timeout.tv_sec++; - } - } - if(type != IOTYPE_TIMER) { - descriptor->readbuf.buffer = malloc(IO_READ_BUFLEN + 2); - descriptor->readbuf.bufpos = 0; - descriptor->readbuf.buflen = IO_READ_BUFLEN; - descriptor->writebuf.buffer = malloc(IO_READ_BUFLEN + 2); - descriptor->writebuf.bufpos = 0; - descriptor->writebuf.buflen = IO_READ_BUFLEN; + + struct IOLowlevelDescriptor *iold = calloc(1, sizeof(*iofd)); + if(!iold) { + iohandler_log(IOLOG_ERROR, "could not allocate memory for IOLowlevelDescriptor in %s:%d", __FILE__, __LINE__); + free(iofd); + return NULL; } - if(!engine) { - iohandler_init_engine(); - if(!engine) { - return NULL; + iold->fd = (type == IOTYPE_STDIN ? fileno(stdin) : sockfd); + iold->flags = IOFLAGS_HAVE_IOFD | IOFLAGS_WANT_READ; + iold->data.iofd = iofd; + + iofd->fd.iold = iold; + iofd->flags = IOFDFLAGS_HAVE_IOLD; + iofd->type = type; + iofd->state = (type == IOTYPE_STDIN ? IO_CONNECTED : IO_CLOSED); + iofd->callback = callback; + + if(timeout) { + iold->timeout = *timeout; + if(iold->timeout.tv_usec > 1000000) { + iold->timeout.tv_usec -= 1000000; + iold->timeout.tv_sec++; } } - engine->add(descriptor); - //add IODescriptor to the list - iohandler_append(descriptor); + if(type != IOTYPE_TIMER) + iohandler_initialize_iobuf(iofd); + + + engine->add(iold); + + //add IOLowlevelDescriptor to the list + iohandler_append(iold); iohandler_log(IOLOG_DEBUG, "added custom socket descriptor (%d) as type `%s`", sockfd, iohandler_iotype_name(type)); - return descriptor; + return iofd; } -void iohandler_set_timeout(struct IODescriptor *descriptor, struct timeval *timeout) { - if(descriptor->prev) - descriptor->prev->next = descriptor->next; - else - first_descriptor = descriptor->next; - if(descriptor->next) - descriptor->next->prev = descriptor->prev; - if(descriptor == timer_priority) - timer_priority = descriptor->next; - if(timeout && timeout->tv_sec == 0 && descriptor->constant_timeout) { - descriptor->timeout.tv_usec += (descriptor->constant_timeout % 1000) * 1000; - descriptor->timeout.tv_sec += (descriptor->constant_timeout / 1000); - if(descriptor->timeout.tv_usec > 1000000) { - descriptor->timeout.tv_sec += (descriptor->timeout.tv_usec / 1000000); - descriptor->timeout.tv_usec %= 1000000; - } - } else if(timeout) { - descriptor->timeout = *timeout; - if(descriptor->timeout.tv_usec > 1000000) { - descriptor->timeout.tv_usec -= 1000000; - descriptor->timeout.tv_sec++; - } - } else { - descriptor->timeout.tv_sec = 0; - descriptor->timeout.tv_usec = 0; - } - iohandler_append(descriptor); +struct IOLowlevelDescriptor *iohandler_lowlevel_add(int fd, int want_read, int want_write, iolowlevel_callback *callback) { + } -static void iohandler_increase_iobuf(struct IOBuffer *iobuf, size_t required) { - if(iobuf->buflen >= required) return; - char *new_buf = realloc(iobuf->buffer, required + 2); - if(new_buf) { - iobuf->buffer = new_buf; - iobuf->buflen = required; +void iohandler_lowlevel_update(struct IOLowlevelDescriptor *iolow, int want_read, int want_write) { + +} + +void iohandler_lowlevel_del(struct IOLowlevelDescriptor *iolow) { + +} + +void iohandler_set_timeout(struct IODescriptor *iofd, struct timeval *timeout) { + struct IOLowlevelDescriptor *iold = IODESCRIPTOR_GET_IOLD(iofd); + if(iold) { + if(!timeout && !(iold->flags & IOFLAGS_HAVE_TIMEOUT)) + return; //nothing to do + + if(iold->prev) + iold->prev->next = iold->next; + else + first_descriptor = iold->next; + if(iold->next) + iold->next->prev = iold->prev; + if(iold == timer_priority) + timer_priority = iold->next; + + if(timeout && timeout->tv_sec == 0 && iofd->constant_timeout) { // autoreload timer + iold->timeout.tv_usec += (iofd->constant_timeout % 1000) * 1000; + iold->timeout.tv_sec += (iofd->constant_timeout / 1000); + if(iold->timeout.tv_usec > 1000000) { + iold->timeout.tv_sec += (iold->timeout.tv_usec / 1000000); + iold->timeout.tv_usec %= 1000000; + } + iold->flags |= IOFLAGS_HAVE_TIMEOUT; + } else if(timeout) { // normal timer + iold->timeout = *timeout; + if(iold->timeout.tv_usec > 1000000) { + iold->timeout.tv_usec -= 1000000; + iold->timeout.tv_sec++; + } + iold->flags |= IOFLAGS_HAVE_TIMEOUT; + } else + iold->flags &= ~IOFLAGS_HAVE_TIMEOUT; + iohandler_append(iold); + } else { + struct IODescriptorStartup *iostartup = iofd->fd.iostartup; + + if(timeout) { + iostartup->timeout = *timeout; + if(iostartup->timeout.tv_usec > 1000000) { + iostartup->timeout.tv_usec -= 1000000; + iostartup->timeout.tv_sec++; + } + } else { + iostartup->timeout.tv_sec = 0; + iostartup->timeout.tv_usec = 0; + } } } struct IODescriptor *iohandler_timer(struct timeval timeout, iohandler_callback *callback) { struct IODescriptor *descriptor; - descriptor = iohandler_add(-1, IOTYPE_TIMER, &timeout, callback); + descriptor = iohandler_add_iofd(-1, IOTYPE_TIMER, &timeout, callback); if(!descriptor) { - iohandler_log(IOLOG_ERROR, "could not allocate memory for IODescriptor in %s:%d", __FILE__, __LINE__); + iohandler_log(IOLOG_ERROR, "could not create IODescriptor in %s:%d", __FILE__, __LINE__); return NULL; } iohandler_log(IOLOG_DEBUG, "added timer descriptor (sec: %d; usec: %d)", timeout.tv_sec, timeout.tv_usec); @@ -286,7 +413,7 @@ struct IODescriptor *iohandler_constant_timer(int msec, iohandler_callback *call timeout.tv_sec += (timeout.tv_usec / 1000000); timeout.tv_usec %= 1000000; } - descriptor = iohandler_add(-1, IOTYPE_TIMER, &timeout, callback); + descriptor = iohandler_add_iofd(-1, IOTYPE_TIMER, &timeout, callback); if(!descriptor) { iohandler_log(IOLOG_ERROR, "could not allocate memory for IODescriptor in %s:%d", __FILE__, __LINE__); return NULL; @@ -296,142 +423,273 @@ struct IODescriptor *iohandler_constant_timer(int msec, iohandler_callback *call return descriptor; } -struct IODescriptor *iohandler_connect(const char *hostname, unsigned int port, int ssl, const char *bindhost, iohandler_callback *callback) { - return iohandler_connect_flags(hostname, port, ssl, bindhost, callback, IOHANDLER_CONNECT_IPV4 | IOHANDLER_CONNECT_IPV6); -} - -struct IODescriptor *iohandler_connect_flags(const char *hostname, unsigned int port, int ssl, const char *bindhost, iohandler_callback *callback, int flags) { - //non-blocking connect - int sockfd, result; - struct addrinfo hints, *res; - struct sockaddr_in *ip4 = NULL; - struct sockaddr_in6 *ip6 = NULL; - size_t dstaddrlen; - struct sockaddr *dstaddr = NULL; - struct IODescriptor *descriptor; +static void iohandler_process_iostartup(struct IODescriptorStartup *iostartup) { + struct IODescriptor *iofd = iostartup->iofd; + struct IOLowlevelDescriptor *iold; + struct IOEvent callback_event; + struct sockaddr_in ip4; + struct sockaddr_in6 ip6; + int addr_family, sockfd, opt; - if(!engine) { - iohandler_init_engine(); - if(!engine) return NULL; + //open socket + if(iostartup->use_srcaddr) + iostartup->iptype = iostartup->use_srcaddr->type; + else if(iostartup->use_dstaddr) + iostartup->iptype = iostartup->use_dstaddr->type; + + addr_family = ((iostartup->iptype & IODNS_RECORD_AAAA) ? AF_INET6 : AF_INET); + + sockfd = socket(addr_family, SOCK_STREAM, 0); + if(!sockfd) { + callback_event.type = IOEVENT_SOCKET_ERROR; + goto iohandler_process_iostartup_fail; } - memset (&hints, 0, sizeof (hints)); - hints.ai_family = PF_UNSPEC; - hints.ai_socktype = SOCK_STREAM; - hints.ai_flags |= AI_CANONNAME; - if ((result = getaddrinfo (hostname, NULL, &hints, &res))) { - iohandler_log(IOLOG_ERROR, "could not resolve %s to an IP address (%d)", hostname, result); - return NULL; + + //prevent SIGPIPE + #ifndef WIN32 + #if defined(SO_NOSIGPIPE) + opt = 1; + setsockopt(sockfd, SOL_SOCKET, SO_NOSIGPIPE, (void *)&opt, sizeof(opt)); + #else + signal(SIGPIPE, SIG_IGN); + #endif + #endif + //make sockfd unblocking + #if defined(F_GETFL) + opt = fcntl(sockfd, F_GETFL); + fcntl(sockfd, F_SETFL, opt|O_NONBLOCK); + opt = fcntl(sockfd, F_GETFD); + fcntl(sockfd, F_SETFD, opt|FD_CLOEXEC); + #endif + + if(iostartup->listening) { + opt = 1; + setsockopt(sockfd, SOL_SOCKET, SO_REUSEADDR, (void *)&opt, sizeof(opt)); + } else { + if((iostartup->iptype & IODNS_RECORD_AAAA)) { + struct sockaddr_in6 *ip6ptr = iostartup->use_srcaddr->address; + ip6ptr->sin6_family = addr_family; + ip6ptr->sin6_port = htons(iostartup->port); + } else { + struct sockaddr_in *ip4ptr = iostartup->use_srcaddr->address; + ip4ptr->sin_family = addr_family; + ip4ptr->sin_port = htons(iostartup->port); + } } - while (res) { - switch (res->ai_family) { - case AF_INET: - ip4 = (struct sockaddr_in *) res->ai_addr; + + // Bind Socket to IP + if(iostartup->listening || iostartup->use_srcaddr) { + if((iostartup->iptype & IODNS_RECORD_AAAA)) { + memset(&ip6, 0, sizeof(ip6)); + if(iostartup->use_srcaddr) { + struct sockaddr_in6 *ip6ptr = iostartup->use_srcaddr->address; + ip6.sin6_addr = ip6ptr->sin6_addr; + } else + inet_pton(AF_INET6, "::1", &ip6.sin6_addr); + ip6.sin6_family = addr_family; + ip6.sin6_port = htons((iostartup->listening ? iostartup->port : 0)); + opt = bind(sockfd, (struct sockaddr*)&ip6, sizeof(ip6)); + } else { + memset(&ip6, 0, sizeof(ip6)); + if(iostartup->use_srcaddr) { + struct sockaddr_in4 *ip4ptr = iostartup->use_srcaddr->address; + ip6.sin6_addr = ip4ptr->sin_addr; + } else + ip4.sin_addr = INADDR_ANY; + ip4.sin_family = addr_family; + ip4.sin_port = htons((iostartup->listening ? iostartup->port : 0)); + opt = bind(sockfd, (struct sockaddr*)&ip4, sizeof(ip4)); + } + if(!opt) { + callback_event.type = IOEVENT_BIND_ERROR; + goto iohandler_process_iostartup_fail; + } + } + + iold = calloc(1, sizeof(*iold)); + iold->fd = sockfd; + iold->flags = IOFLAGS_HAVE_IOFD | IOFLAGS_WANT_READ; + iold->data.iofd = iofd; + + if(iostartup->listening) { + listen(sockfd, 1); + iofd->state = IO_LISTENING; + } else { + connect(sockfd, iostartup->use_dstaddr->address, iostartup->use_dstaddr->addresslen); + iofd->state = IO_CONNECTING; + } + + iofd->fd.iold = iold; + iofd->flags = IOFDFLAGS_HAVE_IOLD; + + iostartup->iofd = NULL; + iohandler_remove_iostartup(iostartup); + return; + + iohandler_process_iostartup_fail: + if(sockfd) + close(sockfd); + callback_event.iofd = iostartup->iofd; + iostartup->iofd->flags |= IOFDFLAGS_FREE_LOCK; + iohandler_trigger_event(&callback_event); + iostartup->iofd->flags &= ~IOFDFLAGS_FREE_LOCK; + if(iostartup->iofd->flags & IOFDFLAGS_WANT_FREE) + iohandler_remove_iostartup(iostartup); + else + iohandler_close(iostartup->iofd); +} + +static IODNS_CALLBACK(iohandler_dns_callback) { + struct IODNSQuery *query = event->query; + struct IODescriptorStartup *iostartup = query->data; + int trigger_dns_fail = 0; + iostartup->iodns = NULL; + + switch((iostartup->flags & IOSTARTUP_LOOKUPS)) { + case IOSTARTUP_SRCLOOKUP: + if(event->type == IODNSEVENT_FAILED) { + trigger_dns_fail = 1; break; - case AF_INET6: - ip6 = (struct sockaddr_in6 *) res->ai_addr; + } + if(iostartup->dsthost) { + iostartup->iodns = iodns_getaddrinfo(iostartup->dsthost, iostartup->iptype, iohandler_dns_callback); + iostartup->flags |= IOSTARTUP_DSTLOOKUP; + iostartup->iodns->data = iostartup; + } + break; + case IOSTARTUP_DSTLOOKUP: + if(event->type == IODNSEVENT_FAILED) { + trigger_dns_fail = 1; break; } - res = res->ai_next; - freeaddrinfo(res); + break; } - if(ip6 && (flags & IOHANDLER_CONNECT_IPV6)) { - sockfd = socket(AF_INET6, SOCK_STREAM, 0); - if(sockfd == -1) { - iohandler_log(IOLOG_ERROR, "could not create socket in %s:%d", __FILE__, __LINE__); - return NULL; + if(iostartup->iodns) + return; + + if(iostartup->iptype & IODNS_RECORD_AAAA) { + struct IODNSResult *result; + if(iostartup->srcaddr) { + for(result = iostartup->srcaddr; result; result = result->next) { + if(result->type == IODNS_RECORD_AAAA) { + iostartup->use_srcaddr = result; + break; + } + } } - - ip6->sin6_family = AF_INET6; - ip6->sin6_port = htons(port); - - struct sockaddr_in6 *ip6vhost = NULL; - if (bindhost && !getaddrinfo(bindhost, NULL, &hints, &res)) { - while (res) { - switch (res->ai_family) { - case AF_INET6: - ip6vhost = (struct sockaddr_in6 *) res->ai_addr; + if(!iostartup->srcaddr || iostartup->use_srcaddr) { + for(result = iostartup->dstaddr; result; result = result->next) { + if(result->type == IODNS_RECORD_AAAA) { + iostartup->use_dstaddr = result; break; } - res = res->ai_next; - freeaddrinfo(res); } } - if(ip6vhost) { - ip6vhost->sin6_family = AF_INET6; - ip6vhost->sin6_port = htons(0); - bind(sockfd, (struct sockaddr*)ip6vhost, sizeof(*ip6vhost)); + if((iostartup->srcaddr && !iostartup->use_srcaddr) || (iostartup->dstaddr && !iostartup->use_dstaddr)) { + iostartup->use_srcaddr = NULL; + iostartup->use_dstaddr = NULL; } - dstaddr = (struct sockaddr*)ip6; - dstaddrlen = sizeof(*ip6); - } else if(ip4 && (flags & IOHANDLER_CONNECT_IPV4)) { - sockfd = socket(AF_INET, SOCK_STREAM, 0); - if(sockfd == -1) { - iohandler_log(IOLOG_ERROR, "could not create socket in %s:%d", __FILE__, __LINE__); - return NULL; + } + if((iostartup->iptype & IODNS_RECORD_A) && !(iostartup->use_dstaddr || iostartup->use_srcaddr)) { + struct IODNSResult *result; + if(iostartup->srcaddr) { + for(result = iostartup->srcaddr; result; result = result->next) { + if(result->type == IODNS_RECORD_A) { + iostartup->use_srcaddr = result; + break; + } + } } - - ip4->sin_family = AF_INET; - ip4->sin_port = htons(port); - - struct sockaddr_in *ip4vhost = NULL; - if (bindhost && !getaddrinfo(bindhost, NULL, &hints, &res)) { - while (res) { - switch (res->ai_family) { - case AF_INET: - ip4vhost = (struct sockaddr_in *) res->ai_addr; + if(!iostartup->srcaddr || iostartup->use_srcaddr) { + for(result = iostartup->srcaddr; result; result = result->next) { + if(result->type == IODNS_RECORD_A) { + iostartup->use_dstaddr = result; break; } - res = res->ai_next; - freeaddrinfo(res); } } - if(ip4vhost) { - ip4vhost->sin_family = AF_INET; - ip4vhost->sin_port = htons(0); - bind(sockfd, (struct sockaddr*)ip4vhost, sizeof(*ip4vhost)); + if((iostartup->srcaddr && !iostartup->use_srcaddr) || (iostartup->dstaddr && !iostartup->use_dstaddr)) { + iostartup->use_srcaddr = NULL; + iostartup->use_dstaddr = NULL; } - dstaddr = (struct sockaddr*)ip4; - dstaddrlen = sizeof(*ip4); - } else - return NULL; - //prevent SIGPIPE - #ifndef WIN32 - #if defined(SO_NOSIGPIPE) - { - int set = 1; - setsockopt(sockfd, SOL_SOCKET, SO_NOSIGPIPE, (void *)&set, sizeof(int)); } - #else - signal(SIGPIPE, SIG_IGN); - #endif - #endif - //make sockfd unblocking - #if defined(F_GETFL) - { - int fcntl_flags; - fcntl_flags = fcntl(sockfd, F_GETFL); - fcntl(sockfd, F_SETFL, fcntl_flags|O_NONBLOCK); - fcntl_flags = fcntl(sockfd, F_GETFD); - fcntl(sockfd, F_SETFD, fcntl_flags|FD_CLOEXEC); + if(!(iostartup->use_dstaddr || iostartup->use_srcaddr)) + trigger_dns_fail = 1; + + if(trigger_dns_fail) { + struct IOEvent callback_event; + callback_event.type = IOEVENT_DNS_FAILED; + callback_event.iofd = iostartup->iofd; + iostartup->iofd->flags |= IOFDFLAGS_FREE_LOCK; + iohandler_trigger_event(&callback_event); + iostartup->iofd->flags &= ~IOFDFLAGS_FREE_LOCK; + if(iostartup->iofd->flags & IOFDFLAGS_WANT_FREE) + iohandler_remove_iostartup(iostartup); + else + iohandler_close(iostartup->iofd); + return; } - #else - /* I hope you're using the Win32 backend or something else that - * automatically marks the file descriptor non-blocking... - */ - #endif - descriptor = iohandler_add(sockfd, IOTYPE_CLIENT, NULL, callback); - if(!descriptor) { - close(sockfd); + + iohandler_process_iostartup(iostartup); +} + +struct IODescriptor *iohandler_connect(const char *hostname, unsigned int port, int ssl, const char *bindhost, iohandler_callback *callback) { + return iohandler_connect_flags(hostname, port, ssl, bindhost, callback, IOHANDLER_CONNECT_IPV4 | IOHANDLER_CONNECT_IPV6); +} + +struct IODescriptor *iohandler_connect_flags(const char *hostname, unsigned int port, int ssl, const char *bindhost, iohandler_callback *callback, int flags) { + if(!engine) { + iohandler_init_engine(); + if(!engine) return NULL; + } + + //non-blocking connect + struct IODescriptorStartup *iostartup; + struct IODescriptor *iofd; + + iofd = calloc(1, sizeof(*iofd)); + if(!iofd) { + return NULL; } - connect(sockfd, dstaddr, dstaddrlen); //returns EINPROGRESS here (nonblocking) - descriptor->state = IO_CONNECTING; - descriptor->ssl = (ssl ? 1 : 0); - descriptor->read_lines = 1; - engine->update(descriptor); - iohandler_log(IOLOG_DEBUG, "added client socket (%d) connecting to %s:%d", sockfd, hostname, port); - return descriptor; + iostartup = calloc(1, sizeof(*iostartup)); + if(!iostartup) { + + return NULL; + } + + iostartup->iofd = iofd; + iofd->fd.iostartup = iostartup; + + iofd->type = IOTYPE_CLIENT; + iofd->state = IO_PENDING; + iofd->callback = callback; + + iostartup->srchost = (bindhost ? strdup(bindhost) : NULL); + iostartup->dsthost = strdup(hostname); + iostartup->port = port; + iostartup->iptype = ((flags & IOHANDLER_CONNECT_IPV6) ? IODNS_RECORD_AAAA : 0) | ((flags & IOHANDLER_CONNECT_IPV4) ? IODNS_RECORD_A : 0); + iostartup->ssl = (ssl == 0 ? 0 : 1); + + //DNS Requests + if(bindhost) { + iostartup->iodns = iodns_getaddrinfo(bindhost, iostartup->iptype, iohandler_dns_callback); + iostartup->flags |= IOSTARTUP_SRCLOOKUP; + } else { + iostartup->iodns = iodns_getaddrinfo(hostname, iostartup->iptype, iohandler_dns_callback); + iostartup->flags |= IOSTARTUP_DSTLOOKUP; + } + iostartup->iodns->data = iostartup; + + iohandler_append_iostartup(iostartup); + + //Some Flags for later use + iofd->read_lines = 1; + + iohandler_log(IOLOG_DEBUG, "added client socket connecting to %s:%d (DNS LOOKUP)", hostname, port); + + return iofd; } struct IODescriptor *iohandler_listen(const char *hostname, unsigned int port, iohandler_callback *callback) { @@ -439,96 +697,50 @@ struct IODescriptor *iohandler_listen(const char *hostname, unsigned int port, i } struct IODescriptor *iohandler_listen_flags(const char *hostname, unsigned int port, iohandler_callback *callback, int flags) { - int sockfd; - struct addrinfo hints, *res; - struct sockaddr_in *ip4 = NULL; - struct sockaddr_in6 *ip6 = NULL; - struct IODescriptor *descriptor; - unsigned int opt; - if(!engine) { iohandler_init_engine(); if(!engine) return NULL; } - memset (&hints, 0, sizeof (hints)); - hints.ai_family = PF_UNSPEC; - hints.ai_socktype = SOCK_STREAM; - hints.ai_flags |= AI_CANONNAME; - if (getaddrinfo (hostname, NULL, &hints, &res)) { - return NULL; - } - while (res) { - switch (res->ai_family) { - case AF_INET: - ip4 = (struct sockaddr_in *) res->ai_addr; - break; - case AF_INET6: - ip6 = (struct sockaddr_in6 *) res->ai_addr; - break; - } - res = res->ai_next; - freeaddrinfo(res); - } - if(ip6 && (flags & IOHANDLER_LISTEN_IPV6)) { - sockfd = socket(AF_INET6, SOCK_STREAM, 0); - if(sockfd == -1) return NULL; - - opt = 1; - setsockopt(sockfd, SOL_SOCKET, SO_REUSEADDR, (const char*)&opt, sizeof(opt)); - - ip6->sin6_family = AF_INET6; - ip6->sin6_port = htons(port); - - bind(sockfd, (struct sockaddr*)ip6, sizeof(*ip6)); - } else if(ip4 && (flags & IOHANDLER_LISTEN_IPV4)) { - sockfd = socket(AF_INET, SOCK_STREAM, 0); - if(sockfd == -1) return NULL; - - opt = 1; - setsockopt(sockfd, SOL_SOCKET, SO_REUSEADDR, (const char*)&opt, sizeof(opt)); - - ip4->sin_family = AF_INET; - ip4->sin_port = htons(port); + //non-blocking listening socket + struct IODescriptorStartup *iostartup; + struct IODescriptor *iofd; + + iofd = calloc(1, sizeof(*iofd)); + if(!iofd) { - bind(sockfd, (struct sockaddr*)ip4, sizeof(*ip4)); - } else return NULL; - //prevent SIGPIPE - #ifndef WIN32 - #if defined(SO_NOSIGPIPE) - { - int set = 1; - setsockopt(sockfd, SOL_SOCKET, SO_NOSIGPIPE, (void *)&set, sizeof(int)); - } - #else - signal(SIGPIPE, SIG_IGN); - #endif - #endif - //make sockfd unblocking - #if defined(F_GETFL) - { - int flag; - flag = fcntl(sockfd, F_GETFL); - fcntl(sockfd, F_SETFL, flag|O_NONBLOCK); - flag = fcntl(sockfd, F_GETFD); - fcntl(sockfd, F_SETFD, flag|FD_CLOEXEC); } - #else - /* I hope you're using the Win32 backend or something else that - * automatically marks the file descriptor non-blocking... - */ - #endif - descriptor = iohandler_add(sockfd, IOTYPE_SERVER, NULL, callback); - if(!descriptor) { - close(sockfd); + iostartup = calloc(1, sizeof(*iostartup)); + if(!iostartup) { + return NULL; } - listen(sockfd, 1); - descriptor->state = IO_LISTENING; - engine->update(descriptor); - iohandler_log(IOLOG_DEBUG, "added server socket (%d) listening on %s:%d", sockfd, hostname, port); - return descriptor; + + iostartup->iofd = iofd; + iofd->fd.iostartup = iostartup; + + iofd->type = IOTYPE_SERVER; + iofd->state = IO_PENDING; + iofd->callback = callback; + + iostartup->srchost = (hostname ? strdup(hostname) : NULL); + iostartup->port = port; + iostartup->iptype = ((flags & IOHANDLER_LISTEN_IPV6) ? IODNS_RECORD_AAAA : 0) | ((flags & IOHANDLER_LISTEN_IPV4) ? IODNS_RECORD_A : 0); + iostartup->ssl = 0; + iostartup->listening = 1; + + //DNS Requests + if(hostname) { + iostartup->iodns = iodns_getaddrinfo(hostname, iostartup->iptype, iohandler_dns_callback); + iostartup->flags |= IOSTARTUP_SRCLOOKUP; + iostartup->iodns->data = iostartup; + iohandler_append_iostartup(iostartup); + } else + iohandler_process_iostartup(iostartup); + + iohandler_log(IOLOG_DEBUG, "added server socket listening on %s:%d", hostname, port); + return iofd; } struct IODescriptor *iohandler_listen_ssl(const char *hostname, unsigned int port, const char *certfile, const char *keyfile, iohandler_callback *callback) { @@ -536,14 +748,50 @@ struct IODescriptor *iohandler_listen_ssl(const char *hostname, unsigned int por } struct IODescriptor *iohandler_listen_ssl_flags(const char *hostname, unsigned int port, const char *certfile, const char *keyfile, iohandler_callback *callback, int flags) { - struct IODescriptor *descriptor = iohandler_listen_flags(hostname, port, callback, flags); - if(!descriptor) + if(!engine) { + iohandler_init_engine(); + if(!engine) return NULL; + } + + //non-blocking listening socket + struct IODescriptorStartup *iostartup; + struct IODescriptor *iofd; + + iofd = calloc(1, sizeof(*iofd)); + if(!iofd) { + return NULL; - //SSL Server Socket - iohandler_ssl_listen(descriptor, certfile, keyfile); - if(descriptor->sslnode) - descriptor->ssl = 1; - return descriptor; + } + iostartup = calloc(1, sizeof(*iostartup)); + if(!iostartup) { + + return NULL; + } + + iostartup->iofd = iofd; + iofd->fd.iostartup = iostartup; + + iofd->type = IOTYPE_SERVER; + iofd->state = IO_PENDING; + iofd->callback = callback; + + iostartup->srchost = (hostname ? strdup(hostname) : NULL); + iostartup->port = port; + iostartup->iptype = ((flags & IOHANDLER_LISTEN_IPV6) ? IODNS_RECORD_AAAA : 0) | ((flags & IOHANDLER_LISTEN_IPV4) ? IODNS_RECORD_A : 0); + iostartup->ssl = 1; + iostartup->ssl_certfile = strdup(certfile); + iostartup->ssl_keyfile = strdup(keyfile); + iostartup->listening = 1; + + //DNS Requests + if(hostname) { + iostartup->iodns = iodns_getaddrinfo(hostname, iostartup->iptype, iohandler_dns_callback); + iohandler_append_iostartup(iostartup); + } else + iohandler_process_iostartup(iostartup); + + iohandler_log(IOLOG_DEBUG, "added SSL server socket listening on %s:%d", hostname, port); + return iofd; } void iohandler_write(struct IODescriptor *iofd, const char *line) { @@ -567,7 +815,11 @@ void iohandler_send(struct IODescriptor *iofd, const char *data, size_t datalen) } memcpy(iofd->writebuf.buffer + iofd->writebuf.bufpos, data, datalen); iofd->writebuf.bufpos += datalen; - engine->update(iofd); + struct IOLowlevelDescriptor *iold = IODESCRIPTOR_GET_IOLD(iofd); + if(iold) { + iold->flags |= IOFLAGS_WANT_WRITE; + engine->update(iold); + } } void iohandler_printf(struct IODescriptor *iofd, const char *text, ...) { @@ -585,78 +837,94 @@ void iohandler_printf(struct IODescriptor *iofd, const char *text, ...) { } static int iohandler_try_write(struct IODescriptor *iofd) { - if(!iofd->writebuf.bufpos) return 0; + struct IOLowlevelDescriptor *iold = IODESCRIPTOR_GET_IOLD(iofd); + if(!iofd->writebuf.bufpos || !iold) return 0; iohandler_log(IOLOG_DEBUG, "write writebuf (%d bytes) to socket (fd: %d)", iofd->writebuf.bufpos, iofd->fd); int res; if(iofd->ssl_active) res = iohandler_ssl_write(iofd, iofd->writebuf.buffer, iofd->writebuf.bufpos); else - res = send(iofd->fd, iofd->writebuf.buffer, iofd->writebuf.bufpos, 0); + res = send(iofd->fd.iold->fd, iofd->writebuf.buffer, iofd->writebuf.bufpos, 0); if(res < 0) { if (errno != EAGAIN && errno != EWOULDBLOCK) - iohandler_log(IOLOG_ERROR, "could not write to socket (fd: %d): %d - %s", iofd->fd, errno, strerror(errno)); + iohandler_log(IOLOG_ERROR, "could not write to socket (fd: %d): %d - %s", iofd->fd.iold->fd, errno, strerror(errno)); else res = 0; } else { iofd->writebuf.bufpos -= res; - if(iofd->state != IO_CLOSED) - engine->update(iofd); + if(iofd->writebuf.bufpos) + iold->flags |= IOFLAGS_WANT_WRITE; + else + iold->flags &= ~IOFLAGS_WANT_WRITE; + engine->update(iold); } return res; } void iohandler_close(struct IODescriptor *iofd) { + struct IOLowlevelDescriptor *iold = IODESCRIPTOR_GET_IOLD(iofd); int engine_remove = 1; iofd->state = IO_CLOSED; - if(iofd->writebuf.bufpos) { - //try to send everything before closing + if(iold) { + if(iofd->writebuf.bufpos) { + //try to send everything before closing #if defined(F_GETFL) - { - int flags; - flags = fcntl(iofd->fd, F_GETFL); - fcntl(iofd->fd, F_SETFL, flags & ~O_NONBLOCK); - flags = fcntl(iofd->fd, F_GETFD); - fcntl(iofd->fd, F_SETFD, flags|FD_CLOEXEC); - } + { + int flags; + flags = fcntl(iofd->fd, F_GETFL); + fcntl(iofd->fd, F_SETFL, flags & ~O_NONBLOCK); + flags = fcntl(iofd->fd, F_GETFD); + fcntl(iofd->fd, F_SETFD, flags|FD_CLOEXEC); + } #else - engine_remove = 0; - engine->remove(iofd); + engine_remove = 0; + engine->remove(iofd); #endif - iohandler_try_write(iofd); + iohandler_try_write(iofd); + } + if(iofd->ssl) + iohandler_ssl_disconnect(iofd); + if((iofd->type == IOTYPE_SERVER || iofd->type == IOTYPE_CLIENT || iofd->type == IOTYPE_STDIN)) + close(iold->fd); + if(engine_remove) + engine->remove(iold); + if(iofd->flags & IOFDFLAGS_FREE_LOCK) + iofd->flags |= IOFDFLAGS_WANT_FREE; + else + iohandler_remove(iold); + } else { + if(iofd->flags & IOFDFLAGS_FREE_LOCK) + iofd->flags |= IOFDFLAGS_WANT_FREE; + else + iohandler_remove_iostartup(iofd->fd.iostartup); } - //close IODescriptor - if(iofd->ssl) - iohandler_ssl_disconnect(iofd); - if(iofd->type == IOTYPE_SERVER || iofd->type == IOTYPE_CLIENT || iofd->type == IOTYPE_STDIN) - close(iofd->fd); - iohandler_remove(iofd, engine_remove); } void iohandler_update(struct IODescriptor *iofd) { - iohandler_log(IOLOG_DEBUG, "external call to iohandler_update (fd: %d)", iofd->fd); - engine->update(iofd); -} - -static void iohandler_trigger_event(struct IOEvent *event) { - if(!event->iofd->callback) return; - iohandler_log(IOLOG_DEBUG, "triggering event (%s) for %s (fd: %d)", iohandler_ioeventtype_name(event->type), iohandler_iotype_name(event->iofd->type), event->iofd->fd); - event->iofd->callback(event); + iohandler_log(IOLOG_DEBUG, "external call to iohandler_update"); + struct IOLowlevelDescriptor *iold = IODESCRIPTOR_GET_IOLD(iofd); + if(iold) + engine->update(iold); } void iohandler_events(struct IODescriptor *iofd, int readable, int writeable) { + struct IOLowlevelDescriptor *iold = IODESCRIPTOR_GET_IOLD(iofd); struct IOEvent callback_event; callback_event.type = IOEVENT_IGNORE; callback_event.iofd = iofd; + int remove_iofd = 0; + if(!iold) + return; switch(iofd->state) { case IO_SSLWAIT: if(!readable && !writeable) { - if(!iofd->ssl_server_hs) { + if(!(iofd->flags & IOFDFLAGS_SSL_SERVER_HS)) { callback_event.type = IOEVENT_SSLFAILED; iofd->state = IO_CLOSED; - engine->update(iofd); - } else - iohandler_close(iofd); - } else if(iofd->ssl_server_hs) { + engine->update(iold); + } + remove_iofd = 1; + } else if((iofd->flags & IOFDFLAGS_SSL_SERVER_HS)) { iohandler_log(IOLOG_DEBUG, "triggering iohandler_ssl_server_handshake for %s (fd: %d)", iohandler_iotype_name(iofd->type), iofd->fd); iohandler_ssl_server_handshake(iofd); } else { @@ -665,19 +933,25 @@ void iohandler_events(struct IODescriptor *iofd, int readable, int writeable) { } break; case IO_CLOSED: - if(iofd->type == IOTYPE_TIMER) + if(iofd->type == IOTYPE_TIMER) // Timers will be removed by IOEngine callback_event.type = IOEVENT_TIMEOUT; + else + remove_iofd = 1; break; case IO_LISTENING: if(readable) { - callback_event.data.accept_fd = accept(iofd->fd, NULL, 0); - if(callback_event.data.accept_fd < 0) { + int accept_fd = accept(iold->fd, NULL, 0); + if(accept_fd < 0) { iohandler_log(IOLOG_ERROR, "could not accept client (server fd: %d): %d - %s", iofd->fd, errno, strerror(errno)); - } else if(iofd->ssl) { - struct IODescriptor *client_iofd = iohandler_add(callback_event.data.accept_fd, IOTYPE_CLIENT, NULL, NULL); - iohandler_ssl_client_accepted(iofd, client_iofd); - } else - callback_event.type = IOEVENT_ACCEPT; + } else { + struct IODescriptor *client_iofd = iohandler_add(accept_fd, IOTYPE_CLIENT, NULL, NULL); + if(iofd->ssl) + iohandler_ssl_client_accepted(iofd, client_iofd); + else { + callback_event.type = IOEVENT_ACCEPT; + callback_event.data.accept_iofd = client_iofd; + } + } } break; case IO_CONNECTING: @@ -688,14 +962,15 @@ void iohandler_events(struct IODescriptor *iofd, int readable, int writeable) { //if (getsockopt(iofd->fd, SOL_SOCKET, SO_ERROR, &callback_event.data.errid, &arglen) < 0) // callback_event.data.errid = errno; iofd->state = IO_CLOSED; - engine->update(iofd); + engine->update(iold); + remove_iofd = 1; } else if(writeable) { - if(iofd->ssl && !iofd->ssl_active) { + if(iofd->ssl && !(iofd->flags & IOFDFLAGS_SSL_ACTIVE)) { iohandler_log(IOLOG_DEBUG, "triggering iohandler_ssl_connect for %s (fd: %d)", iohandler_iotype_name(iofd->type), iofd->fd); iohandler_ssl_connect(iofd); return; } - if(iofd->ssl && iofd->ssl_server_hs) { + if(iofd->ssl && (iofd->flags & IOFDFLAGS_SSL_SERVER_HS)) { callback_event.type = IOEVENT_SSLACCEPT; callback_event.iofd = iofd->data; callback_event.data.accept_iofd = iofd; @@ -704,7 +979,7 @@ void iohandler_events(struct IODescriptor *iofd, int readable, int writeable) { else callback_event.type = IOEVENT_CONNECTED; iofd->state = IO_CONNECTED; - engine->update(iofd); + engine->update(iold); } break; case IO_CONNECTED: @@ -712,28 +987,29 @@ void iohandler_events(struct IODescriptor *iofd, int readable, int writeable) { if(iofd->read_lines) { int bytes; - if(iofd->ssl_active) + if((iofd->flags & IOFDFLAGS_SSL_ACTIVE)) bytes = iohandler_ssl_read(iofd, iofd->readbuf.buffer + iofd->readbuf.bufpos, iofd->readbuf.buflen - iofd->readbuf.bufpos); else { if(iofd->type == IOTYPE_STDIN) #ifdef WIN32 bytes = readable; #else - bytes = read(iofd->fd, iofd->readbuf.buffer + iofd->readbuf.bufpos, iofd->readbuf.buflen - iofd->readbuf.bufpos); + bytes = read(iold->fd, iofd->readbuf.buffer + iofd->readbuf.bufpos, iofd->readbuf.buflen - iofd->readbuf.bufpos); #endif else - bytes = recv(iofd->fd, iofd->readbuf.buffer + iofd->readbuf.bufpos, iofd->readbuf.buflen - iofd->readbuf.bufpos, 0); + bytes = recv(iold->fd, iofd->readbuf.buffer + iofd->readbuf.bufpos, iofd->readbuf.buflen - iofd->readbuf.bufpos, 0); } if(bytes <= 0) { if (errno != EAGAIN || errno != EWOULDBLOCK) { iofd->state = IO_CLOSED; - engine->update(iofd); + engine->update(iold); callback_event.type = IOEVENT_CLOSED; callback_event.data.errid = errno; + remove_iofd = 1; } } else { int i, used_bytes = 0; - iohandler_log(IOLOG_DEBUG, "received %d bytes (fd: %d). readbuf position: %d", bytes, iofd->fd, iofd->readbuf.bufpos); + iohandler_log(IOLOG_DEBUG, "received %d bytes (fd: %d). readbuf position: %d", bytes, iold->fd, iofd->readbuf.bufpos); iofd->readbuf.bufpos += bytes; callback_event.type = IOEVENT_RECV; for(i = 0; i < iofd->readbuf.bufpos; i++) { @@ -744,7 +1020,11 @@ void iohandler_events(struct IODescriptor *iofd, int readable, int writeable) { callback_event.data.recv_str = iofd->readbuf.buffer + used_bytes; iohandler_log(IOLOG_DEBUG, "parsed line (%d bytes): %s", i - used_bytes, iofd->readbuf.buffer + used_bytes); used_bytes = i+1; + iofd->flags |= IOFDFLAGS_FREE_LOCK; iohandler_trigger_event(&callback_event); + iofd->flags &= ~IOFDFLAGS_FREE_LOCK; + if(iofd->flags & IOFDFLAGS_WANT_FREE) + break; } else if(i + 1 - used_bytes >= IO_LINE_LEN) { //512 max iofd->readbuf.buffer[i] = 0; callback_event.data.recv_str = iofd->readbuf.buffer + used_bytes; @@ -755,7 +1035,11 @@ void iohandler_events(struct IODescriptor *iofd, int readable, int writeable) { } } used_bytes = i+1; + iofd->flags |= IOFDFLAGS_FREE_LOCK; iohandler_trigger_event(&callback_event); + iofd->flags &= ~IOFDFLAGS_FREE_LOCK; + if(iofd->flags & IOFDFLAGS_WANT_FREE) + break; } } if(used_bytes) { @@ -778,17 +1062,27 @@ void iohandler_events(struct IODescriptor *iofd, int readable, int writeable) { bytes = iohandler_try_write(iofd); if(bytes < 0) { iofd->state = IO_CLOSED; - engine->update(iofd); + engine->update(iold); callback_event.type = IOEVENT_CLOSED; callback_event.data.errid = errno; + remove_iofd = 1; } } break; + case IO_PENDING: + /* nothing to do for pending descriptors */ + break; } + iofd->flags |= IOFDFLAGS_FREE_LOCK; if(callback_event.type == IOEVENT_IGNORE && !readable && !writeable) callback_event.type = IOEVENT_TIMEOUT; if(callback_event.type != IOEVENT_IGNORE) iohandler_trigger_event(&callback_event); + iofd->flags &= ~IOFDFLAGS_FREE_LOCK; + if(iofd->flags & IOFDFLAGS_WANT_FREE) + iohandler_remove(iold); + else if(remove_iofd) + iohandler_close(iofd); } void iohandler_poll() { @@ -799,6 +1093,7 @@ void iohandler_poll() { } void iohandler_poll_timeout(struct timeval timeout) { + iodns_poll(); if(engine) { IOSYNCHRONIZE(io_poll_sync); //quite senceless multithread support... better support will follow engine->loop(&timeout);