[IOMultiplexer] do not request events from closed descriptors
[NeonServV5.git] / src / IOHandler.c
1 /* IOHandler.c - IOMultiplexer
2  * Copyright (C) 2012  Philipp Kreil (pk910)
3  * 
4  * This program is free software: you can redistribute it and/or modify
5  * it under the terms of the GNU General Public License as published by
6  * the Free Software Foundation, either version 3 of the License, or
7  * (at your option) any later version.
8  * 
9  * This program is distributed in the hope that it will be useful,
10  * but WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12  * GNU General Public License for more details.
13  * 
14  * You should have received a copy of the GNU General Public License 
15  * along with this program. If not, see <http://www.gnu.org/licenses/>. 
16  */
17 #include "IOHandler.h"
18 #include "IOEngine.h"
19 #include "IOHandler_SSL.h"
20 #include <errno.h>
21 #include <stdio.h>
22 #include <unistd.h>
23 #ifdef WIN32
24 #define _WIN32_WINNT 0x501
25 #include <windows.h>
26 #include <winsock2.h>
27 #include <ws2tcpip.h>
28 #else
29 #include <stdarg.h>
30 #include <stdlib.h>
31 #include <string.h>
32 #include <netdb.h>
33 #include <sys/socket.h>
34 #include <netinet/in.h>
35 #include <netinet/tcp.h>
36 #endif
37
38 #define MAXLOG 1024
39 iohandler_log_callback *iolog_backend = NULL;
40
41 struct IODescriptor *first_descriptor = NULL;
42 struct IODescriptor *timer_priority = NULL;
43
44 #ifdef HAVE_PTHREAD_H
45 static pthread_mutex_t io_thread_sync;
46 static pthread_mutex_t io_poll_sync;
47 #endif
48
49 void iohandler_log(enum IOLogType type, char *text, ...) {
50     va_list arg_list;
51     char logBuf[MAXLOG+1];
52     int pos;
53     logBuf[0] = '\0';
54     va_start(arg_list, text);
55     pos = vsnprintf(logBuf, MAXLOG - 1, text, arg_list);
56     va_end(arg_list);
57     if (pos < 0 || pos > (MAXLOG - 1)) pos = MAXLOG - 1;
58     logBuf[pos] = '\n';
59     logBuf[pos+1] = '\0';
60     
61     if(iolog_backend)
62         iolog_backend(type, logBuf);
63 }
64
65 /* IO Engines */
66 extern struct IOEngine engine_select; /* select system call (should always be useable) */
67 extern struct IOEngine engine_kevent;
68 extern struct IOEngine engine_epoll;
69 extern struct IOEngine engine_win32;
70
71 struct IOEngine *engine = NULL;
72
73 static void iohandler_init_engine() {
74     if(engine) return;
75     IOTHREAD_MUTEX_INIT(io_thread_sync);
76     IOTHREAD_MUTEX_INIT(io_poll_sync);
77     
78     //try other engines
79     if(!engine && engine_kevent.init && engine_kevent.init())
80         engine = &engine_kevent;
81     if(!engine && engine_epoll.init && engine_epoll.init())
82         engine = &engine_epoll;
83     if(!engine && engine_win32.init && engine_win32.init())
84         engine = &engine_win32;
85     
86     if (!engine) {
87         if(engine_select.init())
88             engine = &engine_select;
89         else {
90             iohandler_log(IOLOG_FATAL, "found no useable IO engine");
91             return;
92         }
93     }
94     iohandler_log(IOLOG_DEBUG, "using %s IO engine", engine->name);
95     iohandler_ssl_init();
96 }
97
98 static void iohandler_append(struct IODescriptor *descriptor) {
99     IOSYNCHRONIZE(io_thread_sync);
100     struct timeval *timeout = ((descriptor->timeout.tv_sec || descriptor->timeout.tv_usec) ? &descriptor->timeout : NULL);
101     if(timeout) {
102         struct IODescriptor *iofd;
103         int set_priority = 1;
104         descriptor->timeout = *timeout;
105         if(timer_priority)
106             iofd = timer_priority;
107         else
108             iofd = first_descriptor;
109         if(iofd) {
110             for(;;iofd = iofd->next) {
111                 if(timeval_is_smaler(timeout, (&iofd->timeout))) {
112                     descriptor->prev = iofd->prev;
113                     descriptor->next = iofd;
114                     if(iofd->prev)
115                         iofd->prev->next = descriptor;
116                     iofd->prev = descriptor;
117                     if(set_priority)
118                         timer_priority = descriptor;
119                     break;
120                 }
121                 if(iofd == timer_priority)
122                     set_priority = 0;
123                 if(iofd->next == NULL) {
124                     descriptor->next = NULL;
125                     descriptor->prev = iofd;
126                     iofd->next = descriptor;
127                     if(set_priority)
128                         timer_priority = descriptor;
129                     break;
130                 }
131             }
132         } else {
133             descriptor->prev = NULL;
134             descriptor->next = NULL;
135             first_descriptor = descriptor;
136             timer_priority = descriptor;
137         }
138         
139     } else {
140         descriptor->prev = NULL;
141         descriptor->next = first_descriptor;
142         if(first_descriptor)
143             first_descriptor->prev = descriptor;
144         first_descriptor = descriptor;
145     }
146     IODESYNCHRONIZE(io_thread_sync);
147 }
148
149 static void iohandler_remove(struct IODescriptor *descriptor, int engine_remove) {
150     //remove IODescriptor from the list
151     IOSYNCHRONIZE(io_thread_sync);
152     if(descriptor->prev)
153         descriptor->prev->next = descriptor->next;
154     else
155         first_descriptor = descriptor->next;
156     if(descriptor->next)
157         descriptor->next->prev = descriptor->prev;
158     if(descriptor == timer_priority)
159         timer_priority = descriptor->next;
160     
161     if(engine_remove)
162         engine->remove(descriptor);
163     if(descriptor->readbuf.buffer)
164         free(descriptor->readbuf.buffer);
165     if(descriptor->writebuf.buffer)
166         free(descriptor->writebuf.buffer);
167     iohandler_log(IOLOG_DEBUG, "removed IODescriptor (%d) of type `%s`", descriptor->fd, iohandler_iotype_name(descriptor->type));
168     free(descriptor);
169     IODESYNCHRONIZE(io_thread_sync);
170 }
171
172 struct IODescriptor *iohandler_add(int sockfd, enum IOType type, struct timeval *timeout, iohandler_callback *callback) {
173     //just add a new IODescriptor
174     struct IODescriptor *descriptor = calloc(1, sizeof(*descriptor));
175     if(!descriptor) {
176         iohandler_log(IOLOG_ERROR, "could not allocate memory for IODescriptor in %s:%d", __FILE__, __LINE__);
177         return NULL;
178     }
179     descriptor->fd = (type == IOTYPE_STDIN ? fileno(stdin) : sockfd);
180     descriptor->type = type;
181     descriptor->state = (type == IOTYPE_STDIN ? IO_CONNECTED : IO_CLOSED);
182     descriptor->callback = callback;
183     if(timeout)
184         descriptor->timeout = *timeout;
185     if(type != IOTYPE_TIMER) {
186         descriptor->readbuf.buffer = malloc(IO_READ_BUFLEN + 2);
187         descriptor->readbuf.bufpos = 0;
188         descriptor->readbuf.buflen = IO_READ_BUFLEN;
189         descriptor->writebuf.buffer = malloc(IO_READ_BUFLEN + 2);
190         descriptor->writebuf.bufpos = 0;
191         descriptor->writebuf.buflen = IO_READ_BUFLEN;
192     }
193     
194     if(!engine) {
195         iohandler_init_engine();
196         if(!engine) {
197             return NULL;
198         }
199     }
200     engine->add(descriptor);
201     
202     //add IODescriptor to the list
203     iohandler_append(descriptor);
204     
205     iohandler_log(IOLOG_DEBUG, "added custom socket descriptor (%d) as type `%s`", sockfd, iohandler_iotype_name(type));
206     return descriptor;
207 }
208
209 void iohandler_set_timeout(struct IODescriptor *descriptor, struct timeval *timeout) {
210     if(descriptor->prev)
211         descriptor->prev->next = descriptor->next;
212     else
213         first_descriptor = descriptor->next;
214     if(descriptor->next)
215         descriptor->next->prev = descriptor->prev;
216     if(descriptor == timer_priority)
217         timer_priority = descriptor->next;
218     if(timeout) 
219         descriptor->timeout = *timeout;
220     else {
221         descriptor->timeout.tv_sec = 0;
222         descriptor->timeout.tv_usec = 0;
223     }
224     iohandler_append(descriptor);
225 }
226
227 static void iohandler_increase_iobuf(struct IOBuffer *iobuf, size_t required) {
228     if(iobuf->buflen >= required) return;
229     char *new_buf = realloc(iobuf->buffer, required + 2);
230     if(new_buf) {
231         iobuf->buffer = new_buf;
232         iobuf->buflen = required;
233     }
234 }
235
236 struct IODescriptor *iohandler_timer(struct timeval timeout, iohandler_callback *callback) {
237     struct IODescriptor *descriptor;
238     descriptor = iohandler_add(-1, IOTYPE_TIMER, &timeout, callback);
239     if(!descriptor) {
240         iohandler_log(IOLOG_ERROR, "could not allocate memory for IODescriptor in %s:%d", __FILE__, __LINE__);
241         return NULL;
242     }
243     iohandler_log(IOLOG_DEBUG, "added timer descriptor (sec: %d; usec: %d)", timeout.tv_sec, timeout.tv_usec);
244     return descriptor;
245 }
246
247 struct IODescriptor *iohandler_connect(const char *hostname, unsigned int port, int ssl, const char *bindhost, iohandler_callback *callback) {
248     //non-blocking connect
249     int sockfd, result;
250     struct addrinfo hints, *res;
251     struct sockaddr_in *ip4 = NULL;
252     struct sockaddr_in6 *ip6 = NULL;
253     size_t dstaddrlen;
254     struct sockaddr *dstaddr = NULL;
255     struct IODescriptor *descriptor;
256     
257     if(!engine) {
258         iohandler_init_engine();
259         if(!engine) return NULL;
260     }
261     memset (&hints, 0, sizeof (hints));
262     hints.ai_family = PF_UNSPEC;
263     hints.ai_socktype = SOCK_STREAM;
264     hints.ai_flags |= AI_CANONNAME;
265     if ((result = getaddrinfo (hostname, NULL, &hints, &res))) {
266         iohandler_log(IOLOG_ERROR, "could not resolve %s to an IP address (%d)", hostname, result);
267         return NULL;
268     }
269     while (res) {
270         switch (res->ai_family) {
271         case AF_INET:
272             ip4 = (struct sockaddr_in *) res->ai_addr;
273             break;
274         case AF_INET6:
275             ip6 = (struct sockaddr_in6 *) res->ai_addr;
276             break;
277         }
278         res = res->ai_next;
279         freeaddrinfo(res);
280     }
281     
282     if(ip6) {
283         sockfd = socket(AF_INET6, SOCK_STREAM, 0);
284         if(sockfd == -1) {
285             iohandler_log(IOLOG_ERROR, "could not create socket in %s:%d", __FILE__, __LINE__);
286             return NULL;
287         }
288         
289         ip6->sin6_family = AF_INET6;
290         ip6->sin6_port = htons(port);
291         
292         struct sockaddr_in6 *ip6vhost = NULL;
293         if (bindhost && !getaddrinfo(bindhost, NULL, &hints, &res)) {
294             while (res) {
295                 switch (res->ai_family) {
296                 case AF_INET6:
297                     ip6vhost = (struct sockaddr_in6 *) res->ai_addr;
298                     break;
299                 }
300                 res = res->ai_next;
301                 freeaddrinfo(res);
302             }
303         }
304         if(ip6vhost) {
305             ip6vhost->sin6_family = AF_INET6;
306             ip6vhost->sin6_port = htons(0);
307             bind(sockfd, (struct sockaddr*)ip6vhost, sizeof(*ip6vhost));
308         }
309         dstaddr = (struct sockaddr*)ip6;
310         dstaddrlen = sizeof(*ip6);
311     } else if(ip4) {
312         sockfd = socket(AF_INET, SOCK_STREAM, 0);
313         if(sockfd == -1) {
314             iohandler_log(IOLOG_ERROR, "could not create socket in %s:%d", __FILE__, __LINE__);
315             return NULL;
316         }
317         
318         ip4->sin_family = AF_INET;
319         ip4->sin_port = htons(port);
320         
321         struct sockaddr_in *ip4vhost = NULL;
322         if (bindhost && !getaddrinfo(bindhost, NULL, &hints, &res)) {
323             while (res) {
324                 switch (res->ai_family) {
325                 case AF_INET:
326                     ip4vhost = (struct sockaddr_in *) res->ai_addr;
327                     break;
328                 }
329                 res = res->ai_next;
330                 freeaddrinfo(res);
331             }
332         }
333         if(ip4vhost) {
334             ip4vhost->sin_family = AF_INET;
335             ip4vhost->sin_port = htons(0);
336             bind(sockfd, (struct sockaddr*)ip4vhost, sizeof(*ip4vhost));
337         }
338         dstaddr = (struct sockaddr*)ip4;
339         dstaddrlen = sizeof(*ip4);
340     } else
341         return NULL;
342     //make sockfd unblocking
343 #if defined(F_GETFL)
344     {
345         int flags;
346         flags = fcntl(sockfd, F_GETFL);
347         fcntl(sockfd, F_SETFL, flags|O_NONBLOCK);
348         flags = fcntl(sockfd, F_GETFD);
349         fcntl(sockfd, F_SETFD, flags|FD_CLOEXEC);
350     }
351 #else
352     /* I hope you're using the Win32 backend or something else that
353      * automatically marks the file descriptor non-blocking...
354      */
355 #endif
356     descriptor = iohandler_add(sockfd, IOTYPE_CLIENT, NULL, callback);
357     if(!descriptor) {
358         close(sockfd);
359         return NULL;
360     }
361     connect(sockfd, dstaddr, dstaddrlen); //returns EINPROGRESS here (nonblocking)
362     descriptor->state = IO_CONNECTING;
363     descriptor->ssl = (ssl ? 1 : 0);
364     descriptor->read_lines = 1;
365     engine->update(descriptor);
366     iohandler_log(IOLOG_DEBUG, "added client socket (%d) connecting to %s:%d", sockfd, hostname, port);
367     return descriptor;
368 }
369
370 struct IODescriptor *iohandler_listen(const char *hostname, unsigned int port, iohandler_callback *callback) {
371     int sockfd;
372     struct addrinfo hints, *res;
373     struct sockaddr_in *ip4 = NULL;
374     struct sockaddr_in6 *ip6 = NULL;
375     struct IODescriptor *descriptor;
376     unsigned int opt;
377     
378     if(!engine) {
379         iohandler_init_engine();
380         if(!engine) return NULL;
381     }
382     memset (&hints, 0, sizeof (hints));
383     hints.ai_family = PF_UNSPEC;
384     hints.ai_socktype = SOCK_STREAM;
385     hints.ai_flags |= AI_CANONNAME;
386     if (getaddrinfo (hostname, NULL, &hints, &res)) {
387         return NULL;
388     }
389     while (res) {
390         switch (res->ai_family) {
391         case AF_INET:
392             ip4 = (struct sockaddr_in *) res->ai_addr;
393             break;
394         case AF_INET6:
395             ip6 = (struct sockaddr_in6 *) res->ai_addr;
396             break;
397         }
398         res = res->ai_next;
399         freeaddrinfo(res);
400     }
401     
402     if(ip6) {
403         sockfd = socket(AF_INET6, SOCK_STREAM, 0);
404         if(sockfd == -1) return NULL;
405         
406         opt = 1;
407         setsockopt(sockfd, SOL_SOCKET, SO_REUSEADDR, (const char*)&opt, sizeof(opt));
408         
409         ip6->sin6_family = AF_INET6;
410         ip6->sin6_port = htons(port);
411         
412         bind(sockfd, (struct sockaddr*)ip6, sizeof(*ip6));
413     } else if(ip4) {
414         sockfd = socket(AF_INET, SOCK_STREAM, 0);
415         if(sockfd == -1) return NULL;
416         
417         opt = 1;
418         setsockopt(sockfd, SOL_SOCKET, SO_REUSEADDR, (const char*)&opt, sizeof(opt));
419         
420         ip4->sin_family = AF_INET;
421         ip4->sin_port = htons(port);
422         
423         bind(sockfd, (struct sockaddr*)ip4, sizeof(*ip4));
424     } else
425         return NULL;
426     //make sockfd unblocking
427 #if defined(F_GETFL)
428     {
429         int flags;
430         flags = fcntl(sockfd, F_GETFL);
431         fcntl(sockfd, F_SETFL, flags|O_NONBLOCK);
432         flags = fcntl(sockfd, F_GETFD);
433         fcntl(sockfd, F_SETFD, flags|FD_CLOEXEC);
434     }
435 #else
436     /* I hope you're using the Win32 backend or something else that
437      * automatically marks the file descriptor non-blocking...
438      */
439 #endif
440     descriptor = iohandler_add(sockfd, IOTYPE_SERVER, NULL, callback);
441     if(!descriptor) {
442         close(sockfd);
443         return NULL;
444     }
445     listen(sockfd, 1);
446     descriptor->state = IO_LISTENING;
447     engine->update(descriptor);
448     iohandler_log(IOLOG_DEBUG, "added server socket (%d) listening on %s:%d", sockfd, hostname, port);
449     return descriptor;
450 }
451
452 void iohandler_write(struct IODescriptor *iofd, const char *line) {
453     size_t linelen = strlen(line);
454     iohandler_send(iofd, line, linelen);
455 }
456
457 void iohandler_send(struct IODescriptor *iofd, const char *data, size_t datalen) {
458     if(iofd->type == IOTYPE_TIMER || iofd->state == IO_CLOSED) {
459         iohandler_log(IOLOG_ERROR, "could not write to socket (%s)", (iofd->type == IOTYPE_TIMER ? "IOTYPE_TIMER" : "IO_CLOSED"));
460         return;
461     }
462     iohandler_log(IOLOG_DEBUG, "add %d to writebuf (fd: %d): %s", datalen, iofd->fd, data);
463     if(iofd->writebuf.buflen < iofd->writebuf.bufpos + datalen) {
464         iohandler_log(IOLOG_DEBUG, "increase writebuf (curr: %d) to %d (+%d bytes)", iofd->writebuf.buflen, iofd->writebuf.bufpos + datalen, (iofd->writebuf.bufpos + datalen - iofd->writebuf.buflen));
465         iohandler_increase_iobuf(&iofd->writebuf, iofd->writebuf.bufpos + datalen);
466         if(iofd->writebuf.buflen < iofd->writebuf.bufpos + datalen) {
467             iohandler_log(IOLOG_ERROR, "increase writebuf (curr: %d) to %d (+%d bytes) FAILED", iofd->writebuf.buflen, iofd->writebuf.bufpos + datalen, (iofd->writebuf.bufpos + datalen - iofd->writebuf.buflen));
468             return;
469         }
470     }
471     memcpy(iofd->writebuf.buffer + iofd->writebuf.bufpos, data, datalen);
472     iofd->writebuf.bufpos += datalen;
473     engine->update(iofd);
474 }
475
476 void iohandler_printf(struct IODescriptor *iofd, const char *text, ...) {
477     va_list arg_list;
478     char sendBuf[IO_LINE_LEN];
479     int pos;
480     sendBuf[0] = '\0';
481     va_start(arg_list, text);
482     pos = vsnprintf(sendBuf, IO_LINE_LEN - 2, text, arg_list);
483     va_end(arg_list);
484     if (pos < 0 || pos > (IO_LINE_LEN - 2)) pos = IO_LINE_LEN - 2;
485     sendBuf[pos] = '\n';
486     sendBuf[pos+1] = '\0';
487     iohandler_send(iofd, sendBuf, pos+1);
488 }
489
490 void iohandler_try_write(struct IODescriptor *iofd) {
491     if(!iofd->writebuf.bufpos) return;
492     iohandler_log(IOLOG_DEBUG, "write writebuf (%d bytes) to socket (fd: %d)", iofd->writebuf.bufpos, iofd->fd);
493     int res;
494     if(iofd->ssl_active)
495         res = iohandler_ssl_write(iofd, iofd->writebuf.buffer, iofd->writebuf.bufpos);
496     else
497         res = send(iofd->fd, iofd->writebuf.buffer, iofd->writebuf.bufpos, 0);
498     if(res < 0) {
499         if (errno != EAGAIN) {
500             iohandler_log(IOLOG_ERROR, "could not write to socket (fd: %d): %d - %s", iofd->fd, errno, strerror(errno));
501         }
502     } else {
503         iofd->writebuf.bufpos -= res;
504         if(iofd->state != IO_CLOSED)
505             engine->update(iofd);
506     }
507 }
508
509 void iohandler_close(struct IODescriptor *iofd) {
510     int engine_remove = 1;
511     iofd->state = IO_CLOSED;
512     if(iofd->writebuf.bufpos) {
513         //try to send everything before closing
514 #if defined(F_GETFL)
515         {
516             int flags;
517             flags = fcntl(iofd->fd, F_GETFL);
518             fcntl(iofd->fd, F_SETFL, flags & ~O_NONBLOCK);
519             flags = fcntl(iofd->fd, F_GETFD);
520             fcntl(iofd->fd, F_SETFD, flags|FD_CLOEXEC);
521         }
522 #else
523         engine_remove = 0;
524         engine->remove(iofd);
525 #endif
526         iohandler_try_write(iofd);
527     }
528     //close IODescriptor
529     if(iofd->ssl)
530         iohandler_ssl_disconnect(iofd);
531     if(iofd->type == IOTYPE_SERVER || iofd->type == IOTYPE_CLIENT || iofd->type == IOTYPE_STDIN)
532         close(iofd->fd);
533     iohandler_remove(iofd, engine_remove);
534 }
535
536 void iohandler_update(struct IODescriptor *iofd) {
537     iohandler_log(IOLOG_DEBUG, "external call to iohandler_update (fd: %d)", iofd->fd);
538     engine->update(iofd);
539 }
540
541 static void iohandler_trigger_event(struct IOEvent *event) {
542     if(!event->iofd->callback) return;
543     iohandler_log(IOLOG_DEBUG, "triggering event (%s) for %s (fd: %d)", iohandler_ioeventtype_name(event->type), iohandler_iotype_name(event->iofd->type), event->iofd->fd);
544     event->iofd->callback(event);
545 }
546
547 void iohandler_events(struct IODescriptor *iofd, int readable, int writeable) {
548     struct IOEvent callback_event;
549     callback_event.type = IOEVENT_IGNORE;
550     callback_event.iofd = iofd;
551     switch(iofd->state) {
552         case IO_SSLWAIT:
553             if(!readable && !writeable) {
554                 callback_event.type = IOEVENT_SSLFAILED;
555                 iofd->state = IO_CLOSED;
556                 engine->update(iofd);
557             } else {
558                 iohandler_log(IOLOG_DEBUG, "triggering iohandler_ssl_client_handshake for %s (fd: %d)", iohandler_iotype_name(iofd->type), iofd->fd);
559                 iohandler_ssl_client_handshake(iofd);
560             }
561             break;
562         case IO_CLOSED:
563             if(iofd->type == IOTYPE_TIMER)
564                 callback_event.type = IOEVENT_TIMEOUT;
565             break;
566         case IO_LISTENING:
567             if(readable) {
568                 callback_event.data.accept_fd = accept(iofd->fd, NULL, 0);
569                 if(callback_event.data.accept_fd < 0) {
570                     iohandler_log(IOLOG_ERROR, "could not accept client (server fd: %d): %d - %s", iofd->fd, errno, strerror(errno));
571                 } else
572                     callback_event.type = IOEVENT_ACCEPT;
573             }
574             break;
575         case IO_CONNECTING:
576             if(readable) { //could not connect
577                 callback_event.type = IOEVENT_NOTCONNECTED;
578                 //socklen_t arglen;
579                 //arglen = sizeof(callback_event.data.errid);
580                 //if (getsockopt(iofd->fd, SOL_SOCKET, SO_ERROR, &callback_event.data.errid, &arglen) < 0)
581                 //    callback_event.data.errid = errno;
582                 iofd->state = IO_CLOSED;
583                                 engine->update(iofd);
584             } else if(writeable) {
585                 if(iofd->ssl && !iofd->ssl_active) {
586                     iohandler_log(IOLOG_DEBUG, "triggering iohandler_ssl_connect for %s (fd: %d)", iohandler_iotype_name(iofd->type), iofd->fd);
587                     iohandler_ssl_connect(iofd);
588                     return;
589                 }
590                 callback_event.type = IOEVENT_CONNECTED;
591                 iofd->state = IO_CONNECTED;
592                 engine->update(iofd);
593             }
594             break;
595         case IO_CONNECTED:
596             if(readable) {
597                 if(iofd->read_lines) {
598                     int bytes;
599                     
600                     if(iofd->ssl_active)
601                         bytes = iohandler_ssl_read(iofd, iofd->readbuf.buffer + iofd->readbuf.bufpos, iofd->readbuf.buflen - iofd->readbuf.bufpos);
602                     else {
603                         if(iofd->type == IOTYPE_STDIN)
604                             #ifdef WIN32
605                             bytes = readable;
606                             #else
607                             bytes = read(iofd->fd, iofd->readbuf.buffer + iofd->readbuf.bufpos, iofd->readbuf.buflen - iofd->readbuf.bufpos);
608                             #endif
609                         else
610                             bytes = recv(iofd->fd, iofd->readbuf.buffer + iofd->readbuf.bufpos, iofd->readbuf.buflen - iofd->readbuf.bufpos, 0);
611                     }
612                     if(bytes <= 0) {
613                         if (errno != EAGAIN) {
614                             iofd->state = IO_CLOSED;
615                                                         engine->update(iofd);
616                             callback_event.type = IOEVENT_CLOSED;
617                             callback_event.data.errid = errno;
618                         }
619                     } else {
620                         int i, used_bytes = 0;
621                         iohandler_log(IOLOG_DEBUG, "received %d bytes (fd: %d). readbuf position: %d", bytes, iofd->fd, iofd->readbuf.bufpos);
622                         iofd->readbuf.bufpos += bytes;
623                         callback_event.type = IOEVENT_RECV;
624                         for(i = 0; i < iofd->readbuf.bufpos; i++) {
625                             if(iofd->readbuf.buffer[i] == '\r' && iofd->readbuf.buffer[i+1] == '\n')
626                                 iofd->readbuf.buffer[i] = 0;
627                             else if(iofd->readbuf.buffer[i] == '\n' || iofd->readbuf.buffer[i] == '\r') {
628                                 iofd->readbuf.buffer[i] = 0;
629                                 callback_event.data.recv_str = iofd->readbuf.buffer + used_bytes;
630                                 iohandler_log(IOLOG_DEBUG, "parsed line (%d bytes): %s", i - used_bytes, iofd->readbuf.buffer + used_bytes);
631                                 used_bytes = i+1;
632                                 iohandler_trigger_event(&callback_event);
633                             } else if(i + 1 - used_bytes >= IO_LINE_LEN) { //512 max
634                                 iofd->readbuf.buffer[i] = 0;
635                                 callback_event.data.recv_str = iofd->readbuf.buffer + used_bytes;
636                                 iohandler_log(IOLOG_DEBUG, "parsed and stripped line (%d bytes): %s", i - used_bytes, iofd->readbuf.buffer + used_bytes);
637                                 for(; i < iofd->readbuf.bufpos; i++) { //skip the rest of the line
638                                     if(iofd->readbuf.buffer[i] == '\n' || (iofd->readbuf.buffer[i] == '\r' && iofd->readbuf.buffer[i+1] != '\n')) {
639                                         break;
640                                     }
641                                 }
642                                 used_bytes = i+1;
643                                 iohandler_trigger_event(&callback_event);
644                             }
645                         }
646                         if(used_bytes) {
647                             if(used_bytes == iofd->readbuf.bufpos) {
648                                 iofd->readbuf.bufpos = 0;
649                                 iohandler_log(IOLOG_DEBUG, "readbuf fully processed (set buffer position to 0)");
650                             } else {
651                                 iohandler_log(IOLOG_DEBUG, "readbuf rest: %d bytes (used %d bytes)", iofd->readbuf.bufpos - used_bytes, used_bytes);
652                                 memmove(iofd->readbuf.buffer, iofd->readbuf.buffer + used_bytes, iofd->readbuf.bufpos - used_bytes);
653                                 iofd->readbuf.bufpos -= used_bytes;
654                             }
655                         }
656                         callback_event.type = IOEVENT_IGNORE;
657                     }
658                 } else
659                     callback_event.type = IOEVENT_READABLE;
660             }
661             if(writeable) {
662                 iohandler_try_write(iofd);
663             }
664             break;
665     }
666     if(callback_event.type == IOEVENT_IGNORE && !readable && !writeable) 
667         callback_event.type = IOEVENT_TIMEOUT;
668     if(callback_event.type != IOEVENT_IGNORE)
669         iohandler_trigger_event(&callback_event);
670 }
671
672 void iohandler_poll() {
673     if(engine) {
674         IOSYNCHRONIZE(io_poll_sync); //quite senceless multithread support... better support will follow
675         struct timeval timeout;
676         timeout.tv_sec = IO_MAX_TIMEOUT;
677         timeout.tv_usec = 0;
678         engine->loop(&timeout);
679         IODESYNCHRONIZE(io_poll_sync);
680     }
681 }
682
683 //debugging functions
684 char *iohandler_iotype_name(enum IOType type) {
685     switch(type) {
686         case IOTYPE_UNKNOWN:
687             return "IOTYPE_UNKNOWN";
688         case IOTYPE_SERVER:
689             return "IOTYPE_SERVER";
690         case IOTYPE_CLIENT:
691             return "IOTYPE_CLIENT";
692         case IOTYPE_STDIN:
693             return "IOTYPE_STDIN";
694         case IOTYPE_TIMER:
695             return "IOTYPE_TIMER";
696         default:
697             return "(UNDEFINED)";
698     }
699 }
700
701 char *iohandler_iostatus_name(enum IOStatus status) {
702     switch(status) {
703         case IO_CLOSED:
704             return "IO_CLOSED";
705         case IO_LISTENING:
706             return "IO_LISTENING";
707         case IO_CONNECTING:
708             return "IO_CONNECTING";
709         case IO_CONNECTED:
710             return "IO_CONNECTED";
711         case IO_SSLWAIT:
712             return "IO_SSLWAIT";
713         default:
714             return "(UNDEFINED)";
715     }
716 }
717
718 char *iohandler_ioeventtype_name(enum IOEventType type) {
719     switch(type) {
720         case IOEVENT_IGNORE:
721             return "IOEVENT_IGNORE";
722         case IOEVENT_READABLE:
723             return "IOEVENT_READABLE";
724         case IOEVENT_RECV:
725             return "IOEVENT_RECV";
726         case IOEVENT_CONNECTED:
727             return "IOEVENT_CONNECTED";
728         case IOEVENT_NOTCONNECTED:
729             return "IOEVENT_NOTCONNECTED";
730         case IOEVENT_CLOSED:
731             return "IOEVENT_CLOSED";
732         case IOEVENT_ACCEPT:
733             return "IOEVENT_ACCEPT";
734         case IOEVENT_TIMEOUT:
735             return "IOEVENT_TIMEOUT";
736         default:
737             return "(UNDEFINED)";
738     }
739 }