mirror of
https://gitlab.nic.cz/labs/bird.git
synced 2025-01-18 15:01:53 +00:00
Coroutines: A simple and lightweight parallel execution framework.
This commit is contained in:
parent
1db83a507a
commit
1289c1c5ee
26
lib/coro.h
Normal file
26
lib/coro.h
Normal file
@ -0,0 +1,26 @@
|
|||||||
|
/*
|
||||||
|
* BIRD Coroutines
|
||||||
|
*
|
||||||
|
* (c) 2017 Martin Mares <mj@ucw.cz>
|
||||||
|
* (c) 2020 Maria Matejka <mq@jmq.cz>
|
||||||
|
*
|
||||||
|
* Can be freely distributed and used under the terms of the GNU GPL.
|
||||||
|
*/
|
||||||
|
|
||||||
|
#ifndef _BIRD_CORO_H_
|
||||||
|
#define _BIRD_CORO_H_
|
||||||
|
|
||||||
|
#include "lib/resource.h"
|
||||||
|
|
||||||
|
/* A completely opaque coroutine handle. */
|
||||||
|
struct coroutine;
|
||||||
|
|
||||||
|
/* Coroutines are independent threads bound to pools.
|
||||||
|
* You request a coroutine by calling coro_run().
|
||||||
|
* It is forbidden to free a running coroutine from outside.
|
||||||
|
* The running coroutine must free itself by rfree() before returning.
|
||||||
|
*/
|
||||||
|
struct coroutine *coro_run(pool *, void (*entry)(void *), void *data);
|
||||||
|
|
||||||
|
|
||||||
|
#endif
|
@ -17,7 +17,14 @@
|
|||||||
|
|
||||||
#include "lib/birdlib.h"
|
#include "lib/birdlib.h"
|
||||||
#include "lib/locking.h"
|
#include "lib/locking.h"
|
||||||
|
#include "lib/coro.h"
|
||||||
#include "lib/resource.h"
|
#include "lib/resource.h"
|
||||||
|
#include "lib/timer.h"
|
||||||
|
|
||||||
|
/* Using a rather big stack for coroutines to allow for stack-local allocations.
|
||||||
|
* In real world, the kernel doesn't alloc this memory until it is used.
|
||||||
|
* */
|
||||||
|
#define CORO_STACK_SIZE 1048576
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Implementation of coroutines based on POSIX threads
|
* Implementation of coroutines based on POSIX threads
|
||||||
@ -100,3 +107,65 @@ void do_unlock(struct domain_generic *dg, struct domain_generic **lsp)
|
|||||||
pthread_mutex_unlock(&dg->mutex);
|
pthread_mutex_unlock(&dg->mutex);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/* Coroutines */
|
||||||
|
struct coroutine {
|
||||||
|
resource r;
|
||||||
|
pthread_t id;
|
||||||
|
pthread_attr_t attr;
|
||||||
|
void (*entry)(void *);
|
||||||
|
void *data;
|
||||||
|
};
|
||||||
|
|
||||||
|
static _Thread_local _Bool coro_cleaned_up = 0;
|
||||||
|
|
||||||
|
static void coro_free(resource *r)
|
||||||
|
{
|
||||||
|
struct coroutine *c = (void *) r;
|
||||||
|
ASSERT_DIE(pthread_equal(pthread_self(), c->id));
|
||||||
|
pthread_attr_destroy(&c->attr);
|
||||||
|
coro_cleaned_up = 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
static struct resclass coro_class = {
|
||||||
|
.name = "Coroutine",
|
||||||
|
.size = sizeof(struct coroutine),
|
||||||
|
.free = coro_free,
|
||||||
|
};
|
||||||
|
|
||||||
|
static void *coro_entry(void *p)
|
||||||
|
{
|
||||||
|
struct coroutine *c = p;
|
||||||
|
ASSERT_DIE(c->entry);
|
||||||
|
|
||||||
|
c->entry(c->data);
|
||||||
|
ASSERT_DIE(coro_cleaned_up);
|
||||||
|
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
struct coroutine *coro_run(pool *p, void (*entry)(void *), void *data)
|
||||||
|
{
|
||||||
|
ASSERT_DIE(entry);
|
||||||
|
ASSERT_DIE(p);
|
||||||
|
|
||||||
|
struct coroutine *c = ralloc(p, &coro_class);
|
||||||
|
|
||||||
|
c->entry = entry;
|
||||||
|
c->data = data;
|
||||||
|
|
||||||
|
int e = 0;
|
||||||
|
|
||||||
|
if (e = pthread_attr_init(&c->attr))
|
||||||
|
die("pthread_attr_init() failed: %M", e);
|
||||||
|
|
||||||
|
if (e = pthread_attr_setstacksize(&c->attr, CORO_STACK_SIZE))
|
||||||
|
die("pthread_attr_setstacksize(%u) failed: %M", CORO_STACK_SIZE, e);
|
||||||
|
|
||||||
|
if (e = pthread_attr_setdetachstate(&c->attr, PTHREAD_CREATE_DETACHED))
|
||||||
|
die("pthread_attr_setdetachstate(PTHREAD_CREATE_DETACHED) failed: %M", e);
|
||||||
|
|
||||||
|
if (e = pthread_create(&c->id, &c->attr, coro_entry, c))
|
||||||
|
die("pthread_create() failed: %M", e);
|
||||||
|
|
||||||
|
return c;
|
||||||
|
}
|
||||||
|
@ -2176,6 +2176,15 @@ static int short_loops = 0;
|
|||||||
#define SHORT_LOOP_MAX 10
|
#define SHORT_LOOP_MAX 10
|
||||||
#define WORK_EVENTS_MAX 10
|
#define WORK_EVENTS_MAX 10
|
||||||
|
|
||||||
|
static int poll_reload_pipe[2];
|
||||||
|
|
||||||
|
void
|
||||||
|
io_loop_reload(void)
|
||||||
|
{
|
||||||
|
char b;
|
||||||
|
write(poll_reload_pipe[1], &b, 1);
|
||||||
|
}
|
||||||
|
|
||||||
void
|
void
|
||||||
io_loop(void)
|
io_loop(void)
|
||||||
{
|
{
|
||||||
@ -2187,6 +2196,9 @@ io_loop(void)
|
|||||||
int fdmax = 256;
|
int fdmax = 256;
|
||||||
struct pollfd *pfd = xmalloc(fdmax * sizeof(struct pollfd));
|
struct pollfd *pfd = xmalloc(fdmax * sizeof(struct pollfd));
|
||||||
|
|
||||||
|
if (pipe(poll_reload_pipe) < 0)
|
||||||
|
die("pipe(poll_reload_pipe) failed: %m");
|
||||||
|
|
||||||
watchdog_start1();
|
watchdog_start1();
|
||||||
for(;;)
|
for(;;)
|
||||||
{
|
{
|
||||||
@ -2205,7 +2217,12 @@ io_loop(void)
|
|||||||
poll_tout = MIN(poll_tout, timeout);
|
poll_tout = MIN(poll_tout, timeout);
|
||||||
}
|
}
|
||||||
|
|
||||||
nfds = 0;
|
/* A hack to reload main io_loop() when something has changed asynchronously. */
|
||||||
|
pfd[0].fd = poll_reload_pipe[0];
|
||||||
|
pfd[0].events = POLLIN;
|
||||||
|
|
||||||
|
nfds = 1;
|
||||||
|
|
||||||
WALK_LIST(n, sock_list)
|
WALK_LIST(n, sock_list)
|
||||||
{
|
{
|
||||||
pfd[nfds] = (struct pollfd) { .fd = -1 }; /* everything other set to 0 by this */
|
pfd[nfds] = (struct pollfd) { .fd = -1 }; /* everything other set to 0 by this */
|
||||||
@ -2277,6 +2294,14 @@ io_loop(void)
|
|||||||
}
|
}
|
||||||
if (pout)
|
if (pout)
|
||||||
{
|
{
|
||||||
|
if (pfd[0].revents & POLLIN)
|
||||||
|
{
|
||||||
|
/* IO loop reload requested */
|
||||||
|
char b;
|
||||||
|
read(poll_reload_pipe[0], &b, 1);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
times_update(&main_timeloop);
|
times_update(&main_timeloop);
|
||||||
|
|
||||||
/* guaranteed to be non-empty */
|
/* guaranteed to be non-empty */
|
||||||
|
@ -15,6 +15,7 @@
|
|||||||
* user's manual.
|
* user's manual.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
|
#include <stdatomic.h>
|
||||||
#include <stdio.h>
|
#include <stdio.h>
|
||||||
#include <stdlib.h>
|
#include <stdlib.h>
|
||||||
#include <stdarg.h>
|
#include <stdarg.h>
|
||||||
@ -35,6 +36,10 @@ static FILE *dbgf;
|
|||||||
static list *current_log_list;
|
static list *current_log_list;
|
||||||
static char *current_syslog_name; /* NULL -> syslog closed */
|
static char *current_syslog_name; /* NULL -> syslog closed */
|
||||||
|
|
||||||
|
static _Atomic uint max_coro_id = ATOMIC_VAR_INIT(1);
|
||||||
|
static _Thread_local uint this_coro_id;
|
||||||
|
|
||||||
|
#define THIS_CORO_ID (this_coro_id ?: (this_coro_id = atomic_fetch_add_explicit(&max_coro_id, 1, memory_order_acq_rel)))
|
||||||
|
|
||||||
#include <pthread.h>
|
#include <pthread.h>
|
||||||
|
|
||||||
@ -178,7 +183,7 @@ log_commit(int class, buffer *buf)
|
|||||||
l->pos += msg_len;
|
l->pos += msg_len;
|
||||||
}
|
}
|
||||||
|
|
||||||
fprintf(l->fh, "%s <%s> ", tbuf, class_names[class]);
|
fprintf(l->fh, "%s [%04x] <%s> ", tbuf, THIS_CORO_ID, class_names[class]);
|
||||||
}
|
}
|
||||||
fputs(buf->start, l->fh);
|
fputs(buf->start, l->fh);
|
||||||
fputc('\n', l->fh);
|
fputc('\n', l->fh);
|
||||||
@ -288,6 +293,8 @@ die(const char *msg, ...)
|
|||||||
exit(1);
|
exit(1);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static struct timespec dbg_time_start;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* debug - write to debug output
|
* debug - write to debug output
|
||||||
* @msg: a printf-like message
|
* @msg: a printf-like message
|
||||||
@ -300,12 +307,33 @@ debug(const char *msg, ...)
|
|||||||
{
|
{
|
||||||
#define MAX_DEBUG_BUFSIZE 16384
|
#define MAX_DEBUG_BUFSIZE 16384
|
||||||
va_list args;
|
va_list args;
|
||||||
char buf[MAX_DEBUG_BUFSIZE];
|
char buf[MAX_DEBUG_BUFSIZE], *pos = buf;
|
||||||
|
int max = MAX_DEBUG_BUFSIZE;
|
||||||
|
|
||||||
va_start(args, msg);
|
va_start(args, msg);
|
||||||
if (dbgf)
|
if (dbgf)
|
||||||
{
|
{
|
||||||
if (bvsnprintf(buf, MAX_DEBUG_BUFSIZE, msg, args) < 0)
|
struct timespec dbg_time;
|
||||||
|
clock_gettime(CLOCK_MONOTONIC, &dbg_time);
|
||||||
|
uint nsec;
|
||||||
|
uint sec;
|
||||||
|
|
||||||
|
if (dbg_time.tv_nsec > dbg_time_start.tv_nsec)
|
||||||
|
{
|
||||||
|
nsec = dbg_time.tv_nsec - dbg_time_start.tv_nsec;
|
||||||
|
sec = dbg_time.tv_sec - dbg_time_start.tv_sec;
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
nsec = 1000000000 + dbg_time.tv_nsec - dbg_time_start.tv_nsec;
|
||||||
|
sec = dbg_time.tv_sec - dbg_time_start.tv_sec - 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
int n = bsnprintf(pos, max, "%u.%09u: [%04x] ", sec, nsec, THIS_CORO_ID);
|
||||||
|
pos += n;
|
||||||
|
max -= n;
|
||||||
|
|
||||||
|
if (bvsnprintf(pos, max, msg, args) < 0)
|
||||||
bug("Extremely long debug output, split it.");
|
bug("Extremely long debug output, split it.");
|
||||||
|
|
||||||
fputs(buf, dbgf);
|
fputs(buf, dbgf);
|
||||||
|
@ -106,6 +106,7 @@ extern volatile sig_atomic_t async_shutdown_flag;
|
|||||||
|
|
||||||
void io_init(void);
|
void io_init(void);
|
||||||
void io_loop(void);
|
void io_loop(void);
|
||||||
|
void io_loop_reload(void);
|
||||||
void io_log_dump(void);
|
void io_log_dump(void);
|
||||||
int sk_open_unix(struct birdsock *s, char *name);
|
int sk_open_unix(struct birdsock *s, char *name);
|
||||||
struct rfile *rf_open(struct pool *, const char *name, const char *mode);
|
struct rfile *rf_open(struct pool *, const char *name, const char *mode);
|
||||||
|
Loading…
Reference in New Issue
Block a user