void
ioset_init(void)
{
- assert(engine == NULL);
+ if (engine) /* someone beat us to it */
+ return;
#if WITH_IOSET_KQUEUE
if (!engine && io_engine_kqueue.init())
log_module(MAIN_LOG, LOG_ERROR, "Somebody called ioset_add(%d) on a negative fd!", fd);
return 0;
}
+ if (!engine)
+ ioset_init();
res = calloc(1, sizeof(*res));
if (!res)
return 0;
res->fd = fd;
ioq_init(&res->send, 1024);
ioq_init(&res->recv, 1024);
- engine->add(res);
flags = fcntl(fd, F_GETFL);
fcntl(fd, F_SETFL, flags|O_NONBLOCK);
+ engine->add(res);
return res;
}
struct io_fd *
ioset_connect(struct sockaddr *local, unsigned int sa_size, const char *peer, unsigned int port, int blocking, void *data, void (*connect_cb)(struct io_fd *fd, int error)) {
- int fd, res;
+ struct addrinfo hints;
+ struct addrinfo *ai;
struct io_fd *io_fd;
- struct addrinfo hints, *ai;
+ struct io_fd *old_active;
+ int res;
+ int fd;
char portnum[10];
memset(&hints, 0, sizeof(hints));
}
}
io_fd->state = IO_CONNECTED;
+ old_active = active_fd;
if (connect_cb)
connect_cb(io_fd, ((res < 0) ? errno : 0));
- engine->update(io_fd);
+ if (active_fd)
+ engine->update(io_fd);
+ if (old_active != io_fd)
+ active_fd = old_active;
return io_fd;
}
+void ioset_update(struct io_fd *fd) {
+ engine->update(fd);
+}
+
static void
ioset_try_write(struct io_fd *fd) {
int res;
static void
ioset_accept(struct io_fd *listener)
{
- struct io_fd *old_active_fd;
+ struct io_fd *old_active;
struct io_fd *new_fd;
int fd;
new_fd = ioset_add(fd);
new_fd->state = IO_CONNECTED;
- old_active_fd = active_fd;
+ old_active = active_fd;
active_fd = new_fd;
listener->accept_cb(listener, new_fd);
assert(active_fd == NULL || active_fd == new_fd);
else
engine->update(new_fd);
}
- active_fd = old_active_fd;
+ active_fd = old_active;
}
static int
if (fd->recv.put == fd->recv.size)
fd->recv.put = 0;
fdnum = fd->fd;
- while (fd->wants_reads && (fd->line_len > 0)) {
+ while (fd->line_len > 0) {
struct io_fd *old_active;
+ int died = 0;
old_active = active_fd;
active_fd = fd;
fd->readable_cb(fd);
if (active_fd)
ioset_find_line_length(fd);
+ else
+ died = 1;
if (old_active != fd)
active_fd = old_active;
+ if (died)
+ break;
}
}
}
break;
case IO_CONNECTING:
assert(active_fd == NULL || active_fd == fd);
- if (active_fd && writable) {
+ if (active_fd && readable) {
socklen_t arglen;
int rc;
arglen = sizeof(rc);
if (getsockopt(fd->fd, SOL_SOCKET, SO_ERROR, &rc, &arglen) < 0)
rc = errno;
- fd->state = IO_CONNECTED;
+ fd->state = IO_CLOSED;
if (fd->connect_cb)
fd->connect_cb(fd, rc);
- if (active_fd == fd)
- engine->update(fd);
+ } else if (active_fd && writable) {
+ fd->state = IO_CONNECTED;
+ if (fd->connect_cb)
+ fd->connect_cb(fd, 0);
}
+ if (active_fd != fd)
+ break;
+ engine->update(fd);
/* and fall through */
case IO_CONNECTED:
assert(active_fd == NULL || active_fd == fd);