From d1112e33f9cd36560650f28a77d8d0b9716a1cea Mon Sep 17 00:00:00 2001 From: pk910 Date: Fri, 14 Mar 2014 23:36:58 +0100 Subject: [PATCH] [IOMultiplexerV2] completed c-ares DNS Backend & added multithreading to IODNSEngine_default --- configure.ac | 2 +- src/IOHandler/IODNSEngine_cares.c | 132 +++++++++++++++++++++++++++- src/IOHandler/IODNSEngine_default.c | 80 +++++++++++++---- src/IOHandler/IOHandler_config.h | 6 +- 4 files changed, 200 insertions(+), 20 deletions(-) diff --git a/configure.ac b/configure.ac index a1a544f..09c3ada 100644 --- a/configure.ac +++ b/configure.ac @@ -33,7 +33,7 @@ CFLAGS="$CFLAGS -D_GNU_SOURCE" AC_FUNC_MALLOC AC_CHECK_FUNCS([usleep select socket inet_pton inet_ntop]) -AC_CHECK_HEADERS([fcntl.h sys/socket.h sys/select.h sys/time.h sys/types.h unistd.h windows.h winsock2.h errno.h sys/epoll.h sys/event.h]) +AC_CHECK_HEADERS([fcntl.h sys/socket.h sys/select.h sys/time.h sys/types.h unistd.h windows.h winsock2.h errno.h sys/epoll.h sys/event.h ares.h]) AC_CHECK_LIB(ws2_32, main, [ LIBS="$LIBS -lws2_32" ], []) AC_CHECK_LIB(ssl, SSL_read, [ diff --git a/src/IOHandler/IODNSEngine_cares.c b/src/IOHandler/IODNSEngine_cares.c index 447d8c1..fd5cf48 100644 --- a/src/IOHandler/IODNSEngine_cares.c +++ b/src/IOHandler/IODNSEngine_cares.c @@ -32,14 +32,24 @@ #include #elif defined HAVE_SYS_SELECT_H #include +#include +#include #endif +#include "compat/inet.h" + struct dnsengine_cares_socket { struct _IOSocket *iosock; int want_read : 1; int want_write : 1; }; +struct dnsengine_cares_query { + int query_count; + int query_successful; + struct _IODNSQuery *iodns; +}; + static IOTIMER_CALLBACK(dnsengine_cares_timer_callback); static ares_channel dnsengine_cares_channel; @@ -57,7 +67,7 @@ static int dnsengine_cares_init() { iolog_trigger(IOLOG_ERROR, "Failed to initialize c-ares in %s:%d", __FILE__, __LINE__); return 0; } - return 0; /* backend not completed */ + return 1; } static void dnsengine_cares_update_sockets() { @@ -120,6 +130,7 @@ static void dnsengine_cares_update_sockets() { //set up iosock iosock->socket_flags |= IOSOCKETFLAG_PARENT_DNSENGINE | IOSOCKETFLAG_OVERRIDE_WANT_RW; iosock->fd = ares_socks[i]; + dnsengine_cares_sockets[sockid].iosock = iosock; dnsengine_cares_sockets[sockid].want_read = 0; dnsengine_cares_sockets[sockid].want_write = 0; @@ -200,13 +211,128 @@ static void dnsengine_cares_stop() { } +static void dnsengine_cares_callback(void *arg, int status, int timeouts, struct hostent *host) { + struct dnsengine_cares_query *query = arg; + struct _IODNSQuery *iodns = query->iodns; + query->query_count--; + if(iodns) { + if(!(iodns->flags & IODNSFLAG_RUNNING)) { + // query stopped + query->iodns = NULL; + iodns = NULL; + iodns_free_result(iodns->result); + _free_dnsquery(iodns); + } + if(iodns && status == ARES_SUCCESS) { + if((iodns->type & IODNS_FORWARD)) { + char **h_addr; + for(h_addr = host->h_addr_list; *h_addr; h_addr++) { + struct IODNSResult *dnsresult = malloc(sizeof(*dnsresult)); + if(!dnsresult) { + iolog_trigger(IOLOG_ERROR, "Failed to allocate memory for IODNSResult in %s:%d", __FILE__, __LINE__); + goto dnsengine_cares_callback_finally; + } + + int sockaddrlen; + if(host->h_addrtype == AF_INET) { + dnsresult->type = IODNS_RECORD_A; + sockaddrlen = sizeof(struct sockaddr_in); + } else { + dnsresult->type = IODNS_RECORD_AAAA; + sockaddrlen = sizeof(struct sockaddr_in6); + } + dnsresult->result.addr.addresslen = sockaddrlen; + dnsresult->result.addr.address = malloc(sockaddrlen); + if(!dnsresult->result.addr.address) { + iolog_trigger(IOLOG_ERROR, "Failed to allocate memory for sockaddr in %s:%d", __FILE__, __LINE__); + goto dnsengine_cares_callback_finally; + } + void *target = (host->h_addrtype == AF_INET ? ((void *) &((struct sockaddr_in *)dnsresult->result.addr.address)->sin_addr) : ((void *) &((struct sockaddr_in6 *)dnsresult->result.addr.address)->sin6_addr)); + memcpy(target, *h_addr, host->h_length); + + if(host->h_addrtype == AF_INET) { + char str[INET_ADDRSTRLEN]; + inet_ntop( AF_INET, &((struct sockaddr_in *)dnsresult->result.addr.address)->sin_addr, str, INET_ADDRSTRLEN ); + iolog_trigger(IOLOG_DEBUG, "Resolved %s to (A): %s", iodns->request.host, str); + } else { + char str[INET6_ADDRSTRLEN]; + inet_ntop( AF_INET6, &((struct sockaddr_in6 *)dnsresult->result.addr.address)->sin6_addr, str, INET6_ADDRSTRLEN ); + iolog_trigger(IOLOG_DEBUG, "Resolved %s to (AAAA): %s", iodns->request.host, str); + } + + dnsresult->next = iodns->result; + iodns->result = dnsresult; + } + + } else if((iodns->type & IODNS_REVERSE)) { + struct IODNSResult *dnsresult = malloc(sizeof(*dnsresult)); + if(!dnsresult) { + iolog_trigger(IOLOG_ERROR, "Failed to allocate memory for IODNSResult in %s:%d", __FILE__, __LINE__); + goto dnsengine_cares_callback_finally; + } + + dnsresult->type = IODNS_RECORD_PTR; + dnsresult->result.host = strdup(host->h_name); + if(!dnsresult->result.host) { + iolog_trigger(IOLOG_ERROR, "Failed to duplicate h_name string for IODNSResult in %s:%d", __FILE__, __LINE__); + goto dnsengine_cares_callback_finally; + } + + dnsresult->next = iodns->result; + iodns->result = dnsresult; + } + + query->query_successful++; + } + } + dnsengine_cares_callback_finally: + if(query->query_count <= 0) { + if(iodns) { + iodns->flags &= ~(IODNSFLAG_PROCESSING | IODNSFLAG_RUNNING); + iodns_event_callback(iodns, (query->query_successful ? IODNSEVENT_SUCCESS : IODNSEVENT_FAILED)); + } + free(query); + } +} static void dnsengine_cares_add(struct _IODNSQuery *iodns) { - /* TODO */ + struct dnsengine_cares_query *query = malloc(sizeof(*query)); + if(!query) { + iolog_trigger(IOLOG_ERROR, "Failed to allocate memory for dnsengine_cares_query in %s:%d", __FILE__, __LINE__); + iodns_event_callback(iodns, IODNSEVENT_FAILED); + return; + } + iodns->query = query; + query->query_count = 0; + query->query_successful = 0; + query->iodns = iodns; + iodns->flags |= IODNSFLAG_PROCESSING; + if((iodns->type & IODNS_FORWARD)) { + if((iodns->type & IODNS_RECORD_A)) { + query->query_count++; + ares_gethostbyname(dnsengine_cares_channel, iodns->request.host, AF_INET, dnsengine_cares_callback, query); + } + if((iodns->type & IODNS_RECORD_AAAA)) { + query->query_count++; + ares_gethostbyname(dnsengine_cares_channel, iodns->request.host, AF_INET6, dnsengine_cares_callback, query); + } + } else if((iodns->type & IODNS_REVERSE)) { + query->query_count++; + struct sockaddr *addr = iodns->request.addr.address; + if(addr->sa_family == AF_INET) { + struct sockaddr_in *addr4 = (struct sockaddr_in *) iodns->request.addr.address; + ares_gethostbyaddr(dnsengine_cares_channel, &addr4->sin_addr, sizeof(addr4->sin_addr), addr->sa_family, dnsengine_cares_callback, query); + } else { + struct sockaddr_in6 *addr6 = (struct sockaddr_in6 *)iodns->request.addr.address; + ares_gethostbyaddr(dnsengine_cares_channel, &addr6->sin6_addr, sizeof(addr6->sin6_addr), addr->sa_family, dnsengine_cares_callback, query); + } + } + dnsengine_cares_update_timeout(); + dnsengine_cares_update_sockets(); } static void dnsengine_cares_remove(struct _IODNSQuery *iodns) { - /* TODO */ + /* empty */ } static void dnsengine_cares_loop() { diff --git a/src/IOHandler/IODNSEngine_default.c b/src/IOHandler/IODNSEngine_default.c index 926a7d8..da10fac 100644 --- a/src/IOHandler/IODNSEngine_default.c +++ b/src/IOHandler/IODNSEngine_default.c @@ -37,8 +37,11 @@ #ifdef IODNS_USE_THREADS -static pthread_t iodns_thread; -static int iodns_thread_running = 1; +#define IODNS_MAX_THREAD 10 +#define IODNS_INC_THREAD_BY_LOAD 5 /* add another thread when there are more than IODNS_INC_THREAD_BY_LOAD querys per thread */ +static pthread_t *iodns_thread[IODNS_MAX_THREAD]; +static int iodns_threads_wanted = 1; +static int iodns_threads_running = 0; static pthread_cond_t iodns_cond; static pthread_mutex_t iodns_sync, iodns_sync2; @@ -50,8 +53,13 @@ static void iodns_process_queries(); #ifdef IODNS_USE_THREADS static void *dnsengine_worker_main(void *arg) { struct _IODNSQuery *query; - while(iodns_thread_running) { + while(1) { IOSYNCHRONIZE(iodns_sync); + if(iodns_threads_wanted < iodns_threads_running) { + iodns_threads_running--; + break; + } + for(query = iodnsquery_first; query; query = query->next) { if((query->flags & IODNSFLAG_RUNNING)) break; @@ -60,11 +68,38 @@ static void *dnsengine_worker_main(void *arg) { if(!query) pthread_cond_wait(&iodns_cond, &iodns_sync2); - if(iodns_thread_running) - iodns_process_queries(); + if(iodns_threads_wanted < iodns_threads_running) { + iodns_threads_running--; + break; + } + + iodns_process_queries(); } return NULL; } + +static int dnsengine_default_start_worker() { + if(iodns_threads_wanted >= IODNS_MAX_THREAD-1) + return 0; + int i; + for(i = 0; i < IODNS_MAX_THREAD; i++) { + if(!iodns_thread[i]) + break; + } + if(i >= IODNS_MAX_THREAD) + return 0; + iodns_thread[i] = malloc(sizeof(**iodns_thread)); + if(!iodns_thread[i]) + return 0; + iodns_threads_wanted++; + if(pthread_create(iodns_thread[i], NULL, dnsengine_worker_main, NULL)) { + iodns_threads_wanted--; + iolog_trigger(IOLOG_ERROR, "could not create pthread in %s:%d (Returned: %i)", __FILE__, __LINE__, thread_err); + return 0; + } + iodns_threads_running++; + return 1; +} #endif static int dnsengine_default_init() { @@ -74,14 +109,9 @@ static int dnsengine_default_init() { IOTHREAD_MUTEX_INIT(iodns_sync); IOTHREAD_MUTEX_INIT(iodns_sync2); - iodns_thread_running = 1; - - int thread_err; - thread_err = pthread_create(&iodns_thread, NULL, dnsengine_worker_main, NULL); - if(thread_err) { - iolog_trigger(IOLOG_ERROR, "could not create pthread in %s:%d (Returned: %i)", __FILE__, __LINE__, thread_err); + if(!dnsengine_default_start_worker()) { iodns_loop_blocking = 1; - iodns_thread_running = 0; + iodns_threads_running = 0; } #else iodns_loop_blocking = 1; @@ -91,12 +121,19 @@ static int dnsengine_default_init() { static void dnsengine_default_stop() { #ifdef IODNS_USE_THREADS + int i; if(iodns_thread_running) { - iodns_thread_running = 0; + iodns_threads_wanted = 0; IOSYNCHRONIZE(iodns_sync2); - pthread_cond_signal(&iodns_cond); + pthread_cond_broadcast(&iodns_cond); IODESYNCHRONIZE(iodns_sync2); - pthread_join(iodns_thread, NULL); + for(i = 0; i < IODNS_MAX_THREAD; i++) { + if(iodns_thread[i]) { + pthread_join(*iodns_thread[i], NULL); + free(iodns_thread[i]); + iodns_thread[i] = NULL; + } + } } #endif } @@ -107,6 +144,18 @@ static void dnsengine_default_add(struct _IODNSQuery *iodns) { IOSYNCHRONIZE(iodns_sync2); pthread_cond_signal(&iodns_cond); IODESYNCHRONIZE(iodns_sync2); + + int querycount = 0; + for(iodns = iodnsquery_first; iodns; iodns = iodns->next) { + if(!(iodns->flags & IODNSFLAG_RUNNING)) + continue; + if((iodns->flags & IODNSFLAG_PROCESSING)) + continue; + querycount++; + } + if(querycount / iodns_threads_wanted > IODNS_INC_THREAD_BY_LOAD) { + dnsengine_default_start_worker(); + } } #endif } @@ -134,6 +183,7 @@ static void iodns_process_queries() { continue; if((iodns->flags & IODNSFLAG_PROCESSING)) continue; + iodns->flags |= IODNSFLAG_PROCESSING; IODESYNCHRONIZE(iodns_sync); diff --git a/src/IOHandler/IOHandler_config.h b/src/IOHandler/IOHandler_config.h index bffedc9..7e83d8c 100644 --- a/src/IOHandler/IOHandler_config.h +++ b/src/IOHandler/IOHandler_config.h @@ -17,7 +17,6 @@ /* required configure script checks AC_FUNC_MALLOC - AC_FUNC_CALLOC AC_CHECK_FUNCS([usleep select socket inet_pton inet_ntop]) AC_CHECK_HEADERS([fcntl.h sys/socket.h sys/select.h sys/time.h sys/types.h unistd.h windows.h winsock2.h errno.h sys/epoll.h sys/event.h]) @@ -34,6 +33,11 @@ LIBS="$LIBS -lpthread" ]) ]) + AC_CHECK_LIB(cares, ares_init, [ + AC_CHECK_HEADERS(ares.h, [ + LIBS="$LIBS -lcares" + ]) + ]) */ // configure config file #include "../../config.h" -- 2.20.1