1 /* ioset.h - srvx event loop
2 * Copyright 2002-2004 srvx Development Team
4 * This program is free software; you can redistribute it and/or modify
5 * it under the terms of the GNU General Public License as published by
6 * the Free Software Foundation; either version 2 of the License, or
7 * (at your option) any later version. Important limitations are
8 * listed in the COPYING file that accompanies this software.
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
15 * You should have received a copy of the GNU General Public License
16 * along with this program; if not, email srvx-maintainers@srvx.net.
28 #ifdef HAVE_SYS_SELECT_H
29 #include <sys/select.h>
31 #ifdef HAVE_SYS_SOCKET_H
32 #include <sys/socket.h>
39 #define IS_EOL(CH) ((CH) == '\n')
41 extern int uplink_connect(void);
42 static int clock_skew;
46 static struct io_fd **fds;
47 static unsigned int fds_size;
48 static fd_set read_fds, write_fds;
51 ioq_init(struct ioq *ioq, int size) {
52 ioq->buf = malloc(size);
53 ioq->get = ioq->put = 0;
58 ioq_put_avail(const struct ioq *ioq) {
59 /* Subtract 1 from ioq->get to be sure we don't fill the buffer
60 * and make it look empty even when there's data in it. */
61 if (ioq->put < ioq->get)
62 return ioq->get - ioq->put - 1;
63 else if (ioq->get == 0)
64 return ioq->size - ioq->put - 1;
66 return ioq->size - ioq->put;
70 ioq_get_avail(const struct ioq *ioq) {
71 return ((ioq->put < ioq->get) ? ioq->size : ioq->put) - ioq->get;
75 ioq_used(const struct ioq *ioq) {
76 return ((ioq->put < ioq->get) ? ioq->size : 0) + ioq->put - ioq->get;
80 ioq_grow(struct ioq *ioq) {
81 int new_size = ioq->size << 1;
82 char *new_buf = malloc(new_size);
83 int get_avail = ioq_get_avail(ioq);
84 memcpy(new_buf, ioq->buf + ioq->get, get_avail);
85 if (ioq->put < ioq->get)
86 memcpy(new_buf + get_avail, ioq->buf, ioq->put);
88 ioq->put = ioq_used(ioq);
92 return new_size - ioq->put;
106 log_module(MAIN_LOG, LOG_ERROR, "Somebody called ioset_add(%d) on a negative fd!", fd);
109 res = calloc(1, sizeof(*res));
113 ioq_init(&res->send, 1024);
114 ioq_init(&res->recv, 1024);
115 if ((unsigned)fd >= fds_size) {
116 unsigned int old_size = fds_size;
118 fds = realloc(fds, fds_size*sizeof(*fds));
119 memset(fds+old_size, 0, (fds_size-old_size)*sizeof(*fds));
122 flags = fcntl(fd, F_GETFL);
123 fcntl(fd, F_SETFL, flags|O_NONBLOCK);
128 ioset_connect(struct sockaddr *local, unsigned int sa_size, const char *peer, unsigned int port, int blocking, void *data, void (*connect_cb)(struct io_fd *fd, int error)) {
131 struct sockaddr_in sin;
134 if (!getipbyname(peer, &ip)) {
135 log_module(MAIN_LOG, LOG_ERROR, "getipbyname(%s) failed.", peer);
138 sin.sin_addr.s_addr = ip;
140 if ((fd = socket(local->sa_family, SOCK_STREAM, 0)) < 0) {
141 log_module(MAIN_LOG, LOG_ERROR, "socket() for %s returned errno %d (%s)", peer, errno, strerror(errno));
144 if (bind(fd, local, sa_size) < 0) {
145 log_module(MAIN_LOG, LOG_ERROR, "bind() of socket for %s (fd %d) returned errno %d (%s). Will let operating system choose.", peer, fd, errno, strerror(errno));
148 if ((fd = socket(PF_INET, SOCK_STREAM, 0)) < 0) {
149 log_module(MAIN_LOG, LOG_ERROR, "socket() for %s returned errno %d (%s).", peer, errno, strerror(errno));
153 sin.sin_family = AF_INET;
154 sin.sin_port = htons(port);
156 res = connect(fd, (struct sockaddr*)&sin, sizeof(sin));
157 io_fd = ioset_add(fd);
159 io_fd = ioset_add(fd);
160 res = connect(fd, (struct sockaddr*)&sin, sizeof(sin));
167 io_fd->connect_cb = connect_cb;
170 case EINPROGRESS: /* only if !blocking */
173 log_module(MAIN_LOG, LOG_ERROR, "connect(%s:%d) (fd %d) returned errno %d (%s).", peer, port, io_fd->fd, errno, strerror(errno));
174 /* then fall through */
177 ioset_close(io_fd->fd, 1);
182 connect_cb(io_fd, ((res < 0) ? errno : 0));
187 ioset_try_write(struct io_fd *fd) {
189 unsigned int req = ioq_get_avail(&fd->send);
190 res = write(fd->fd, fd->send.buf+fd->send.get, req);
196 log_module(MAIN_LOG, LOG_ERROR, "write() on fd %d error %d: %s", fd->fd, errno, strerror(errno));
200 if (fd->send.get == fd->send.size)
206 ioset_close(int fd, int os_close) {
208 if (!(fdp = fds[fd]))
212 fdp->destroy_cb(fdp);
213 if (fdp->send.get != fdp->send.put) {
214 int flags = fcntl(fd, F_GETFL);
215 fcntl(fd, F_SETFL, flags&~O_NONBLOCK);
216 ioset_try_write(fdp);
217 /* it may need to send the beginning of the buffer now.. */
218 if (fdp->send.get != fdp->send.put)
219 ioset_try_write(fdp);
226 FD_CLR(fd, &read_fds);
227 FD_CLR(fd, &write_fds);
231 ioset_find_line_length(struct io_fd *fd) {
232 unsigned int pos, max, len;
234 max = (fd->recv.put < fd->recv.get) ? fd->recv.size : fd->recv.put;
235 for (pos = fd->recv.get; pos < max; ++pos, ++len)
236 if (IS_EOL(fd->recv.buf[pos]))
237 return fd->line_len = len + 1;
238 if (fd->recv.put < fd->recv.get)
239 for (pos = 0; pos < fd->recv.put; ++pos, ++len)
240 if (IS_EOL(fd->recv.buf[pos]))
241 return fd->line_len = len + 1;
242 return fd->line_len = 0;
246 ioset_buffered_read(struct io_fd *fd) {
247 int put_avail, nbr, fdnum;
249 if (!(put_avail = ioq_put_avail(&fd->recv)))
250 put_avail = ioq_grow(&fd->recv);
251 nbr = read(fd->fd, fd->recv.buf + fd->recv.put, put_avail);
257 log_module(MAIN_LOG, LOG_ERROR, "Unexpected read() error %d on fd %d: %s", errno, fd->fd, strerror(errno));
258 /* Just flag it as EOF and call readable_cb() to notify the fd's owner. */
263 } else if (nbr == 0) {
268 if (fd->line_len == 0) {
270 for (pos = fd->recv.put; pos < fd->recv.put + nbr; ++pos) {
271 if (IS_EOL(fd->recv.buf[pos])) {
272 if (fd->recv.put < fd->recv.get)
273 fd->line_len = fd->recv.size + pos + 1 - fd->recv.get;
275 fd->line_len = pos + 1 - fd->recv.get;
281 if (fd->recv.put == fd->recv.size)
284 while (fd->wants_reads && (fd->line_len > 0)) {
287 break; /* make sure they didn't close on us */
288 ioset_find_line_length(fd);
294 ioset_line_read(struct io_fd *fd, char *dest, int max) {
296 if (fd->eof && (!ioq_get_avail(&fd->recv) || (fd->line_len < 0)))
298 if (fd->line_len < 0)
300 if (fd->line_len < max)
302 avail = ioq_get_avail(&fd->recv);
304 memcpy(dest, fd->recv.buf + fd->recv.get, avail);
305 fd->recv.get += avail;
306 assert(fd->recv.get == fd->recv.size);
312 memcpy(dest + done, fd->recv.buf + fd->recv.get, max - done);
313 fd->recv.get += max - done;
314 if (fd->recv.get == fd->recv.size)
317 ioset_find_line_length(fd);
322 #define debug_fdsets(MSG, NFDS, READ_FDS, WRITE_FDS, EXCEPT_FDS, SELECT_TIMEOUT) (void)0
325 debug_fdsets(const char *msg, int nfds, fd_set *read_fds, fd_set *write_fds, fd_set *except_fds, struct timeval *select_timeout) {
326 static const char *flag_text[8] = { "---", "r", "w", "rw", "e", "er", "ew", "erw" };
331 for (pos=ii=0; ii<nfds; ++ii) {
332 flags = (read_fds && FD_ISSET(ii, read_fds)) ? 1 : 0;
333 flags |= (write_fds && FD_ISSET(ii, write_fds)) ? 2 : 0;
334 flags |= (except_fds && FD_ISSET(ii, except_fds)) ? 4 : 0;
337 pos += sprintf(buf+pos, " %d%s", ii, flag_text[flags]);
339 gettimeofday(&now, NULL);
340 if (select_timeout) {
341 log_module(MAIN_LOG, LOG_DEBUG, "%s, at "FMT_TIME_T".%06ld:%s (timeout "FMT_TIME_T".%06ld)", msg, now.tv_sec, now.tv_usec, buf, select_timeout->tv_sec, select_timeout->tv_usec);
343 log_module(MAIN_LOG, LOG_DEBUG, "%s, at "FMT_TIME_T".%06ld:%s (no timeout)", msg, now.tv_sec, now.tv_usec, buf);
350 extern struct io_fd *socket_io_fd;
351 struct timeval select_timeout;
353 int select_result, max_fd;
357 while (!quit_services) {
358 while (!socket_io_fd)
361 /* How long to sleep? (fill in select_timeout) */
362 wakey = timeq_next();
363 if ((wakey - now) < 0)
364 select_timeout.tv_sec = 0;
366 select_timeout.tv_sec = wakey - now;
367 select_timeout.tv_usec = 0;
369 /* Set up read_fds and write_fds fdsets. */
373 for (nn=0; nn<fds_size; nn++) {
378 FD_SET(nn, &read_fds);
379 if ((fd->send.get != fd->send.put) || !fd->connected)
380 FD_SET(nn, &write_fds);
383 /* Check for activity, update time. */
384 debug_fdsets("Entering select", max_fd+1, &read_fds, &write_fds, NULL, &select_timeout);
385 select_result = select(max_fd + 1, &read_fds, &write_fds, NULL, &select_timeout);
386 debug_fdsets("After select", max_fd+1, &read_fds, &write_fds, NULL, &select_timeout);
387 now = time(NULL) + clock_skew;
388 if (select_result < 0) {
389 if (errno != EINTR) {
390 log_module(MAIN_LOG, LOG_ERROR, "select() error %d: %s", errno, strerror(errno));
396 /* Call back anybody that has connect or read activity and wants to know. */
397 for (nn=0; nn<fds_size; nn++) {
400 if (FD_ISSET(nn, &read_fds)) {
402 ioset_buffered_read(fd);
406 if (FD_ISSET(nn, &write_fds) && !fd->connected) {
407 int rc, arglen = sizeof(rc);
408 if (getsockopt(fd->fd, SOL_SOCKET, SO_ERROR, &rc, &arglen) < 0)
412 fd->connect_cb(fd, rc);
414 /* Note: check whether write FD is still set, since the
415 * connect_cb() might close the FD, making us dereference
416 * a free()'d pointer for the fd.
418 if (FD_ISSET(nn, &write_fds) && (fd->send.get != fd->send.put))
422 /* Call any timeq events we need to call. */
429 extern char *services_config;
430 conf_read(services_config);
437 ioset_write(struct io_fd *fd, const char *buf, unsigned int nbw) {
439 while (ioq_used(&fd->send) + nbw >= fd->send.size)
441 avail = ioq_put_avail(&fd->send);
443 memcpy(fd->send.buf + fd->send.put, buf, avail);
448 memcpy(fd->send.buf + fd->send.put, buf, nbw);
450 if (fd->send.put == fd->send.size)
455 ioset_set_time(unsigned long new_now) {
456 clock_skew = new_now - time(NULL);