X-Git-Url: http://git.pk910.de/?a=blobdiff_plain;f=src%2Fioset.c;h=33622af1c55e39f19a298a2702444163d875b074;hb=80d9ed728be4b02ac483f3339cbb184f6602d15b;hp=444579e9c46d7fb84d58eb75a65f047096a3f1f6;hpb=c4ab180a14a56a57317675480f2aae18a16f3d7d;p=srvx.git diff --git a/src/ioset.c b/src/ioset.c index 444579e..33622af 100644 --- a/src/ioset.c +++ b/src/ioset.c @@ -31,6 +31,55 @@ #include #endif +#ifdef WITH_IOSET_WIN32 + +# undef errno +# define errno WSAGetLastError() +# undef EINPROGRESS +# define EINPROGRESS WSAEINPROGRESS +# undef EHOSTUNREACH +# define EHOSTUNREACH WSAEHOSTUNREACH +# undef ECONNREFUSED +# define ECONNREFUSED WSAECONNREFUSED +# undef EAGAIN +# define EAGAIN WSAEWOULDBLOCK +# define strerror wsa_strerror + +static const char * +wsa_strerror(int wsa_err) +{ + switch (wsa_err) + { + case WSAEINTR: return "Operation interrupted"; + case WSAEBADF: return "Bad file descriptor"; + case WSAEACCES: return "Permission denied"; + case WSAEFAULT: return "Invalid address"; + case WSAEINVAL: return "Invalid parameter"; + case WSAEMFILE: return "Too many open files"; + case WSAEWOULDBLOCK: return "Try again later"; + case WSAEINPROGRESS: return "Operation in progress"; + case WSAEALREADY: return "Operation already in progress"; + case WSAENOTSOCK: return "Not a socket"; + case WSAEDESTADDRREQ: return "Destination address required"; + case WSAEMSGSIZE: return "Invalid message size"; + case WSAEPROTOTYPE: return "Invalid protocol type for socket"; + case WSAENOPROTOOPT: return "Invalid protocol option"; + case WSAEPROTONOSUPPORT: return "Protocol not supported"; + case WSAEOPNOTSUPP: return "Operation not supported"; + case WSAEADDRINUSE: return "Address already in use"; + case WSAEADDRNOTAVAIL: return "Address not available"; + case WSAENETDOWN: return "Network down"; + case WSAENETUNREACH: return "Network unreachable"; + case WSAENETRESET: return "Network reset"; + case WSAECONNABORTED: return "Connection aborted"; + case WSAECONNRESET: return "Connection reset by peer"; + case WSAECONNREFUSED: return "Connection refused"; + } + return "unknown error"; +} + +#endif /* WITH_IOSET_WIN32 */ + #define IS_EOL(CH) ((CH) == '\n') extern int uplink_connect(void); @@ -82,10 +131,10 @@ ioq_grow(struct ioq *ioq) { ioq->get = 0; ioq->buf = new_buf; ioq->size = new_size; - return new_size - ioq->put; + return new_size - ioq->put - 1; } -extern struct io_engine io_engine_kqueue; +extern struct io_engine io_engine_kevent; extern struct io_engine io_engine_epoll; extern struct io_engine io_engine_win32; extern struct io_engine io_engine_select; @@ -96,14 +145,14 @@ ioset_init(void) if (engine) /* someone beat us to it */ return; -#if WITH_IOSET_KQUEUE - if (!engine && io_engine_kqueue.init()) - engine = &io_engine_kqueue; +#if WITH_IOSET_KEVENT + if (!engine && io_engine_kevent.init()) + engine = &io_engine_kevent; #endif #if WITH_IOSET_EPOLL if (!engine && io_engine_epoll.init()) - engine = &io_engine_epoll; + engine = &io_engine_epoll; #endif #if WITH_IOSET_WIN32 @@ -114,9 +163,9 @@ ioset_init(void) if (engine) { /* we found one that works */ } else if (io_engine_select.init()) - engine = &io_engine_select; + engine = &io_engine_select; else - log_module(MAIN_LOG, LOG_FATAL, "No usable I/O engine found."); + log_module(MAIN_LOG, LOG_FATAL, "No usable I/O engine found."); log_module(MAIN_LOG, LOG_DEBUG, "Using %s I/O engine.", engine->name); } @@ -142,10 +191,17 @@ ioset_add(int fd) { res->fd = fd; ioq_init(&res->send, 1024); ioq_init(&res->recv, 1024); +#if defined(F_GETFL) flags = fcntl(fd, F_GETFL); fcntl(fd, F_SETFL, flags|O_NONBLOCK); flags = fcntl(fd, F_GETFD); fcntl(fd, F_SETFD, flags|FD_CLOEXEC); +#else + /* I hope you're using the Win32 backend or something else that + * automatically marks the file descriptor non-blocking... + */ + (void)flags; +#endif engine->add(res); return res; } @@ -159,36 +215,36 @@ struct io_fd *ioset_listen(struct sockaddr *local, unsigned int sa_size, void *d fd = socket(local ? local->sa_family : PF_INET, SOCK_STREAM, 0); if (fd < 0) { - log_module(MAIN_LOG, LOG_ERROR, "Unable to create listening socket: %s", strerror(errno)); - return NULL; + log_module(MAIN_LOG, LOG_ERROR, "Unable to create listening socket: %s", strerror(errno)); + return NULL; } if (local && sa_size) { - res = bind(fd, local, sa_size); - if (res < 0) { - log_module(MAIN_LOG, LOG_ERROR, "Unable to bind listening socket %d: %s", fd, strerror(errno)); - close(fd); - return NULL; - } - opt = 1; res = setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, (const char*)&opt, sizeof(opt)); if (res < 0) { log_module(MAIN_LOG, LOG_WARNING, "Unable to mark listener address as re-usable: %s", strerror(errno)); } + + res = bind(fd, local, sa_size); + if (res < 0) { + log_module(MAIN_LOG, LOG_ERROR, "Unable to bind listening socket %d: %s", fd, strerror(errno)); + close(fd); + return NULL; + } } res = listen(fd, 1); if (res < 0) { - log_module(MAIN_LOG, LOG_ERROR, "Unable to listen on socket %d: %s", fd, strerror(errno)); - close(fd); - return NULL; + log_module(MAIN_LOG, LOG_ERROR, "Unable to listen on socket %d: %s", fd, strerror(errno)); + close(fd); + return NULL; } io_fd = ioset_add(fd); if (!io_fd) { - close(fd); - return NULL; + close(fd); + return NULL; } io_fd->state = IO_LISTENING; io_fd->data = data; @@ -211,8 +267,9 @@ ioset_connect(struct sockaddr *local, unsigned int sa_size, const char *peer, un hints.ai_family = local ? local->sa_family : 0; hints.ai_socktype = SOCK_STREAM; snprintf(portnum, sizeof(portnum), "%u", port); - if (getaddrinfo(peer, portnum, &hints, &ai)) { - log_module(MAIN_LOG, LOG_ERROR, "getaddrinfo(%s, %s) failed.", peer, portnum); + res = getaddrinfo(peer, portnum, &hints, &ai); + if (res != 0) { + log_module(MAIN_LOG, LOG_ERROR, "getaddrinfo(%s, %s) failed: %s.", peer, portnum, gai_strerror(res)); return NULL; } @@ -259,7 +316,6 @@ ioset_connect(struct sockaddr *local, unsigned int sa_size, const char *peer, un case EHOSTUNREACH: case ECONNREFUSED: ioset_close(io_fd, 1); - engine->update(io_fd); return NULL; } } @@ -284,13 +340,10 @@ ioset_try_write(struct io_fd *fd) { unsigned int req; req = ioq_get_avail(&fd->send); - res = write(fd->fd, fd->send.buf+fd->send.get, req); + res = send(fd->fd, fd->send.buf+fd->send.get, req, 0); if (res < 0) { - switch (errno) { - case EAGAIN: - break; - default: - log_module(MAIN_LOG, LOG_ERROR, "write() on fd %d error %d: %s", fd->fd, errno, strerror(errno)); + if (errno != EAGAIN) { + log_module(MAIN_LOG, LOG_ERROR, "send() on fd %d error %d: %s", fd->fd, errno, strerror(errno)); } } else { fd->send.get += res; @@ -303,15 +356,29 @@ ioset_try_write(struct io_fd *fd) { void ioset_close(struct io_fd *fdp, int os_close) { if (!fdp) - return; + return; if (active_fd == fdp) active_fd = NULL; if (fdp->destroy_cb) fdp->destroy_cb(fdp); +#if defined(HAVE_WSAEVENTSELECT) + /* This is one huge kludge. Sorry! */ + if (fdp->send.get != fdp->send.put && (os_close & 2)) { + engine->remove(fdp, 0); + ioset_try_write(fdp); + /* it may need to send the beginning of the buffer now.. */ + if (fdp->send.get != fdp->send.put) + ioset_try_write(fdp); + } + free(fdp->send.buf); + free(fdp->recv.buf); + if (os_close & 1) + closesocket(fdp->fd); +#else if (fdp->send.get != fdp->send.put && (os_close & 2)) { int flags; - flags = fcntl(fdp->fd, F_GETFL); + flags = fcntl(fdp->fd, F_GETFL); fcntl(fdp->fd, F_SETFL, flags&~O_NONBLOCK); ioset_try_write(fdp); /* it may need to send the beginning of the buffer now.. */ @@ -322,7 +389,8 @@ ioset_close(struct io_fd *fdp, int os_close) { free(fdp->recv.buf); if (os_close & 1) close(fdp->fd); - engine->remove(fdp); + engine->remove(fdp, os_close & 1); +#endif free(fdp); } @@ -335,8 +403,8 @@ ioset_accept(struct io_fd *listener) fd = accept(listener->fd, NULL, 0); if (fd < 0) { - log_module(MAIN_LOG, LOG_ERROR, "Unable to accept new connection on listener %d: %s", listener->fd, strerror(errno)); - return; + log_module(MAIN_LOG, LOG_ERROR, "Unable to accept new connection on listener %d: %s", listener->fd, strerror(errno)); + return; } new_fd = ioset_add(fd); @@ -371,17 +439,14 @@ ioset_find_line_length(struct io_fd *fd) { static void ioset_buffered_read(struct io_fd *fd) { - int put_avail, nbr, fdnum; + int put_avail, nbr; if (!(put_avail = ioq_put_avail(&fd->recv))) put_avail = ioq_grow(&fd->recv); - nbr = read(fd->fd, fd->recv.buf + fd->recv.put, put_avail); + nbr = recv(fd->fd, fd->recv.buf + fd->recv.put, put_avail, 0); if (nbr < 0) { - switch (errno) { - case EAGAIN: - break; - default: - log_module(MAIN_LOG, LOG_ERROR, "Unexpected read() error %d on fd %d: %s", errno, fd->fd, strerror(errno)); + if (errno != EAGAIN) { + log_module(MAIN_LOG, LOG_ERROR, "Unexpected recv() error %d on fd %d: %s", errno, fd->fd, strerror(errno)); /* Just flag it as EOF and call readable_cb() to notify the fd's owner. */ fd->state = IO_CLOSED; fd->readable_cb(fd); @@ -409,7 +474,6 @@ ioset_buffered_read(struct io_fd *fd) { fd->recv.put += nbr; if (fd->recv.put == fd->recv.size) fd->recv.put = 0; - fdnum = fd->fd; while (fd->line_len > 0) { struct io_fd *old_active; int died = 0; @@ -431,18 +495,21 @@ ioset_buffered_read(struct io_fd *fd) { int ioset_line_read(struct io_fd *fd, char *dest, int max) { - int avail, done; - if ((fd->state == IO_CLOSED) && (!ioq_get_avail(&fd->recv) || (fd->line_len < 0))) + int line_len; + int avail; + int done; + + line_len = fd->line_len; + if ((fd->state == IO_CLOSED) && (!ioq_get_avail(&fd->recv) || (line_len < 0))) return 0; - if (fd->line_len < 0) + if (line_len < 0) return -1; - if (fd->line_len < max) - max = fd->line_len; + if (line_len < max) + max = line_len; avail = ioq_get_avail(&fd->recv); if (max > avail) { memcpy(dest, fd->recv.buf + fd->recv.get, avail); - fd->recv.get += avail; - assert(fd->recv.get == fd->recv.size); + assert(fd->recv.get + avail == fd->recv.size); fd->recv.get = 0; done = avail; } else { @@ -452,16 +519,16 @@ ioset_line_read(struct io_fd *fd, char *dest, int max) { fd->recv.get += max - done; if (fd->recv.get == fd->recv.size) fd->recv.get = 0; - dest[max] = 0; + dest[max - 1] = 0; ioset_find_line_length(fd); - return max; + return line_len; } void ioset_events(struct io_fd *fd, int readable, int writable) { if (!fd || (!readable && !writable)) - return; + return; active_fd = fd; switch (fd->state) { case IO_CLOSED: @@ -510,7 +577,7 @@ void ioset_run(void) { extern struct io_fd *socket_io_fd; struct timeval timeout; - time_t wakey; + unsigned long wakey; while (!quit_services) { while (!socket_io_fd) @@ -518,14 +585,14 @@ ioset_run(void) { /* How long to sleep? (fill in select_timeout) */ wakey = timeq_next(); - if ((wakey - now) < 0) + if (wakey < now) timeout.tv_sec = 0; else timeout.tv_sec = wakey - now; timeout.tv_usec = 0; - if (engine->loop(&timeout)) - continue; + if (engine->loop(&timeout)) + continue; /* Call any timeq events we need to call. */ timeq_run();