* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
+#include "IOHandler.h"
+#include "IOEngine.h"
struct IODescriptor *first_descriptor = NULL;
static void iohandler_init_engine() {
//try other engines
- if(engine)
+ if(engine) {
//found an useable IO engine
- else if (engine_select.init())
+ } else if (engine_select.init())
engine = &engine_select;
else {
//STDERR: found no useable IO engine
}
}
-struct IODescriptor *iohandler_add(int sockfd, IOType type, iohandler_callback *callback) {
+struct IODescriptor *iohandler_add(int sockfd, enum IOType type, iohandler_callback *callback) {
//just add a new IODescriptor
struct IODescriptor *descriptor = calloc(1, sizeof(*descriptor));
if(!descriptor) return NULL;
descriptor->state = IO_CLOSED;
descriptor->callback = callback;
if(type != IOTYPE_TIMER) {
- descriptor->readbuf.buffer = malloc(IOREADBUFLEN + 2);
+ descriptor->readbuf.buffer = malloc(IO_READ_BUFLEN + 2);
descriptor->readbuf.bufpos = 0;
- descriptor->readbuf.buflen = IOREADBUFLEN;
- descriptor->writebuf.buffer = malloc(IOREADBUFLEN + 2);
+ descriptor->readbuf.buflen = IO_READ_BUFLEN;
+ descriptor->writebuf.buffer = malloc(IO_READ_BUFLEN + 2);
descriptor->writebuf.bufpos = 0;
- descriptor->writebuf.buflen = IOREADBUFLEN;
+ descriptor->writebuf.buflen = IO_READ_BUFLEN;
}
if(!engine) {
//add IODescriptor to the list
descriptor->prev = NULL;
descriptor->next = first_descriptor;
- first_descriptor->prev = descriptor;
+ if(first_descriptor)
+ first_descriptor->prev = descriptor;
first_descriptor = descriptor;
return descriptor;
}
-static void iohandler_remove(struct IODescriptor *descriptor) {
+static void iohandler_remove(struct IODescriptor *descriptor, int engine_remove) {
//remove IODescriptor from the list
if(descriptor->prev)
descriptor->prev->next = descriptor->next;
first_descriptor = descriptor->next;
if(descriptor->next)
descriptor->next->prev = descriptor->prev;
- engine->remove(descriptor);
+ if(engine_remove)
+ engine->remove(descriptor);
if(descriptor->readbuf.buffer)
free(descriptor->readbuf.buffer);
if(descriptor->writebuf.buffer)
}
}
-struct IODescriptor *iohandler_timer(timeval timeout, iohandler_callback *callback) {
+struct IODescriptor *iohandler_timer(struct timeval timeout, iohandler_callback *callback) {
struct IODescriptor *descriptor;
descriptor = iohandler_add(-1, IOTYPE_TIMER, callback);
if(!descriptor) return NULL;
return descriptor;
}
-struct IODescriptor *iohandler_connect(const char *hostname, unsigned int port, const char *bind, iohandler_callback *callback) {
+struct IODescriptor *iohandler_connect(const char *hostname, unsigned int port, const char *bindhost, iohandler_callback *callback) {
//non-blocking connect
int sockfd;
struct addrinfo hints, *res, *freeres;
ip6->sin6_port = htons(port);
struct sockaddr_in6 *ip6vhost = NULL;
- if (bind && !getaddrinfo(bind, NULL, &hints, &res)) {
+ if (bindhost && !getaddrinfo(bindhost, NULL, &hints, &res)) {
while (res) {
switch (res->ai_family) {
case AF_INET6:
ip4->sin_port = htons(port);
struct sockaddr_in *ip4vhost = NULL;
- if (bind && !getaddrinfo(bind, NULL, &hints, &res)) {
+ if (bindhost && !getaddrinfo(bindhost, NULL, &hints, &res)) {
while (res) {
switch (res->ai_family) {
case AF_INET:
/* I hope you're using the Win32 backend or something else that
* automatically marks the file descriptor non-blocking...
*/
- (void)flags;
#endif
descriptor = iohandler_add(sockfd, IOTYPE_CLIENT, callback);
if(!descriptor) {
descriptor->state = IO_CONNECTING;
descriptor->read_lines = 1;
engine->update(descriptor);
+ return descriptor;
}
struct IODescriptor *iohandler_listen(const char *hostname, unsigned int port, iohandler_callback *callback) {
/* I hope you're using the Win32 backend or something else that
* automatically marks the file descriptor non-blocking...
*/
- (void)flags;
#endif
descriptor = iohandler_add(sockfd, IOTYPE_SERVER, callback);
if(!descriptor) {
listen(sockfd, 1);
descriptor->state = IO_LISTENING;
engine->update(descriptor);
+ return descriptor;
}
void iohandler_write(struct IODescriptor *iofd, const char *line) {
}
void iohandler_send(struct IODescriptor *iofd, const char *data, size_t datalen) {
- if(iofd->type == IOTYPE_TIMER) return; //can not write to timer? :D
+ if(iofd->type == IOTYPE_TIMER || iofd->state == IO_CLOSED) return; //can not write to timer? :D
if(iofd->writebuf.buflen < iofd->writebuf.bufpos + datalen) {
iohandler_increase_iobuf(&iofd->writebuf, iofd->writebuf.bufpos + datalen);
if(iofd->writebuf.buflen < iofd->writebuf.bufpos + datalen)
return;
}
memcpy(iofd->writebuf.buffer + iofd->writebuf.bufpos, data, datalen);
+ iofd->writebuf.bufpos += datalen;
engine->update(iofd);
}
}
void iohandler_close(struct IODescriptor *iofd) {
+ int engine_remove = 1;
+ if(iofd->writebuf.bufpos) {
+ //try to send everything before closing
+#if defined(F_GETFL)
+ flags = fcntl(sockfd, F_GETFL);
+ fcntl(sockfd, F_SETFL, flags & ~O_NONBLOCK);
+ flags = fcntl(sockfd, F_GETFD);
+ fcntl(sockfd, F_SETFD, flags|FD_CLOEXEC);
+#else
+ engine_remove = 0;
+ engine->remove(iofd);
+#endif
+ iohandler_try_write(iofd);
+ }
//close IODescriptor
if(iofd->type == IOTYPE_SERVER || iofd->type == IOTYPE_CLIENT || iofd->type == IOTYPE_STDIN)
close(iofd->fd);
- iohandler_remove(iofd);
+ iohandler_remove(iofd, engine_remove);
}
void iohandler_update(struct IODescriptor *iofd) {
callback_event.type = IOEVENT_TIMEOUT;
break;
case IO_LISTENING:
- callback_event.data.accept_fd = accept(iofd->fd, NULL, 0);
- if(callback_event.data.accept_fd < 0) {
- //error: could not accept
- } else
- callback_event.type = IOEVENT_ACCEPT;
+ if(readable) {
+ callback_event.data.accept_fd = accept(iofd->fd, NULL, 0);
+ if(callback_event.data.accept_fd < 0) {
+ //error: could not accept
+ } else
+ callback_event.type = IOEVENT_ACCEPT;
+ }
break;
case IO_CONNECTING:
if(readable) { //could not connect
callback_event.type = IOEVENT_NOTCONNECTED;
socklen_t arglen;
- arglen = sizeof(callback_event.errno);
- if (getsockopt(fd->fd, SOL_SOCKET, SO_ERROR, &callback_event.errno, &arglen) < 0)
- callback_event.data.errno = errno;
+ arglen = sizeof(callback_event.data.errid);
+ if (getsockopt(iofd->fd, SOL_SOCKET, SO_ERROR, &callback_event.data.errid, &arglen) < 0)
+ callback_event.data.errid = errno;
iofd->state = IO_CLOSED;
} else if(writeable) {
callback_event.type = IOEVENT_CONNECTED;
int bytes = recv(iofd->fd, iofd->readbuf.buffer + iofd->readbuf.bufpos, iofd->readbuf.buflen - iofd->readbuf.bufpos, 0);
if(bytes <= 0) {
if (errno != EAGAIN) {
+ iofd->state = IO_CLOSED;
callback_event.type = IOEVENT_CLOSED;
- callback_event.data.errno = errno;
+ callback_event.data.errid = errno;
}
} else {
- int i, used_bytes = 0, buffer_offset = 0;
+ int i, used_bytes = 0;
iofd->readbuf.bufpos += bytes;
callback_event.type = IOEVENT_RECV;
for(i = 0; i < iofd->readbuf.bufpos; i++) {
}
break;
}
+ if(callback_event.type == IOEVENT_IGNORE && !readable && !writeable)
+ callback_event.type = IOEVENT_TIMEOUT;
if(callback_event.type != IOEVENT_IGNORE)
iohandler_trigger_event(&callback_event);
}