Doxyfy msgq.h and msgq.c.
[ircu2.10.12-pk.git] / ircd / msgq.c
1 /*
2  * IRC - Internet Relay Chat, ircd/msgq.c
3  * Copyright (C) 2000 Kevin L. Mitchell <klmitch@mit.edu>
4  *
5  * This program is free software; you can redistribute it and/or modify
6  * it under the terms of the GNU General Public License as published by
7  * the Free Software Foundation; either version 1, or (at your option)
8  * any later version.
9  *
10  * This program is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13  * GNU General Public License for more details.
14  *
15  * You should have received a copy of the GNU General Public License
16  * along with this program; if not, write to the Free Software
17  * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
18  *
19  * $Id$
20  */
21 #include "config.h"
22
23 #include "msgq.h"
24 #include "ircd.h"
25 #include "ircd_alloc.h"
26 #include "ircd_defs.h"
27 #include "ircd_features.h"
28 #include "ircd_reply.h"
29 #include "ircd_snprintf.h"
30 #include "numeric.h"
31 #include "send.h"
32 #include "s_debug.h"
33 #include "s_stats.h"
34
35 #include <assert.h>
36 #include <stdarg.h>
37 #include <string.h>
38 #include <sys/types.h>
39 #include <sys/uio.h>    /* struct iovec */
40
41 #define MB_BASE_SHIFT   5 /**< Log2 of smallest message body to allocate. */
42 #define MB_MAX_SHIFT    9 /**< Log2 of largest message body to allocate. */
43
44 /** Buffer for a single message. */
45 struct MsgBuf {
46   struct MsgBuf *next;          /**< next msg in global queue */
47   struct MsgBuf **prev_p;       /**< what points to us in linked list */
48   struct MsgBuf *real;          /**< the actual MsgBuf we're attaching */
49   unsigned int ref;             /**< reference count */
50   unsigned int length;          /**< length of message */
51   unsigned int power;           /**< size of buffer (power of 2) */
52   char msg[1];                  /**< the message */
53 };
54
55 /** Return allocated length of the buffer of \a buf. */
56 #define bufsize(buf)    (1 << (buf)->power)
57
58 /** Message body for a particular destination. */
59 struct Msg {
60   struct Msg *next;             /**< next msg */
61   unsigned int sent;            /**< bytes in msg that have already been sent */
62   struct MsgBuf *msg;           /**< actual message in queue */
63 };
64
65 /** Statistics tracking for message sizes. */
66 struct MsgSizes {
67   unsigned int msgs;            /**< total number of messages */
68   unsigned int sizes[BUFSIZE];  /**< histogram of message sizes */
69 };
70
71 /** Global tracking data for message buffers. */
72 static struct {
73   struct MsgBuf *msglist;       /**< list of in-use MsgBuf's */
74   struct {
75     unsigned int alloc;         /**< number of Msg's allocated */
76     unsigned int used;          /**< number of Msg's in use */
77     struct Msg *free;           /**< freelist of Msg's */
78   } msgs;                       /**< tracking info for Msg structs */
79   size_t tot_bufsize;           /**< total amount of memory in buffers */
80   /** Array of MsgBuf information, one entry for each used bucket size. */
81   struct {
82     unsigned int alloc;         /**< total MsgBuf's of this size */
83     unsigned int used;          /**< number of MsgBuf's of this size in use */
84     struct MsgBuf *free;        /**< list of free MsgBuf's */
85   } msgBufs[MB_MAX_SHIFT - MB_BASE_SHIFT + 1];
86   struct MsgSizes sizes;        /**< histogram of message sizes */
87 } MQData;
88
89 /*
90  * This routine is used to remove a certain amount of data from a given
91  * queue and release the Msg (and MsgBuf) structure if needed
92  */
93 /** Remove some data from a list within a message queue.
94  * @param[in,out] mq Message queue to remove from.
95  * @param[in,out] qlist Particular list within queue to remove from.
96  * @param[in,out] length_p Number of bytes left to remove.
97  */
98 static void
99 msgq_delmsg(struct MsgQ *mq, struct MsgQList *qlist, unsigned int *length_p)
100 {
101   struct Msg *m;
102   unsigned int msglen;
103
104   assert(0 != mq);
105   assert(0 != qlist);
106   assert(0 != qlist->head);
107   assert(0 != length_p);
108
109   m = qlist->head; /* find the msg we're deleting from */
110
111   msglen = m->msg->length - m->sent; /* calculate how much is left */
112
113   if (*length_p >= msglen) { /* deleted it all? */
114     mq->length -= msglen; /* decrement length */
115     mq->count--; /* decrement the message count */
116     *length_p -= msglen;
117
118     msgq_clean(m->msg); /* free up the struct MsgBuf */
119     m->msg = 0; /* don't let it point anywhere nasty, please */
120
121     if (qlist->head == qlist->tail) /* figure out if we emptied the queue */
122       qlist->head = qlist->tail = 0;
123     else
124       qlist->head = m->next; /* just shift the list down some */
125
126     MQData.msgs.used--; /* struct Msg is not in use anymore */
127
128     m->next = MQData.msgs.free; /* throw it onto the free list */
129     MQData.msgs.free = m;
130   } else {
131     mq->length -= *length_p; /* decrement queue length */
132     m->sent += *length_p; /* this much of the message has been sent */
133     *length_p = 0; /* we've dealt with it all */
134   }
135 }
136
137 /** Initialize \a mq.
138  * @param[in] mq MsgQ to initialize.
139  */
140 void
141 msgq_init(struct MsgQ *mq)
142 {
143   assert(0 != mq);
144
145   mq->length = 0;
146   mq->count = 0;
147   mq->queue.head = 0;
148   mq->queue.tail = 0;
149   mq->prio.head = 0;
150   mq->prio.tail = 0;
151 }
152
153 /** Delete bytes from the front of a message queue.
154  * @param[in] mq Queue to drop data from.
155  * @param[in] length Number of bytes to drop.
156  */
157 void
158 msgq_delete(struct MsgQ *mq, unsigned int length)
159 {
160   assert(0 != mq);
161
162   while (length > 0) {
163     if (mq->queue.head && mq->queue.head->sent > 0) /* partial msg on norm q */
164       msgq_delmsg(mq, &mq->queue, &length);
165     else if (mq->prio.head) /* message (partial or complete) on prio queue */
166       msgq_delmsg(mq, &mq->prio, &length);
167     else if (mq->queue.head) /* message on normal queue */
168       msgq_delmsg(mq, &mq->queue, &length);
169     else
170       break;
171   }
172 }
173
174 /** Map data from a message queue to an I/O vector.
175  * @param[in] mq Message queue to send from.
176  * @param[out] iov Output vector.
177  * @param[in] count Number of elements in \a iov.
178  * @param[out] len Number of bytes mapped from \a mq to \a iov.
179  * @return Number of elements filled in \a iov.
180  */
181 int
182 msgq_mapiov(const struct MsgQ *mq, struct iovec *iov, int count,
183             unsigned int *len)
184 {
185   struct Msg *queue;
186   struct Msg *prio;
187   int i = 0;
188
189   assert(0 != mq);
190   assert(0 != iov);
191   assert(0 != count);
192   assert(0 != len);
193
194   if (mq->length <= 0) /* no data to map */
195     return 0;
196
197   if (mq->queue.head && mq->queue.head->sent > 0) { /* partial msg on norm q */
198     iov[i].iov_base = mq->queue.head->msg->msg + mq->queue.head->sent;
199     iov[i].iov_len = mq->queue.head->msg->length - mq->queue.head->sent;
200     *len += iov[i].iov_len;
201
202     queue = mq->queue.head->next; /* where we start later... */
203
204     i++; /* filled an iovec... */
205     if (!--count) /* check for space */
206       return i;
207   } else
208     queue = mq->queue.head; /* start at head of queue */
209
210   if (mq->prio.head && mq->prio.head->sent > 0) { /* partial msg on prio q */
211     iov[i].iov_base = mq->prio.head->msg->msg + mq->prio.head->sent;
212     iov[i].iov_len = mq->prio.head->msg->length - mq->prio.head->sent;
213     *len += iov[i].iov_len;
214
215     prio = mq->prio.head->next; /* where we start later... */
216
217     i++; /* filled an iovec... */
218     if (!--count) /* check for space */
219       return i;
220   } else
221     prio = mq->prio.head; /* start at head of prio */
222
223   for (; prio; prio = prio->next) { /* go through prio queue */
224     iov[i].iov_base = prio->msg->msg; /* store message */
225     iov[i].iov_len = prio->msg->length;
226     *len += iov[i].iov_len;
227
228     i++; /* filled an iovec... */
229     if (!--count) /* check for space */
230       return i;
231   }
232
233   for (; queue; queue = queue->next) { /* go through normal queue */
234     iov[i].iov_base = queue->msg->msg;
235     iov[i].iov_len = queue->msg->length;
236     *len += iov[i].iov_len;
237
238     i++; /* filled an iovec... */
239     if (!--count) /* check for space */
240       return i;
241   }
242
243   return i;
244 }
245
246 /** Allocate a message buffer large enough to hold \a length bytes.
247  * TODO: \a in_mb needs better documentation.
248  * @param[in] in_mb Some other message buffer(?).
249  * @param[in] length Number of bytes of space to reserve in output.
250  * @return Pointer to some usable message buffer.
251  */
252 static struct MsgBuf *
253 msgq_alloc(struct MsgBuf *in_mb, int length)
254 {
255   struct MsgBuf *mb;
256   int power;
257
258   /* Find the power of two size that will accomodate the message */
259   for (power = MB_BASE_SHIFT; power < MB_MAX_SHIFT + 1; power++)
260     if ((length - 1) >> power == 0)
261       break;
262   assert((1 << power) >= length);
263   assert((1 << power) <= 512);
264   length = 1 << power; /* reset the length */
265
266   /* If the message needs a buffer of exactly the existing size, just use it */
267   if (in_mb && in_mb->power == power) {
268     in_mb->real = in_mb; /* real buffer is this buffer */
269     return in_mb;
270   }
271
272   /* Try popping one off the freelist first */
273   if ((mb = MQData.msgBufs[power - MB_BASE_SHIFT].free)) {
274     MQData.msgBufs[power - MB_BASE_SHIFT].free = mb->next;
275   } else if (MQData.tot_bufsize < feature_int(FEAT_BUFFERPOOL)) {
276     /* Allocate another if we won't bust the BUFFERPOOL */
277     Debug((DEBUG_MALLOC, "Allocating MsgBuf of length %d (total size %zu)",
278            length, sizeof(struct MsgBuf) + length));
279     mb = (struct MsgBuf *)MyMalloc(sizeof(struct MsgBuf) + length);
280     MQData.msgBufs[power - MB_BASE_SHIFT].alloc++;
281     mb->power = power; /* remember size */
282     MQData.tot_bufsize += length;
283   }
284
285   if (mb) {
286     MQData.msgBufs[power - MB_BASE_SHIFT].used++; /* how many are we using? */
287
288     mb->real = 0; /* essential initializations */
289     mb->ref = 1;
290
291     if (in_mb) /* remember who's the *real* buffer */
292       in_mb->real = mb;
293   } else if (in_mb) /* just use the input buffer */
294     mb = in_mb->real = in_mb;
295
296   return mb; /* return the buffer */
297 }
298
299 /** Deallocate unused message buffers.
300  */
301 static void
302 msgq_clear_freembs(void)
303 {
304   struct MsgBuf *mb;
305   int i;
306
307   /* Walk through the various size classes */
308   for (i = MB_BASE_SHIFT; i < MB_MAX_SHIFT + 1; i++)
309     /* walk down the free list */
310     while ((mb = MQData.msgBufs[i - MB_BASE_SHIFT].free)) {
311       MQData.msgBufs[i - MB_BASE_SHIFT].free = mb->next; /* shift free list */
312       MQData.msgBufs[i - MB_BASE_SHIFT].alloc--; /* reduce allocation count */
313       MQData.tot_bufsize -= 1 << i; /* reduce total buffer allocation count */
314       MyFree(mb); /* and free the buffer */
315     }
316 }
317
318 /** Format a message buffer for a client from a format string.
319  * @param[in] dest %Client that receives the data (may be NULL).
320  * @param[in] format Format string for message.
321  * @param[in] vl Argument list for \a format.
322  * @return Allocated MsgBuf.
323  */
324 struct MsgBuf *
325 msgq_vmake(struct Client *dest, const char *format, va_list vl)
326 {
327   struct MsgBuf *mb;
328
329   assert(0 != format);
330
331   if (!(mb = msgq_alloc(0, BUFSIZE))) {
332     if (feature_bool(FEAT_HAS_FERGUSON_FLUSHER)) {
333       /*
334        * from "Married With Children" episode were Al bought a REAL toilet
335        * on the black market because he was tired of the wimpy water
336        * conserving toilets they make these days --Bleep
337        */
338       /*
339        * Apparently this doesn't work, the server _has_ to
340        * dump a few clients to handle the load. A fully loaded
341        * server cannot handle a net break without dumping some
342        * clients. If we flush the connections here under a full
343        * load we may end up starving the kernel for mbufs and
344        * crash the machine
345        */
346       /*
347        * attempt to recover from buffer starvation before
348        * bailing this may help servers running out of memory
349        */
350       flush_connections(0);
351       mb = msgq_alloc(0, BUFSIZE);
352     }
353     if (!mb) { /* OK, try clearing the buffer free list */
354       msgq_clear_freembs();
355       mb = msgq_alloc(0, BUFSIZE);
356     }
357     if (!mb) { /* OK, try killing a client */
358       kill_highest_sendq(0); /* Don't kill any server connections */
359       mb = msgq_alloc(0, BUFSIZE);
360     }
361     if (!mb) { /* hmmm... */
362       kill_highest_sendq(1); /* Try killing a server connection now */
363       mb = msgq_alloc(0, BUFSIZE);
364     }
365     if (!mb) /* AIEEEE! */
366       server_panic("Unable to allocate buffers!");
367   }
368
369   mb->next = MQData.msglist; /* initialize the msgbuf */
370   mb->prev_p = &MQData.msglist;
371
372   /* fill the buffer */
373   mb->length = ircd_vsnprintf(dest, mb->msg, bufsize(mb) - 1, format, vl);
374
375   if (mb->length > bufsize(mb) - 2)
376     mb->length = bufsize(mb) - 2;
377
378   mb->msg[mb->length++] = '\r'; /* add \r\n to buffer */
379   mb->msg[mb->length++] = '\n';
380   mb->msg[mb->length] = '\0'; /* not strictly necessary */
381
382   assert(mb->length <= bufsize(mb));
383
384   if (MQData.msglist) /* link it into the list */
385     MQData.msglist->prev_p = &mb->next;
386   MQData.msglist = mb;
387
388   return mb;
389 }
390
391 /** Format a message buffer for a client from a format string.
392  * @param[in] dest %Client that receives the data (may be NULL).
393  * @param[in] format Format string for message.
394  * @return Allocated MsgBuf.
395  */
396 struct MsgBuf *
397 msgq_make(struct Client *dest, const char *format, ...)
398 {
399   va_list vl;
400   struct MsgBuf *mb;
401
402   va_start(vl, format);
403   mb = msgq_vmake(dest, format, vl);
404   va_end(vl);
405
406   return mb;
407 }
408
409 /** Append text to an existing message buffer.
410  * @param[in] dest %Client for whom to format the message.
411  * @param[in] mb Message buffer to append to.
412  * @param[in] format Format string of what to append.
413  */
414 void
415 msgq_append(struct Client *dest, struct MsgBuf *mb, const char *format, ...)
416 {
417   va_list vl;
418
419   assert(0 != mb);
420   assert(0 != format);
421   assert(0 == mb->real);
422
423   assert(2 < mb->length);
424   assert(bufsize(mb) >= mb->length);
425
426   mb->length -= 2; /* back up to before \r\n */
427
428   va_start(vl, format); /* append to the buffer */
429   mb->length += ircd_vsnprintf(dest, mb->msg + mb->length,
430                                bufsize(mb) - mb->length - 1, format, vl);
431   va_end(vl);
432
433   if (mb->length > bufsize(mb) - 2)
434     mb->length = bufsize(mb) - 2;
435
436   mb->msg[mb->length++] = '\r'; /* add \r\n to buffer */
437   mb->msg[mb->length++] = '\n';
438   mb->msg[mb->length] = '\0'; /* not strictly necessary */
439
440   assert(mb->length <= bufsize(mb));
441 }
442
443 /** Decrement the reference count on \a mb, freeing it if needed.
444  * @param[in] mb MsgBuf to release.
445  */
446 void
447 msgq_clean(struct MsgBuf *mb)
448 {
449   assert(0 != mb);
450   assert(0 < mb->ref);
451
452   if (!--mb->ref) { /* deallocate the message */
453     if (mb->prev_p) {
454       *mb->prev_p = mb->next; /* clip it out of active MsgBuf's list */
455       if (mb->next)
456         mb->next->prev_p = mb->prev_p;
457     }
458
459     if (mb->real && mb->real != mb) /* clean up the real buffer */
460       msgq_clean(mb->real);
461
462     mb->next = MQData.msgBufs[mb->power - MB_BASE_SHIFT].free;
463     MQData.msgBufs[mb->power - MB_BASE_SHIFT].free = mb;
464     MQData.msgBufs[mb->power - MB_BASE_SHIFT].used--;
465
466     mb->prev_p = 0;
467   }
468 }
469
470 /** Append a message to a peer's message queue.
471  * @param[in] mq Message queue to append to.
472  * @param[in] mb Message to append.
473  * @param[in] prio If non-zero, use the high-priority (lag-busting) message list; else use the normal list.
474  */
475 void
476 msgq_add(struct MsgQ *mq, struct MsgBuf *mb, int prio)
477 {
478   struct MsgQList *qlist;
479   struct Msg *msg;
480
481   assert(0 != mq);
482   assert(0 != mb);
483   assert(0 < mb->ref);
484   assert(0 < mb->length);
485
486   Debug((DEBUG_SEND, "Adding buffer %p [%.*s] length %u to %s queue", mb,
487          mb->length - 2, mb->msg, mb->length, prio ? "priority" : "normal"));
488
489   qlist = prio ? &mq->prio : &mq->queue;
490
491   if (!(msg = MQData.msgs.free)) { /* do I need to allocate one? */
492     msg = (struct Msg *)MyMalloc(sizeof(struct Msg));
493     MQData.msgs.alloc++; /* we allocated another */
494   } else /* shift the free list */
495     MQData.msgs.free = MQData.msgs.free->next;
496
497   MQData.msgs.used++; /* we're using another */
498
499   msg->next = 0; /* initialize the msg */
500   msg->sent = 0;
501
502   /* Get the real buffer, allocating one if necessary */
503   if (!mb->real) {
504     struct MsgBuf *tmp;
505
506     MQData.sizes.msgs++; /* update histogram counts */
507     MQData.sizes.sizes[mb->length - 1]++;
508
509     tmp = msgq_alloc(mb, mb->length); /* allocate a close-fitting buffer */
510
511     if (tmp != mb) { /* OK, prepare the new "real" buffer */
512       Debug((DEBUG_SEND, "Copying old buffer %p [%.*s] length %u into new "
513              "buffer %p size %u", mb, mb->length - 2, mb->msg, mb->length,
514              tmp, bufsize(tmp)));
515       memcpy(tmp->msg, mb->msg, mb->length + 1); /* copy string over */
516       tmp->length = mb->length;
517
518       tmp->next = mb->next; /* replace it in the list, now */
519       if (tmp->next)
520         tmp->next->prev_p = &tmp->next;
521       tmp->prev_p = mb->prev_p;
522       *tmp->prev_p = tmp;
523
524       mb->next = 0; /* this one's no longer in the list */
525       mb->prev_p = 0;
526     }
527   }
528
529   mb = mb->real; /* work with the real buffer */
530   mb->ref++; /* increment the ref count on the buffer */
531
532   msg->msg = mb; /* point at the real message buffer now */
533
534   if (!qlist->head) /* queue list was empty; head and tail point to msg */
535     qlist->head = qlist->tail = msg;
536   else {
537     assert(0 != qlist->tail);
538
539     qlist->tail->next = msg; /* queue had something in it; add to end */
540     qlist->tail = msg;
541   }
542
543   mq->length += mb->length; /* update the queue length */
544   mq->count++; /* and the queue count */
545 }
546
547 /** Report memory statistics for message buffers.
548  * @param[in] cptr Client requesting information.
549  * @param[out] msg_alloc Receives number of bytes allocated in Msg structs.
550  * @param[out] msgbuf_alloc Receives number of bytes allocated in MsgBuf structs.
551  */
552 void
553 msgq_count_memory(struct Client *cptr, size_t *msg_alloc, size_t *msgbuf_alloc)
554 {
555   int i;
556   size_t total = 0, size;
557
558   assert(0 != cptr);
559   assert(0 != msg_alloc);
560   assert(0 != msgbuf_alloc);
561
562   /* Data for Msg's is simple, so just send it */
563   send_reply(cptr, SND_EXPLICIT | RPL_STATSDEBUG,
564              ":Msgs allocated %d(%zu) used %d(%zu)", MQData.msgs.alloc,
565              MQData.msgs.alloc * sizeof(struct Msg), MQData.msgs.used,
566              MQData.msgs.used * sizeof(struct Msg));
567   /* count_memory() wants to know the total */
568   *msg_alloc = MQData.msgs.alloc * sizeof(struct Msg);
569
570   /* Ok, now walk through each size class */
571   for (i = MB_BASE_SHIFT; i < MB_MAX_SHIFT + 1; i++) {
572     size = sizeof(struct MsgBuf) + (1 << i); /* total size of a buffer */
573
574     /* Send information for this buffer size class */
575     send_reply(cptr, SND_EXPLICIT | RPL_STATSDEBUG,
576                ":MsgBufs of size %zu allocated %d(%zu) used %d(%zu)", 1 << i,
577                MQData.msgBufs[i - MB_BASE_SHIFT].alloc,
578                MQData.msgBufs[i - MB_BASE_SHIFT].alloc * size,
579                MQData.msgBufs[i - MB_BASE_SHIFT].used,
580                MQData.msgBufs[i - MB_BASE_SHIFT].used * size);
581
582     /* count_memory() wants to know the total */
583     total += MQData.msgBufs[i - MB_BASE_SHIFT].alloc * size;
584   }
585   *msgbuf_alloc = total;
586 }
587
588 /** Report remaining space in a MsgBuf.
589  * @param[in] mb Message buffer to check.
590  * @return Number of additional bytes that can be appended to the message.
591  */
592 unsigned int
593 msgq_bufleft(struct MsgBuf *mb)
594 {
595   assert(0 != mb);
596
597   return bufsize(mb) - mb->length; /* \r\n counted in mb->length */
598 }
599
600 /** Send histogram of message lengths to a client.
601  * @param[in] cptr Client requesting statistics.
602  * @param[in] sd Stats descriptor for request (ignored).
603  * @param[in] param Extra parameter from user (ignored).
604  */
605 void
606 msgq_histogram(struct Client *cptr, const struct StatDesc *sd, char *param)
607 {
608   struct MsgSizes tmp = MQData.sizes; /* All hail structure copy! */
609   int i;
610
611   send_reply(cptr, SND_EXPLICIT | RPL_STATSDEBUG,
612              ":Histogram of message lengths (%lu messages)", tmp.msgs);
613   for (i = 0; i + 16 <= BUFSIZE; i += 16)
614     send_reply(cptr, SND_EXPLICIT | RPL_STATSDEBUG, ":% 4d: %lu %lu %lu %lu "
615                "%lu %lu %lu %lu %lu %lu %lu %lu %lu %lu %lu %lu", i + 1,
616                tmp.sizes[i +  0], tmp.sizes[i +  1], tmp.sizes[i +  2],
617                tmp.sizes[i +  3], tmp.sizes[i +  4], tmp.sizes[i +  5],
618                tmp.sizes[i +  6], tmp.sizes[i +  7], tmp.sizes[i +  8],
619                tmp.sizes[i +  9], tmp.sizes[i + 10], tmp.sizes[i + 11],
620                tmp.sizes[i + 12], tmp.sizes[i + 13], tmp.sizes[i + 14],
621                tmp.sizes[i + 15]);
622 }