0
0
mirror of https://gitlab.nic.cz/labs/bird.git synced 2024-12-22 17:51:53 +00:00

Coroutines: A simple and lightweight parallel execution framework.

This commit is contained in:
Maria Matejka 2021-02-08 09:51:59 +01:00
parent cb030180ae
commit bc353f1b50
4 changed files with 126 additions and 1 deletions

26
lib/coro.h Normal file
View File

@ -0,0 +1,26 @@
/*
* BIRD Coroutines
*
* (c) 2017 Martin Mares <mj@ucw.cz>
* (c) 2020 Maria Matejka <mq@jmq.cz>
*
* Can be freely distributed and used under the terms of the GNU GPL.
*/
#ifndef _BIRD_CORO_H_
#define _BIRD_CORO_H_
#include "lib/resource.h"
/* A completely opaque coroutine handle. */
struct coroutine;
/* Coroutines are independent threads bound to pools.
* You request a coroutine by calling coro_run().
* It is forbidden to free a running coroutine from outside.
* The running coroutine must free itself by rfree() before returning.
*/
struct coroutine *coro_run(pool *, void (*entry)(void *), void *data);
#endif

View File

@ -17,7 +17,14 @@
#include "lib/birdlib.h" #include "lib/birdlib.h"
#include "lib/locking.h" #include "lib/locking.h"
#include "lib/coro.h"
#include "lib/resource.h" #include "lib/resource.h"
#include "lib/timer.h"
/* Using a rather big stack for coroutines to allow for stack-local allocations.
* In real world, the kernel doesn't alloc this memory until it is used.
* */
#define CORO_STACK_SIZE 1048576
/* /*
* Implementation of coroutines based on POSIX threads * Implementation of coroutines based on POSIX threads
@ -100,3 +107,69 @@ void do_unlock(struct domain_generic *dg, struct domain_generic **lsp)
pthread_mutex_unlock(&dg->mutex); pthread_mutex_unlock(&dg->mutex);
} }
/* Coroutines */
struct coroutine {
resource r;
pthread_t id;
pthread_attr_t attr;
void (*entry)(void *);
void *data;
};
static _Thread_local _Bool coro_cleaned_up = 0;
static void coro_free(resource *r)
{
struct coroutine *c = (void *) r;
ASSERT_DIE(pthread_equal(pthread_self(), c->id));
pthread_attr_destroy(&c->attr);
coro_cleaned_up = 1;
}
static struct resclass coro_class = {
.name = "Coroutine",
.size = sizeof(struct coroutine),
.free = coro_free,
};
extern pthread_key_t current_time_key;
static void *coro_entry(void *p)
{
struct coroutine *c = p;
ASSERT_DIE(c->entry);
pthread_setspecific(current_time_key, &main_timeloop);
c->entry(c->data);
ASSERT_DIE(coro_cleaned_up);
return NULL;
}
struct coroutine *coro_run(pool *p, void (*entry)(void *), void *data)
{
ASSERT_DIE(entry);
ASSERT_DIE(p);
struct coroutine *c = ralloc(p, &coro_class);
c->entry = entry;
c->data = data;
int e = 0;
if (e = pthread_attr_init(&c->attr))
die("pthread_attr_init() failed: %M", e);
if (e = pthread_attr_setstacksize(&c->attr, CORO_STACK_SIZE))
die("pthread_attr_setstacksize(%u) failed: %M", CORO_STACK_SIZE, e);
if (e = pthread_attr_setdetachstate(&c->attr, PTHREAD_CREATE_DETACHED))
die("pthread_attr_setdetachstate(PTHREAD_CREATE_DETACHED) failed: %M", e);
if (e = pthread_create(&c->id, &c->attr, coro_entry, c))
die("pthread_create() failed: %M", e);
return c;
}

View File

@ -2176,6 +2176,15 @@ static int short_loops = 0;
#define SHORT_LOOP_MAX 10 #define SHORT_LOOP_MAX 10
#define WORK_EVENTS_MAX 10 #define WORK_EVENTS_MAX 10
static int poll_reload_pipe[2];
void
io_loop_reload(void)
{
char b;
write(poll_reload_pipe[1], &b, 1);
}
void void
io_loop(void) io_loop(void)
{ {
@ -2187,6 +2196,9 @@ io_loop(void)
int fdmax = 256; int fdmax = 256;
struct pollfd *pfd = xmalloc(fdmax * sizeof(struct pollfd)); struct pollfd *pfd = xmalloc(fdmax * sizeof(struct pollfd));
if (pipe(poll_reload_pipe) < 0)
die("pipe(poll_reload_pipe) failed: %m");
watchdog_start1(); watchdog_start1();
for(;;) for(;;)
{ {
@ -2205,7 +2217,12 @@ io_loop(void)
poll_tout = MIN(poll_tout, timeout); poll_tout = MIN(poll_tout, timeout);
} }
nfds = 0; /* A hack to reload main io_loop() when something has changed asynchronously. */
pfd[0].fd = poll_reload_pipe[0];
pfd[0].events = POLLIN;
nfds = 1;
WALK_LIST(n, sock_list) WALK_LIST(n, sock_list)
{ {
pfd[nfds] = (struct pollfd) { .fd = -1 }; /* everything other set to 0 by this */ pfd[nfds] = (struct pollfd) { .fd = -1 }; /* everything other set to 0 by this */
@ -2277,6 +2294,14 @@ io_loop(void)
} }
if (pout) if (pout)
{ {
if (pfd[0].revents & POLLIN)
{
/* IO loop reload requested */
char b;
read(poll_reload_pipe[0], &b, 1);
continue;
}
times_update(&main_timeloop); times_update(&main_timeloop);
/* guaranteed to be non-empty */ /* guaranteed to be non-empty */

View File

@ -106,6 +106,7 @@ extern volatile sig_atomic_t async_shutdown_flag;
void io_init(void); void io_init(void);
void io_loop(void); void io_loop(void);
void io_loop_reload(void);
void io_log_dump(void); void io_log_dump(void);
int sk_open_unix(struct birdsock *s, char *name); int sk_open_unix(struct birdsock *s, char *name);
struct rfile *rf_open(struct pool *, const char *name, const char *mode); struct rfile *rf_open(struct pool *, const char *name, const char *mode);