-void iohandler_update(struct IODescriptor *iofd) {
- iohandler_log(IOLOG_DEBUG, "external call to iohandler_update (fd: %d)", iofd->fd);
- engine->update(iofd);
-}
-
-static void iohandler_trigger_event(struct IOEvent *event) {
- if(!event->iofd->callback) return;
- iohandler_log(IOLOG_DEBUG, "triggering event (%s) for %s (fd: %d)", iohandler_ioeventtype_name(event->type), iohandler_iotype_name(event->iofd->type), event->iofd->fd);
- event->iofd->callback(event);
-}
-
-void iohandler_events(struct IODescriptor *iofd, int readable, int writeable) {
- struct IOEvent callback_event;
- callback_event.type = IOEVENT_IGNORE;
- callback_event.iofd = iofd;
- switch(iofd->state) {
- case IO_SSLWAIT:
- if(!readable && !writeable) {
- callback_event.type = IOEVENT_SSLFAILED;
- iofd->state = IO_CLOSED;
- engine->update(iofd);
- } else {
- iohandler_log(IOLOG_DEBUG, "triggering iohandler_ssl_client_handshake for %s (fd: %d)", iohandler_iotype_name(iofd->type), iofd->fd);
- iohandler_ssl_client_handshake(iofd);
- }
- break;
- case IO_CLOSED:
- if(iofd->type == IOTYPE_TIMER)
- callback_event.type = IOEVENT_TIMEOUT;
- break;
- case IO_LISTENING:
- if(readable) {
- callback_event.data.accept_fd = accept(iofd->fd, NULL, 0);
- if(callback_event.data.accept_fd < 0) {
- iohandler_log(IOLOG_ERROR, "could not accept client (server fd: %d): %d - %s", iofd->fd, errno, strerror(errno));
- } else
- callback_event.type = IOEVENT_ACCEPT;
- }
- break;
- case IO_CONNECTING:
- if(readable) { //could not connect
- callback_event.type = IOEVENT_NOTCONNECTED;
- //socklen_t arglen;
- //arglen = sizeof(callback_event.data.errid);
- //if (getsockopt(iofd->fd, SOL_SOCKET, SO_ERROR, &callback_event.data.errid, &arglen) < 0)
- // callback_event.data.errid = errno;
- iofd->state = IO_CLOSED;
- engine->update(iofd);
- } else if(writeable) {
- if(iofd->ssl && !iofd->ssl_active) {
- iohandler_log(IOLOG_DEBUG, "triggering iohandler_ssl_connect for %s (fd: %d)", iohandler_iotype_name(iofd->type), iofd->fd);
- iohandler_ssl_connect(iofd);
- return;
- }
- callback_event.type = IOEVENT_CONNECTED;
- iofd->state = IO_CONNECTED;
- engine->update(iofd);
- }
- break;
- case IO_CONNECTED:
- if(readable) {
- if(iofd->read_lines) {
- int bytes;
-
- if(iofd->ssl_active)
- bytes = iohandler_ssl_read(iofd, iofd->readbuf.buffer + iofd->readbuf.bufpos, iofd->readbuf.buflen - iofd->readbuf.bufpos);
- else {
- if(iofd->type == IOTYPE_STDIN)
- #ifdef WIN32
- bytes = readable;
- #else
- bytes = read(iofd->fd, iofd->readbuf.buffer + iofd->readbuf.bufpos, iofd->readbuf.buflen - iofd->readbuf.bufpos);
- #endif
- else
- bytes = recv(iofd->fd, iofd->readbuf.buffer + iofd->readbuf.bufpos, iofd->readbuf.buflen - iofd->readbuf.bufpos, 0);
- }
- if(bytes <= 0) {
- if (errno != EAGAIN || errno != EWOULDBLOCK) {
- iofd->state = IO_CLOSED;
- engine->update(iofd);
- callback_event.type = IOEVENT_CLOSED;
- callback_event.data.errid = errno;
- }
- } else {
- int i, used_bytes = 0;
- iohandler_log(IOLOG_DEBUG, "received %d bytes (fd: %d). readbuf position: %d", bytes, iofd->fd, iofd->readbuf.bufpos);
- iofd->readbuf.bufpos += bytes;
- callback_event.type = IOEVENT_RECV;
- for(i = 0; i < iofd->readbuf.bufpos; i++) {
- if(iofd->readbuf.buffer[i] == '\r' && iofd->readbuf.buffer[i+1] == '\n')
- iofd->readbuf.buffer[i] = 0;
- else if(iofd->readbuf.buffer[i] == '\n' || iofd->readbuf.buffer[i] == '\r') {
- iofd->readbuf.buffer[i] = 0;
- callback_event.data.recv_str = iofd->readbuf.buffer + used_bytes;
- iohandler_log(IOLOG_DEBUG, "parsed line (%d bytes): %s", i - used_bytes, iofd->readbuf.buffer + used_bytes);
- used_bytes = i+1;
- iohandler_trigger_event(&callback_event);
- } else if(i + 1 - used_bytes >= IO_LINE_LEN) { //512 max
- iofd->readbuf.buffer[i] = 0;
- callback_event.data.recv_str = iofd->readbuf.buffer + used_bytes;
- iohandler_log(IOLOG_DEBUG, "parsed and stripped line (%d bytes): %s", i - used_bytes, iofd->readbuf.buffer + used_bytes);
- for(; i < iofd->readbuf.bufpos; i++) { //skip the rest of the line
- if(iofd->readbuf.buffer[i] == '\n' || (iofd->readbuf.buffer[i] == '\r' && iofd->readbuf.buffer[i+1] != '\n')) {
- break;
- }
- }
- used_bytes = i+1;
- iohandler_trigger_event(&callback_event);
- }
- }
- if(used_bytes) {
- if(used_bytes == iofd->readbuf.bufpos) {
- iofd->readbuf.bufpos = 0;
- iohandler_log(IOLOG_DEBUG, "readbuf fully processed (set buffer position to 0)");
- } else {
- iohandler_log(IOLOG_DEBUG, "readbuf rest: %d bytes (used %d bytes)", iofd->readbuf.bufpos - used_bytes, used_bytes);
- memmove(iofd->readbuf.buffer, iofd->readbuf.buffer + used_bytes, iofd->readbuf.bufpos - used_bytes);
- iofd->readbuf.bufpos -= used_bytes;
- }
- }
- callback_event.type = IOEVENT_IGNORE;
- }
- } else
- callback_event.type = IOEVENT_READABLE;
- }
- if(writeable) {
- int bytes;
- bytes = iohandler_try_write(iofd);
- if(bytes < 0) {
- iofd->state = IO_CLOSED;
- engine->update(iofd);
- callback_event.type = IOEVENT_CLOSED;
- callback_event.data.errid = errno;
- }
- }
- break;
- }
- if(callback_event.type == IOEVENT_IGNORE && !readable && !writeable)
- callback_event.type = IOEVENT_TIMEOUT;
- if(callback_event.type != IOEVENT_IGNORE)
- iohandler_trigger_event(&callback_event);
-}
-
-void iohandler_poll() {
- struct timeval timeout;
- timeout.tv_sec = IO_MAX_TIMEOUT;
- timeout.tv_usec = 0;
- iohandler_poll_timeout(timeout);
-}
-
-void iohandler_poll_timeout(struct timeval timeout) {
- if(engine) {
- IOSYNCHRONIZE(io_poll_sync); //quite senceless multithread support... better support will follow
- engine->loop(&timeout);
- IODESYNCHRONIZE(io_poll_sync);
- }
-}
-
-//debugging functions
-char *iohandler_iotype_name(enum IOType type) {
- switch(type) {
- case IOTYPE_UNKNOWN:
- return "IOTYPE_UNKNOWN";
- case IOTYPE_SERVER:
- return "IOTYPE_SERVER";
- case IOTYPE_CLIENT:
- return "IOTYPE_CLIENT";
- case IOTYPE_STDIN:
- return "IOTYPE_STDIN";
- case IOTYPE_TIMER:
- return "IOTYPE_TIMER";
- default:
- return "(UNDEFINED)";
- }
-}
-
-char *iohandler_iostatus_name(enum IOStatus status) {
- switch(status) {
- case IO_CLOSED:
- return "IO_CLOSED";
- case IO_LISTENING:
- return "IO_LISTENING";
- case IO_CONNECTING:
- return "IO_CONNECTING";
- case IO_CONNECTED:
- return "IO_CONNECTED";
- case IO_SSLWAIT:
- return "IO_SSLWAIT";
- default:
- return "(UNDEFINED)";
+static void iohandler_worker(void *tptr) {
+ struct IOHandlerThread *thread = tptr;
+
+ #ifdef HAVE_PTHREAD_H
+ if(!thread->main) {
+ thread->id = pthread_self_tid();
+ }
+ #endif
+
+ while(!thread->shutdown) { // endless loop
+ if(thread->main && iohandler_treads != iohandler_running) {
+ IOSYNCHRONIZE(iothread_sync);
+ #ifdef HAVE_PTHREAD_H
+ int i;
+ if(iohandler_treads > iohandler_running) {
+ for(i = 0; i < (iohandler_treads - iohandler_running); i++)
+ iohandler_start_worker();
+ }
+ if(iohandler_treads < iohandler_running) {
+ struct IOHandlerThread *cthread;
+ for(i = 0; i < (iohandler_running - iohandler_treads); i++) {
+ for(cthread = threads; cthread; cthread = cthread->next) {
+ if(cthread->main)
+ continue;
+ cthread->shutdown = 1;
+ iolog_trigger(IOLOG_ERROR, "Thread %d marked for shutdown.", cthread->id);
+ }
+ if(cthread)
+ iohandler_running--;
+ }
+ }
+ #endif
+ if(iohandler_treads == 0) {
+ #ifdef HAVE_PTHREAD_H
+ struct IOHandlerThread *cthread;
+ for(cthread = threads; cthread; cthread = cthread->next) {
+ if(cthread->main)
+ continue;
+ cthread->shutdown = 1;
+ pthread_join(cthread->thread, NULL);
+ }
+ #endif
+ thread->shutdown = 1;
+ IODESYNCHRONIZE(iothread_sync);
+ break;
+ }
+ IODESYNCHRONIZE(iothread_sync);
+ }
+ if(!thread->run) {
+ usleep(500000); // 500ms
+ continue;
+ }
+
+ // iohandler calls
+ iogc_exec();
+ iodns_poll();
+
+ }
+ IOSYNCHRONIZE(iothread_sync);
+ if(thread == threads) {
+ threads = thread->next;
+ } else {
+ struct IOHandlerThread *cthread;
+ for(cthread = threads; cthread; cthread = cthread->next) {
+ if(cthread->next == thread) {
+ cthread->next = thread->next;
+ break;
+ }
+ }
+ }
+ iolog_trigger(IOLOG_DEBUG, "Thread %d stopped.", thread->id);
+ free(thread);
+ IODESYNCHRONIZE(iothread_sync);
+}
+
+void iohandler_run() {
+ if(!iohandler_running)
+ return;
+ iohandler_running = 1;
+
+ struct IOHandlerThread *mainthread = calloc(1, sizeof(*mainthread));
+ if(!mainthread) {
+ iolog_trigger(IOLOG_ERROR, "could not allocate memory for IOHandlerThread in %s:%d", __FILE__, __LINE__);
+ return;