Initial import (again)
[srvx.git] / src / ioset.c
1 /* ioset.h - srvx event loop
2  * Copyright 2002-2004 srvx Development Team
3  *
4  * This program is free software; you can redistribute it and/or modify
5  * it under the terms of the GNU General Public License as published by
6  * the Free Software Foundation; either version 2 of the License, or
7  * (at your option) any later version.  Important limitations are
8  * listed in the COPYING file that accompanies this software.
9  *
10  * This program is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13  * GNU General Public License for more details.
14  *
15  * You should have received a copy of the GNU General Public License
16  * along with this program; if not, email srvx-maintainers@srvx.net.
17  */
18
19 #include "ioset.h"
20 #include "log.h"
21 #include "timeq.h"
22 #include "saxdb.h"
23 #include "conf.h"
24
25 #ifdef HAVE_FCNTL_H
26 #include <fcntl.h>
27 #endif
28 #ifdef HAVE_SYS_SELECT_H
29 #include <sys/select.h>
30 #endif
31 #ifdef HAVE_SYS_SOCKET_H
32 #include <sys/socket.h>
33 #endif
34
35 #ifndef IOSET_DEBUG
36 #define IOSET_DEBUG 0
37 #endif
38
39 #define IS_EOL(CH) ((CH) == '\n')
40
41 extern int uplink_connect(void);
42 static int clock_skew;
43 int do_write_dbs;
44 int do_reopen;
45
46 static struct io_fd **fds;
47 static unsigned int fds_size;
48 static fd_set read_fds, write_fds;
49
50 static void
51 ioq_init(struct ioq *ioq, int size) {
52     ioq->buf = malloc(size);
53     ioq->get = ioq->put = 0;
54     ioq->size = size;
55 }
56
57 static unsigned int
58 ioq_put_avail(const struct ioq *ioq) {
59     /* Subtract 1 from ioq->get to be sure we don't fill the buffer
60      * and make it look empty even when there's data in it. */
61     if (ioq->put < ioq->get) {
62         return ioq->get - ioq->put - 1;
63     } else if (ioq->get == 0) {
64         return ioq->size - ioq->put - 1;
65     } else {
66         return ioq->size - ioq->put;
67     }
68 }
69
70 static unsigned int
71 ioq_get_avail(const struct ioq *ioq) {
72     return ((ioq->put < ioq->get) ? ioq->size : ioq->put) - ioq->get;
73 }
74
75 static unsigned int
76 ioq_used(const struct ioq *ioq) {
77     return ((ioq->put < ioq->get) ? ioq->size : 0) + ioq->put - ioq->get;
78 }
79
80 static unsigned int
81 ioq_grow(struct ioq *ioq) {
82     int new_size = ioq->size << 1;
83     char *new_buf = malloc(new_size);
84     int get_avail = ioq_get_avail(ioq);
85     memcpy(new_buf, ioq->buf + ioq->get, get_avail);
86     if (ioq->put < ioq->get)
87         memcpy(new_buf + get_avail, ioq->buf, ioq->put);
88     free(ioq->buf);
89     ioq->put = ioq_used(ioq);
90     ioq->get = 0;
91     ioq->buf = new_buf;
92     ioq->size = new_size;
93     return new_size - ioq->put;
94 }
95
96 void
97 ioset_cleanup(void) {
98     free(fds);
99 }
100
101 struct io_fd *
102 ioset_add(int fd) {
103     struct io_fd *res;
104     int flags;
105
106     if (fd < 0) {
107         log_module(MAIN_LOG, LOG_ERROR, "Somebody called ioset_add(%d) on a negative fd!", fd);
108         return 0;
109     }
110     res = calloc(1, sizeof(*res));
111     if (!res)
112         return 0;
113     res->fd = fd;
114     ioq_init(&res->send, 1024);
115     ioq_init(&res->recv, 1024);
116     if ((unsigned)fd >= fds_size) {
117         unsigned int old_size = fds_size;
118         fds_size = fd + 8;
119         fds = realloc(fds, fds_size*sizeof(*fds));
120         memset(fds+old_size, 0, (fds_size-old_size)*sizeof(*fds));
121     }
122     fds[fd] = res;
123     flags = fcntl(fd, F_GETFL);
124     fcntl(fd, F_SETFL, flags|O_NONBLOCK);
125     return res;
126 }
127
128 struct io_fd *
129 ioset_connect(struct sockaddr *local, unsigned int sa_size, const char *peer, unsigned int port, int blocking, void *data, void (*connect_cb)(struct io_fd *fd, int error)) {
130     int fd, res;
131     struct io_fd *io_fd;
132     struct sockaddr_in sin;
133     unsigned long ip;
134
135     if (!getipbyname(peer, &ip)) {
136         log_module(MAIN_LOG, LOG_ERROR, "getipbyname(%s) failed.", peer);
137         return NULL;
138     }
139     sin.sin_addr.s_addr = ip;
140     if (local) {
141         if ((fd = socket(local->sa_family, SOCK_STREAM, 0)) < 0) {
142             log_module(MAIN_LOG, LOG_ERROR, "socket() for %s returned errno %d (%s)", peer, errno, strerror(errno));
143             return NULL;
144         }
145         if (bind(fd, local, sa_size) < 0) {
146             log_module(MAIN_LOG, LOG_ERROR, "bind() of socket for %s (fd %d) returned errno %d (%s).  Will let operating system choose.", peer, fd, errno, strerror(errno));
147         }
148     } else {
149         if ((fd = socket(PF_INET, SOCK_STREAM, 0)) < 0) {
150             log_module(MAIN_LOG, LOG_ERROR, "socket() for %s returned errno %d (%s).", peer, errno, strerror(errno));
151             return NULL;
152         }
153     }
154     sin.sin_family = AF_INET;
155     sin.sin_port = htons(port);
156     if (blocking) {
157         res = connect(fd, (struct sockaddr*)&sin, sizeof(sin));
158         io_fd = ioset_add(fd);
159     } else {
160         io_fd = ioset_add(fd);
161         res = connect(fd, (struct sockaddr*)&sin, sizeof(sin));
162     }
163     if (!io_fd) {
164         close(fd);
165         return NULL;
166     }
167     io_fd->data = data;
168     io_fd->connect_cb = connect_cb;
169     if (res < 0) {
170         switch (errno) {
171         case EINPROGRESS: /* only if !blocking */
172             return io_fd;
173         default:
174             log_module(MAIN_LOG, LOG_ERROR, "connect(%s:%d) (fd %d) returned errno %d (%s).", peer, port, io_fd->fd, errno, strerror(errno));
175             /* then fall through */
176         case EHOSTUNREACH:
177         case ECONNREFUSED:
178             ioset_close(io_fd->fd, 1);
179             return NULL;
180         }
181     }
182     if (connect_cb)
183         connect_cb(io_fd, ((res < 0) ? errno : 0));
184     return io_fd;
185 }
186
187 static void
188 ioset_try_write(struct io_fd *fd) {
189     int res;
190     unsigned int req = ioq_get_avail(&fd->send);
191     res = write(fd->fd, fd->send.buf+fd->send.get, req);
192     if (res < 0) {
193         switch (errno) {
194         case EAGAIN: break;
195         default:
196             log_module(MAIN_LOG, LOG_ERROR, "write() on fd %d error %d: %s", fd->fd, errno, strerror(errno));
197         }
198     } else {
199         fd->send.get += res;
200         if (fd->send.get == fd->send.size) fd->send.get = 0;
201     }
202 }
203
204 void
205 ioset_close(int fd, int os_close) {
206     struct io_fd *fdp;
207     if (!(fdp = fds[fd])) return;
208     fds[fd] = NULL;
209     if (fdp->destroy_cb) fdp->destroy_cb(fdp);
210     if (fdp->send.get != fdp->send.put) {
211         int flags = fcntl(fd, F_GETFL);
212         fcntl(fd, F_SETFL, flags&~O_NONBLOCK);
213         ioset_try_write(fdp);
214         /* it may need to send the beginning of the buffer now.. */
215         if (fdp->send.get != fdp->send.put) ioset_try_write(fdp);
216     }
217     free(fdp->send.buf);
218     free(fdp->recv.buf);
219     if (os_close) close(fd);
220     free(fdp);
221     FD_CLR(fd, &read_fds);
222     FD_CLR(fd, &write_fds);
223 }
224
225 static int
226 ioset_find_line_length(struct io_fd *fd) {
227     unsigned int pos, max, len;
228     len = 0;
229     max = (fd->recv.put < fd->recv.get) ? fd->recv.size : fd->recv.put;
230     for (pos = fd->recv.get; pos < max; ++pos, ++len) {
231         if (IS_EOL(fd->recv.buf[pos])) return fd->line_len = len + 1;
232     }
233     if (fd->recv.put < fd->recv.get) {
234         for (pos = 0; pos < fd->recv.put; ++pos, ++len) {
235             if (IS_EOL(fd->recv.buf[pos])) return fd->line_len = len + 1;
236         }
237     }
238     return fd->line_len = 0;
239 }
240
241 static void
242 ioset_buffered_read(struct io_fd *fd) {
243     int put_avail, nbr, fdnum;
244     
245     if (!(put_avail = ioq_put_avail(&fd->recv))) put_avail = ioq_grow(&fd->recv);
246     nbr = read(fd->fd, fd->recv.buf + fd->recv.put, put_avail);
247     if (nbr < 0) {
248         switch (errno) {
249         case EAGAIN: break;
250         default:
251             log_module(MAIN_LOG, LOG_ERROR, "Unexpected read() error %d on fd %d: %s", errno, fd->fd, strerror(errno));
252             /* Just flag it as EOF and call readable_cb() to notify the fd's owner. */
253             fd->eof = 1;
254             fd->wants_reads = 0;
255             fd->readable_cb(fd);
256         }
257     } else if (nbr == 0) {
258         fd->eof = 1;
259         fd->wants_reads = 0;
260         fd->readable_cb(fd);
261     } else {
262         if (fd->line_len == 0) {
263             unsigned int pos;
264             for (pos = fd->recv.put; pos < fd->recv.put + nbr; ++pos) {
265                 if (IS_EOL(fd->recv.buf[pos])) {
266                     if (fd->recv.put < fd->recv.get) {
267                         fd->line_len = fd->recv.size + pos + 1 - fd->recv.get;
268                     } else {
269                         fd->line_len = pos + 1 - fd->recv.get;
270                     }
271                     break;
272                 }
273             }
274         }
275         fd->recv.put += nbr;
276         if (fd->recv.put == fd->recv.size) fd->recv.put = 0;
277         fdnum = fd->fd;
278         while (fd->wants_reads && (fd->line_len > 0)) {
279             fd->readable_cb(fd);
280             if (!fds[fdnum]) break; /* make sure they didn't close on us */
281             ioset_find_line_length(fd);
282         }
283     }
284 }
285
286 int
287 ioset_line_read(struct io_fd *fd, char *dest, int max) {
288     int avail, done;
289     if (fd->eof && (!ioq_get_avail(&fd->recv) ||  (fd->line_len < 0))) return 0;
290     if (fd->line_len < 0) return -1;
291     if (fd->line_len < max) max = fd->line_len;
292     avail = ioq_get_avail(&fd->recv);
293     if (max > avail) {
294         memcpy(dest, fd->recv.buf + fd->recv.get, avail);
295         fd->recv.get += avail;
296         assert(fd->recv.get == fd->recv.size);
297         fd->recv.get = 0;
298         done = avail;
299     } else {
300         done = 0;
301     }
302     memcpy(dest + done, fd->recv.buf + fd->recv.get, max - done);
303     fd->recv.get += max - done;
304     if (fd->recv.get == fd->recv.size) fd->recv.get = 0;
305     dest[max] = 0;
306     ioset_find_line_length(fd);
307     return max;
308 }
309
310 #if 1
311 #define debug_fdsets(MSG, NFDS, READ_FDS, WRITE_FDS, EXCEPT_FDS, SELECT_TIMEOUT) (void)0
312 #else
313 static void
314 debug_fdsets(const char *msg, int nfds, fd_set *read_fds, fd_set *write_fds, fd_set *except_fds, struct timeval *select_timeout) {
315     static const char *flag_text[8] = { "---", "r", "w", "rw", "e", "er", "ew", "erw" };
316     char buf[MAXLEN];
317     int pos, ii, flags;
318     struct timeval now;
319
320     for (pos=ii=0; ii<nfds; ++ii) {
321         flags  = (read_fds && FD_ISSET(ii, read_fds)) ? 1 : 0;
322         flags |= (write_fds && FD_ISSET(ii, write_fds)) ? 2 : 0;
323         flags |= (except_fds && FD_ISSET(ii, except_fds)) ? 4 : 0;
324         if (!flags) continue;
325         pos += sprintf(buf+pos, " %d%s", ii, flag_text[flags]);
326     }
327     gettimeofday(&now, NULL);
328     if (select_timeout) {
329         log_module(MAIN_LOG, LOG_DEBUG, "%s, at "FMT_TIME_T".%06ld:%s (timeout "FMT_TIME_T".%06ld)", msg, now.tv_sec, now.tv_usec, buf, select_timeout->tv_sec, select_timeout->tv_usec);
330     } else {
331         log_module(MAIN_LOG, LOG_DEBUG, "%s, at "FMT_TIME_T".%06ld:%s (no timeout)", msg, now.tv_sec, now.tv_usec, buf);
332     }
333 }
334 #endif
335
336 void
337 ioset_run(void) {
338     extern struct io_fd *socket_io_fd;
339     struct timeval select_timeout;
340     unsigned int nn;
341     int select_result, max_fd;
342     time_t wakey;
343     struct io_fd *fd;
344
345     while (!quit_services) {
346         while (!socket_io_fd) uplink_connect();
347
348         /* How long to sleep? (fill in select_timeout) */
349         wakey = timeq_next();
350         if ((wakey - now) < 0) {
351             select_timeout.tv_sec = 0;
352         } else {
353             select_timeout.tv_sec = wakey - now;
354         }
355         select_timeout.tv_usec = 0;
356
357         /* Set up read_fds and write_fds fdsets. */
358         FD_ZERO(&read_fds);
359         FD_ZERO(&write_fds);
360         max_fd = 0;
361         for (nn=0; nn<fds_size; nn++) {
362             if (!(fd = fds[nn])) continue;
363             max_fd = nn;
364             if (fd->wants_reads) FD_SET(nn, &read_fds);
365             if ((fd->send.get != fd->send.put) || !fd->connected) FD_SET(nn, &write_fds);
366         }
367
368         /* Check for activity, update time. */
369         debug_fdsets("Entering select", max_fd+1, &read_fds, &write_fds, NULL, &select_timeout);
370         select_result = select(max_fd + 1, &read_fds, &write_fds, NULL, &select_timeout);
371         debug_fdsets("After select", max_fd+1, &read_fds, &write_fds, NULL, &select_timeout);
372         now = time(NULL) + clock_skew;
373         if (select_result < 0) {
374             if (errno != EINTR) {
375                 log_module(MAIN_LOG, LOG_ERROR, "select() error %d: %s", errno, strerror(errno));
376                 close_socket();
377             }
378             continue;
379         }
380
381         /* Call back anybody that has connect or read activity and wants to know. */
382         for (nn=0; nn<fds_size; nn++) {
383             if (!(fd = fds[nn])) continue;
384             if (FD_ISSET(nn, &read_fds)) {
385                 if (fd->line_reads) {
386                     ioset_buffered_read(fd);
387                 } else {
388                     fd->readable_cb(fd);
389                 }
390             }
391             if (FD_ISSET(nn, &write_fds) && !fd->connected) {
392                 int rc, arglen = sizeof(rc);
393                 if (getsockopt(fd->fd, SOL_SOCKET, SO_ERROR, &rc, &arglen) < 0) rc = errno;
394                 fd->connected = 1;
395                 if (fd->connect_cb) fd->connect_cb(fd, rc);
396             }
397             /* Note: check whether write FD is still set, since the
398              * connect_cb() might close the FD, making us dereference
399              * a free()'d pointer for the fd.
400              */
401             if (FD_ISSET(nn, &write_fds) && (fd->send.get != fd->send.put)) {
402                 ioset_try_write(fd);
403             }
404         }
405
406         /* Call any timeq events we need to call. */
407         timeq_run();
408         if (do_write_dbs) {
409             saxdb_write_all();
410             do_write_dbs = 0;
411         }
412         if (do_reopen) {
413             extern char *services_config;
414             conf_read(services_config);
415             do_reopen = 0;
416         }
417     }
418 }
419
420 void
421 ioset_write(struct io_fd *fd, const char *buf, unsigned int nbw) {
422     unsigned int avail;
423     while (ioq_used(&fd->send) + nbw >= fd->send.size) {
424         ioq_grow(&fd->send);
425     }
426     avail = ioq_put_avail(&fd->send);
427     if (nbw > avail) {
428         memcpy(fd->send.buf + fd->send.put, buf, avail);
429         buf += avail;
430         nbw -= avail;
431         fd->send.put = 0;
432     }
433     memcpy(fd->send.buf + fd->send.put, buf, nbw);
434     fd->send.put += nbw;
435     if (fd->send.put == fd->send.size) fd->send.put = 0;
436 }
437
438 void
439 ioset_set_time(unsigned long new_now) {
440     clock_skew = new_now - time(NULL);
441     now = new_now;
442 }