1 /* ioset.h - srvx event loop
2 * Copyright 2002-2004 srvx Development Team
4 * This program is free software; you can redistribute it and/or modify
5 * it under the terms of the GNU General Public License as published by
6 * the Free Software Foundation; either version 2 of the License, or
7 * (at your option) any later version. Important limitations are
8 * listed in the COPYING file that accompanies this software.
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
15 * You should have received a copy of the GNU General Public License
16 * along with this program; if not, email srvx-maintainers@srvx.net.
28 #ifdef HAVE_SYS_SELECT_H
29 #include <sys/select.h>
31 #ifdef HAVE_SYS_SOCKET_H
32 #include <sys/socket.h>
39 #define IS_EOL(CH) ((CH) == '\n')
41 extern int uplink_connect(void);
42 static int clock_skew;
46 static struct io_fd **fds;
47 static unsigned int fds_size;
48 static fd_set read_fds, write_fds;
51 ioq_init(struct ioq *ioq, int size) {
52 ioq->buf = malloc(size);
53 ioq->get = ioq->put = 0;
58 ioq_put_avail(const struct ioq *ioq) {
59 /* Subtract 1 from ioq->get to be sure we don't fill the buffer
60 * and make it look empty even when there's data in it. */
61 if (ioq->put < ioq->get) {
62 return ioq->get - ioq->put - 1;
63 } else if (ioq->get == 0) {
64 return ioq->size - ioq->put - 1;
66 return ioq->size - ioq->put;
71 ioq_get_avail(const struct ioq *ioq) {
72 return ((ioq->put < ioq->get) ? ioq->size : ioq->put) - ioq->get;
76 ioq_used(const struct ioq *ioq) {
77 return ((ioq->put < ioq->get) ? ioq->size : 0) + ioq->put - ioq->get;
81 ioq_grow(struct ioq *ioq) {
82 int new_size = ioq->size << 1;
83 char *new_buf = malloc(new_size);
84 int get_avail = ioq_get_avail(ioq);
85 memcpy(new_buf, ioq->buf + ioq->get, get_avail);
86 if (ioq->put < ioq->get)
87 memcpy(new_buf + get_avail, ioq->buf, ioq->put);
89 ioq->put = ioq_used(ioq);
93 return new_size - ioq->put;
107 log_module(MAIN_LOG, LOG_ERROR, "Somebody called ioset_add(%d) on a negative fd!", fd);
110 res = calloc(1, sizeof(*res));
114 ioq_init(&res->send, 1024);
115 ioq_init(&res->recv, 1024);
116 if ((unsigned)fd >= fds_size) {
117 unsigned int old_size = fds_size;
119 fds = realloc(fds, fds_size*sizeof(*fds));
120 memset(fds+old_size, 0, (fds_size-old_size)*sizeof(*fds));
123 flags = fcntl(fd, F_GETFL);
124 fcntl(fd, F_SETFL, flags|O_NONBLOCK);
129 ioset_connect(struct sockaddr *local, unsigned int sa_size, const char *peer, unsigned int port, int blocking, void *data, void (*connect_cb)(struct io_fd *fd, int error)) {
132 struct sockaddr_in sin;
135 if (!getipbyname(peer, &ip)) {
136 log_module(MAIN_LOG, LOG_ERROR, "getipbyname(%s) failed.", peer);
139 sin.sin_addr.s_addr = ip;
141 if ((fd = socket(local->sa_family, SOCK_STREAM, 0)) < 0) {
142 log_module(MAIN_LOG, LOG_ERROR, "socket() for %s returned errno %d (%s)", peer, errno, strerror(errno));
145 if (bind(fd, local, sa_size) < 0) {
146 log_module(MAIN_LOG, LOG_ERROR, "bind() of socket for %s (fd %d) returned errno %d (%s). Will let operating system choose.", peer, fd, errno, strerror(errno));
149 if ((fd = socket(PF_INET, SOCK_STREAM, 0)) < 0) {
150 log_module(MAIN_LOG, LOG_ERROR, "socket() for %s returned errno %d (%s).", peer, errno, strerror(errno));
154 sin.sin_family = AF_INET;
155 sin.sin_port = htons(port);
157 res = connect(fd, (struct sockaddr*)&sin, sizeof(sin));
158 io_fd = ioset_add(fd);
160 io_fd = ioset_add(fd);
161 res = connect(fd, (struct sockaddr*)&sin, sizeof(sin));
168 io_fd->connect_cb = connect_cb;
171 case EINPROGRESS: /* only if !blocking */
174 log_module(MAIN_LOG, LOG_ERROR, "connect(%s:%d) (fd %d) returned errno %d (%s).", peer, port, io_fd->fd, errno, strerror(errno));
175 /* then fall through */
178 ioset_close(io_fd->fd, 1);
183 connect_cb(io_fd, ((res < 0) ? errno : 0));
188 ioset_try_write(struct io_fd *fd) {
190 unsigned int req = ioq_get_avail(&fd->send);
191 res = write(fd->fd, fd->send.buf+fd->send.get, req);
196 log_module(MAIN_LOG, LOG_ERROR, "write() on fd %d error %d: %s", fd->fd, errno, strerror(errno));
200 if (fd->send.get == fd->send.size) fd->send.get = 0;
205 ioset_close(int fd, int os_close) {
207 if (!(fdp = fds[fd])) return;
209 if (fdp->destroy_cb) fdp->destroy_cb(fdp);
210 if (fdp->send.get != fdp->send.put) {
211 int flags = fcntl(fd, F_GETFL);
212 fcntl(fd, F_SETFL, flags&~O_NONBLOCK);
213 ioset_try_write(fdp);
214 /* it may need to send the beginning of the buffer now.. */
215 if (fdp->send.get != fdp->send.put) ioset_try_write(fdp);
219 if (os_close) close(fd);
221 FD_CLR(fd, &read_fds);
222 FD_CLR(fd, &write_fds);
226 ioset_find_line_length(struct io_fd *fd) {
227 unsigned int pos, max, len;
229 max = (fd->recv.put < fd->recv.get) ? fd->recv.size : fd->recv.put;
230 for (pos = fd->recv.get; pos < max; ++pos, ++len) {
231 if (IS_EOL(fd->recv.buf[pos])) return fd->line_len = len + 1;
233 if (fd->recv.put < fd->recv.get) {
234 for (pos = 0; pos < fd->recv.put; ++pos, ++len) {
235 if (IS_EOL(fd->recv.buf[pos])) return fd->line_len = len + 1;
238 return fd->line_len = 0;
242 ioset_buffered_read(struct io_fd *fd) {
243 int put_avail, nbr, fdnum;
245 if (!(put_avail = ioq_put_avail(&fd->recv))) put_avail = ioq_grow(&fd->recv);
246 nbr = read(fd->fd, fd->recv.buf + fd->recv.put, put_avail);
251 log_module(MAIN_LOG, LOG_ERROR, "Unexpected read() error %d on fd %d: %s", errno, fd->fd, strerror(errno));
252 /* Just flag it as EOF and call readable_cb() to notify the fd's owner. */
257 } else if (nbr == 0) {
262 if (fd->line_len == 0) {
264 for (pos = fd->recv.put; pos < fd->recv.put + nbr; ++pos) {
265 if (IS_EOL(fd->recv.buf[pos])) {
266 if (fd->recv.put < fd->recv.get) {
267 fd->line_len = fd->recv.size + pos + 1 - fd->recv.get;
269 fd->line_len = pos + 1 - fd->recv.get;
276 if (fd->recv.put == fd->recv.size) fd->recv.put = 0;
278 while (fd->wants_reads && (fd->line_len > 0)) {
280 if (!fds[fdnum]) break; /* make sure they didn't close on us */
281 ioset_find_line_length(fd);
287 ioset_line_read(struct io_fd *fd, char *dest, int max) {
289 if (fd->eof && (!ioq_get_avail(&fd->recv) || (fd->line_len < 0))) return 0;
290 if (fd->line_len < 0) return -1;
291 if (fd->line_len < max) max = fd->line_len;
292 avail = ioq_get_avail(&fd->recv);
294 memcpy(dest, fd->recv.buf + fd->recv.get, avail);
295 fd->recv.get += avail;
296 assert(fd->recv.get == fd->recv.size);
302 memcpy(dest + done, fd->recv.buf + fd->recv.get, max - done);
303 fd->recv.get += max - done;
304 if (fd->recv.get == fd->recv.size) fd->recv.get = 0;
306 ioset_find_line_length(fd);
311 #define debug_fdsets(MSG, NFDS, READ_FDS, WRITE_FDS, EXCEPT_FDS, SELECT_TIMEOUT) (void)0
314 debug_fdsets(const char *msg, int nfds, fd_set *read_fds, fd_set *write_fds, fd_set *except_fds, struct timeval *select_timeout) {
315 static const char *flag_text[8] = { "---", "r", "w", "rw", "e", "er", "ew", "erw" };
320 for (pos=ii=0; ii<nfds; ++ii) {
321 flags = (read_fds && FD_ISSET(ii, read_fds)) ? 1 : 0;
322 flags |= (write_fds && FD_ISSET(ii, write_fds)) ? 2 : 0;
323 flags |= (except_fds && FD_ISSET(ii, except_fds)) ? 4 : 0;
324 if (!flags) continue;
325 pos += sprintf(buf+pos, " %d%s", ii, flag_text[flags]);
327 gettimeofday(&now, NULL);
328 if (select_timeout) {
329 log_module(MAIN_LOG, LOG_DEBUG, "%s, at "FMT_TIME_T".%06ld:%s (timeout "FMT_TIME_T".%06ld)", msg, now.tv_sec, now.tv_usec, buf, select_timeout->tv_sec, select_timeout->tv_usec);
331 log_module(MAIN_LOG, LOG_DEBUG, "%s, at "FMT_TIME_T".%06ld:%s (no timeout)", msg, now.tv_sec, now.tv_usec, buf);
338 extern struct io_fd *socket_io_fd;
339 struct timeval select_timeout;
341 int select_result, max_fd;
345 while (!quit_services) {
346 while (!socket_io_fd) uplink_connect();
348 /* How long to sleep? (fill in select_timeout) */
349 wakey = timeq_next();
350 if ((wakey - now) < 0) {
351 select_timeout.tv_sec = 0;
353 select_timeout.tv_sec = wakey - now;
355 select_timeout.tv_usec = 0;
357 /* Set up read_fds and write_fds fdsets. */
361 for (nn=0; nn<fds_size; nn++) {
362 if (!(fd = fds[nn])) continue;
364 if (fd->wants_reads) FD_SET(nn, &read_fds);
365 if ((fd->send.get != fd->send.put) || !fd->connected) FD_SET(nn, &write_fds);
368 /* Check for activity, update time. */
369 debug_fdsets("Entering select", max_fd+1, &read_fds, &write_fds, NULL, &select_timeout);
370 select_result = select(max_fd + 1, &read_fds, &write_fds, NULL, &select_timeout);
371 debug_fdsets("After select", max_fd+1, &read_fds, &write_fds, NULL, &select_timeout);
372 now = time(NULL) + clock_skew;
373 if (select_result < 0) {
374 if (errno != EINTR) {
375 log_module(MAIN_LOG, LOG_ERROR, "select() error %d: %s", errno, strerror(errno));
381 /* Call back anybody that has connect or read activity and wants to know. */
382 for (nn=0; nn<fds_size; nn++) {
383 if (!(fd = fds[nn])) continue;
384 if (FD_ISSET(nn, &read_fds)) {
385 if (fd->line_reads) {
386 ioset_buffered_read(fd);
391 if (FD_ISSET(nn, &write_fds) && !fd->connected) {
392 int rc, arglen = sizeof(rc);
393 if (getsockopt(fd->fd, SOL_SOCKET, SO_ERROR, &rc, &arglen) < 0) rc = errno;
395 if (fd->connect_cb) fd->connect_cb(fd, rc);
397 /* Note: check whether write FD is still set, since the
398 * connect_cb() might close the FD, making us dereference
399 * a free()'d pointer for the fd.
401 if (FD_ISSET(nn, &write_fds) && (fd->send.get != fd->send.put)) {
406 /* Call any timeq events we need to call. */
413 extern char *services_config;
414 conf_read(services_config);
421 ioset_write(struct io_fd *fd, const char *buf, unsigned int nbw) {
423 while (ioq_used(&fd->send) + nbw >= fd->send.size) {
426 avail = ioq_put_avail(&fd->send);
428 memcpy(fd->send.buf + fd->send.put, buf, avail);
433 memcpy(fd->send.buf + fd->send.put, buf, nbw);
435 if (fd->send.put == fd->send.size) fd->send.put = 0;
439 ioset_set_time(unsigned long new_now) {
440 clock_skew = new_now - time(NULL);