Add IPv6 support.
[srvx.git] / src / ioset.c
1 /* ioset.h - srvx event loop
2  * Copyright 2002-2004 srvx Development Team
3  *
4  * This file is part of srvx.
5  *
6  * srvx is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License as published by
8  * the Free Software Foundation; either version 2 of the License, or
9  * (at your option) any later version.
10  *
11  * This program is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14  * GNU General Public License for more details.
15  *
16  * You should have received a copy of the GNU General Public License
17  * along with srvx; if not, write to the Free Software Foundation,
18  * Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA.
19  */
20
21 #include "ioset.h"
22 #include "log.h"
23 #include "timeq.h"
24 #include "saxdb.h"
25 #include "conf.h"
26
27 #ifdef HAVE_FCNTL_H
28 #include <fcntl.h>
29 #endif
30 #ifdef HAVE_SYS_SELECT_H
31 #include <sys/select.h>
32 #endif
33 #ifdef HAVE_SYS_SOCKET_H
34 #include <sys/socket.h>
35 #endif
36
37 #ifndef IOSET_DEBUG
38 #define IOSET_DEBUG 0
39 #endif
40
41 #define IS_EOL(CH) ((CH) == '\n')
42
43 extern int uplink_connect(void);
44 static int clock_skew;
45 int do_write_dbs;
46 int do_reopen;
47
48 static struct io_fd **fds;
49 static unsigned int fds_size;
50 static fd_set read_fds, write_fds;
51
52 static void
53 ioq_init(struct ioq *ioq, int size) {
54     ioq->buf = malloc(size);
55     ioq->get = ioq->put = 0;
56     ioq->size = size;
57 }
58
59 static unsigned int
60 ioq_put_avail(const struct ioq *ioq) {
61     /* Subtract 1 from ioq->get to be sure we don't fill the buffer
62      * and make it look empty even when there's data in it. */
63     if (ioq->put < ioq->get)
64         return ioq->get - ioq->put - 1;
65     else if (ioq->get == 0)
66         return ioq->size - ioq->put - 1;
67     else
68         return ioq->size - ioq->put;
69 }
70
71 static unsigned int
72 ioq_get_avail(const struct ioq *ioq) {
73     return ((ioq->put < ioq->get) ? ioq->size : ioq->put) - ioq->get;
74 }
75
76 static unsigned int
77 ioq_used(const struct ioq *ioq) {
78     return ((ioq->put < ioq->get) ? ioq->size : 0) + ioq->put - ioq->get;
79 }
80
81 static unsigned int
82 ioq_grow(struct ioq *ioq) {
83     int new_size = ioq->size << 1;
84     char *new_buf = malloc(new_size);
85     int get_avail = ioq_get_avail(ioq);
86     memcpy(new_buf, ioq->buf + ioq->get, get_avail);
87     if (ioq->put < ioq->get)
88         memcpy(new_buf + get_avail, ioq->buf, ioq->put);
89     free(ioq->buf);
90     ioq->put = ioq_used(ioq);
91     ioq->get = 0;
92     ioq->buf = new_buf;
93     ioq->size = new_size;
94     return new_size - ioq->put;
95 }
96
97 void
98 ioset_cleanup(void) {
99     free(fds);
100 }
101
102 struct io_fd *
103 ioset_add(int fd) {
104     struct io_fd *res;
105     int flags;
106
107     if (fd < 0) {
108         log_module(MAIN_LOG, LOG_ERROR, "Somebody called ioset_add(%d) on a negative fd!", fd);
109         return 0;
110     }
111     res = calloc(1, sizeof(*res));
112     if (!res)
113         return 0;
114     res->fd = fd;
115     ioq_init(&res->send, 1024);
116     ioq_init(&res->recv, 1024);
117     if ((unsigned)fd >= fds_size) {
118         unsigned int old_size = fds_size;
119         fds_size = fd + 8;
120         fds = realloc(fds, fds_size*sizeof(*fds));
121         memset(fds+old_size, 0, (fds_size-old_size)*sizeof(*fds));
122     }
123     fds[fd] = res;
124     flags = fcntl(fd, F_GETFL);
125     fcntl(fd, F_SETFL, flags|O_NONBLOCK);
126     return res;
127 }
128
129 struct io_fd *
130 ioset_connect(struct sockaddr *local, unsigned int sa_size, const char *peer, unsigned int port, int blocking, void *data, void (*connect_cb)(struct io_fd *fd, int error)) {
131     int fd, res;
132     struct io_fd *io_fd;
133     struct addrinfo hints, *ai;
134     char portnum[10];
135
136     memset(&hints, 0, sizeof(hints));
137     hints.ai_family = local ? local->sa_family : 0;
138     hints.ai_socktype = SOCK_STREAM;
139     snprintf(portnum, sizeof(portnum), "%u", port);
140     if (getaddrinfo(peer, portnum, &hints, &ai)) {
141         log_module(MAIN_LOG, LOG_ERROR, "getaddrinfo(%s, %s) failed.", peer, portnum);
142         return NULL;
143     }
144
145     if (local) {
146         if ((fd = socket(local->sa_family, SOCK_STREAM, 0)) < 0) {
147             log_module(MAIN_LOG, LOG_ERROR, "socket() for %s returned errno %d (%s)", peer, errno, strerror(errno));
148             freeaddrinfo(ai);
149             return NULL;
150         }
151         if (bind(fd, local, sa_size) < 0) {
152             log_module(MAIN_LOG, LOG_ERROR, "bind() of socket for %s (fd %d) returned errno %d (%s).  Will let operating system choose.", peer, fd, errno, strerror(errno));
153         }
154     } else {
155         if ((fd = socket(PF_INET, SOCK_STREAM, 0)) < 0) {
156             log_module(MAIN_LOG, LOG_ERROR, "socket() for %s returned errno %d (%s).", peer, errno, strerror(errno));
157             freeaddrinfo(ai);
158             return NULL;
159         }
160     }
161
162     if (blocking) {
163         res = connect(fd, ai->ai_addr, ai->ai_addrlen);
164         io_fd = ioset_add(fd);
165     } else {
166         io_fd = ioset_add(fd);
167         res = connect(fd, ai->ai_addr, ai->ai_addrlen);
168     }
169     freeaddrinfo(ai);
170     if (!io_fd) {
171         close(fd);
172         return NULL;
173     }
174     io_fd->data = data;
175     io_fd->connect_cb = connect_cb;
176     if (res < 0) {
177         switch (errno) {
178         case EINPROGRESS: /* only if !blocking */
179             return io_fd;
180         default:
181             log_module(MAIN_LOG, LOG_ERROR, "connect(%s:%d) (fd %d) returned errno %d (%s).", peer, port, io_fd->fd, errno, strerror(errno));
182             /* then fall through */
183         case EHOSTUNREACH:
184         case ECONNREFUSED:
185             ioset_close(io_fd->fd, 1);
186             return NULL;
187         }
188     }
189     if (connect_cb)
190         connect_cb(io_fd, ((res < 0) ? errno : 0));
191     return io_fd;
192 }
193
194 static void
195 ioset_try_write(struct io_fd *fd) {
196     int res;
197     unsigned int req = ioq_get_avail(&fd->send);
198     res = write(fd->fd, fd->send.buf+fd->send.get, req);
199     if (res < 0) {
200         switch (errno) {
201         case EAGAIN:
202             break;
203         default:
204             log_module(MAIN_LOG, LOG_ERROR, "write() on fd %d error %d: %s", fd->fd, errno, strerror(errno));
205         }
206     } else {
207         fd->send.get += res;
208         if (fd->send.get == fd->send.size)
209             fd->send.get = 0;
210     }
211 }
212
213 void
214 ioset_close(int fd, int os_close) {
215     struct io_fd *fdp;
216     if (!(fdp = fds[fd]))
217         return;
218     fds[fd] = NULL;
219     if (fdp->destroy_cb)
220         fdp->destroy_cb(fdp);
221     if (fdp->send.get != fdp->send.put) {
222         int flags = fcntl(fd, F_GETFL);
223         fcntl(fd, F_SETFL, flags&~O_NONBLOCK);
224         ioset_try_write(fdp);
225         /* it may need to send the beginning of the buffer now.. */
226         if (fdp->send.get != fdp->send.put)
227             ioset_try_write(fdp);
228     }
229     free(fdp->send.buf);
230     free(fdp->recv.buf);
231     if (os_close)
232         close(fd);
233     free(fdp);
234     FD_CLR(fd, &read_fds);
235     FD_CLR(fd, &write_fds);
236 }
237
238 static int
239 ioset_find_line_length(struct io_fd *fd) {
240     unsigned int pos, max, len;
241     len = 0;
242     max = (fd->recv.put < fd->recv.get) ? fd->recv.size : fd->recv.put;
243     for (pos = fd->recv.get; pos < max; ++pos, ++len)
244         if (IS_EOL(fd->recv.buf[pos]))
245             return fd->line_len = len + 1;
246     if (fd->recv.put < fd->recv.get)
247         for (pos = 0; pos < fd->recv.put; ++pos, ++len)
248             if (IS_EOL(fd->recv.buf[pos]))
249                 return fd->line_len = len + 1;
250     return fd->line_len = 0;
251 }
252
253 static void
254 ioset_buffered_read(struct io_fd *fd) {
255     int put_avail, nbr, fdnum;
256     
257     if (!(put_avail = ioq_put_avail(&fd->recv)))
258         put_avail = ioq_grow(&fd->recv);
259     nbr = read(fd->fd, fd->recv.buf + fd->recv.put, put_avail);
260     if (nbr < 0) {
261         switch (errno) {
262         case EAGAIN:
263             break;
264         default:
265             log_module(MAIN_LOG, LOG_ERROR, "Unexpected read() error %d on fd %d: %s", errno, fd->fd, strerror(errno));
266             /* Just flag it as EOF and call readable_cb() to notify the fd's owner. */
267             fd->eof = 1;
268             fd->wants_reads = 0;
269             fd->readable_cb(fd);
270         }
271     } else if (nbr == 0) {
272         fd->eof = 1;
273         fd->wants_reads = 0;
274         fd->readable_cb(fd);
275     } else {
276         if (fd->line_len == 0) {
277             unsigned int pos;
278             for (pos = fd->recv.put; pos < fd->recv.put + nbr; ++pos) {
279                 if (IS_EOL(fd->recv.buf[pos])) {
280                     if (fd->recv.put < fd->recv.get)
281                         fd->line_len = fd->recv.size + pos + 1 - fd->recv.get;
282                     else
283                         fd->line_len = pos + 1 - fd->recv.get;
284                     break;
285                 }
286             }
287         }
288         fd->recv.put += nbr;
289         if (fd->recv.put == fd->recv.size)
290             fd->recv.put = 0;
291         fdnum = fd->fd;
292         while (fd->wants_reads && (fd->line_len > 0)) {
293             fd->readable_cb(fd);
294             if (!fds[fdnum])
295                 break; /* make sure they didn't close on us */
296             ioset_find_line_length(fd);
297         }
298     }
299 }
300
301 int
302 ioset_line_read(struct io_fd *fd, char *dest, int max) {
303     int avail, done;
304     if (fd->eof && (!ioq_get_avail(&fd->recv) ||  (fd->line_len < 0)))
305         return 0;
306     if (fd->line_len < 0)
307         return -1;
308     if (fd->line_len < max)
309         max = fd->line_len;
310     avail = ioq_get_avail(&fd->recv);
311     if (max > avail) {
312         memcpy(dest, fd->recv.buf + fd->recv.get, avail);
313         fd->recv.get += avail;
314         assert(fd->recv.get == fd->recv.size);
315         fd->recv.get = 0;
316         done = avail;
317     } else {
318         done = 0;
319     }
320     memcpy(dest + done, fd->recv.buf + fd->recv.get, max - done);
321     fd->recv.get += max - done;
322     if (fd->recv.get == fd->recv.size)
323         fd->recv.get = 0;
324     dest[max] = 0;
325     ioset_find_line_length(fd);
326     return max;
327 }
328
329 #if 1
330 #define debug_fdsets(MSG, NFDS, READ_FDS, WRITE_FDS, EXCEPT_FDS, SELECT_TIMEOUT) (void)0
331 #else
332 static void
333 debug_fdsets(const char *msg, int nfds, fd_set *read_fds, fd_set *write_fds, fd_set *except_fds, struct timeval *select_timeout) {
334     static const char *flag_text[8] = { "---", "r", "w", "rw", "e", "er", "ew", "erw" };
335     char buf[MAXLEN];
336     int pos, ii, flags;
337     struct timeval now;
338
339     for (pos=ii=0; ii<nfds; ++ii) {
340         flags  = (read_fds && FD_ISSET(ii, read_fds)) ? 1 : 0;
341         flags |= (write_fds && FD_ISSET(ii, write_fds)) ? 2 : 0;
342         flags |= (except_fds && FD_ISSET(ii, except_fds)) ? 4 : 0;
343         if (!flags)
344             continue;
345         pos += sprintf(buf+pos, " %d%s", ii, flag_text[flags]);
346     }
347     gettimeofday(&now, NULL);
348     if (select_timeout) {
349         log_module(MAIN_LOG, LOG_DEBUG, "%s, at "FMT_TIME_T".%06ld:%s (timeout "FMT_TIME_T".%06ld)", msg, now.tv_sec, now.tv_usec, buf, select_timeout->tv_sec, select_timeout->tv_usec);
350     } else {
351         log_module(MAIN_LOG, LOG_DEBUG, "%s, at "FMT_TIME_T".%06ld:%s (no timeout)", msg, now.tv_sec, now.tv_usec, buf);
352     }
353 }
354 #endif
355
356 void
357 ioset_run(void) {
358     extern struct io_fd *socket_io_fd;
359     struct timeval select_timeout;
360     unsigned int nn;
361     int select_result, max_fd;
362     time_t wakey;
363     struct io_fd *fd;
364
365     while (!quit_services) {
366         while (!socket_io_fd)
367             uplink_connect();
368
369         /* How long to sleep? (fill in select_timeout) */
370         wakey = timeq_next();
371         if ((wakey - now) < 0)
372             select_timeout.tv_sec = 0;
373         else
374             select_timeout.tv_sec = wakey - now;
375         select_timeout.tv_usec = 0;
376
377         /* Set up read_fds and write_fds fdsets. */
378         FD_ZERO(&read_fds);
379         FD_ZERO(&write_fds);
380         max_fd = 0;
381         for (nn=0; nn<fds_size; nn++) {
382             if (!(fd = fds[nn]))
383                 continue;
384             max_fd = nn;
385             if (fd->wants_reads)
386                 FD_SET(nn, &read_fds);
387             if ((fd->send.get != fd->send.put) || !fd->connected)
388                 FD_SET(nn, &write_fds);
389         }
390
391         /* Check for activity, update time. */
392         debug_fdsets("Entering select", max_fd+1, &read_fds, &write_fds, NULL, &select_timeout);
393         select_result = select(max_fd + 1, &read_fds, &write_fds, NULL, &select_timeout);
394         debug_fdsets("After select", max_fd+1, &read_fds, &write_fds, NULL, &select_timeout);
395         now = time(NULL) + clock_skew;
396         if (select_result < 0) {
397             if (errno != EINTR) {
398                 log_module(MAIN_LOG, LOG_ERROR, "select() error %d: %s", errno, strerror(errno));
399                 close_socket();
400             }
401             continue;
402         }
403
404         /* Call back anybody that has connect or read activity and wants to know. */
405         for (nn=0; nn<fds_size; nn++) {
406             if (!(fd = fds[nn]))
407                 continue;
408             if (FD_ISSET(nn, &read_fds)) {
409                 if (fd->line_reads)
410                     ioset_buffered_read(fd);
411                 else
412                     fd->readable_cb(fd);
413             }
414             if (FD_ISSET(nn, &write_fds) && !fd->connected) {
415                 socklen_t arglen;
416                 int rc;
417
418                 arglen = sizeof(rc);
419                 if (getsockopt(fd->fd, SOL_SOCKET, SO_ERROR, &rc, &arglen) < 0)
420                     rc = errno;
421                 fd->connected = 1;
422                 if (fd->connect_cb)
423                     fd->connect_cb(fd, rc);
424             }
425             /* Note: check whether write FD is still set, since the
426              * connect_cb() might close the FD, making us dereference
427              * a free()'d pointer for the fd.
428              */
429             if (FD_ISSET(nn, &write_fds) && (fd->send.get != fd->send.put))
430                 ioset_try_write(fd);
431         }
432
433         /* Call any timeq events we need to call. */
434         timeq_run();
435         if (do_write_dbs) {
436             saxdb_write_all();
437             do_write_dbs = 0;
438         }
439         if (do_reopen) {
440             extern char *services_config;
441             conf_read(services_config);
442             do_reopen = 0;
443         }
444     }
445 }
446
447 void
448 ioset_write(struct io_fd *fd, const char *buf, unsigned int nbw) {
449     unsigned int avail;
450     while (ioq_used(&fd->send) + nbw >= fd->send.size)
451         ioq_grow(&fd->send);
452     avail = ioq_put_avail(&fd->send);
453     if (nbw > avail) {
454         memcpy(fd->send.buf + fd->send.put, buf, avail);
455         buf += avail;
456         nbw -= avail;
457         fd->send.put = 0;
458     }
459     memcpy(fd->send.buf + fd->send.put, buf, nbw);
460     fd->send.put += nbw;
461     if (fd->send.put == fd->send.size)
462         fd->send.put = 0;
463 }
464
465 void
466 ioset_set_time(unsigned long new_now) {
467     clock_skew = new_now - time(NULL);
468     now = new_now;
469 }