0
0
mirror of https://gitlab.nic.cz/labs/bird.git synced 2024-12-22 17:51:53 +00:00

First try of loop balancing

If a thread encounters timeout == 0 for poll, it considers itself
"busy" and with some hysteresis it tries to drop loops for others to
pick and thus better distribute work between threads.
This commit is contained in:
Maria Matejka 2023-04-30 22:17:42 +02:00
parent 9f25dd79b8
commit fa973c2c15
2 changed files with 131 additions and 52 deletions

View File

@ -406,7 +406,7 @@ socket_changed(sock *s)
struct birdloop *loop = s->loop; struct birdloop *loop = s->loop;
ASSERT_DIE(birdloop_inside(loop)); ASSERT_DIE(birdloop_inside(loop));
loop->sock_changed++; loop->sock_changed = 1;
birdloop_ping(loop); birdloop_ping(loop);
} }
@ -552,7 +552,7 @@ sockets_fire(struct birdloop *loop)
continue; continue;
if (!sk_tx_pending(s)) if (!sk_tx_pending(s))
loop->thread->sock_changed++; loop->thread->sock_changed = 1;
} }
if (rev & POLLIN) if (rev & POLLIN)
@ -590,6 +590,7 @@ struct birdloop_pickup_group {
list threads; list threads;
uint thread_count; uint thread_count;
uint loop_count; uint loop_count;
uint thread_busy_count;
btime max_latency; btime max_latency;
event start_threads; event start_threads;
} pickup_groups[2] = { } pickup_groups[2] = {
@ -624,25 +625,49 @@ birdloop_set_thread(struct birdloop *loop, struct bird_thread *thr, struct birdl
} }
/* Now we are free of running pings */ /* Now we are free of running pings */
if (loop->thread = thr) /* Update the thread value */
loop->thread = thr;
/* Put into appropriate lists */
if (thr)
{ {
add_tail(&thr->loops, &loop->n); add_tail(&thr->loops, &loop->n);
thr->loop_count++; thr->loop_count++;
ev_send_loop(loop->thread->meta, &loop->event);
} }
else else
{ {
old->loop_count--; old->loop_count--;
/* Unschedule from Meta */
ev_postpone(&loop->event);
tm_stop(&loop->timer);
/* Request local socket reload */
this_thread->sock_changed = 1;
/* Put into pickup list */
LOCK_DOMAIN(resource, group->domain); LOCK_DOMAIN(resource, group->domain);
add_tail(&group->loops, &loop->n); add_tail(&group->loops, &loop->n);
UNLOCK_DOMAIN(resource, group->domain); UNLOCK_DOMAIN(resource, group->domain);
} }
/* Finished */ /* Allow pings */
atomic_fetch_and_explicit(&loop->thread_transition, ~LTT_MOVE, memory_order_acq_rel); atomic_fetch_and_explicit(&loop->thread_transition, ~LTT_MOVE, memory_order_acq_rel);
}
/* Request to run by force */ static void
ev_send_loop(loop->thread->meta, &loop->event); bird_thread_pickup_next(struct birdloop_pickup_group *group)
{
/* This thread goes to the end of the pickup list */
LOCK_DOMAIN(resource, group->domain);
rem_node(&this_thread->n);
add_tail(&group->threads, &this_thread->n);
/* If there are more loops to be picked up, wakeup the next thread in order */
if (!EMPTY_LIST(group->loops))
wakeup_do_kick(SKIP_BACK(struct bird_thread, n, HEAD(group->threads)));
UNLOCK_DOMAIN(resource, group->domain);
} }
static struct birdloop * static struct birdloop *
@ -651,7 +676,32 @@ birdloop_take(struct birdloop_pickup_group *group)
struct birdloop *loop = NULL; struct birdloop *loop = NULL;
LOCK_DOMAIN(resource, group->domain); LOCK_DOMAIN(resource, group->domain);
if (!EMPTY_LIST(group->loops))
int drop =
this_thread->busy_active &&
(group->thread_busy_count < group->thread_count) &&
(this_thread->loop_count > 1);
int take = !EMPTY_LIST(group->loops);
if (drop)
{
UNLOCK_DOMAIN(resource, group->domain);
node *n;
WALK_LIST2(loop, n, this_thread->loops, n)
if (ev_active(&loop->event))
{
LOOP_TRACE(loop, "Moving to another thread");
/* Pass to another thread */
rem_node(&loop->n);
birdloop_set_thread(loop, NULL, group);
bird_thread_pickup_next(group);
break;
}
return NULL;
}
else if (take)
{ {
/* Take the first loop from the pickup list and unlock */ /* Take the first loop from the pickup list and unlock */
loop = SKIP_BACK(struct birdloop, n, HEAD(group->loops)); loop = SKIP_BACK(struct birdloop, n, HEAD(group->loops));
@ -659,41 +709,14 @@ birdloop_take(struct birdloop_pickup_group *group)
UNLOCK_DOMAIN(resource, group->domain); UNLOCK_DOMAIN(resource, group->domain);
birdloop_set_thread(loop, this_thread, group); birdloop_set_thread(loop, this_thread, group);
bird_thread_pickup_next(group);
/* This thread goes to the end of the pickup list */ return loop;
LOCK_DOMAIN(resource, group->domain);
rem_node(&this_thread->n);
add_tail(&group->threads, &this_thread->n);
/* If there are more loops to be picked up, wakeup the next thread in order */
if (!EMPTY_LIST(group->loops))
wakeup_do_kick(SKIP_BACK(struct bird_thread, n, HEAD(group->threads)));
} }
UNLOCK_DOMAIN(resource, group->domain);
return loop;
}
static void
birdloop_drop(struct birdloop *loop, struct birdloop_pickup_group *group)
{
/* Remove loop from this thread's list */
rem_node(&loop->n);
/* Unset loop's thread */
if (birdloop_inside(loop))
birdloop_set_thread(loop, NULL, group);
else else
{ {
birdloop_enter(loop); UNLOCK_DOMAIN(resource, group->domain);
birdloop_set_thread(loop, NULL, group); return NULL;
birdloop_leave(loop);
} }
/* Put loop into pickup list */
LOCK_DOMAIN(resource, group->domain);
add_tail(&group->loops, &loop->n);
UNLOCK_DOMAIN(resource, group->domain);
} }
static int static int
@ -707,6 +730,38 @@ poll_timeout(struct birdloop *loop)
return remains TO_MS + ((remains TO_MS) MS < remains); return remains TO_MS + ((remains TO_MS) MS < remains);
} }
static void
bird_thread_busy_update(struct bird_thread *thr, int val)
{
if (thr->busy_active == val)
return;
if (val)
thr->busy_counter++;
else
thr->busy_counter--;
switch (thr->busy_counter)
{
case 0:
thr->busy_active = 0;
break;
case 4:
thr->busy_active = 1;
break;
default:
return;
}
LOCK_DOMAIN(resource, thr->group->domain);
if (val)
thr->group->thread_busy_count++;
else
thr->group->thread_busy_count--;
UNLOCK_DOMAIN(resource, thr->group->domain);
}
static void * static void *
bird_thread_main(void *arg) bird_thread_main(void *arg)
{ {
@ -795,6 +850,9 @@ bird_thread_main(void *arg)
else if (timeout > 5000) else if (timeout > 5000)
flush_local_pages(); flush_local_pages();
bird_thread_busy_update(thr, (timeout == 0));
account_to(&this_thread->idle);
poll_retry:; poll_retry:;
int rv = poll(pfd.pfd.data, pfd.pfd.used, timeout); int rv = poll(pfd.pfd.data, pfd.pfd.used, timeout);
if (rv < 0) if (rv < 0)
@ -804,6 +862,8 @@ poll_retry:;
bug("poll in %p: %m", thr); bug("poll in %p: %m", thr);
} }
account_to(&this_thread->overhead);
/* Drain wakeup fd */ /* Drain wakeup fd */
if (pfd.pfd.data[0].revents & POLLIN) if (pfd.pfd.data[0].revents & POLLIN)
{ {
@ -934,7 +994,22 @@ bird_thread_shutdown(void * _ UNUSED)
/* Drop loops including the thread dropper itself */ /* Drop loops including the thread dropper itself */
while (!EMPTY_LIST(thr->loops)) while (!EMPTY_LIST(thr->loops))
birdloop_drop(HEAD(thr->loops), group); {
struct birdloop *loop = HEAD(thr->loops);
/* Remove loop from this thread's list */
rem_node(&loop->n);
/* Unset loop's thread */
if (birdloop_inside(loop))
birdloop_set_thread(loop, NULL, group);
else
{
birdloop_enter(loop);
birdloop_set_thread(loop, NULL, group);
birdloop_leave(loop);
}
}
/* Let others know about new loops */ /* Let others know about new loops */
LOCK_DOMAIN(resource, group->domain); LOCK_DOMAIN(resource, group->domain);
@ -1022,11 +1097,12 @@ struct bird_thread_show_data {
u8 show_loops; u8 show_loops;
uint line_pos; uint line_pos;
uint line_max; uint line_max;
const char *lines[]; const char **lines;
}; };
#define tsd_append(...) do { \ #define tsd_append(...) do { \
ASSERT_DIE(tsd->line_pos < tsd->line_max); \ if (tsd->line_pos >= tsd->line_max) \
tsd->lines = mb_realloc(tsd->lines, sizeof (const char *) * (tsd->line_max *= 2)); \
tsd->lines[tsd->line_pos++] = lp_sprintf(tsd->lp, __VA_ARGS__); \ tsd->lines[tsd->line_pos++] = lp_sprintf(tsd->lp, __VA_ARGS__); \
} while (0) } while (0)
@ -1061,8 +1137,8 @@ static void
bird_thread_show_loop(struct bird_thread_show_data *tsd, struct birdloop *loop) bird_thread_show_loop(struct bird_thread_show_data *tsd, struct birdloop *loop)
{ {
tsd_append(" Loop %s", domain_name(loop->time.domain)); tsd_append(" Loop %s", domain_name(loop->time.domain));
bird_thread_show_spent_time(tsd, " Working", &loop->working); bird_thread_show_spent_time(tsd, "Working ", &loop->working);
bird_thread_show_spent_time(tsd, " Locking", &loop->locking); bird_thread_show_spent_time(tsd, "Locking ", &loop->locking);
} }
static void static void
@ -1089,7 +1165,8 @@ bird_thread_show(void *data)
if (tsd->show_loops) if (tsd->show_loops)
{ {
tsd_append(" Total working time: %t", total_time_ns NS); tsd_append(" Total working time: %t", total_time_ns NS);
bird_thread_show_spent_time(tsd, " Overhead", &this_thread->overhead); bird_thread_show_spent_time(tsd, "Overhead", &this_thread->overhead);
bird_thread_show_spent_time(tsd, "Idle ", &this_thread->idle);
} }
else else
tsd_append("Thread %p working %t s overhead %t s", tsd_append("Thread %p working %t s overhead %t s",
@ -1175,9 +1252,8 @@ cmd_show_threads(int show_loops)
} }
/* Total number of lines must be recalculated when changing the code! */ /* Total number of lines must be recalculated when changing the code! */
uint total_lines = total_threads * (show_loops + 1) + total_loops * 3 * show_loops + 10;
struct bird_thread_show_data *tsd = mb_allocz(p, sizeof(struct bird_thread_show_data) + total_lines * sizeof(const char *)); struct bird_thread_show_data *tsd = mb_allocz(p, sizeof(struct bird_thread_show_data));
tsd->cli = this_cli; tsd->cli = this_cli;
tsd->pool = p; tsd->pool = p;
tsd->lp = lp_new(p); tsd->lp = lp_new(p);
@ -1188,7 +1264,8 @@ cmd_show_threads(int show_loops)
.data = tsd, .data = tsd,
}; };
tsd->line_pos = 0; tsd->line_pos = 0;
tsd->line_max = total_lines; tsd->line_max = 64;
tsd->lines = mb_allocz(p, sizeof(const char *) * tsd->line_max);
this_cli->cont = bird_thread_show_cli_cont; this_cli->cont = bird_thread_show_cli_cont;
this_cli->cleanup = bird_thread_show_cli_cleanup; this_cli->cleanup = bird_thread_show_cli_cleanup;
@ -1294,7 +1371,7 @@ birdloop_stop_internal(struct birdloop *loop)
birdloop_leave(loop); birdloop_leave(loop);
/* Request local socket reload */ /* Request local socket reload */
this_thread->sock_changed++; this_thread->sock_changed = 1;
/* Call the stopped hook from the main loop */ /* Call the stopped hook from the main loop */
loop->event.hook = loop->stopped; loop->event.hook = loop->stopped;
@ -1353,7 +1430,7 @@ birdloop_run(void *_loop)
ev_send_loop(this_thread->meta, &loop->event); ev_send_loop(this_thread->meta, &loop->event);
/* Collect socket change requests */ /* Collect socket change requests */
this_thread->sock_changed += loop->sock_changed; this_thread->sock_changed |= loop->sock_changed;
loop->sock_changed = 0; loop->sock_changed = 0;
account_to(&this_thread->overhead); account_to(&this_thread->overhead);

View File

@ -51,7 +51,7 @@ struct birdloop
list sock_list; list sock_list;
struct birdsock *sock_active; struct birdsock *sock_active;
int sock_num; int sock_num;
uint sock_changed; uint sock_changed:1;
uint ping_pending; uint ping_pending;
@ -93,13 +93,15 @@ struct bird_thread
event cleanup_event; event cleanup_event;
int sock_changed; u8 sock_changed;
u8 busy_active;
u16 busy_counter;
uint loop_count; uint loop_count;
u64 max_latency_ns; u64 max_latency_ns;
u64 max_loop_time_ns; u64 max_loop_time_ns;
struct spent_time overhead; struct spent_time overhead, idle;
}; };
#endif #endif