/* * BIRD -- I/O and event loop * * Can be freely distributed and used under the terms of the GNU GPL. */ #include #include #include #include #include #include #include #include #include #include "nest/bird.h" #include "lib/buffer.h" #include "lib/lists.h" #include "lib/resource.h" #include "lib/event.h" #include "lib/timer.h" #include "lib/socket.h" #include "lib/io-loop.h" #include "sysdep/unix/io-loop.h" #include "conf/conf.h" #define THREAD_STACK_SIZE 65536 /* To be lowered in near future */ /* * Current thread context */ _Thread_local struct birdloop *birdloop_current; static _Thread_local struct birdloop *birdloop_wakeup_masked; static _Thread_local uint birdloop_wakeup_masked_count; event_list * birdloop_event_list(struct birdloop *loop) { return &loop->event_list; } struct timeloop * birdloop_time_loop(struct birdloop *loop) { return &loop->time; } _Bool birdloop_inside(struct birdloop *loop) { for (struct birdloop *c = birdloop_current; c; c = c->prev_loop) if (loop == c) return 1; return 0; } void birdloop_flag(struct birdloop *loop, u32 flag) { atomic_fetch_or_explicit(&loop->flags, flag, memory_order_acq_rel); birdloop_ping(loop); } void birdloop_flag_set_handler(struct birdloop *loop, struct birdloop_flag_handler *fh) { ASSERT_DIE(birdloop_inside(loop)); loop->flag_handler = fh; } static int birdloop_process_flags(struct birdloop *loop) { if (!loop->flag_handler) return 0; u32 flags = atomic_exchange_explicit(&loop->flags, 0, memory_order_acq_rel); loop->flag_handler->hook(loop->flag_handler, flags); return !!flags; } static int birdloop_run_events(struct birdloop *loop) { btime begin = current_time(); while (current_time() - begin < 5 MS) { if (!ev_run_list(&loop->event_list)) return 0; times_update(); } return 1; } /* * Wakeup code for birdloop */ void pipe_new(struct pipe *p) { int rv = pipe(p->fd); if (rv < 0) die("pipe: %m"); if (fcntl(p->fd[0], F_SETFL, O_NONBLOCK) < 0) die("fcntl(O_NONBLOCK): %m"); if (fcntl(p->fd[1], F_SETFL, O_NONBLOCK) < 0) die("fcntl(O_NONBLOCK): %m"); } void pipe_drain(struct pipe *p) { while (1) { char buf[64]; int rv = read(p->fd[0], buf, sizeof(buf)); if ((rv < 0) && (errno == EAGAIN)) return; if (rv == 0) bug("wakeup read eof"); if ((rv < 0) && (errno != EINTR)) bug("wakeup read: %m"); } } int pipe_read_one(struct pipe *p) { while (1) { char v; int rv = read(p->fd[0], &v, sizeof(v)); if (rv == 1) return 1; if ((rv < 0) && (errno == EAGAIN)) return 0; if (rv > 1) bug("wakeup read more bytes than expected: %d", rv); if (rv == 0) bug("wakeup read eof"); if (errno != EINTR) bug("wakeup read: %m"); } } void pipe_kick(struct pipe *p) { char v = 1; int rv; while (1) { rv = write(p->fd[1], &v, sizeof(v)); if ((rv >= 0) || (errno == EAGAIN)) return; if (errno != EINTR) bug("wakeup write: %m"); } } void pipe_pollin(struct pipe *p, struct pollfd *pfd) { pfd->fd = p->fd[0]; pfd->events = POLLIN; pfd->revents = 0; } static inline void wakeup_init(struct birdloop *loop) { pipe_new(&loop->wakeup); } static inline void wakeup_drain(struct birdloop *loop) { pipe_drain(&loop->wakeup); } static inline void wakeup_do_kick(struct birdloop *loop) { pipe_kick(&loop->wakeup); } static inline void birdloop_do_ping(struct birdloop *loop) { if (atomic_fetch_add_explicit(&loop->ping_sent, 1, memory_order_acq_rel)) return; if (loop == birdloop_wakeup_masked) birdloop_wakeup_masked_count++; else wakeup_do_kick(loop); } void birdloop_ping(struct birdloop *loop) { if (birdloop_inside(loop) && !loop->ping_pending) loop->ping_pending++; else birdloop_do_ping(loop); } /* * Sockets */ static void sockets_init(struct birdloop *loop) { init_list(&loop->sock_list); loop->sock_num = 0; BUFFER_INIT(loop->poll_sk, loop->pool, 4); BUFFER_INIT(loop->poll_fd, loop->pool, 4); loop->poll_changed = 1; /* add wakeup fd */ } static void sockets_add(struct birdloop *loop, sock *s) { add_tail(&loop->sock_list, &s->n); loop->sock_num++; s->index = -1; loop->poll_changed = 1; birdloop_ping(loop); } void sk_start(sock *s) { ASSERT_DIE(birdloop_current != &main_birdloop); sockets_add(birdloop_current, s); } static void sockets_remove(struct birdloop *loop, sock *s) { if (!enlisted(&s->n)) return; rem_node(&s->n); loop->sock_num--; if (s->index >= 0) { loop->poll_sk.data[s->index] = NULL; s->index = -1; loop->poll_changed = 1; loop->close_scheduled = 1; birdloop_ping(loop); } else close(s->fd); } void sk_stop(sock *s) { sockets_remove(birdloop_current, s); } static inline uint sk_want_events(sock *s) { return (s->rx_hook ? POLLIN : 0) | ((s->ttx != s->tpos) ? POLLOUT : 0); } /* FIXME: this should be called from sock code static void sockets_update(struct birdloop *loop, sock *s) { if (s->index >= 0) loop->poll_fd.data[s->index].events = sk_want_events(s); } */ static void sockets_prepare(struct birdloop *loop) { BUFFER_SET(loop->poll_sk, loop->sock_num + 1); BUFFER_SET(loop->poll_fd, loop->sock_num + 1); struct pollfd *pfd = loop->poll_fd.data; sock **psk = loop->poll_sk.data; uint i = 0; node *n; WALK_LIST(n, loop->sock_list) { sock *s = SKIP_BACK(sock, n, n); ASSERT(i < loop->sock_num); s->index = i; *psk = s; pfd->fd = s->fd; pfd->events = sk_want_events(s); pfd->revents = 0; pfd++; psk++; i++; } ASSERT(i == loop->sock_num); /* Add internal wakeup fd */ *psk = NULL; pipe_pollin(&loop->wakeup, pfd); loop->poll_changed = 0; } static void sockets_close_fds(struct birdloop *loop) { struct pollfd *pfd = loop->poll_fd.data; sock **psk = loop->poll_sk.data; int poll_num = loop->poll_fd.used - 1; int i; for (i = 0; i < poll_num; i++) if (psk[i] == NULL) close(pfd[i].fd); loop->close_scheduled = 0; } int sk_read(sock *s, int revents); int sk_write(sock *s); static void sockets_fire(struct birdloop *loop) { struct pollfd *pfd = loop->poll_fd.data; sock **psk = loop->poll_sk.data; int poll_num = loop->poll_fd.used - 1; times_update(); /* Last fd is internal wakeup fd */ if (pfd[poll_num].revents & POLLIN) wakeup_drain(loop); int i; for (i = 0; i < poll_num; pfd++, psk++, i++) { int e = 1; if (! pfd->revents) continue; if (pfd->revents & POLLNVAL) bug("poll: invalid fd %d", pfd->fd); if (pfd->revents & POLLIN) while (e && *psk && (*psk)->rx_hook) e = sk_read(*psk, pfd->revents); e = 1; if (pfd->revents & POLLOUT) { loop->poll_changed = 1; while (e && *psk) e = sk_write(*psk); } } } /* * Birdloop */ struct birdloop main_birdloop; static void birdloop_enter_locked(struct birdloop *loop); void birdloop_init(void) { wakeup_init(&main_birdloop); main_birdloop.time.domain = the_bird_domain.the_bird; main_birdloop.time.loop = &main_birdloop; times_update(); timers_init(&main_birdloop.time, &root_pool); birdloop_enter_locked(&main_birdloop); } static void *birdloop_main(void *arg); struct birdloop * birdloop_new(pool *pp, uint order, const char *name) { struct domain_generic *dg = domain_new(name, order); pool *p = rp_new(pp, name); struct birdloop *loop = mb_allocz(p, sizeof(struct birdloop)); loop->pool = p; loop->time.domain = dg; loop->time.loop = loop; birdloop_enter(loop); wakeup_init(loop); ev_init_list(&loop->event_list, loop, name); timers_init(&loop->time, p); sockets_init(loop); int e = 0; if (e = pthread_attr_init(&loop->thread_attr)) die("pthread_attr_init() failed: %M", e); if (e = pthread_attr_setstacksize(&loop->thread_attr, THREAD_STACK_SIZE)) die("pthread_attr_setstacksize(%u) failed: %M", THREAD_STACK_SIZE, e); if (e = pthread_attr_setdetachstate(&loop->thread_attr, PTHREAD_CREATE_DETACHED)) die("pthread_attr_setdetachstate(PTHREAD_CREATE_DETACHED) failed: %M", e); if (e = pthread_create(&loop->thread_id, &loop->thread_attr, birdloop_main, loop)) die("pthread_create() failed: %M", e); birdloop_leave(loop); return loop; } static void birdloop_do_stop(struct birdloop *loop, void (*stopped)(void *data), void *data) { loop->stopped = stopped; loop->stop_data = data; wakeup_do_kick(loop); } void birdloop_stop(struct birdloop *loop, void (*stopped)(void *data), void *data) { DG_LOCK(loop->time.domain); birdloop_do_stop(loop, stopped, data); DG_UNLOCK(loop->time.domain); } void birdloop_stop_self(struct birdloop *loop, void (*stopped)(void *data), void *data) { ASSERT_DIE(loop == birdloop_current); ASSERT_DIE(DG_IS_LOCKED(loop->time.domain)); birdloop_do_stop(loop, stopped, data); } void birdloop_free(struct birdloop *loop) { ASSERT_DIE(loop->links == 0); ASSERT_DIE(pthread_equal(pthread_self(), loop->thread_id)); rcu_birdloop_stop(&loop->rcu); pthread_attr_destroy(&loop->thread_attr); domain_free(loop->time.domain); rfree(loop->pool); } static void birdloop_enter_locked(struct birdloop *loop) { ASSERT_DIE(DG_IS_LOCKED(loop->time.domain)); ASSERT_DIE(!birdloop_inside(loop)); /* Store the old context */ loop->prev_loop = birdloop_current; /* Put the new context */ birdloop_current = loop; } void birdloop_enter(struct birdloop *loop) { DG_LOCK(loop->time.domain); return birdloop_enter_locked(loop); } static void birdloop_leave_locked(struct birdloop *loop) { /* Check the current context */ ASSERT_DIE(birdloop_current == loop); /* Send pending pings */ if (loop->ping_pending) { loop->ping_pending = 0; birdloop_do_ping(loop); } /* Restore the old context */ birdloop_current = loop->prev_loop; } void birdloop_leave(struct birdloop *loop) { birdloop_leave_locked(loop); DG_UNLOCK(loop->time.domain); } void birdloop_mask_wakeups(struct birdloop *loop) { ASSERT_DIE(birdloop_wakeup_masked == NULL); birdloop_wakeup_masked = loop; } void birdloop_unmask_wakeups(struct birdloop *loop) { ASSERT_DIE(birdloop_wakeup_masked == loop); birdloop_wakeup_masked = NULL; if (birdloop_wakeup_masked_count) wakeup_do_kick(loop); birdloop_wakeup_masked_count = 0; } void birdloop_link(struct birdloop *loop) { ASSERT_DIE(birdloop_inside(loop)); loop->links++; } void birdloop_unlink(struct birdloop *loop) { ASSERT_DIE(birdloop_inside(loop)); loop->links--; } static void * birdloop_main(void *arg) { struct birdloop *loop = arg; timer *t; int rv, timeout; rcu_birdloop_start(&loop->rcu); btime loop_begin = current_time(); tmp_init(loop->pool); birdloop_enter(loop); while (1) { timers_fire(&loop->time, 0); if (birdloop_process_flags(loop) + birdloop_run_events(loop)) timeout = 0; else if (t = timers_first(&loop->time)) timeout = (tm_remains(t) TO_MS) + 1; else timeout = -1; if (loop->poll_changed) sockets_prepare(loop); else if ((timeout < 0) || (timeout > 5000)) flush_local_pages(); btime duration = current_time() - loop_begin; if (duration > config->watchdog_warning) log(L_WARN "I/O loop cycle took %d ms", (int) (duration TO_MS)); birdloop_leave(loop); try: rv = poll(loop->poll_fd.data, loop->poll_fd.used, timeout); if (rv < 0) { if (errno == EINTR || errno == EAGAIN) goto try; bug("poll: %m"); } birdloop_enter(loop); if (loop->close_scheduled) sockets_close_fds(loop); if (loop->stopped) break; loop_begin = current_time(); if (rv && !loop->poll_changed) sockets_fire(loop); atomic_exchange_explicit(&loop->ping_sent, 0, memory_order_acq_rel); } /* Flush remaining events */ ASSERT_DIE(!ev_run_list(&loop->event_list)); /* Drop timers */ while (t = timers_first(&loop->time)) tm_stop(t); /* No sockets allowed */ ASSERT_DIE(EMPTY_LIST(loop->sock_list)); ASSERT_DIE(loop->sock_num == 0); birdloop_leave(loop); loop->stopped(loop->stop_data); flush_local_pages(); return NULL; } void birdloop_yield(void) { usleep(100); }