[IOMultiplexerV2] completed c-ares DNS Backend & added multithreading to IODNSEngine_...
authorpk910 <philipp@zoelle1.de>
Fri, 14 Mar 2014 22:36:58 +0000 (23:36 +0100)
committerpk910 <philipp@zoelle1.de>
Fri, 14 Mar 2014 23:07:44 +0000 (00:07 +0100)
configure.ac
src/IOHandler/IODNSEngine_cares.c
src/IOHandler/IODNSEngine_default.c
src/IOHandler/IOHandler_config.h

index a1a544f71adbce14841c647188b1a5a1a937a16b..09c3ada2ca367ab76f34f0206db1845063099b68 100644 (file)
@@ -33,7 +33,7 @@ CFLAGS="$CFLAGS -D_GNU_SOURCE"
 
 AC_FUNC_MALLOC
 AC_CHECK_FUNCS([usleep select socket inet_pton inet_ntop])
-AC_CHECK_HEADERS([fcntl.h sys/socket.h sys/select.h sys/time.h sys/types.h unistd.h windows.h winsock2.h errno.h sys/epoll.h sys/event.h])
+AC_CHECK_HEADERS([fcntl.h sys/socket.h sys/select.h sys/time.h sys/types.h unistd.h windows.h winsock2.h errno.h sys/epoll.h sys/event.h ares.h])
 
 AC_CHECK_LIB(ws2_32, main, [ LIBS="$LIBS -lws2_32" ], [])
 AC_CHECK_LIB(ssl, SSL_read, [
index 447d8c1ccb6af408196c8faaa66c53334f5ef5ee..fd5cf48bd8a0bcc152e66eabcb5dfc3ed7b79bd1 100644 (file)
 #include <winsock2.h>
 #elif defined HAVE_SYS_SELECT_H
 #include <sys/select.h>
+#include <netdb.h>
+#include <arpa/inet.h>
 #endif
 
+#include "compat/inet.h"
+
 struct dnsengine_cares_socket {
        struct _IOSocket *iosock;
        int want_read : 1;
        int want_write : 1;
 };
 
+struct dnsengine_cares_query {
+       int query_count;
+       int query_successful;
+       struct _IODNSQuery *iodns;
+};
+
 static IOTIMER_CALLBACK(dnsengine_cares_timer_callback);
 
 static ares_channel dnsengine_cares_channel;
@@ -57,7 +67,7 @@ static int dnsengine_cares_init() {
                iolog_trigger(IOLOG_ERROR, "Failed to initialize c-ares in %s:%d", __FILE__, __LINE__);
         return 0;
     }
-    return 0; /* backend not completed */
+    return 1;
 }
 
 static void dnsengine_cares_update_sockets() {
@@ -120,6 +130,7 @@ static void dnsengine_cares_update_sockets() {
                        //set up iosock
                        iosock->socket_flags |= IOSOCKETFLAG_PARENT_DNSENGINE | IOSOCKETFLAG_OVERRIDE_WANT_RW;
                        iosock->fd = ares_socks[i];
+                       dnsengine_cares_sockets[sockid].iosock = iosock;
                        dnsengine_cares_sockets[sockid].want_read = 0;
                        dnsengine_cares_sockets[sockid].want_write = 0;
                        
@@ -200,13 +211,128 @@ static void dnsengine_cares_stop() {
 }
 
 
+static void dnsengine_cares_callback(void *arg, int status, int timeouts, struct hostent *host) {
+       struct dnsengine_cares_query *query = arg;
+       struct _IODNSQuery *iodns = query->iodns;
+       query->query_count--;
+       if(iodns) {
+               if(!(iodns->flags & IODNSFLAG_RUNNING)) {
+                       // query stopped
+                       query->iodns = NULL;
+                       iodns = NULL;
+                       iodns_free_result(iodns->result);
+                       _free_dnsquery(iodns);
+               }
+               if(iodns && status == ARES_SUCCESS) {
+                       if((iodns->type & IODNS_FORWARD)) {
+                               char **h_addr;
+                               for(h_addr = host->h_addr_list; *h_addr; h_addr++) {
+                                       struct IODNSResult *dnsresult = malloc(sizeof(*dnsresult));
+                                       if(!dnsresult) {
+                                               iolog_trigger(IOLOG_ERROR, "Failed to allocate memory for IODNSResult in %s:%d", __FILE__, __LINE__);
+                                               goto dnsengine_cares_callback_finally;
+                                       }
+                                       
+                                       int sockaddrlen;
+                                       if(host->h_addrtype == AF_INET) {
+                                               dnsresult->type = IODNS_RECORD_A;
+                                               sockaddrlen = sizeof(struct sockaddr_in);
+                                       } else {
+                                               dnsresult->type = IODNS_RECORD_AAAA;
+                                               sockaddrlen = sizeof(struct sockaddr_in6);
+                                       }
+                                       dnsresult->result.addr.addresslen = sockaddrlen;
+                                       dnsresult->result.addr.address = malloc(sockaddrlen);
+                                       if(!dnsresult->result.addr.address) {
+                                               iolog_trigger(IOLOG_ERROR, "Failed to allocate memory for sockaddr in %s:%d", __FILE__, __LINE__);
+                                               goto dnsengine_cares_callback_finally;
+                                       }
+                                       void *target = (host->h_addrtype == AF_INET ? ((void *) &((struct sockaddr_in *)dnsresult->result.addr.address)->sin_addr) : ((void *) &((struct sockaddr_in6 *)dnsresult->result.addr.address)->sin6_addr));
+                                       memcpy(target, *h_addr, host->h_length);
+                                       
+                                       if(host->h_addrtype == AF_INET) {
+                                               char str[INET_ADDRSTRLEN];
+                                               inet_ntop( AF_INET, &((struct sockaddr_in *)dnsresult->result.addr.address)->sin_addr, str, INET_ADDRSTRLEN );
+                                               iolog_trigger(IOLOG_DEBUG, "Resolved %s to (A): %s", iodns->request.host, str);
+                                       } else {
+                                               char str[INET6_ADDRSTRLEN];
+                                               inet_ntop( AF_INET6, &((struct sockaddr_in6 *)dnsresult->result.addr.address)->sin6_addr, str, INET6_ADDRSTRLEN );
+                                               iolog_trigger(IOLOG_DEBUG, "Resolved %s to (AAAA): %s", iodns->request.host, str);
+                                       }
+                                       
+                                       dnsresult->next = iodns->result;
+                                       iodns->result = dnsresult;
+                               }
+                               
+                       } else if((iodns->type & IODNS_REVERSE)) {
+                               struct IODNSResult *dnsresult = malloc(sizeof(*dnsresult));
+                               if(!dnsresult) {
+                                       iolog_trigger(IOLOG_ERROR, "Failed to allocate memory for IODNSResult in %s:%d", __FILE__, __LINE__);
+                                       goto dnsengine_cares_callback_finally;
+                               }
+                               
+                               dnsresult->type = IODNS_RECORD_PTR;
+                               dnsresult->result.host = strdup(host->h_name);
+                               if(!dnsresult->result.host) {
+                                       iolog_trigger(IOLOG_ERROR, "Failed to duplicate h_name string for IODNSResult in %s:%d", __FILE__, __LINE__);
+                                       goto dnsengine_cares_callback_finally;
+                               }
+                               
+                               dnsresult->next = iodns->result;
+                               iodns->result = dnsresult;
+                       }
+                       
+                       query->query_successful++;
+               }
+       }
+       dnsengine_cares_callback_finally:
+       if(query->query_count <= 0) {
+               if(iodns) {
+                       iodns->flags &= ~(IODNSFLAG_PROCESSING | IODNSFLAG_RUNNING);
+                       iodns_event_callback(iodns, (query->query_successful ? IODNSEVENT_SUCCESS : IODNSEVENT_FAILED));
+               }
+               free(query);
+       }
+}
 
 static void dnsengine_cares_add(struct _IODNSQuery *iodns) {
-    /* TODO */
+       struct dnsengine_cares_query *query = malloc(sizeof(*query));
+       if(!query) {
+               iolog_trigger(IOLOG_ERROR, "Failed to allocate memory for dnsengine_cares_query in %s:%d", __FILE__, __LINE__);
+               iodns_event_callback(iodns, IODNSEVENT_FAILED);
+               return;
+       }
+       iodns->query = query;
+       query->query_count = 0;
+       query->query_successful = 0;
+       query->iodns = iodns;
+       iodns->flags |= IODNSFLAG_PROCESSING;
+       if((iodns->type & IODNS_FORWARD)) {
+               if((iodns->type & IODNS_RECORD_A)) {
+                       query->query_count++;
+                       ares_gethostbyname(dnsengine_cares_channel, iodns->request.host, AF_INET, dnsengine_cares_callback, query);
+               }
+               if((iodns->type & IODNS_RECORD_AAAA)) {
+                       query->query_count++;
+                       ares_gethostbyname(dnsengine_cares_channel, iodns->request.host, AF_INET6, dnsengine_cares_callback, query);
+               }
+       } else if((iodns->type & IODNS_REVERSE)) {
+               query->query_count++;
+               struct sockaddr *addr = iodns->request.addr.address;
+               if(addr->sa_family == AF_INET) {
+                       struct sockaddr_in *addr4 = (struct sockaddr_in *) iodns->request.addr.address;
+                       ares_gethostbyaddr(dnsengine_cares_channel, &addr4->sin_addr, sizeof(addr4->sin_addr), addr->sa_family, dnsengine_cares_callback, query);
+               } else {
+                       struct sockaddr_in6 *addr6 = (struct sockaddr_in6 *)iodns->request.addr.address;
+                       ares_gethostbyaddr(dnsengine_cares_channel, &addr6->sin6_addr, sizeof(addr6->sin6_addr), addr->sa_family, dnsengine_cares_callback, query);
+               }
+       }
+       dnsengine_cares_update_timeout();
+       dnsengine_cares_update_sockets();
 }
 
 static void dnsengine_cares_remove(struct _IODNSQuery *iodns) {
-    /* TODO */
+       /* empty */
 }
 
 static void dnsengine_cares_loop() {
index 926a7d8c729549447cbdaffe72e88853bae1554b..da10fac8a42af0b84e0423e8f714f052b547b132 100644 (file)
 
 
 #ifdef IODNS_USE_THREADS
-static pthread_t iodns_thread;
-static int iodns_thread_running = 1;
+#define IODNS_MAX_THREAD 10
+#define IODNS_INC_THREAD_BY_LOAD 5 /* add another thread when there are more than IODNS_INC_THREAD_BY_LOAD querys per thread */
+static pthread_t *iodns_thread[IODNS_MAX_THREAD];
+static int iodns_threads_wanted = 1;
+static int iodns_threads_running = 0;
 
 static pthread_cond_t iodns_cond;
 static pthread_mutex_t iodns_sync, iodns_sync2;
@@ -50,8 +53,13 @@ static void iodns_process_queries();
 #ifdef IODNS_USE_THREADS
 static void *dnsengine_worker_main(void *arg) {
        struct _IODNSQuery *query;
-       while(iodns_thread_running) {
+       while(1) {
                IOSYNCHRONIZE(iodns_sync);
+               if(iodns_threads_wanted < iodns_threads_running) {
+                       iodns_threads_running--;
+                       break;
+               }
+               
                for(query = iodnsquery_first; query; query = query->next) {
                        if((query->flags & IODNSFLAG_RUNNING))
                                break;
@@ -60,11 +68,38 @@ static void *dnsengine_worker_main(void *arg) {
                if(!query)
                        pthread_cond_wait(&iodns_cond, &iodns_sync2);
                
-               if(iodns_thread_running)
-                       iodns_process_queries();
+               if(iodns_threads_wanted < iodns_threads_running) {
+                       iodns_threads_running--;
+                       break;
+               }
+               
+               iodns_process_queries();
        }
        return NULL;
 }
+
+static int dnsengine_default_start_worker() {
+       if(iodns_threads_wanted >= IODNS_MAX_THREAD-1)
+               return 0;
+       int i;
+       for(i = 0; i < IODNS_MAX_THREAD; i++) {
+               if(!iodns_thread[i]) 
+                       break;
+       }
+       if(i >= IODNS_MAX_THREAD)
+               return 0;
+       iodns_thread[i] = malloc(sizeof(**iodns_thread));
+       if(!iodns_thread[i])
+               return 0;
+       iodns_threads_wanted++;
+       if(pthread_create(iodns_thread[i], NULL, dnsengine_worker_main, NULL)) {
+               iodns_threads_wanted--;
+               iolog_trigger(IOLOG_ERROR, "could not create pthread in %s:%d (Returned: %i)", __FILE__, __LINE__, thread_err);
+               return 0;
+       }
+       iodns_threads_running++;
+       return 1;
+}
 #endif
 
 static int dnsengine_default_init() {
@@ -74,14 +109,9 @@ static int dnsengine_default_init() {
        IOTHREAD_MUTEX_INIT(iodns_sync);
        IOTHREAD_MUTEX_INIT(iodns_sync2);
        
-       iodns_thread_running = 1;
-       
-       int thread_err;
-       thread_err = pthread_create(&iodns_thread, NULL, dnsengine_worker_main, NULL);
-       if(thread_err) {
-               iolog_trigger(IOLOG_ERROR, "could not create pthread in %s:%d (Returned: %i)", __FILE__, __LINE__, thread_err);
+       if(!dnsengine_default_start_worker()) {
                iodns_loop_blocking = 1;
-               iodns_thread_running = 0;
+               iodns_threads_running = 0;
        }
        #else
        iodns_loop_blocking = 1;
@@ -91,12 +121,19 @@ static int dnsengine_default_init() {
 
 static void dnsengine_default_stop() {
        #ifdef IODNS_USE_THREADS
+       int i;
        if(iodns_thread_running) {
-               iodns_thread_running = 0;
+               iodns_threads_wanted = 0;
                IOSYNCHRONIZE(iodns_sync2);
-               pthread_cond_signal(&iodns_cond);
+               pthread_cond_broadcast(&iodns_cond);
                IODESYNCHRONIZE(iodns_sync2);
-               pthread_join(iodns_thread, NULL);
+               for(i = 0; i < IODNS_MAX_THREAD; i++) {
+                       if(iodns_thread[i]) {
+                               pthread_join(*iodns_thread[i], NULL);
+                               free(iodns_thread[i]);
+                               iodns_thread[i] = NULL;
+                       }
+               }
        }
        #endif
 }
@@ -107,6 +144,18 @@ static void dnsengine_default_add(struct _IODNSQuery *iodns) {
                IOSYNCHRONIZE(iodns_sync2);
                pthread_cond_signal(&iodns_cond);
                IODESYNCHRONIZE(iodns_sync2);
+               
+               int querycount = 0;
+               for(iodns = iodnsquery_first; iodns; iodns = iodns->next) {
+                       if(!(iodns->flags & IODNSFLAG_RUNNING))
+                               continue;
+                       if((iodns->flags & IODNSFLAG_PROCESSING))
+                               continue;
+                       querycount++;
+               }
+               if(querycount / iodns_threads_wanted > IODNS_INC_THREAD_BY_LOAD) {
+                       dnsengine_default_start_worker();
+               }
        }
        #endif
 }
@@ -134,6 +183,7 @@ static void iodns_process_queries() {
                        continue;
                if((iodns->flags & IODNSFLAG_PROCESSING))
                        continue;
+               iodns->flags |= IODNSFLAG_PROCESSING;
                
                IODESYNCHRONIZE(iodns_sync);
                
index bffedc9c8bb6e64e5b39fbf61da54bca3a195845..7e83d8c295d08335bf8916acec3720f5bd4ab2ca 100644 (file)
@@ -17,7 +17,6 @@
 
 /* required configure script checks
  AC_FUNC_MALLOC
- AC_FUNC_CALLOC
  AC_CHECK_FUNCS([usleep select socket inet_pton inet_ntop])
  AC_CHECK_HEADERS([fcntl.h sys/socket.h sys/select.h sys/time.h sys/types.h unistd.h windows.h winsock2.h errno.h sys/epoll.h sys/event.h])
  
      LIBS="$LIBS -lpthread"
    ])
  ])
+ AC_CHECK_LIB(cares, ares_init, [
+   AC_CHECK_HEADERS(ares.h, [
+     LIBS="$LIBS -lcares"
+   ])
+ ])
 */
 // configure config file
 #include "../../config.h"