X-Git-Url: http://git.pk910.de/?a=blobdiff_plain;f=src%2Fioset.c;h=33622af1c55e39f19a298a2702444163d875b074;hb=HEAD;hp=4e8caf137d280e53311d1df31ec5cae223c75cf0;hpb=d2048f43a8d603f4d758c685f88eceaaebb2b606;p=srvx.git diff --git a/src/ioset.c b/src/ioset.c index 4e8caf1..33622af 100644 --- a/src/ioset.c +++ b/src/ioset.c @@ -1,11 +1,12 @@ /* ioset.h - srvx event loop - * Copyright 2002-2004 srvx Development Team + * Copyright 2002-2004, 2006 srvx Development Team * - * This program is free software; you can redistribute it and/or modify + * This file is part of srvx. + * + * srvx is free software; you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by * the Free Software Foundation; either version 2 of the License, or - * (at your option) any later version. Important limitations are - * listed in the COPYING file that accompanies this software. + * (at your option) any later version. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of @@ -13,10 +14,11 @@ * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License - * along with this program; if not, email srvx-maintainers@srvx.net. + * along with srvx; if not, write to the Free Software Foundation, + * Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA. */ -#include "ioset.h" +#include "ioset-impl.h" #include "log.h" #include "timeq.h" #include "saxdb.h" @@ -25,27 +27,67 @@ #ifdef HAVE_FCNTL_H #include #endif -#ifdef HAVE_SYS_SELECT_H -#include -#endif #ifdef HAVE_SYS_SOCKET_H #include #endif -#ifndef IOSET_DEBUG -#define IOSET_DEBUG 0 -#endif +#ifdef WITH_IOSET_WIN32 + +# undef errno +# define errno WSAGetLastError() +# undef EINPROGRESS +# define EINPROGRESS WSAEINPROGRESS +# undef EHOSTUNREACH +# define EHOSTUNREACH WSAEHOSTUNREACH +# undef ECONNREFUSED +# define ECONNREFUSED WSAECONNREFUSED +# undef EAGAIN +# define EAGAIN WSAEWOULDBLOCK +# define strerror wsa_strerror + +static const char * +wsa_strerror(int wsa_err) +{ + switch (wsa_err) + { + case WSAEINTR: return "Operation interrupted"; + case WSAEBADF: return "Bad file descriptor"; + case WSAEACCES: return "Permission denied"; + case WSAEFAULT: return "Invalid address"; + case WSAEINVAL: return "Invalid parameter"; + case WSAEMFILE: return "Too many open files"; + case WSAEWOULDBLOCK: return "Try again later"; + case WSAEINPROGRESS: return "Operation in progress"; + case WSAEALREADY: return "Operation already in progress"; + case WSAENOTSOCK: return "Not a socket"; + case WSAEDESTADDRREQ: return "Destination address required"; + case WSAEMSGSIZE: return "Invalid message size"; + case WSAEPROTOTYPE: return "Invalid protocol type for socket"; + case WSAENOPROTOOPT: return "Invalid protocol option"; + case WSAEPROTONOSUPPORT: return "Protocol not supported"; + case WSAEOPNOTSUPP: return "Operation not supported"; + case WSAEADDRINUSE: return "Address already in use"; + case WSAEADDRNOTAVAIL: return "Address not available"; + case WSAENETDOWN: return "Network down"; + case WSAENETUNREACH: return "Network unreachable"; + case WSAENETRESET: return "Network reset"; + case WSAECONNABORTED: return "Connection aborted"; + case WSAECONNRESET: return "Connection reset by peer"; + case WSAECONNREFUSED: return "Connection refused"; + } + return "unknown error"; +} + +#endif /* WITH_IOSET_WIN32 */ #define IS_EOL(CH) ((CH) == '\n') extern int uplink_connect(void); -static int clock_skew; +int clock_skew; int do_write_dbs; int do_reopen; - -static struct io_fd **fds; -static unsigned int fds_size; -static fd_set read_fds, write_fds; +static struct io_engine *engine; +static struct io_fd *active_fd; static void ioq_init(struct ioq *ioq, int size) { @@ -89,12 +131,47 @@ ioq_grow(struct ioq *ioq) { ioq->get = 0; ioq->buf = new_buf; ioq->size = new_size; - return new_size - ioq->put; + return new_size - ioq->put - 1; +} + +extern struct io_engine io_engine_kevent; +extern struct io_engine io_engine_epoll; +extern struct io_engine io_engine_win32; +extern struct io_engine io_engine_select; + +void +ioset_init(void) +{ + if (engine) /* someone beat us to it */ + return; + +#if WITH_IOSET_KEVENT + if (!engine && io_engine_kevent.init()) + engine = &io_engine_kevent; +#endif + +#if WITH_IOSET_EPOLL + if (!engine && io_engine_epoll.init()) + engine = &io_engine_epoll; +#endif + +#if WITH_IOSET_WIN32 + if (!engine && io_engine_win32.init()) + engine = &io_engine_win32; +#endif + + if (engine) { + /* we found one that works */ + } else if (io_engine_select.init()) + engine = &io_engine_select; + else + log_module(MAIN_LOG, LOG_FATAL, "No usable I/O engine found."); + log_module(MAIN_LOG, LOG_DEBUG, "Using %s I/O engine.", engine->name); } void ioset_cleanup(void) { - free(fds); + engine->cleanup(); } struct io_fd * @@ -106,39 +183,100 @@ ioset_add(int fd) { log_module(MAIN_LOG, LOG_ERROR, "Somebody called ioset_add(%d) on a negative fd!", fd); return 0; } + if (!engine) + ioset_init(); res = calloc(1, sizeof(*res)); if (!res) return 0; res->fd = fd; ioq_init(&res->send, 1024); ioq_init(&res->recv, 1024); - if ((unsigned)fd >= fds_size) { - unsigned int old_size = fds_size; - fds_size = fd + 8; - fds = realloc(fds, fds_size*sizeof(*fds)); - memset(fds+old_size, 0, (fds_size-old_size)*sizeof(*fds)); - } - fds[fd] = res; +#if defined(F_GETFL) flags = fcntl(fd, F_GETFL); fcntl(fd, F_SETFL, flags|O_NONBLOCK); + flags = fcntl(fd, F_GETFD); + fcntl(fd, F_SETFD, flags|FD_CLOEXEC); +#else + /* I hope you're using the Win32 backend or something else that + * automatically marks the file descriptor non-blocking... + */ + (void)flags; +#endif + engine->add(res); return res; } +struct io_fd *ioset_listen(struct sockaddr *local, unsigned int sa_size, void *data, void (*accept_cb)(struct io_fd *listener, struct io_fd *new_connect)) +{ + struct io_fd *io_fd; + unsigned int opt; + int res; + int fd; + + fd = socket(local ? local->sa_family : PF_INET, SOCK_STREAM, 0); + if (fd < 0) { + log_module(MAIN_LOG, LOG_ERROR, "Unable to create listening socket: %s", strerror(errno)); + return NULL; + } + + if (local && sa_size) { + opt = 1; + res = setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, (const char*)&opt, sizeof(opt)); + if (res < 0) { + log_module(MAIN_LOG, LOG_WARNING, "Unable to mark listener address as re-usable: %s", strerror(errno)); + } + + res = bind(fd, local, sa_size); + if (res < 0) { + log_module(MAIN_LOG, LOG_ERROR, "Unable to bind listening socket %d: %s", fd, strerror(errno)); + close(fd); + return NULL; + } + } + + res = listen(fd, 1); + if (res < 0) { + log_module(MAIN_LOG, LOG_ERROR, "Unable to listen on socket %d: %s", fd, strerror(errno)); + close(fd); + return NULL; + } + + io_fd = ioset_add(fd); + if (!io_fd) { + close(fd); + return NULL; + } + io_fd->state = IO_LISTENING; + io_fd->data = data; + io_fd->accept_cb = accept_cb; + engine->update(io_fd); + return io_fd; +} + struct io_fd * ioset_connect(struct sockaddr *local, unsigned int sa_size, const char *peer, unsigned int port, int blocking, void *data, void (*connect_cb)(struct io_fd *fd, int error)) { - int fd, res; + struct addrinfo hints; + struct addrinfo *ai; struct io_fd *io_fd; - struct sockaddr_in sin; - unsigned long ip; - - if (!getipbyname(peer, &ip)) { - log_module(MAIN_LOG, LOG_ERROR, "getipbyname(%s) failed.", peer); + struct io_fd *old_active; + int res; + int fd; + char portnum[10]; + + memset(&hints, 0, sizeof(hints)); + hints.ai_family = local ? local->sa_family : 0; + hints.ai_socktype = SOCK_STREAM; + snprintf(portnum, sizeof(portnum), "%u", port); + res = getaddrinfo(peer, portnum, &hints, &ai); + if (res != 0) { + log_module(MAIN_LOG, LOG_ERROR, "getaddrinfo(%s, %s) failed: %s.", peer, portnum, gai_strerror(res)); return NULL; } - sin.sin_addr.s_addr = ip; + if (local) { if ((fd = socket(local->sa_family, SOCK_STREAM, 0)) < 0) { log_module(MAIN_LOG, LOG_ERROR, "socket() for %s returned errno %d (%s)", peer, errno, strerror(errno)); + freeaddrinfo(ai); return NULL; } if (bind(fd, local, sa_size) < 0) { @@ -147,72 +285,86 @@ ioset_connect(struct sockaddr *local, unsigned int sa_size, const char *peer, un } else { if ((fd = socket(PF_INET, SOCK_STREAM, 0)) < 0) { log_module(MAIN_LOG, LOG_ERROR, "socket() for %s returned errno %d (%s).", peer, errno, strerror(errno)); + freeaddrinfo(ai); return NULL; } } - sin.sin_family = AF_INET; - sin.sin_port = htons(port); + if (blocking) { - res = connect(fd, (struct sockaddr*)&sin, sizeof(sin)); + res = connect(fd, ai->ai_addr, ai->ai_addrlen); io_fd = ioset_add(fd); } else { io_fd = ioset_add(fd); - res = connect(fd, (struct sockaddr*)&sin, sizeof(sin)); + res = connect(fd, ai->ai_addr, ai->ai_addrlen); } + freeaddrinfo(ai); if (!io_fd) { close(fd); return NULL; } + io_fd->state = IO_CONNECTING; io_fd->data = data; io_fd->connect_cb = connect_cb; if (res < 0) { switch (errno) { case EINPROGRESS: /* only if !blocking */ + engine->update(io_fd); return io_fd; default: log_module(MAIN_LOG, LOG_ERROR, "connect(%s:%d) (fd %d) returned errno %d (%s).", peer, port, io_fd->fd, errno, strerror(errno)); /* then fall through */ case EHOSTUNREACH: case ECONNREFUSED: - ioset_close(io_fd->fd, 1); + ioset_close(io_fd, 1); return NULL; } } + io_fd->state = IO_CONNECTED; + old_active = active_fd; if (connect_cb) connect_cb(io_fd, ((res < 0) ? errno : 0)); + if (active_fd) + engine->update(io_fd); + if (old_active != io_fd) + active_fd = old_active; return io_fd; } +void ioset_update(struct io_fd *fd) { + engine->update(fd); +} + static void ioset_try_write(struct io_fd *fd) { int res; - unsigned int req = ioq_get_avail(&fd->send); - res = write(fd->fd, fd->send.buf+fd->send.get, req); + unsigned int req; + + req = ioq_get_avail(&fd->send); + res = send(fd->fd, fd->send.buf+fd->send.get, req, 0); if (res < 0) { - switch (errno) { - case EAGAIN: - break; - default: - log_module(MAIN_LOG, LOG_ERROR, "write() on fd %d error %d: %s", fd->fd, errno, strerror(errno)); + if (errno != EAGAIN) { + log_module(MAIN_LOG, LOG_ERROR, "send() on fd %d error %d: %s", fd->fd, errno, strerror(errno)); } } else { fd->send.get += res; if (fd->send.get == fd->send.size) fd->send.get = 0; + engine->update(fd); } } void -ioset_close(int fd, int os_close) { - struct io_fd *fdp; - if (!(fdp = fds[fd])) +ioset_close(struct io_fd *fdp, int os_close) { + if (!fdp) return; - fds[fd] = NULL; + if (active_fd == fdp) + active_fd = NULL; if (fdp->destroy_cb) fdp->destroy_cb(fdp); - if (fdp->send.get != fdp->send.put) { - int flags = fcntl(fd, F_GETFL); - fcntl(fd, F_SETFL, flags&~O_NONBLOCK); +#if defined(HAVE_WSAEVENTSELECT) + /* This is one huge kludge. Sorry! */ + if (fdp->send.get != fdp->send.put && (os_close & 2)) { + engine->remove(fdp, 0); ioset_try_write(fdp); /* it may need to send the beginning of the buffer now.. */ if (fdp->send.get != fdp->send.put) @@ -220,11 +372,54 @@ ioset_close(int fd, int os_close) { } free(fdp->send.buf); free(fdp->recv.buf); - if (os_close) - close(fd); + if (os_close & 1) + closesocket(fdp->fd); +#else + if (fdp->send.get != fdp->send.put && (os_close & 2)) { + int flags; + + flags = fcntl(fdp->fd, F_GETFL); + fcntl(fdp->fd, F_SETFL, flags&~O_NONBLOCK); + ioset_try_write(fdp); + /* it may need to send the beginning of the buffer now.. */ + if (fdp->send.get != fdp->send.put) + ioset_try_write(fdp); + } + free(fdp->send.buf); + free(fdp->recv.buf); + if (os_close & 1) + close(fdp->fd); + engine->remove(fdp, os_close & 1); +#endif free(fdp); - FD_CLR(fd, &read_fds); - FD_CLR(fd, &write_fds); +} + +static void +ioset_accept(struct io_fd *listener) +{ + struct io_fd *old_active; + struct io_fd *new_fd; + int fd; + + fd = accept(listener->fd, NULL, 0); + if (fd < 0) { + log_module(MAIN_LOG, LOG_ERROR, "Unable to accept new connection on listener %d: %s", listener->fd, strerror(errno)); + return; + } + + new_fd = ioset_add(fd); + new_fd->state = IO_CONNECTED; + old_active = active_fd; + active_fd = new_fd; + listener->accept_cb(listener, new_fd); + assert(active_fd == NULL || active_fd == new_fd); + if (active_fd == new_fd) { + if (new_fd->send.get != new_fd->send.put) + ioset_try_write(new_fd); + else + engine->update(new_fd); + } + active_fd = old_active; } static int @@ -244,26 +439,25 @@ ioset_find_line_length(struct io_fd *fd) { static void ioset_buffered_read(struct io_fd *fd) { - int put_avail, nbr, fdnum; - + int put_avail, nbr; + if (!(put_avail = ioq_put_avail(&fd->recv))) put_avail = ioq_grow(&fd->recv); - nbr = read(fd->fd, fd->recv.buf + fd->recv.put, put_avail); + nbr = recv(fd->fd, fd->recv.buf + fd->recv.put, put_avail, 0); if (nbr < 0) { - switch (errno) { - case EAGAIN: - break; - default: - log_module(MAIN_LOG, LOG_ERROR, "Unexpected read() error %d on fd %d: %s", errno, fd->fd, strerror(errno)); + if (errno != EAGAIN) { + log_module(MAIN_LOG, LOG_ERROR, "Unexpected recv() error %d on fd %d: %s", errno, fd->fd, strerror(errno)); /* Just flag it as EOF and call readable_cb() to notify the fd's owner. */ - fd->eof = 1; - fd->wants_reads = 0; + fd->state = IO_CLOSED; fd->readable_cb(fd); + if (active_fd == fd) + engine->update(fd); } } else if (nbr == 0) { - fd->eof = 1; - fd->wants_reads = 0; + fd->state = IO_CLOSED; fd->readable_cb(fd); + if (active_fd == fd) + engine->update(fd); } else { if (fd->line_len == 0) { unsigned int pos; @@ -280,30 +474,42 @@ ioset_buffered_read(struct io_fd *fd) { fd->recv.put += nbr; if (fd->recv.put == fd->recv.size) fd->recv.put = 0; - fdnum = fd->fd; - while (fd->wants_reads && (fd->line_len > 0)) { + while (fd->line_len > 0) { + struct io_fd *old_active; + int died = 0; + + old_active = active_fd; + active_fd = fd; fd->readable_cb(fd); - if (!fds[fdnum]) - break; /* make sure they didn't close on us */ - ioset_find_line_length(fd); + if (active_fd) + ioset_find_line_length(fd); + else + died = 1; + if (old_active != fd) + active_fd = old_active; + if (died) + break; } } } int ioset_line_read(struct io_fd *fd, char *dest, int max) { - int avail, done; - if (fd->eof && (!ioq_get_avail(&fd->recv) || (fd->line_len < 0))) + int line_len; + int avail; + int done; + + line_len = fd->line_len; + if ((fd->state == IO_CLOSED) && (!ioq_get_avail(&fd->recv) || (line_len < 0))) return 0; - if (fd->line_len < 0) + if (line_len < 0) return -1; - if (fd->line_len < max) - max = fd->line_len; + if (line_len < max) + max = line_len; avail = ioq_get_avail(&fd->recv); if (max > avail) { memcpy(dest, fd->recv.buf + fd->recv.get, avail); - fd->recv.get += avail; - assert(fd->recv.get == fd->recv.size); + assert(fd->recv.get + avail == fd->recv.size); fd->recv.get = 0; done = avail; } else { @@ -313,46 +519,65 @@ ioset_line_read(struct io_fd *fd, char *dest, int max) { fd->recv.get += max - done; if (fd->recv.get == fd->recv.size) fd->recv.get = 0; - dest[max] = 0; + dest[max - 1] = 0; ioset_find_line_length(fd); - return max; + return line_len; } -#if 1 -#define debug_fdsets(MSG, NFDS, READ_FDS, WRITE_FDS, EXCEPT_FDS, SELECT_TIMEOUT) (void)0 -#else -static void -debug_fdsets(const char *msg, int nfds, fd_set *read_fds, fd_set *write_fds, fd_set *except_fds, struct timeval *select_timeout) { - static const char *flag_text[8] = { "---", "r", "w", "rw", "e", "er", "ew", "erw" }; - char buf[MAXLEN]; - int pos, ii, flags; - struct timeval now; - - for (pos=ii=0; iitv_sec, select_timeout->tv_usec); - } else { - log_module(MAIN_LOG, LOG_DEBUG, "%s, at "FMT_TIME_T".%06ld:%s (no timeout)", msg, now.tv_sec, now.tv_usec, buf); +void +ioset_events(struct io_fd *fd, int readable, int writable) +{ + if (!fd || (!readable && !writable)) + return; + active_fd = fd; + switch (fd->state) { + case IO_CLOSED: + break; + case IO_LISTENING: + if (active_fd && readable) + ioset_accept(fd); + break; + case IO_CONNECTING: + assert(active_fd == NULL || active_fd == fd); + if (active_fd && readable) { + socklen_t arglen; + int rc; + arglen = sizeof(rc); + if (getsockopt(fd->fd, SOL_SOCKET, SO_ERROR, &rc, &arglen) < 0) + rc = errno; + fd->state = IO_CLOSED; + if (fd->connect_cb) + fd->connect_cb(fd, rc); + } else if (active_fd && writable) { + fd->state = IO_CONNECTED; + if (fd->connect_cb) + fd->connect_cb(fd, 0); + } + if (active_fd != fd) + break; + engine->update(fd); + /* and fall through */ + case IO_CONNECTED: + assert(active_fd == NULL || active_fd == fd); + if (active_fd && readable) { + if (fd->line_reads) + ioset_buffered_read(fd); + else + fd->readable_cb(fd); + } + + assert(active_fd == NULL || active_fd == fd); + if (active_fd && writable) + ioset_try_write(fd); + break; } } -#endif void ioset_run(void) { extern struct io_fd *socket_io_fd; - struct timeval select_timeout; - unsigned int nn; - int select_result, max_fd; - time_t wakey; - struct io_fd *fd; + struct timeval timeout; + unsigned long wakey; while (!quit_services) { while (!socket_io_fd) @@ -360,64 +585,14 @@ ioset_run(void) { /* How long to sleep? (fill in select_timeout) */ wakey = timeq_next(); - if ((wakey - now) < 0) - select_timeout.tv_sec = 0; + if (wakey < now) + timeout.tv_sec = 0; else - select_timeout.tv_sec = wakey - now; - select_timeout.tv_usec = 0; - - /* Set up read_fds and write_fds fdsets. */ - FD_ZERO(&read_fds); - FD_ZERO(&write_fds); - max_fd = 0; - for (nn=0; nnwants_reads) - FD_SET(nn, &read_fds); - if ((fd->send.get != fd->send.put) || !fd->connected) - FD_SET(nn, &write_fds); - } + timeout.tv_sec = wakey - now; + timeout.tv_usec = 0; - /* Check for activity, update time. */ - debug_fdsets("Entering select", max_fd+1, &read_fds, &write_fds, NULL, &select_timeout); - select_result = select(max_fd + 1, &read_fds, &write_fds, NULL, &select_timeout); - debug_fdsets("After select", max_fd+1, &read_fds, &write_fds, NULL, &select_timeout); - now = time(NULL) + clock_skew; - if (select_result < 0) { - if (errno != EINTR) { - log_module(MAIN_LOG, LOG_ERROR, "select() error %d: %s", errno, strerror(errno)); - close_socket(); - } + if (engine->loop(&timeout)) continue; - } - - /* Call back anybody that has connect or read activity and wants to know. */ - for (nn=0; nnline_reads) - ioset_buffered_read(fd); - else - fd->readable_cb(fd); - } - if (FD_ISSET(nn, &write_fds) && !fd->connected) { - int rc, arglen = sizeof(rc); - if (getsockopt(fd->fd, SOL_SOCKET, SO_ERROR, &rc, &arglen) < 0) - rc = errno; - fd->connected = 1; - if (fd->connect_cb) - fd->connect_cb(fd, rc); - } - /* Note: check whether write FD is still set, since the - * connect_cb() might close the FD, making us dereference - * a free()'d pointer for the fd. - */ - if (FD_ISSET(nn, &write_fds) && (fd->send.get != fd->send.put)) - ioset_try_write(fd); - } /* Call any timeq events we need to call. */ timeq_run(); @@ -449,6 +624,21 @@ ioset_write(struct io_fd *fd, const char *buf, unsigned int nbw) { fd->send.put += nbw; if (fd->send.put == fd->send.size) fd->send.put = 0; + engine->update(fd); +} + +int +ioset_printf(struct io_fd *fd, const char *fmt, ...) { + char tmpbuf[MAXLEN]; + va_list ap; + int res; + + va_start(ap, fmt); + res = vsnprintf(tmpbuf, sizeof(tmpbuf), fmt, ap); + va_end(ap); + if (res > 0 && (size_t)res <= sizeof(tmpbuf)) + ioset_write(fd, tmpbuf, res); + return res; } void