mirror of
https://gitlab.nic.cz/labs/bird.git
synced 2024-12-22 17:51:53 +00:00
More efficient IO loop event execution to avoid long loops
If there are lots of loops in a single thread and only some of the loops are actually active, the other loops are now kept aside and not checked until they actually get some timers, events or active sockets. This should help with extreme loads like 100k tables and protocols. Also ping and loop pickup mechanism was allowing subtle race conditions. Now properly handling collisions between loop ping and pickup.
This commit is contained in:
parent
d9f0f4af7d
commit
571c4f69bf
@ -13,6 +13,7 @@ struct domain_generic;
|
||||
|
||||
/* Here define the global lock order; first to last. */
|
||||
struct lock_order {
|
||||
struct domain_generic *meta;
|
||||
struct domain_generic *the_bird;
|
||||
struct domain_generic *control;
|
||||
struct domain_generic *proto;
|
||||
|
@ -1131,6 +1131,7 @@ proto_loop_stopped(void *ptr)
|
||||
|
||||
birdloop_enter(&main_birdloop);
|
||||
|
||||
birdloop_free(p->loop);
|
||||
p->loop = &main_birdloop;
|
||||
proto_cleanup(p);
|
||||
|
||||
|
@ -4073,6 +4073,7 @@ rt_delete(void *tab_)
|
||||
|
||||
RT_UNLOCK(RT_PUB(tab));
|
||||
|
||||
birdloop_free(tab->loop);
|
||||
rfree(tab->rp);
|
||||
config_del_obstacle(conf);
|
||||
|
||||
|
@ -217,6 +217,8 @@ bgp_open(struct bgp_proto *p)
|
||||
req->port = p->cf->local_port;
|
||||
req->flags = p->cf->free_bind ? SKF_FREEBIND : 0;
|
||||
|
||||
BGP_TRACE(D_EVENTS, "Requesting listen socket at %I%J port %u", req->addr, req->iface, req->port);
|
||||
|
||||
add_tail(&bgp_listen_pending, &req->n);
|
||||
ev_schedule(&bgp_listen_event);
|
||||
}
|
||||
@ -243,7 +245,9 @@ bgp_listen_create(void *_ UNUSED)
|
||||
break;
|
||||
|
||||
/* Not found any */
|
||||
if (!NODE_VALID(bs))
|
||||
if (NODE_VALID(bs))
|
||||
BGP_TRACE(D_EVENTS, "Found a listening socket: %p", bs);
|
||||
else
|
||||
{
|
||||
sock *sk = sk_new(proto_pool);
|
||||
sk->type = SK_TCP_PASSIVE;
|
||||
@ -275,6 +279,8 @@ bgp_listen_create(void *_ UNUSED)
|
||||
|
||||
init_list(&bs->requests);
|
||||
add_tail(&bgp_sockets, &bs->n);
|
||||
|
||||
BGP_TRACE(D_EVENTS, "Created new listening socket: %p", bs);
|
||||
}
|
||||
|
||||
add_tail(&bs->requests, &req->n);
|
||||
|
@ -31,6 +31,8 @@
|
||||
|
||||
#define THREAD_STACK_SIZE 65536 /* To be lowered in near future */
|
||||
|
||||
static struct birdloop *birdloop_new_internal(pool *pp, uint order, const char *name, int request_pickup);
|
||||
|
||||
/*
|
||||
* Nanosecond time for accounting purposes
|
||||
*
|
||||
@ -65,6 +67,9 @@ _Thread_local struct birdloop *birdloop_current;
|
||||
static _Thread_local struct birdloop *birdloop_wakeup_masked;
|
||||
static _Thread_local uint birdloop_wakeup_masked_count;
|
||||
|
||||
#define LOOP_TRACE(loop, fmt, args...) do { if (config && config->latency_debug) log(L_TRACE "%s (%p): " fmt, domain_name((loop)->time.domain), (loop), ##args); } while (0)
|
||||
#define THREAD_TRACE(...) do { if (config && config->latency_debug) log(L_TRACE "Thread: " __VA_ARGS__); } while (0)
|
||||
|
||||
event_list *
|
||||
birdloop_event_list(struct birdloop *loop)
|
||||
{
|
||||
@ -190,11 +195,13 @@ pipe_kick(struct pipe *p)
|
||||
}
|
||||
|
||||
void
|
||||
pipe_pollin(struct pipe *p, struct pollfd *pfd)
|
||||
pipe_pollin(struct pipe *p, struct pfd *pfd)
|
||||
{
|
||||
pfd->fd = p->fd[0];
|
||||
pfd->events = POLLIN;
|
||||
pfd->revents = 0;
|
||||
BUFFER_PUSH(pfd->pfd) = (struct pollfd) {
|
||||
.fd = p->fd[0],
|
||||
.events = POLLIN,
|
||||
};
|
||||
BUFFER_PUSH(pfd->loop) = NULL;
|
||||
}
|
||||
|
||||
static inline void
|
||||
@ -215,28 +222,80 @@ wakeup_do_kick(struct bird_thread *loop)
|
||||
pipe_kick(&loop->wakeup);
|
||||
}
|
||||
|
||||
static inline _Bool
|
||||
birdloop_try_ping(struct birdloop *loop, u32 ltt)
|
||||
{
|
||||
/* Somebody else is already pinging, be idempotent */
|
||||
if (ltt & LTT_PING)
|
||||
{
|
||||
LOOP_TRACE(loop, "already being pinged");
|
||||
return 0;
|
||||
}
|
||||
|
||||
/* Thread moving is an implicit ping */
|
||||
if (ltt & LTT_MOVE)
|
||||
{
|
||||
LOOP_TRACE(loop, "ping while moving");
|
||||
return 1;
|
||||
}
|
||||
|
||||
/* No more flags allowed */
|
||||
ASSERT_DIE(!ltt);
|
||||
|
||||
/* No ping when not picked up */
|
||||
if (!loop->thread)
|
||||
{
|
||||
LOOP_TRACE(loop, "not picked up yet, can't ping");
|
||||
return 1;
|
||||
}
|
||||
|
||||
/* No ping when masked */
|
||||
if (loop == birdloop_wakeup_masked)
|
||||
{
|
||||
LOOP_TRACE(loop, "wakeup masked, can't ping");
|
||||
birdloop_wakeup_masked_count++;
|
||||
return 1;
|
||||
}
|
||||
|
||||
/* Send meta event to ping */
|
||||
if ((loop != loop->thread->meta) && (loop != &main_birdloop))
|
||||
{
|
||||
LOOP_TRACE(loop, "Ping by meta event to %p", loop->thread->meta);
|
||||
ev_send_loop(loop->thread->meta, &loop->event);
|
||||
return 1;
|
||||
}
|
||||
|
||||
/* Do the real ping */
|
||||
LOOP_TRACE(loop, "sending pipe ping");
|
||||
wakeup_do_kick(loop->thread);
|
||||
return 0;
|
||||
}
|
||||
|
||||
static inline void
|
||||
birdloop_do_ping(struct birdloop *loop)
|
||||
{
|
||||
if (!loop->thread)
|
||||
return;
|
||||
/* Register our ping effort */
|
||||
u32 ltt = atomic_fetch_or_explicit(&loop->thread_transition, LTT_PING, memory_order_acq_rel);
|
||||
|
||||
if (atomic_fetch_add_explicit(&loop->thread->ping_sent, 1, memory_order_acq_rel))
|
||||
return;
|
||||
|
||||
if (loop == birdloop_wakeup_masked)
|
||||
birdloop_wakeup_masked_count++;
|
||||
else
|
||||
wakeup_do_kick(loop->thread);
|
||||
/* Try to ping in multiple ways */
|
||||
if (birdloop_try_ping(loop, ltt))
|
||||
atomic_fetch_and_explicit(&loop->thread_transition, ~LTT_PING, memory_order_acq_rel);
|
||||
}
|
||||
|
||||
void
|
||||
birdloop_ping(struct birdloop *loop)
|
||||
{
|
||||
if (birdloop_inside(loop) && !loop->ping_pending)
|
||||
loop->ping_pending++;
|
||||
else
|
||||
if (!birdloop_inside(loop))
|
||||
{
|
||||
LOOP_TRACE(loop, "ping from outside");
|
||||
birdloop_do_ping(loop);
|
||||
}
|
||||
else
|
||||
{
|
||||
LOOP_TRACE(loop, "ping from inside, pending=%d", loop->ping_pending);
|
||||
if (!loop->ping_pending)
|
||||
loop->ping_pending++;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@ -254,6 +313,7 @@ sockets_init(struct birdloop *loop)
|
||||
static void
|
||||
sockets_add(struct birdloop *loop, sock *s)
|
||||
{
|
||||
LOOP_TRACE(loop, "adding socket %p (total=%d)", s, loop->sock_num);
|
||||
add_tail(&loop->sock_list, &s->n);
|
||||
loop->sock_num++;
|
||||
|
||||
@ -278,6 +338,8 @@ sockets_remove(struct birdloop *loop, sock *s)
|
||||
return;
|
||||
|
||||
/* Decouple the socket from the loop at all. */
|
||||
LOOP_TRACE(loop, "removing socket %p (total=%d)", s, loop->sock_num);
|
||||
|
||||
rem_node(&s->n);
|
||||
loop->sock_num--;
|
||||
if (loop->thread)
|
||||
@ -302,30 +364,30 @@ sk_stop(sock *s)
|
||||
static inline uint sk_want_events(sock *s)
|
||||
{ return (s->rx_hook ? POLLIN : 0) | ((s->ttx != s->tpos) ? POLLOUT : 0); }
|
||||
|
||||
static struct pollfd *
|
||||
sockets_prepare(struct birdloop *loop, struct pollfd *pfd, struct pollfd *end)
|
||||
void
|
||||
sockets_prepare(struct birdloop *loop, struct pfd *pfd)
|
||||
{
|
||||
node *n;
|
||||
loop->pfd = pfd;
|
||||
|
||||
WALK_LIST(n, loop->sock_list)
|
||||
{
|
||||
sock *s = SKIP_BACK(sock, n, n);
|
||||
uint w = sk_want_events(s);
|
||||
|
||||
/* Out of space for pfds. Force reallocation. */
|
||||
if (pfd >= end)
|
||||
return NULL;
|
||||
|
||||
s->index = pfd - loop->pfd;
|
||||
|
||||
pfd->fd = s->fd;
|
||||
pfd->events = sk_want_events(s);
|
||||
pfd->revents = 0;
|
||||
|
||||
pfd++;
|
||||
if (!w)
|
||||
{
|
||||
s->index = -1;
|
||||
continue;
|
||||
}
|
||||
|
||||
return pfd;
|
||||
s->index = pfd->pfd.used;
|
||||
LOOP_TRACE(loop, "socket %p poll index is %d", s, s->index);
|
||||
|
||||
BUFFER_PUSH(pfd->pfd) = (struct pollfd) {
|
||||
.fd = s->fd,
|
||||
.events = sk_want_events(s),
|
||||
};
|
||||
BUFFER_PUSH(pfd->loop) = loop;
|
||||
}
|
||||
}
|
||||
|
||||
int sk_read(sock *s, int revents);
|
||||
@ -334,10 +396,9 @@ int sk_write(sock *s);
|
||||
static void
|
||||
sockets_fire(struct birdloop *loop)
|
||||
{
|
||||
struct pollfd *pfd = loop->pfd;
|
||||
|
||||
times_update();
|
||||
|
||||
struct pollfd *pfd = loop->thread->pfd->pfd.data;
|
||||
sock *s; node *n, *nxt;
|
||||
WALK_LIST2_DELSAFE(s, n, nxt, loop->sock_list, n)
|
||||
{
|
||||
@ -378,6 +439,99 @@ static list bird_thread_pickup;
|
||||
|
||||
static _Thread_local struct bird_thread *this_thread;
|
||||
|
||||
static void
|
||||
birdloop_set_thread(struct birdloop *loop, struct bird_thread *thr)
|
||||
{
|
||||
/* Signal our moving effort */
|
||||
u32 ltt = atomic_fetch_or_explicit(&loop->thread_transition, LTT_MOVE, memory_order_acq_rel);
|
||||
ASSERT_DIE((ltt & LTT_MOVE) == 0);
|
||||
|
||||
while (ltt & LTT_PING)
|
||||
{
|
||||
birdloop_yield();
|
||||
ltt = atomic_load_explicit(&loop->thread_transition, memory_order_acquire);
|
||||
ASSERT_DIE(ltt & LTT_MOVE);
|
||||
}
|
||||
/* Now we are free of running pings */
|
||||
|
||||
if (loop->thread = thr)
|
||||
add_tail(&thr->loops, &loop->n);
|
||||
else
|
||||
{
|
||||
LOCK_DOMAIN(resource, birdloop_domain);
|
||||
add_tail(&birdloop_pickup, &loop->n);
|
||||
UNLOCK_DOMAIN(resource, birdloop_domain);
|
||||
}
|
||||
|
||||
/* Finished */
|
||||
atomic_fetch_and_explicit(&loop->thread_transition, ~LTT_MOVE, memory_order_acq_rel);
|
||||
|
||||
/* Request to run by force */
|
||||
ev_send_loop(loop->thread->meta, &loop->event);
|
||||
}
|
||||
|
||||
static struct birdloop *
|
||||
birdloop_take(void)
|
||||
{
|
||||
struct birdloop *loop = NULL;
|
||||
|
||||
LOCK_DOMAIN(resource, birdloop_domain);
|
||||
if (!EMPTY_LIST(birdloop_pickup))
|
||||
{
|
||||
/* Take the first loop from the pickup list and unlock */
|
||||
loop = SKIP_BACK(struct birdloop, n, HEAD(birdloop_pickup));
|
||||
rem_node(&loop->n);
|
||||
UNLOCK_DOMAIN(resource, birdloop_domain);
|
||||
|
||||
birdloop_set_thread(loop, this_thread);
|
||||
|
||||
/* This thread goes to the end of the pickup list */
|
||||
LOCK_DOMAIN(resource, birdloop_domain);
|
||||
rem_node(&this_thread->n);
|
||||
add_tail(&bird_thread_pickup, &this_thread->n);
|
||||
|
||||
/* If there are more loops to be picked up, wakeup the next thread in order */
|
||||
if (!EMPTY_LIST(birdloop_pickup))
|
||||
wakeup_do_kick(SKIP_BACK(struct bird_thread, n, HEAD(bird_thread_pickup)));
|
||||
}
|
||||
UNLOCK_DOMAIN(resource, birdloop_domain);
|
||||
|
||||
return loop;
|
||||
}
|
||||
|
||||
static void
|
||||
birdloop_drop(struct birdloop *loop)
|
||||
{
|
||||
/* Remove loop from this thread's list */
|
||||
rem_node(&loop->n);
|
||||
|
||||
/* Unset loop's thread */
|
||||
if (birdloop_inside(loop))
|
||||
birdloop_set_thread(loop, NULL);
|
||||
else
|
||||
{
|
||||
birdloop_enter(loop);
|
||||
birdloop_set_thread(loop, NULL);
|
||||
birdloop_leave(loop);
|
||||
}
|
||||
|
||||
/* Put loop into pickup list */
|
||||
LOCK_DOMAIN(resource, birdloop_domain);
|
||||
add_tail(&birdloop_pickup, &loop->n);
|
||||
UNLOCK_DOMAIN(resource, birdloop_domain);
|
||||
}
|
||||
|
||||
static int
|
||||
poll_timeout(struct birdloop *loop)
|
||||
{
|
||||
timer *t = timers_first(&loop->time);
|
||||
if (!t)
|
||||
return -1;
|
||||
|
||||
btime remains = tm_remains(t);
|
||||
return remains TO_MS + ((remains TO_MS) MS < remains);
|
||||
}
|
||||
|
||||
static void *
|
||||
bird_thread_main(void *arg)
|
||||
{
|
||||
@ -389,103 +543,79 @@ bird_thread_main(void *arg)
|
||||
tmp_init(thr->pool);
|
||||
init_list(&thr->loops);
|
||||
|
||||
u32 refresh_sockets = 1;
|
||||
thr->meta = birdloop_new_internal(thr->pool, DOMAIN_ORDER(meta), "Thread Meta", 0);
|
||||
thr->meta->thread = thr;
|
||||
birdloop_enter(thr->meta);
|
||||
|
||||
struct pollfd *pfd, *end;
|
||||
u32 refresh_sockets = 1;
|
||||
struct pfd pfd;
|
||||
BUFFER_INIT(pfd.pfd, thr->pool, 16);
|
||||
BUFFER_INIT(pfd.loop, thr->pool, 16);
|
||||
thr->pfd = &pfd;
|
||||
|
||||
while (1)
|
||||
{
|
||||
/* Wakeup at least once a minute. */
|
||||
int timeout = 60000;
|
||||
int timeout;
|
||||
|
||||
/* Pickup new loops */
|
||||
LOCK_DOMAIN(resource, birdloop_domain);
|
||||
if (!EMPTY_LIST(birdloop_pickup))
|
||||
struct birdloop *loop = birdloop_take();
|
||||
if (loop)
|
||||
{
|
||||
struct birdloop *loop = SKIP_BACK(struct birdloop, n, HEAD(birdloop_pickup));
|
||||
rem_node(&loop->n);
|
||||
UNLOCK_DOMAIN(resource, birdloop_domain);
|
||||
|
||||
add_tail(&thr->loops, &loop->n);
|
||||
|
||||
birdloop_enter(loop);
|
||||
loop->thread = thr;
|
||||
if (!EMPTY_LIST(loop->sock_list))
|
||||
refresh_sockets = 1;
|
||||
birdloop_leave(loop);
|
||||
|
||||
/* If there are more loops to be picked up, wakeup the next thread */
|
||||
LOCK_DOMAIN(resource, birdloop_domain);
|
||||
rem_node(&thr->n);
|
||||
add_tail(&bird_thread_pickup, &thr->n);
|
||||
|
||||
if (!EMPTY_LIST(birdloop_pickup))
|
||||
wakeup_do_kick(SKIP_BACK(struct bird_thread, n, HEAD(bird_thread_pickup)));
|
||||
}
|
||||
UNLOCK_DOMAIN(resource, birdloop_domain);
|
||||
|
||||
struct birdloop *loop; node *nn;
|
||||
WALK_LIST2(loop, nn, thr->loops, n)
|
||||
/* Schedule all loops with timed out timers */
|
||||
timers_fire(&thr->meta->time, 0);
|
||||
|
||||
/* Run all scheduled loops */
|
||||
int more_events = ev_run_list(&thr->meta->event_list);
|
||||
if (more_events)
|
||||
{
|
||||
birdloop_enter(loop);
|
||||
u64 after_enter = ns_now();
|
||||
|
||||
timer *t;
|
||||
|
||||
times_update();
|
||||
timers_fire(&loop->time, 0);
|
||||
int again = birdloop_process_flags(loop) + ev_run_list(&loop->event_list);
|
||||
|
||||
#if 0
|
||||
if (loop->n.next->next)
|
||||
__builtin_prefetch(SKIP_BACK(struct birdloop, n, loop->n.next)->time.domain);
|
||||
#endif
|
||||
|
||||
if (again)
|
||||
timeout = MIN(0, timeout);
|
||||
else if (t = timers_first(&loop->time))
|
||||
timeout = MIN(((tm_remains(t) TO_MS) + 1), timeout);
|
||||
|
||||
u64 before_leave = ns_now();
|
||||
loop->total_time_spent_ns += (before_leave - after_enter);
|
||||
birdloop_leave(loop);
|
||||
|
||||
ev_run_list(&thr->priority_events);
|
||||
THREAD_TRACE("More events to run");
|
||||
timeout = 0;
|
||||
}
|
||||
else
|
||||
{
|
||||
timeout = poll_timeout(thr->meta);
|
||||
if (timeout == -1)
|
||||
THREAD_TRACE("No timers, no events");
|
||||
else
|
||||
THREAD_TRACE("Next timer in %d ms", timeout);
|
||||
}
|
||||
|
||||
/* Run priority events before sleeping */
|
||||
ev_run_list(&thr->priority_events);
|
||||
|
||||
/* Do we have to refresh sockets? */
|
||||
refresh_sockets += atomic_exchange_explicit(&thr->poll_changed, 0, memory_order_acq_rel);
|
||||
|
||||
if (!refresh_sockets && ((timeout < 0) || (timeout > 5000)))
|
||||
flush_local_pages();
|
||||
|
||||
while (refresh_sockets)
|
||||
if (refresh_sockets)
|
||||
{
|
||||
sock_retry:;
|
||||
end = (pfd = thr->pfd) + thr->pfd_max;
|
||||
BUFFER_FLUSH(pfd.pfd);
|
||||
BUFFER_FLUSH(pfd.loop);
|
||||
|
||||
/* Add internal wakeup fd */
|
||||
pipe_pollin(&thr->wakeup, pfd);
|
||||
pfd++;
|
||||
pipe_pollin(&thr->wakeup, &pfd);
|
||||
|
||||
node *nn;
|
||||
WALK_LIST2(loop, nn, thr->loops, n)
|
||||
{
|
||||
birdloop_enter(loop);
|
||||
pfd = sockets_prepare(loop, pfd, end);
|
||||
sockets_prepare(loop, &pfd);
|
||||
birdloop_leave(loop);
|
||||
|
||||
if (!pfd)
|
||||
{
|
||||
mb_free(thr->pfd);
|
||||
thr->pfd = mb_alloc(thr->pool, sizeof(struct pollfd) * (thr->pfd_max *= 2));
|
||||
goto sock_retry;
|
||||
}
|
||||
}
|
||||
|
||||
ASSERT_DIE(pfd.loop.used == pfd.pfd.used);
|
||||
refresh_sockets = 0;
|
||||
}
|
||||
/* Nothing to do in at least 5 seconds, flush local hot page cache */
|
||||
else if (timeout > 5000)
|
||||
flush_local_pages();
|
||||
|
||||
poll_retry:;
|
||||
int rv = poll(thr->pfd, pfd - thr->pfd, timeout);
|
||||
int rv = poll(pfd.pfd.data, pfd.pfd.used, timeout);
|
||||
if (rv < 0)
|
||||
{
|
||||
if (errno == EINTR || errno == EAGAIN)
|
||||
@ -494,51 +624,26 @@ poll_retry:;
|
||||
}
|
||||
|
||||
/* Drain wakeup fd */
|
||||
if (thr->pfd[0].revents & POLLIN)
|
||||
if (pfd.pfd.data[0].revents & POLLIN)
|
||||
{
|
||||
ASSERT_DIE(rv > 0);
|
||||
rv--;
|
||||
wakeup_drain(thr);
|
||||
}
|
||||
|
||||
atomic_exchange_explicit(&thr->ping_sent, 0, memory_order_acq_rel);
|
||||
atomic_fetch_and_explicit(&thr->meta->thread_transition, ~LTT_PING, memory_order_acq_rel);
|
||||
|
||||
if (!rv && !atomic_exchange_explicit(&thr->run_cleanup, 0, memory_order_acq_rel))
|
||||
continue;
|
||||
|
||||
/* Process stops and regular sockets */
|
||||
node *nxt;
|
||||
WALK_LIST2_DELSAFE(loop, nn, nxt, thr->loops, n)
|
||||
/* Schedule loops with active sockets */
|
||||
if (rv)
|
||||
for (uint i = 1; i < pfd.pfd.used; i++)
|
||||
if (pfd.pfd.data[i].revents)
|
||||
{
|
||||
birdloop_enter(loop);
|
||||
|
||||
if (loop->stopped)
|
||||
{
|
||||
/* Flush remaining events */
|
||||
ASSERT_DIE(!ev_run_list(&loop->event_list));
|
||||
|
||||
/* Drop timers */
|
||||
timer *t;
|
||||
while (t = timers_first(&loop->time))
|
||||
tm_stop(t);
|
||||
|
||||
/* No sockets allowed */
|
||||
ASSERT_DIE(EMPTY_LIST(loop->sock_list));
|
||||
|
||||
/* Declare loop stopped */
|
||||
rem_node(&loop->n);
|
||||
birdloop_leave(loop);
|
||||
loop->stopped(loop->stop_data);
|
||||
|
||||
/* Birdloop already left */
|
||||
continue;
|
||||
}
|
||||
else if (rv)
|
||||
sockets_fire(loop);
|
||||
|
||||
birdloop_leave(loop);
|
||||
LOOP_TRACE(pfd.loop.data[i], "socket id %d got revents=%d", i, pfd.pfd.data[i].revents);
|
||||
ev_send_loop(thr->meta, &pfd.loop.data[i]->event);
|
||||
}
|
||||
}
|
||||
|
||||
bug("An infinite loop has ended.");
|
||||
}
|
||||
|
||||
static void
|
||||
@ -563,11 +668,8 @@ bird_thread_start(void)
|
||||
|
||||
struct bird_thread *thr = mb_allocz(p, sizeof(*thr));
|
||||
thr->pool = p;
|
||||
thr->pfd = mb_alloc(p, sizeof(struct pollfd) * (thr->pfd_max = 16));
|
||||
thr->cleanup_event = (event) { .hook = bird_thread_cleanup, .data = thr, };
|
||||
|
||||
atomic_store_explicit(&thr->ping_sent, 0, memory_order_relaxed);
|
||||
|
||||
wakeup_init(thr);
|
||||
ev_init_list(&thr->priority_events, NULL, "Thread direct event list");
|
||||
|
||||
@ -615,7 +717,7 @@ bird_thread_shutdown(void * _ UNUSED)
|
||||
|
||||
UNLOCK_DOMAIN(resource, birdloop_domain);
|
||||
|
||||
log(L_INFO "Thread pickup size differs from dropper goal by %d%s", dif, tdl_stop ? ", stopping" : "");
|
||||
DBG("Thread pickup size differs from dropper goal by %d%s\n", dif, tdl_stop ? ", stopping" : "");
|
||||
|
||||
if (tdl_stop)
|
||||
{
|
||||
@ -628,29 +730,11 @@ bird_thread_shutdown(void * _ UNUSED)
|
||||
/* Leave the thread-picker list to get no more loops */
|
||||
LOCK_DOMAIN(resource, birdloop_domain);
|
||||
rem_node(&thr->n);
|
||||
UNLOCK_DOMAIN(resource, birdloop_domain);
|
||||
|
||||
/* Drop loops including the thread dropper itself */
|
||||
while (!EMPTY_LIST(thr->loops))
|
||||
{
|
||||
/* Remove loop from this thread's list */
|
||||
struct birdloop *loop = HEAD(thr->loops);
|
||||
rem_node(&loop->n);
|
||||
UNLOCK_DOMAIN(resource, birdloop_domain);
|
||||
|
||||
/* Unset loop's thread */
|
||||
if (birdloop_inside(loop))
|
||||
loop->thread = NULL;
|
||||
else
|
||||
{
|
||||
birdloop_enter(loop);
|
||||
loop->thread = NULL;
|
||||
birdloop_leave(loop);
|
||||
}
|
||||
|
||||
/* Put loop into pickup list */
|
||||
LOCK_DOMAIN(resource, birdloop_domain);
|
||||
add_tail(&birdloop_pickup, &loop->n);
|
||||
}
|
||||
birdloop_drop(HEAD(thr->loops));
|
||||
|
||||
/* Let others know about new loops */
|
||||
if (!EMPTY_LIST(birdloop_pickup))
|
||||
@ -660,6 +744,11 @@ bird_thread_shutdown(void * _ UNUSED)
|
||||
/* Leave the thread-dropper loop as we aren't going to return. */
|
||||
birdloop_leave(thread_dropper);
|
||||
|
||||
/* Stop the meta loop */
|
||||
birdloop_leave(thr->meta);
|
||||
domain_free(thr->meta->time.domain);
|
||||
rfree(thr->meta->pool);
|
||||
|
||||
/* Local pages not needed anymore */
|
||||
flush_local_pages();
|
||||
|
||||
@ -865,8 +954,81 @@ birdloop_init(void)
|
||||
birdloop_enter_locked(&main_birdloop);
|
||||
}
|
||||
|
||||
struct birdloop *
|
||||
birdloop_new(pool *pp, uint order, const char *name)
|
||||
static void
|
||||
birdloop_stop_internal(struct birdloop *loop)
|
||||
{
|
||||
/* Flush remaining events */
|
||||
ASSERT_DIE(!ev_run_list(&loop->event_list));
|
||||
|
||||
/* Drop timers */
|
||||
timer *t;
|
||||
while (t = timers_first(&loop->time))
|
||||
tm_stop(t);
|
||||
|
||||
/* No sockets allowed */
|
||||
ASSERT_DIE(EMPTY_LIST(loop->sock_list));
|
||||
|
||||
/* Unschedule from Meta */
|
||||
ev_postpone(&loop->event);
|
||||
tm_stop(&loop->timer);
|
||||
|
||||
/* Declare loop stopped */
|
||||
rem_node(&loop->n);
|
||||
birdloop_leave(loop);
|
||||
|
||||
/* Tail-call the stopped hook */
|
||||
loop->stopped(loop->stop_data);
|
||||
}
|
||||
|
||||
static void
|
||||
birdloop_run(void *_loop)
|
||||
{
|
||||
/* Run priority events before the loop is executed */
|
||||
ev_run_list(&this_thread->priority_events);
|
||||
|
||||
struct birdloop *loop = _loop;
|
||||
birdloop_enter(loop);
|
||||
|
||||
if (loop->stopped)
|
||||
/* Birdloop left inside the helper function */
|
||||
return birdloop_stop_internal(loop);
|
||||
|
||||
/* Process sockets */
|
||||
sockets_fire(loop);
|
||||
|
||||
/* Run timers */
|
||||
timers_fire(&loop->time, 0);
|
||||
|
||||
/* Run flag handlers */
|
||||
if (birdloop_process_flags(loop))
|
||||
{
|
||||
LOOP_TRACE(loop, "Flag processing needs another run");
|
||||
ev_send_loop(this_thread->meta, &loop->event);
|
||||
}
|
||||
|
||||
/* Run events */
|
||||
ev_run_list(&loop->event_list);
|
||||
|
||||
/* Request meta timer */
|
||||
timer *t = timers_first(&loop->time);
|
||||
if (t)
|
||||
tm_start_in(&loop->timer, tm_remains(t), this_thread->meta);
|
||||
else
|
||||
tm_stop(&loop->timer);
|
||||
|
||||
birdloop_leave(loop);
|
||||
}
|
||||
|
||||
static void
|
||||
birdloop_run_timer(timer *tm)
|
||||
{
|
||||
struct birdloop *loop = tm->data;
|
||||
LOOP_TRACE(loop, "Timer ready, requesting run");
|
||||
ev_send_loop(loop->thread->meta, &loop->event);
|
||||
}
|
||||
|
||||
static struct birdloop *
|
||||
birdloop_new_internal(pool *pp, uint order, const char *name, int request_pickup)
|
||||
{
|
||||
struct domain_generic *dg = domain_new(name, order);
|
||||
|
||||
@ -877,32 +1039,45 @@ birdloop_new(pool *pp, uint order, const char *name)
|
||||
loop->time.domain = dg;
|
||||
loop->time.loop = loop;
|
||||
|
||||
atomic_store_explicit(&loop->thread_transition, 0, memory_order_relaxed);
|
||||
|
||||
birdloop_enter(loop);
|
||||
|
||||
ev_init_list(&loop->event_list, loop, name);
|
||||
timers_init(&loop->time, p);
|
||||
sockets_init(loop);
|
||||
|
||||
loop->event = (event) { .hook = birdloop_run, .data = loop, };
|
||||
loop->timer = (timer) { .hook = birdloop_run_timer, .data = loop, };
|
||||
|
||||
if (request_pickup)
|
||||
{
|
||||
LOCK_DOMAIN(resource, birdloop_domain);
|
||||
add_tail(&birdloop_pickup, &loop->n);
|
||||
wakeup_do_kick(SKIP_BACK(struct bird_thread, n, HEAD(bird_thread_pickup)));
|
||||
UNLOCK_DOMAIN(resource, birdloop_domain);
|
||||
}
|
||||
else
|
||||
loop->n.next = loop->n.prev = &loop->n;
|
||||
|
||||
birdloop_leave(loop);
|
||||
|
||||
return loop;
|
||||
}
|
||||
|
||||
struct birdloop *
|
||||
birdloop_new(pool *pp, uint order, const char *name)
|
||||
{
|
||||
return birdloop_new_internal(pp, order, name, 1);
|
||||
}
|
||||
|
||||
static void
|
||||
birdloop_do_stop(struct birdloop *loop, void (*stopped)(void *data), void *data)
|
||||
{
|
||||
loop->stopped = stopped;
|
||||
loop->stop_data = data;
|
||||
if (loop->thread)
|
||||
{
|
||||
atomic_store_explicit(&loop->thread->run_cleanup, 1, memory_order_release);
|
||||
wakeup_do_kick(loop->thread);
|
||||
}
|
||||
|
||||
birdloop_do_ping(loop);
|
||||
}
|
||||
|
||||
void
|
||||
@ -961,6 +1136,7 @@ birdloop_leave_locked(struct birdloop *loop)
|
||||
/* Send pending pings */
|
||||
if (loop->ping_pending)
|
||||
{
|
||||
LOOP_TRACE(loop, "sending pings on leave");
|
||||
loop->ping_pending = 0;
|
||||
birdloop_do_ping(loop);
|
||||
}
|
||||
|
@ -16,8 +16,15 @@ struct pipe
|
||||
int fd[2];
|
||||
};
|
||||
|
||||
struct pfd {
|
||||
BUFFER(struct pollfd) pfd;
|
||||
BUFFER(struct birdloop *) loop;
|
||||
};
|
||||
|
||||
void sockets_prepare(struct birdloop *, struct pfd *);
|
||||
|
||||
void pipe_new(struct pipe *);
|
||||
void pipe_pollin(struct pipe *, struct pollfd *);
|
||||
void pipe_pollin(struct pipe *, struct pfd *);
|
||||
void pipe_drain(struct pipe *);
|
||||
void pipe_kick(struct pipe *);
|
||||
|
||||
@ -25,6 +32,9 @@ struct birdloop
|
||||
{
|
||||
node n;
|
||||
|
||||
event event;
|
||||
timer timer;
|
||||
|
||||
pool *pool;
|
||||
|
||||
struct timeloop time;
|
||||
@ -36,6 +46,9 @@ struct birdloop
|
||||
|
||||
uint links;
|
||||
|
||||
_Atomic u32 thread_transition;
|
||||
#define LTT_PING 1
|
||||
#define LTT_MOVE 2
|
||||
_Atomic u32 flags;
|
||||
struct birdloop_flag_handler *flag_handler;
|
||||
|
||||
@ -45,7 +58,6 @@ struct birdloop
|
||||
struct birdloop *prev_loop;
|
||||
|
||||
struct bird_thread *thread;
|
||||
struct pollfd *pfd;
|
||||
|
||||
u64 total_time_spent_ns;
|
||||
};
|
||||
@ -54,16 +66,13 @@ struct bird_thread
|
||||
{
|
||||
node n;
|
||||
|
||||
struct pollfd *pfd;
|
||||
uint pfd_max;
|
||||
|
||||
_Atomic u32 ping_sent;
|
||||
_Atomic u32 run_cleanup;
|
||||
_Atomic u32 poll_changed;
|
||||
|
||||
struct pipe wakeup;
|
||||
event_list priority_events;
|
||||
|
||||
struct birdloop *meta;
|
||||
|
||||
pthread_t thread_id;
|
||||
pthread_attr_t thread_attr;
|
||||
|
||||
@ -71,6 +80,7 @@ struct bird_thread
|
||||
|
||||
list loops;
|
||||
pool *pool;
|
||||
struct pfd *pfd;
|
||||
|
||||
event cleanup_event;
|
||||
};
|
||||
|
@ -722,7 +722,6 @@ sk_log_error(sock *s, const char *p)
|
||||
* Actual struct birdsock code
|
||||
*/
|
||||
|
||||
static list sock_list;
|
||||
static struct birdsock *current_sock;
|
||||
static struct birdsock *stored_sock;
|
||||
|
||||
@ -1026,7 +1025,7 @@ sk_setup(sock *s)
|
||||
static void
|
||||
sk_insert(sock *s)
|
||||
{
|
||||
add_tail(&sock_list, &s->n);
|
||||
add_tail(&main_birdloop.sock_list, &s->n);
|
||||
}
|
||||
|
||||
static void
|
||||
@ -2049,7 +2048,7 @@ sk_dump_all(void)
|
||||
sock *s;
|
||||
|
||||
debug("Open sockets:\n");
|
||||
WALK_LIST(n, sock_list)
|
||||
WALK_LIST(n, main_birdloop.sock_list)
|
||||
{
|
||||
s = SKIP_BACK(sock, n, n);
|
||||
debug("%p ", s);
|
||||
@ -2208,7 +2207,7 @@ watchdog_stop(void)
|
||||
void
|
||||
io_init(void)
|
||||
{
|
||||
init_list(&sock_list);
|
||||
init_list(&main_birdloop.sock_list);
|
||||
ev_init_list(&global_event_list, &main_birdloop, "Global event list");
|
||||
ev_init_list(&global_work_list, &main_birdloop, "Global work list");
|
||||
ev_init_list(&main_birdloop.event_list, &main_birdloop, "Global fast event list");
|
||||
@ -2229,12 +2228,11 @@ void
|
||||
io_loop(void)
|
||||
{
|
||||
int poll_tout, timeout;
|
||||
int nfds, events, pout;
|
||||
int events, pout;
|
||||
timer *t;
|
||||
sock *s;
|
||||
node *n;
|
||||
int fdmax = 256;
|
||||
struct pollfd *pfd = xmalloc(fdmax * sizeof(struct pollfd));
|
||||
struct pfd pfd;
|
||||
BUFFER_INIT(pfd.pfd, &root_pool, 16);
|
||||
BUFFER_INIT(pfd.loop, &root_pool, 16);
|
||||
|
||||
watchdog_start1();
|
||||
for(;;)
|
||||
@ -2255,39 +2253,11 @@ io_loop(void)
|
||||
poll_tout = MIN(poll_tout, timeout);
|
||||
}
|
||||
|
||||
/* A hack to reload main io_loop() when something has changed asynchronously. */
|
||||
pipe_pollin(&main_birdloop.thread->wakeup, &pfd[0]);
|
||||
BUFFER_FLUSH(pfd.pfd);
|
||||
BUFFER_FLUSH(pfd.loop);
|
||||
|
||||
nfds = 1;
|
||||
|
||||
WALK_LIST(n, sock_list)
|
||||
{
|
||||
pfd[nfds] = (struct pollfd) { .fd = -1 }; /* everything other set to 0 by this */
|
||||
s = SKIP_BACK(sock, n, n);
|
||||
if (s->rx_hook)
|
||||
{
|
||||
pfd[nfds].fd = s->fd;
|
||||
pfd[nfds].events |= POLLIN;
|
||||
}
|
||||
if (s->tx_hook && s->ttx != s->tpos)
|
||||
{
|
||||
pfd[nfds].fd = s->fd;
|
||||
pfd[nfds].events |= POLLOUT;
|
||||
}
|
||||
if (pfd[nfds].fd != -1)
|
||||
{
|
||||
s->index = nfds;
|
||||
nfds++;
|
||||
}
|
||||
else
|
||||
s->index = -1;
|
||||
|
||||
if (nfds >= fdmax)
|
||||
{
|
||||
fdmax *= 2;
|
||||
pfd = xrealloc(pfd, fdmax * sizeof(struct pollfd));
|
||||
}
|
||||
}
|
||||
pipe_pollin(&main_birdloop.thread->wakeup, &pfd);
|
||||
sockets_prepare(&main_birdloop, &pfd);
|
||||
|
||||
/*
|
||||
* Yes, this is racy. But even if the signal comes before this test
|
||||
@ -2319,7 +2289,7 @@ io_loop(void)
|
||||
/* And finally enter poll() to find active sockets */
|
||||
watchdog_stop();
|
||||
birdloop_leave(&main_birdloop);
|
||||
pout = poll(pfd, nfds, poll_tout);
|
||||
pout = poll(pfd.pfd.data, pfd.pfd.used, poll_tout);
|
||||
birdloop_enter(&main_birdloop);
|
||||
watchdog_start();
|
||||
|
||||
@ -2331,18 +2301,18 @@ io_loop(void)
|
||||
}
|
||||
if (pout)
|
||||
{
|
||||
if (pfd[0].revents & POLLIN)
|
||||
if (pfd.pfd.data[0].revents & POLLIN)
|
||||
{
|
||||
/* IO loop reload requested */
|
||||
pipe_drain(&main_birdloop.thread->wakeup);
|
||||
atomic_exchange_explicit(&main_birdloop.thread->ping_sent, 0, memory_order_acq_rel);
|
||||
atomic_fetch_and_explicit(&main_birdloop.thread_transition, ~LTT_PING, memory_order_acq_rel);
|
||||
continue;
|
||||
}
|
||||
|
||||
times_update();
|
||||
|
||||
/* guaranteed to be non-empty */
|
||||
current_sock = SKIP_BACK(sock, n, HEAD(sock_list));
|
||||
current_sock = SKIP_BACK(sock, n, HEAD(main_birdloop.sock_list));
|
||||
|
||||
while (current_sock)
|
||||
{
|
||||
@ -2357,19 +2327,19 @@ io_loop(void)
|
||||
int steps;
|
||||
|
||||
steps = MAX_STEPS;
|
||||
if (s->fast_rx && (pfd[s->index].revents & POLLIN) && s->rx_hook)
|
||||
if (s->fast_rx && (pfd.pfd.data[s->index].revents & POLLIN) && s->rx_hook)
|
||||
do
|
||||
{
|
||||
steps--;
|
||||
io_log_event(s->rx_hook, s->data);
|
||||
e = sk_read(s, pfd[s->index].revents);
|
||||
e = sk_read(s, pfd.pfd.data[s->index].revents);
|
||||
if (s != current_sock)
|
||||
goto next;
|
||||
}
|
||||
while (e && s->rx_hook && steps);
|
||||
|
||||
steps = MAX_STEPS;
|
||||
if (pfd[s->index].revents & POLLOUT)
|
||||
if (pfd.pfd.data[s->index].revents & POLLOUT)
|
||||
do
|
||||
{
|
||||
steps--;
|
||||
@ -2392,7 +2362,7 @@ io_loop(void)
|
||||
int count = 0;
|
||||
current_sock = stored_sock;
|
||||
if (current_sock == NULL)
|
||||
current_sock = SKIP_BACK(sock, n, HEAD(sock_list));
|
||||
current_sock = SKIP_BACK(sock, n, HEAD(main_birdloop.sock_list));
|
||||
|
||||
while (current_sock && count < MAX_RX_STEPS)
|
||||
{
|
||||
@ -2403,18 +2373,18 @@ io_loop(void)
|
||||
goto next2;
|
||||
}
|
||||
|
||||
if (!s->fast_rx && (pfd[s->index].revents & POLLIN) && s->rx_hook)
|
||||
if (!s->fast_rx && (pfd.pfd.data[s->index].revents & POLLIN) && s->rx_hook)
|
||||
{
|
||||
count++;
|
||||
io_log_event(s->rx_hook, s->data);
|
||||
sk_read(s, pfd[s->index].revents);
|
||||
sk_read(s, pfd.pfd.data[s->index].revents);
|
||||
if (s != current_sock)
|
||||
goto next2;
|
||||
}
|
||||
|
||||
if (pfd[s->index].revents & (POLLHUP | POLLERR))
|
||||
if (pfd.pfd.data[s->index].revents & (POLLHUP | POLLERR))
|
||||
{
|
||||
sk_err(s, pfd[s->index].revents);
|
||||
sk_err(s, pfd.pfd.data[s->index].revents);
|
||||
if (s != current_sock)
|
||||
goto next2;
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user