Indenting cleanups, +sp fix, PING fix
[srvx.git] / src / ioset.c
1 /* ioset.h - srvx event loop
2  * Copyright 2002-2004 srvx Development Team
3  *
4  * This program is free software; you can redistribute it and/or modify
5  * it under the terms of the GNU General Public License as published by
6  * the Free Software Foundation; either version 2 of the License, or
7  * (at your option) any later version.  Important limitations are
8  * listed in the COPYING file that accompanies this software.
9  *
10  * This program is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13  * GNU General Public License for more details.
14  *
15  * You should have received a copy of the GNU General Public License
16  * along with this program; if not, email srvx-maintainers@srvx.net.
17  */
18
19 #include "ioset.h"
20 #include "log.h"
21 #include "timeq.h"
22 #include "saxdb.h"
23 #include "conf.h"
24
25 #ifdef HAVE_FCNTL_H
26 #include <fcntl.h>
27 #endif
28 #ifdef HAVE_SYS_SELECT_H
29 #include <sys/select.h>
30 #endif
31 #ifdef HAVE_SYS_SOCKET_H
32 #include <sys/socket.h>
33 #endif
34
35 #ifndef IOSET_DEBUG
36 #define IOSET_DEBUG 0
37 #endif
38
39 #define IS_EOL(CH) ((CH) == '\n')
40
41 extern int uplink_connect(void);
42 static int clock_skew;
43 int do_write_dbs;
44 int do_reopen;
45
46 static struct io_fd **fds;
47 static unsigned int fds_size;
48 static fd_set read_fds, write_fds;
49
50 static void
51 ioq_init(struct ioq *ioq, int size) {
52     ioq->buf = malloc(size);
53     ioq->get = ioq->put = 0;
54     ioq->size = size;
55 }
56
57 static unsigned int
58 ioq_put_avail(const struct ioq *ioq) {
59     /* Subtract 1 from ioq->get to be sure we don't fill the buffer
60      * and make it look empty even when there's data in it. */
61     if (ioq->put < ioq->get)
62         return ioq->get - ioq->put - 1;
63     else if (ioq->get == 0)
64         return ioq->size - ioq->put - 1;
65     else
66         return ioq->size - ioq->put;
67 }
68
69 static unsigned int
70 ioq_get_avail(const struct ioq *ioq) {
71     return ((ioq->put < ioq->get) ? ioq->size : ioq->put) - ioq->get;
72 }
73
74 static unsigned int
75 ioq_used(const struct ioq *ioq) {
76     return ((ioq->put < ioq->get) ? ioq->size : 0) + ioq->put - ioq->get;
77 }
78
79 static unsigned int
80 ioq_grow(struct ioq *ioq) {
81     int new_size = ioq->size << 1;
82     char *new_buf = malloc(new_size);
83     int get_avail = ioq_get_avail(ioq);
84     memcpy(new_buf, ioq->buf + ioq->get, get_avail);
85     if (ioq->put < ioq->get)
86         memcpy(new_buf + get_avail, ioq->buf, ioq->put);
87     free(ioq->buf);
88     ioq->put = ioq_used(ioq);
89     ioq->get = 0;
90     ioq->buf = new_buf;
91     ioq->size = new_size;
92     return new_size - ioq->put;
93 }
94
95 void
96 ioset_cleanup(void) {
97     free(fds);
98 }
99
100 struct io_fd *
101 ioset_add(int fd) {
102     struct io_fd *res;
103     int flags;
104
105     if (fd < 0) {
106         log_module(MAIN_LOG, LOG_ERROR, "Somebody called ioset_add(%d) on a negative fd!", fd);
107         return 0;
108     }
109     res = calloc(1, sizeof(*res));
110     if (!res)
111         return 0;
112     res->fd = fd;
113     ioq_init(&res->send, 1024);
114     ioq_init(&res->recv, 1024);
115     if ((unsigned)fd >= fds_size) {
116         unsigned int old_size = fds_size;
117         fds_size = fd + 8;
118         fds = realloc(fds, fds_size*sizeof(*fds));
119         memset(fds+old_size, 0, (fds_size-old_size)*sizeof(*fds));
120     }
121     fds[fd] = res;
122     flags = fcntl(fd, F_GETFL);
123     fcntl(fd, F_SETFL, flags|O_NONBLOCK);
124     return res;
125 }
126
127 struct io_fd *
128 ioset_connect(struct sockaddr *local, unsigned int sa_size, const char *peer, unsigned int port, int blocking, void *data, void (*connect_cb)(struct io_fd *fd, int error)) {
129     int fd, res;
130     struct io_fd *io_fd;
131     struct sockaddr_in sin;
132     unsigned long ip;
133
134     if (!getipbyname(peer, &ip)) {
135         log_module(MAIN_LOG, LOG_ERROR, "getipbyname(%s) failed.", peer);
136         return NULL;
137     }
138     sin.sin_addr.s_addr = ip;
139     if (local) {
140         if ((fd = socket(local->sa_family, SOCK_STREAM, 0)) < 0) {
141             log_module(MAIN_LOG, LOG_ERROR, "socket() for %s returned errno %d (%s)", peer, errno, strerror(errno));
142             return NULL;
143         }
144         if (bind(fd, local, sa_size) < 0) {
145             log_module(MAIN_LOG, LOG_ERROR, "bind() of socket for %s (fd %d) returned errno %d (%s).  Will let operating system choose.", peer, fd, errno, strerror(errno));
146         }
147     } else {
148         if ((fd = socket(PF_INET, SOCK_STREAM, 0)) < 0) {
149             log_module(MAIN_LOG, LOG_ERROR, "socket() for %s returned errno %d (%s).", peer, errno, strerror(errno));
150             return NULL;
151         }
152     }
153     sin.sin_family = AF_INET;
154     sin.sin_port = htons(port);
155     if (blocking) {
156         res = connect(fd, (struct sockaddr*)&sin, sizeof(sin));
157         io_fd = ioset_add(fd);
158     } else {
159         io_fd = ioset_add(fd);
160         res = connect(fd, (struct sockaddr*)&sin, sizeof(sin));
161     }
162     if (!io_fd) {
163         close(fd);
164         return NULL;
165     }
166     io_fd->data = data;
167     io_fd->connect_cb = connect_cb;
168     if (res < 0) {
169         switch (errno) {
170         case EINPROGRESS: /* only if !blocking */
171             return io_fd;
172         default:
173             log_module(MAIN_LOG, LOG_ERROR, "connect(%s:%d) (fd %d) returned errno %d (%s).", peer, port, io_fd->fd, errno, strerror(errno));
174             /* then fall through */
175         case EHOSTUNREACH:
176         case ECONNREFUSED:
177             ioset_close(io_fd->fd, 1);
178             return NULL;
179         }
180     }
181     if (connect_cb)
182         connect_cb(io_fd, ((res < 0) ? errno : 0));
183     return io_fd;
184 }
185
186 static void
187 ioset_try_write(struct io_fd *fd) {
188     int res;
189     unsigned int req = ioq_get_avail(&fd->send);
190     res = write(fd->fd, fd->send.buf+fd->send.get, req);
191     if (res < 0) {
192         switch (errno) {
193         case EAGAIN:
194             break;
195         default:
196             log_module(MAIN_LOG, LOG_ERROR, "write() on fd %d error %d: %s", fd->fd, errno, strerror(errno));
197         }
198     } else {
199         fd->send.get += res;
200         if (fd->send.get == fd->send.size)
201             fd->send.get = 0;
202     }
203 }
204
205 void
206 ioset_close(int fd, int os_close) {
207     struct io_fd *fdp;
208     if (!(fdp = fds[fd]))
209         return;
210     fds[fd] = NULL;
211     if (fdp->destroy_cb)
212         fdp->destroy_cb(fdp);
213     if (fdp->send.get != fdp->send.put) {
214         int flags = fcntl(fd, F_GETFL);
215         fcntl(fd, F_SETFL, flags&~O_NONBLOCK);
216         ioset_try_write(fdp);
217         /* it may need to send the beginning of the buffer now.. */
218         if (fdp->send.get != fdp->send.put)
219             ioset_try_write(fdp);
220     }
221     free(fdp->send.buf);
222     free(fdp->recv.buf);
223     if (os_close)
224         close(fd);
225     free(fdp);
226     FD_CLR(fd, &read_fds);
227     FD_CLR(fd, &write_fds);
228 }
229
230 static int
231 ioset_find_line_length(struct io_fd *fd) {
232     unsigned int pos, max, len;
233     len = 0;
234     max = (fd->recv.put < fd->recv.get) ? fd->recv.size : fd->recv.put;
235     for (pos = fd->recv.get; pos < max; ++pos, ++len)
236         if (IS_EOL(fd->recv.buf[pos]))
237             return fd->line_len = len + 1;
238     if (fd->recv.put < fd->recv.get)
239         for (pos = 0; pos < fd->recv.put; ++pos, ++len)
240             if (IS_EOL(fd->recv.buf[pos]))
241                 return fd->line_len = len + 1;
242     return fd->line_len = 0;
243 }
244
245 static void
246 ioset_buffered_read(struct io_fd *fd) {
247     int put_avail, nbr, fdnum;
248     
249     if (!(put_avail = ioq_put_avail(&fd->recv)))
250         put_avail = ioq_grow(&fd->recv);
251     nbr = read(fd->fd, fd->recv.buf + fd->recv.put, put_avail);
252     if (nbr < 0) {
253         switch (errno) {
254         case EAGAIN:
255             break;
256         default:
257             log_module(MAIN_LOG, LOG_ERROR, "Unexpected read() error %d on fd %d: %s", errno, fd->fd, strerror(errno));
258             /* Just flag it as EOF and call readable_cb() to notify the fd's owner. */
259             fd->eof = 1;
260             fd->wants_reads = 0;
261             fd->readable_cb(fd);
262         }
263     } else if (nbr == 0) {
264         fd->eof = 1;
265         fd->wants_reads = 0;
266         fd->readable_cb(fd);
267     } else {
268         if (fd->line_len == 0) {
269             unsigned int pos;
270             for (pos = fd->recv.put; pos < fd->recv.put + nbr; ++pos) {
271                 if (IS_EOL(fd->recv.buf[pos])) {
272                     if (fd->recv.put < fd->recv.get)
273                         fd->line_len = fd->recv.size + pos + 1 - fd->recv.get;
274                     else
275                         fd->line_len = pos + 1 - fd->recv.get;
276                     break;
277                 }
278             }
279         }
280         fd->recv.put += nbr;
281         if (fd->recv.put == fd->recv.size)
282             fd->recv.put = 0;
283         fdnum = fd->fd;
284         while (fd->wants_reads && (fd->line_len > 0)) {
285             fd->readable_cb(fd);
286             if (!fds[fdnum])
287                 break; /* make sure they didn't close on us */
288             ioset_find_line_length(fd);
289         }
290     }
291 }
292
293 int
294 ioset_line_read(struct io_fd *fd, char *dest, int max) {
295     int avail, done;
296     if (fd->eof && (!ioq_get_avail(&fd->recv) ||  (fd->line_len < 0)))
297         return 0;
298     if (fd->line_len < 0)
299         return -1;
300     if (fd->line_len < max)
301         max = fd->line_len;
302     avail = ioq_get_avail(&fd->recv);
303     if (max > avail) {
304         memcpy(dest, fd->recv.buf + fd->recv.get, avail);
305         fd->recv.get += avail;
306         assert(fd->recv.get == fd->recv.size);
307         fd->recv.get = 0;
308         done = avail;
309     } else {
310         done = 0;
311     }
312     memcpy(dest + done, fd->recv.buf + fd->recv.get, max - done);
313     fd->recv.get += max - done;
314     if (fd->recv.get == fd->recv.size)
315         fd->recv.get = 0;
316     dest[max] = 0;
317     ioset_find_line_length(fd);
318     return max;
319 }
320
321 #if 1
322 #define debug_fdsets(MSG, NFDS, READ_FDS, WRITE_FDS, EXCEPT_FDS, SELECT_TIMEOUT) (void)0
323 #else
324 static void
325 debug_fdsets(const char *msg, int nfds, fd_set *read_fds, fd_set *write_fds, fd_set *except_fds, struct timeval *select_timeout) {
326     static const char *flag_text[8] = { "---", "r", "w", "rw", "e", "er", "ew", "erw" };
327     char buf[MAXLEN];
328     int pos, ii, flags;
329     struct timeval now;
330
331     for (pos=ii=0; ii<nfds; ++ii) {
332         flags  = (read_fds && FD_ISSET(ii, read_fds)) ? 1 : 0;
333         flags |= (write_fds && FD_ISSET(ii, write_fds)) ? 2 : 0;
334         flags |= (except_fds && FD_ISSET(ii, except_fds)) ? 4 : 0;
335         if (!flags)
336             continue;
337         pos += sprintf(buf+pos, " %d%s", ii, flag_text[flags]);
338     }
339     gettimeofday(&now, NULL);
340     if (select_timeout) {
341         log_module(MAIN_LOG, LOG_DEBUG, "%s, at "FMT_TIME_T".%06ld:%s (timeout "FMT_TIME_T".%06ld)", msg, now.tv_sec, now.tv_usec, buf, select_timeout->tv_sec, select_timeout->tv_usec);
342     } else {
343         log_module(MAIN_LOG, LOG_DEBUG, "%s, at "FMT_TIME_T".%06ld:%s (no timeout)", msg, now.tv_sec, now.tv_usec, buf);
344     }
345 }
346 #endif
347
348 void
349 ioset_run(void) {
350     extern struct io_fd *socket_io_fd;
351     struct timeval select_timeout;
352     unsigned int nn;
353     int select_result, max_fd;
354     time_t wakey;
355     struct io_fd *fd;
356
357     while (!quit_services) {
358         while (!socket_io_fd)
359             uplink_connect();
360
361         /* How long to sleep? (fill in select_timeout) */
362         wakey = timeq_next();
363         if ((wakey - now) < 0)
364             select_timeout.tv_sec = 0;
365         else
366             select_timeout.tv_sec = wakey - now;
367         select_timeout.tv_usec = 0;
368
369         /* Set up read_fds and write_fds fdsets. */
370         FD_ZERO(&read_fds);
371         FD_ZERO(&write_fds);
372         max_fd = 0;
373         for (nn=0; nn<fds_size; nn++) {
374             if (!(fd = fds[nn]))
375                 continue;
376             max_fd = nn;
377             if (fd->wants_reads)
378                 FD_SET(nn, &read_fds);
379             if ((fd->send.get != fd->send.put) || !fd->connected)
380                 FD_SET(nn, &write_fds);
381         }
382
383         /* Check for activity, update time. */
384         debug_fdsets("Entering select", max_fd+1, &read_fds, &write_fds, NULL, &select_timeout);
385         select_result = select(max_fd + 1, &read_fds, &write_fds, NULL, &select_timeout);
386         debug_fdsets("After select", max_fd+1, &read_fds, &write_fds, NULL, &select_timeout);
387         now = time(NULL) + clock_skew;
388         if (select_result < 0) {
389             if (errno != EINTR) {
390                 log_module(MAIN_LOG, LOG_ERROR, "select() error %d: %s", errno, strerror(errno));
391                 close_socket();
392             }
393             continue;
394         }
395
396         /* Call back anybody that has connect or read activity and wants to know. */
397         for (nn=0; nn<fds_size; nn++) {
398             if (!(fd = fds[nn]))
399                 continue;
400             if (FD_ISSET(nn, &read_fds)) {
401                 if (fd->line_reads)
402                     ioset_buffered_read(fd);
403                 else
404                     fd->readable_cb(fd);
405             }
406             if (FD_ISSET(nn, &write_fds) && !fd->connected) {
407                 int rc, arglen = sizeof(rc);
408                 if (getsockopt(fd->fd, SOL_SOCKET, SO_ERROR, &rc, &arglen) < 0)
409                     rc = errno;
410                 fd->connected = 1;
411                 if (fd->connect_cb)
412                     fd->connect_cb(fd, rc);
413             }
414             /* Note: check whether write FD is still set, since the
415              * connect_cb() might close the FD, making us dereference
416              * a free()'d pointer for the fd.
417              */
418             if (FD_ISSET(nn, &write_fds) && (fd->send.get != fd->send.put))
419                 ioset_try_write(fd);
420         }
421
422         /* Call any timeq events we need to call. */
423         timeq_run();
424         if (do_write_dbs) {
425             saxdb_write_all();
426             do_write_dbs = 0;
427         }
428         if (do_reopen) {
429             extern char *services_config;
430             conf_read(services_config);
431             do_reopen = 0;
432         }
433     }
434 }
435
436 void
437 ioset_write(struct io_fd *fd, const char *buf, unsigned int nbw) {
438     unsigned int avail;
439     while (ioq_used(&fd->send) + nbw >= fd->send.size)
440         ioq_grow(&fd->send);
441     avail = ioq_put_avail(&fd->send);
442     if (nbw > avail) {
443         memcpy(fd->send.buf + fd->send.put, buf, avail);
444         buf += avail;
445         nbw -= avail;
446         fd->send.put = 0;
447     }
448     memcpy(fd->send.buf + fd->send.put, buf, nbw);
449     fd->send.put += nbw;
450     if (fd->send.put == fd->send.size)
451         fd->send.put = 0;
452 }
453
454 void
455 ioset_set_time(unsigned long new_now) {
456     clock_skew = new_now - time(NULL);
457     now = new_now;
458 }