aafbb59b692822a4463495a0213d4028e1c4c689
[ircu2.10.12-pk.git] / ircd / engine_kqueue.c
1 /*
2  * IRC - Internet Relay Chat, ircd/engine_kqueue.c
3  * Copyright (C) 2001 Kevin L. Mitchell <klmitch@mit.edu>
4  *
5  * This program is free software; you can redistribute it and/or modify
6  * it under the terms of the GNU General Public License as published by
7  * the Free Software Foundation; either version 1, or (at your option)
8  * any later version.
9  *
10  * This program is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13  * GNU General Public License for more details.
14  *
15  * You should have received a copy of the GNU General Public License
16  * along with this program; if not, write to the Free Software
17  * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
18  *
19  * $Id$
20  */
21 #include "config.h"
22
23 #include "ircd_events.h"
24
25 #include "ircd.h"
26 #include "ircd_alloc.h"
27 #include "ircd_log.h"
28 #include "s_debug.h"
29
30 #include <assert.h>
31 #include <errno.h>
32 #include <signal.h>
33 #include <sys/event.h>
34 #include <sys/socket.h>
35 #include <sys/time.h>
36 #include <sys/types.h>
37 #include <time.h>
38 #include <unistd.h>
39
40 #define KQUEUE_ERROR_THRESHOLD  20      /* after 20 kqueue errors, restart */
41 #define ERROR_EXPIRE_TIME       3600    /* expire errors after an hour */
42
43 #define POLLS_PER_KQUEUE        20      /* get 20 kevents per turn */
44
45 static struct Socket** sockList;
46 static int kqueue_max;
47 static int kqueue_id;
48
49 static int errors = 0;
50 static struct Timer clear_error;
51
52 /* decrements the error count once per hour */
53 static void
54 error_clear(struct Event* ev)
55 {
56   if (!--errors) /* remove timer when error count reaches 0 */
57     timer_del(ev_timer(ev));
58 }
59
60 /* initialize the kqueue engine */
61 static int
62 engine_init(int max_sockets)
63 {
64   int i;
65
66   if ((kqueue_id = kqueue()) < 0) { /* initialize... */
67     log_write(LS_SYSTEM, L_WARNING, 0,
68               "kqueue() engine cannot initialize: %m");
69     return 0;
70   }
71
72   /* allocate necessary memory */
73   sockList = (struct Socket**) MyMalloc(sizeof(struct Socket*) * max_sockets);
74
75   /* initialize the data */
76   for (i = 0; i < max_sockets; i++)
77     sockList[i] = 0;
78
79   kqueue_max = max_sockets; /* number of sockets allocated */
80
81   return 1; /* success! */
82 }
83
84 /* add a signel to be watched for */
85 static void
86 engine_signal(struct Signal* sig)
87 {
88   struct kevent sigevent;
89   struct sigaction act;
90
91   assert(0 != signal);
92
93   Debug((DEBUG_ENGINE, "kqueue: Adding filter for signal %d [%p]",
94          sig_signal(sig), sig));
95
96   sigevent.ident = sig_signal(sig); /* set up the kqueue event */
97   sigevent.filter = EVFILT_SIGNAL; /* looking for signals... */
98   sigevent.flags = EV_ADD | EV_ENABLE; /* add and enable it */
99   sigevent.fflags = 0;
100   sigevent.data = 0;
101   sigevent.udata = sig; /* store our user data */
102
103   if (kevent(kqueue_id, &sigevent, 1, 0, 0, 0) < 0) { /* add event */
104     log_write(LS_SYSTEM, L_WARNING, 0, "Unable to trap signal %d",
105               sig_signal(sig));
106     return;
107   }
108
109   act.sa_handler = SIG_IGN; /* ignore the signal */
110   act.sa_flags = 0;
111   sigemptyset(&act.sa_mask);
112   sigaction(sig_signal(sig), &act, 0);
113 }
114
115 /* Figure out what events go with a given state */
116 static unsigned int
117 state_to_events(enum SocketState state, unsigned int events)
118 {
119   switch (state) {
120   case SS_CONNECTING: /* connecting socket */
121     return SOCK_EVENT_WRITABLE;
122     break;
123
124   case SS_LISTENING: /* listening socket */
125   case SS_NOTSOCK: /* our signal socket--just in case */
126     return SOCK_EVENT_READABLE;
127     break;
128
129   case SS_CONNECTED: case SS_DATAGRAM: case SS_CONNECTDG:
130     return events; /* ordinary socket */
131     break;
132   }
133
134   /*NOTREACHED*/
135   return 0;
136 }
137
138 /* Activate kqueue filters as appropriate */
139 static void
140 set_or_clear(struct Socket* sock, unsigned int clear, unsigned int set)
141 {
142   int i = 0;
143   struct kevent chglist[2];
144
145   assert(0 != sock);
146   assert(-1 < s_fd(sock));
147
148   if ((clear ^ set) & SOCK_EVENT_READABLE) { /* readable has changed */
149     chglist[i].ident = s_fd(sock); /* set up the change list */
150     chglist[i].filter = EVFILT_READ; /* readable filter */
151     chglist[i].flags = EV_ADD; /* adding it */
152     chglist[i].fflags = 0;
153     chglist[i].data = 0;
154     chglist[i].udata = 0; /* I love udata, but it can't really be used here */
155
156     if (set & SOCK_EVENT_READABLE) /* it's set */
157       chglist[i].flags |= EV_ENABLE;
158     else /* clear it */
159       chglist[i].flags |= EV_DISABLE;
160
161     i++; /* advance to next element */
162   }
163
164   if ((clear ^ set) & SOCK_EVENT_WRITABLE) { /* writable has changed */
165     chglist[i].ident = s_fd(sock); /* set up the change list */
166     chglist[i].filter = EVFILT_WRITE; /* writable filter */
167     chglist[i].flags = EV_ADD; /* adding it */
168     chglist[i].fflags = 0;
169     chglist[i].data = 0;
170     chglist[i].udata = 0;
171
172     if (set & SOCK_EVENT_WRITABLE) /* it's set */
173       chglist[i].flags |= EV_ENABLE;
174     else /* clear it */
175       chglist[i].flags |= EV_DISABLE;
176
177     i++; /* advance count... */
178   }
179
180   if (kevent(kqueue_id, chglist, i, 0, 0, 0) < 0)
181     event_generate(ET_ERROR, sock, errno); /* report error */
182 }
183
184 /* add a socket to be listened on */
185 static int
186 engine_add(struct Socket* sock)
187 {
188   assert(0 != sock);
189   assert(0 == sockList[s_fd(sock)]);
190
191   /* bounds-check... */
192   if (sock->s_fd >= kqueue_max) {
193     log_write(LS_SYSTEM, L_ERROR, 0,
194               "Attempt to add socket %d (> %d) to event engine", s_fd(sock),
195               kqueue_max);
196     return 0;
197   }
198
199   sockList[s_fd(sock)] = sock; /* add to list */
200
201   Debug((DEBUG_ENGINE, "kqueue: Adding socket %d [%p], state %s, to engine",
202          s_fd(sock), sock, state_to_name(s_state(sock))));
203
204   /* Add socket to queue */
205   set_or_clear(sock, 0, state_to_events(s_state(sock), s_events(sock)));
206
207   return 1; /* success */
208 }
209
210 /* socket switching to new state */
211 static void
212 engine_state(struct Socket* sock, enum SocketState new_state)
213 {
214   assert(0 != sock);
215   assert(sock == sockList[s_fd(sock)]);
216
217   Debug((DEBUG_ENGINE, "kqueue: Changing state for socket %p to %s", sock,
218          state_to_name(new_state)));
219
220   /* set the correct events */
221   set_or_clear(sock,
222                state_to_events(s_state(sock), s_events(sock)), /* old state */
223                state_to_events(new_state, s_events(sock))); /* new state */
224
225 }
226
227 /* socket events changing */
228 static void
229 engine_events(struct Socket* sock, unsigned int new_events)
230 {
231   assert(0 != sock);
232   assert(sock == sockList[s_fd(sock)]);
233
234   Debug((DEBUG_ENGINE, "kqueue: Changing event mask for socket %p to [%s]",
235          sock, sock_flags(new_events)));
236
237   /* set the correct events */
238   set_or_clear(sock,
239                state_to_events(s_state(sock), s_events(sock)), /* old events */
240                state_to_events(s_state(sock), new_events)); /* new events */
241 }
242
243 /* socket going away */
244 static void
245 engine_delete(struct Socket* sock)
246 {
247   struct kevent dellist[2];
248
249   assert(0 != sock);
250   assert(sock == sockList[s_fd(sock)]);
251
252   Debug((DEBUG_ENGINE, "kqueue: Deleting socket %d [%p], state %s",
253          s_fd(sock), sock, state_to_name(s_state(sock))));
254
255   dellist[0].ident = s_fd(sock); /* set up the delete list */
256   dellist[0].filter = EVFILT_READ; /* readable filter */
257   dellist[0].flags = EV_DELETE; /* delete it */
258   dellist[0].fflags = 0;
259   dellist[0].data = 0;
260   dellist[0].udata = 0;
261
262   dellist[1].ident = s_fd(sock);
263   dellist[1].filter = EVFILT_WRITE; /* writable filter */
264   dellist[1].flags = EV_DELETE; /* delete it */
265   dellist[1].fflags = 0;
266   dellist[1].data = 0;
267   dellist[1].udata = 0;
268
269   /* make it all go away */
270   if (kevent(kqueue_id, dellist, 2, 0, 0, 0) < 0)
271     log_write(LS_SOCKET, L_WARNING, 0,
272               "Unable to delete kevent items for socket %d", s_fd(sock));
273
274   sockList[s_fd(sock)] = 0;
275 }
276
277 /* engine event loop */
278 static void
279 engine_loop(struct Generators* gen)
280 {
281   struct kevent events[POLLS_PER_KQUEUE];
282   struct Socket* sock;
283   struct timespec wait;
284   int nevs;
285   int i;
286   int errcode;
287   size_t codesize;
288
289   while (running) {
290     /* set up the sleep time */
291     wait.tv_sec = timer_next(gen) ? (timer_next(gen) - CurrentTime) : -1;
292     wait.tv_nsec = 0;
293
294     Debug((DEBUG_INFO, "kqueue: delay: %Tu (%Tu) %Tu", timer_next(gen),
295            CurrentTime, wait.tv_sec));
296
297     /* check for active events */
298     nevs = kevent(kqueue_id, 0, 0, events, POLLS_PER_KQUEUE,
299                   wait.tv_sec < 0 ? 0 : &wait);
300
301     CurrentTime = time(0); /* set current time... */
302
303     if (nevs < 0) {
304       if (errno != EINTR) { /* ignore kevent interrupts */
305         /* Log the kqueue error */
306         log_write(LS_SOCKET, L_ERROR, 0, "kevent() error: %m");
307         if (!errors++)
308           timer_add(&clear_error, error_clear, 0, TT_PERIODIC,
309                     ERROR_EXPIRE_TIME);
310         else if (errors > KQUEUE_ERROR_THRESHOLD) /* too many errors... */
311           server_restart("too many kevent errors");
312       }
313       /* old code did a sleep(1) here; with usage these days,
314        * that may be too expensive
315        */
316       continue;
317     }
318
319     for (i = 0; i < nevs; i++) {
320       if (events[i].filter == EVFILT_SIGNAL) {
321         /* it's a signal; deal appropriately */
322         event_generate(ET_SIGNAL, events[i].udata, events[i].ident);
323         continue; /* skip socket processing loop */
324       }
325
326       assert(events[i].filter == EVFILT_READ ||
327              events[i].filter == EVFILT_WRITE);
328
329       sock = sockList[events[i].ident];
330       if (!sock) /* slots may become empty while processing events */
331         continue;
332
333       assert(s_fd(sock) == events[i].ident);
334
335       gen_ref_inc(sock); /* can't have it going away on us */
336
337       Debug((DEBUG_ENGINE, "kqueue: Checking socket %p (fd %d) state %s, "
338              "events %s", sock, s_fd(sock), state_to_name(s_state(sock)),
339              sock_flags(s_events(sock))));
340
341       if (s_state(sock) != SS_NOTSOCK) {
342         errcode = 0; /* check for errors on socket */
343         codesize = sizeof(errcode);
344         if (getsockopt(s_fd(sock), SOL_SOCKET, SO_ERROR, &errcode,
345                        &codesize) < 0)
346           errcode = errno; /* work around Solaris implementation */
347
348         if (errcode) { /* an error occurred; generate an event */
349           Debug((DEBUG_ENGINE, "kqueue: Error %d on fd %d, socket %p", errcode,
350                  s_fd(sock), sock));
351           event_generate(ET_ERROR, sock, errcode);
352           gen_ref_dec(sock); /* careful not to leak reference counts */
353           continue;
354         }
355       }
356
357       switch (s_state(sock)) {
358       case SS_CONNECTING:
359         if (events[i].filter == EVFILT_WRITE) { /* connection completed */
360           Debug((DEBUG_ENGINE, "kqueue: Connection completed"));
361           event_generate(ET_CONNECT, sock, 0);
362         }
363         break;
364
365       case SS_LISTENING:
366         if (events[i].filter == EVFILT_READ) { /* connect. to be accept. */
367           Debug((DEBUG_ENGINE, "kqueue: Ready for accept"));
368           event_generate(ET_ACCEPT, sock, 0);
369         }
370         break;
371
372       case SS_NOTSOCK: /* doing nothing socket-specific */
373       case SS_CONNECTED:
374         if (events[i].filter == EVFILT_READ) { /* data on socket */
375           Debug((DEBUG_ENGINE, "kqueue: EOF or data to be read"));
376           event_generate(events[i].flags & EV_EOF ? ET_EOF : ET_READ, sock, 0);
377         }
378         if (events[i].filter == EVFILT_WRITE) { /* socket writable */
379           Debug((DEBUG_ENGINE, "kqueue: Data can be written"));
380           event_generate(ET_WRITE, sock, 0);
381         }
382         break;
383
384       case SS_DATAGRAM: case SS_CONNECTDG:
385         if (events[i].filter == EVFILT_READ) { /* socket readable */
386           Debug((DEBUG_ENGINE, "kqueue: Datagram to be read"));
387           event_generate(ET_READ, sock, 0);
388         }
389         if (events[i].filter == EVFILT_WRITE) { /* socket writable */
390           Debug((DEBUG_ENGINE, "kqueue: Datagram can be written"));
391           event_generate(ET_WRITE, sock, 0);
392         }
393         break;
394       }
395
396       assert(s_fd(sock) == events[i].ident);
397
398       gen_ref_dec(sock); /* we're done with it */
399     }
400
401     timer_run(); /* execute any pending timers */
402   }
403 }
404
405 struct Engine engine_kqueue = {
406   "kqueue()",           /* Engine name */
407   engine_init,          /* Engine initialization function */
408   engine_signal,        /* Engine signal registration function */
409   engine_add,           /* Engine socket registration function */
410   engine_state,         /* Engine socket state change function */
411   engine_events,        /* Engine socket events mask function */
412   engine_delete,        /* Engine socket deletion function */
413   engine_loop           /* Core engine event loop */
414 };