From fd3d15b6b5545483e10594a3d6fd01ebffe78732 Mon Sep 17 00:00:00 2001 From: Martin Mares Date: Tue, 18 Jul 2017 22:21:27 +0200 Subject: [PATCH 01/13] A simple experiment with coroutine-based CLI --- lib/socket.h | 4 ++ nest/cli.c | 31 +++--------- nest/cli.h | 6 +++ sysdep/unix/io.c | 110 ++++++++++++++++++++++++++++++++++++++++ sysdep/unix/main.c | 122 +++++++++++++++++++++++++++++++-------------- sysdep/unix/unix.h | 7 +++ 6 files changed, 218 insertions(+), 62 deletions(-) diff --git a/lib/socket.h b/lib/socket.h index d5281b83..540193b3 100644 --- a/lib/socket.h +++ b/lib/socket.h @@ -36,6 +36,8 @@ struct ssh_sock { }; #endif +struct coroutine; + typedef struct birdsock { resource r; pool *pool; /* Pool where incoming connections should be allocated (for SK_xxx_PASSIVE) */ @@ -78,6 +80,8 @@ typedef struct birdsock { char *password; /* Password for MD5 authentication */ const char *err; /* Error message */ struct ssh_sock *ssh; /* Used in SK_SSH */ + + struct coroutine *rx_coroutine; } sock; sock *sock_new(pool *); /* Allocate new socket */ diff --git a/nest/cli.c b/nest/cli.c index ad81d384..b8f6956c 100644 --- a/nest/cli.c +++ b/nest/cli.c @@ -67,6 +67,7 @@ #include "nest/cli.h" #include "conf/conf.h" #include "lib/string.h" +#include "sysdep/unix/unix.h" // FIXME pool *cli_pool; @@ -251,8 +252,8 @@ cli_cmd_read_hook(byte *buf, uint max, UNUSED int fd) return max; } -static void -cli_command(struct cli *c) +void +cli_command(cli *c) { struct config f; int res; @@ -279,28 +280,15 @@ static void cli_event(void *data) { cli *c = data; - int err; while (c->ring_read != c->ring_write && c->async_msg_size < CLI_MAX_ASYNC_QUEUE) cli_copy_message(c); - if (c->tx_pos) - ; - else if (c->cont) - c->cont(c); - else - { - err = cli_get_command(c); - if (!err) - return; - if (err < 0) - cli_printf(c, 9000, "Command too long"); - else - cli_command(c); - } - cli_write_trigger(c); + + if (c->sleeping_on_yield) + coro_resume(c->coro); } cli * @@ -323,13 +311,6 @@ cli_new(void *priv) return c; } -void -cli_kick(cli *c) -{ - if (!c->cont && !c->tx_pos) - ev_schedule(c->event); -} - static list cli_log_hooks; static int cli_log_inited; diff --git a/nest/cli.h b/nest/cli.h index 6040be91..904ca8e8 100644 --- a/nest/cli.h +++ b/nest/cli.h @@ -25,6 +25,8 @@ struct cli_out { byte buf[0]; }; +struct coroutine; + typedef struct cli { node n; /* Node in list of all log hooks */ pool *pool; @@ -45,6 +47,9 @@ typedef struct cli { uint log_mask; /* Mask of allowed message levels */ uint log_threshold; /* When free < log_threshold, store only important messages */ uint async_msg_size; /* Total size of async messages queued in tx_buf */ + struct coroutine *coro; + int sleeping_on_tx; + int sleeping_on_yield; } cli; extern pool *cli_pool; @@ -66,6 +71,7 @@ void cli_free(cli *); void cli_kick(cli *); void cli_written(cli *); void cli_echo(uint class, byte *msg); +void cli_command(cli *c); static inline int cli_access_restricted(void) { diff --git a/sysdep/unix/io.c b/sysdep/unix/io.c index 0cf48c9d..44d0d8b5 100644 --- a/sysdep/unix/io.c +++ b/sysdep/unix/io.c @@ -2665,3 +2665,113 @@ test_old_bird(char *path) die("I found another BIRD running."); close(fd); } + +/* EXPERIMENTAL: Support for coroutines */ + +#include + +struct coroutine { + resource r; + ucontext_t ctx; + void *stack; + void (*entry_point)(void *arg); + void *arg; +}; + +static ucontext_t *main_context; +static coroutine *coro_current; // NULL for main context + +static void +coro_free(resource *r) +{ + coroutine *c = (coroutine *) r; + xfree(c->stack); +} + +static void +coro_dump(resource *r UNUSED) +{ + debug("\n"); +} + +static struct resclass coro_class = { + .name = "Coroutine", + .size = sizeof(struct coroutine), + .free = coro_free, + .dump = coro_dump, + // FIXME: Implement memsize +}; + +static void +coro_do_start(void) +{ + ASSERT(coro_current); + coro_current->entry_point(coro_current->arg); + bug("Coroutine returned unexpectedly"); +} + +struct coroutine * +coro_new(pool *p, void (*entry_point)(void *), void *arg) +{ + if (!main_context) + { + main_context = xmalloc(sizeof(*main_context)); + if (getcontext(main_context) < 0) + bug("getcontext() failed"); + } + + coroutine *c = ralloc(p, &coro_class); + c->entry_point = entry_point; + c->arg = arg; + if (getcontext(&c->ctx) < 0) + bug("getcontext() failed"); + c->stack = xmalloc(65536); + c->ctx.uc_stack.ss_sp = c->stack; + c->ctx.uc_stack.ss_size = 65536; + + makecontext(&c->ctx, coro_do_start, 0); + + return c; +} + +// Return to main context +void +coro_suspend(void) +{ + ASSERT(coro_current); + ASSERT(main_context); + coroutine *c = coro_current; + coro_current = NULL; + swapcontext(&c->ctx, main_context); + ASSERT(coro_current == c); +} + +// Resume context +void +coro_resume(coroutine *c) +{ + ASSERT(!coro_current); + coro_current = c; + swapcontext(main_context, &c->ctx); + ASSERT(!coro_current); +} + +static int +coro_sk_rx_hook(sock *sk, uint size UNUSED) +{ + ASSERT(sk->rx_coroutine); + ASSERT(!coro_current); + coro_resume(sk->rx_coroutine); + return 0; +} + +int +coro_sk_read(sock *s) +{ + ASSERT(coro_current); + s->rx_coroutine = coro_current; + s->rx_hook = coro_sk_rx_hook; + coro_suspend(); + s->rx_hook = NULL; + return s->rpos - s->rbuf; +} diff --git a/sysdep/unix/main.c b/sysdep/unix/main.c index c1b92b7e..c625a859 100644 --- a/sysdep/unix/main.c +++ b/sysdep/unix/main.c @@ -374,6 +374,9 @@ cli_write(cli *c) /* Everything is written */ s->tbuf = NULL; cli_written(c); + + if (c->sleeping_on_tx) + coro_resume(c->coro); } void @@ -391,42 +394,6 @@ cli_tx(sock *s) cli_write(s->data); } -int -cli_get_command(cli *c) -{ - sock *s = c->priv; - byte *t = c->rx_aux ? : s->rbuf; - byte *tend = s->rpos; - byte *d = c->rx_pos; - byte *dend = c->rx_buf + CLI_RX_BUF_SIZE - 2; - - while (t < tend) - { - if (*t == '\r') - t++; - else if (*t == '\n') - { - t++; - c->rx_pos = c->rx_buf; - c->rx_aux = t; - *d = 0; - return (d < dend) ? 1 : -1; - } - else if (d < dend) - *d++ = *t++; - } - c->rx_aux = s->rpos = s->rbuf; - c->rx_pos = d; - return 0; -} - -static int -cli_rx(sock *s, uint size UNUSED) -{ - cli_kick(s->data); - return 0; -} - static void cli_err(sock *s, int err) { @@ -440,6 +407,86 @@ cli_err(sock *s, int err) cli_free(s->data); } +static int +cli_getchar(cli *c) +{ + sock *s = c->priv; + + if (c->rx_aux == s->rpos) + { + log(L_INFO "CLI wants to read"); + c->rx_aux = s->rpos = s->rbuf; + int n = coro_sk_read(s); + log(L_INFO "CLI read %d bytes", n); + ASSERT(n); + } + return *c->rx_aux++; +} + +static void +cli_yield(cli *c) +{ + log(L_INFO "CLI: Yield"); + c->sleeping_on_yield = 1; + ev_schedule(c->event); + coro_suspend(); + c->sleeping_on_yield = 0; + log(L_INFO "CLI: Resumed after yield"); +} + +static void +cli_coroutine(void *_c) +{ + cli *c = _c; + sock *s = c->priv; + + log(L_INFO "CLI coroutine reached"); + c->rx_aux = s->rbuf; + for (;;) + { + while (c->tx_pos) + { + log(L_INFO "CLI sleeps on write"); + c->sleeping_on_tx = 1; + coro_suspend(); + c->sleeping_on_tx = 0; + log(L_INFO "CLI wakeup on write"); + } + + if (c->cont) + { + c->cont(c); + cli_write_trigger(c); + cli_yield(c); + continue; + } + + byte *d = c->rx_buf; + byte *dend = c->rx_buf + CLI_RX_BUF_SIZE - 2; + for (;;) + { + int ch = cli_getchar(c); + if (ch == '\r') + ; + else if (ch == '\n') + break; + else if (d < dend) + *d++ = ch; + } + + if (d >= dend) + { + cli_printf(c, 9000, "Command too long"); + cli_write_trigger(c); + continue; + } + + *d = 0; + cli_command(c); + cli_write_trigger(c); + } +} + static int cli_connect(sock *s, uint size UNUSED) { @@ -447,7 +494,6 @@ cli_connect(sock *s, uint size UNUSED) if (config->cli_debug) log(L_INFO "CLI connect"); - s->rx_hook = cli_rx; s->tx_hook = cli_tx; s->err_hook = cli_err; s->data = c = cli_new(s); @@ -456,6 +502,8 @@ cli_connect(sock *s, uint size UNUSED) c->rx_pos = c->rx_buf; c->rx_aux = NULL; rmove(s, c->pool); + c->coro = coro_new(c->pool, cli_coroutine, c); + coro_resume(c->coro); return 1; } diff --git a/sysdep/unix/unix.h b/sysdep/unix/unix.h index dcaab729..4a9b4253 100644 --- a/sysdep/unix/unix.h +++ b/sysdep/unix/unix.h @@ -102,6 +102,13 @@ int sk_open_unix(struct birdsock *s, char *name); void *tracked_fopen(struct pool *, char *name, char *mode); void test_old_bird(char *path); +/* Co-routines */ +typedef struct coroutine coroutine; +coroutine *coro_new(struct pool *pool, void (*entry_point)(void *arg), void *arg); +void coro_suspend(void); +void coro_resume(coroutine *c); +int coro_sk_read(struct birdsock *s); + /* krt.c bits */ void krt_io_init(void); From 774121633f1e5a32a62d0f40ef05d0c04d908819 Mon Sep 17 00:00:00 2001 From: Martin Mares Date: Tue, 18 Jul 2017 23:01:06 +0200 Subject: [PATCH 02/13] Co-routines moved to a separate module with sysdep implementation --- lib/coroutine.h | 22 +++++++ nest/cli.c | 2 +- sysdep/unix/Makefile | 2 +- sysdep/unix/coroutine.c | 135 ++++++++++++++++++++++++++++++++++++++++ sysdep/unix/io.c | 110 -------------------------------- sysdep/unix/main.c | 1 + sysdep/unix/unix.h | 7 --- 7 files changed, 160 insertions(+), 119 deletions(-) create mode 100644 lib/coroutine.h create mode 100644 sysdep/unix/coroutine.c diff --git a/lib/coroutine.h b/lib/coroutine.h new file mode 100644 index 00000000..f64a9540 --- /dev/null +++ b/lib/coroutine.h @@ -0,0 +1,22 @@ +/* + * BIRD Coroutines + * + * (c) 2017 Martin Mares + * + * Can be freely distributed and used under the terms of the GNU GPL. + */ + +#ifndef _BIRD_COROUTINE_H_ +#define _BIRD_COROUTINE_H_ + +// The structure is completely opaque, implemented by sysdep +typedef struct coroutine coroutine; + +coroutine *coro_new(struct pool *pool, void (*entry_point)(void *arg), void *arg); +void coro_suspend(void); +void coro_resume(coroutine *c); + +struct birdsock; +int coro_sk_read(struct birdsock *s); + +#endif diff --git a/nest/cli.c b/nest/cli.c index b8f6956c..bf4b47a8 100644 --- a/nest/cli.c +++ b/nest/cli.c @@ -66,8 +66,8 @@ #include "nest/bird.h" #include "nest/cli.h" #include "conf/conf.h" +#include "lib/coroutine.h" #include "lib/string.h" -#include "sysdep/unix/unix.h" // FIXME pool *cli_pool; diff --git a/sysdep/unix/Makefile b/sysdep/unix/Makefile index f592399c..9da83613 100644 --- a/sysdep/unix/Makefile +++ b/sysdep/unix/Makefile @@ -1,4 +1,4 @@ -src := io.c krt.c log.c main.c random.c +src := io.c krt.c log.c main.c random.c coroutine.c obj := $(src-o-files) $(all-daemon) $(cf-local) diff --git a/sysdep/unix/coroutine.c b/sysdep/unix/coroutine.c new file mode 100644 index 00000000..3d648eb4 --- /dev/null +++ b/sysdep/unix/coroutine.c @@ -0,0 +1,135 @@ +/* + * BIRD Coroutines + * + * (c) 2017 Martin Mares + * + * Can be freely distributed and used under the terms of the GNU GPL. + */ + +#ifndef _GNU_SOURCE +#define _GNU_SOURCE +#endif + +#include +#include + +#include "nest/bird.h" +#include "lib/coroutine.h" +#include "lib/resource.h" +#include "lib/socket.h" +#include "sysdep/unix/unix.h" + +struct coroutine { + resource r; + ucontext_t ctx; + void *stack; + void (*entry_point)(void *arg); + void *arg; +}; + +static ucontext_t *main_context; +static coroutine *coro_current; // NULL for main context + +#define CORO_STACK_SIZE 65536 + +static void +coro_free(resource *r) +{ + coroutine *c = (coroutine *) r; + xfree(c->stack); +} + +static void +coro_dump(resource *r UNUSED) +{ + debug("\n"); +} + +static size_t +coro_memsize(resource *r) +{ + coroutine *c = (coroutine *) r; + return sizeof(*c) + CORO_STACK_SIZE + 2*ALLOC_OVERHEAD; +} + +static struct resclass coro_class = { + .name = "Coroutine", + .size = sizeof(struct coroutine), + .free = coro_free, + .dump = coro_dump, + .memsize = coro_memsize, +}; + +static void +coro_do_start(void) +{ + ASSERT(coro_current); + coro_current->entry_point(coro_current->arg); + bug("Coroutine returned unexpectedly"); +} + +struct coroutine * +coro_new(pool *p, void (*entry_point)(void *), void *arg) +{ + if (!main_context) + { + main_context = xmalloc(sizeof(*main_context)); + if (getcontext(main_context) < 0) + bug("getcontext() failed"); + } + + coroutine *c = ralloc(p, &coro_class); + c->entry_point = entry_point; + c->arg = arg; + if (getcontext(&c->ctx) < 0) + bug("getcontext() failed"); + c->stack = xmalloc(CORO_STACK_SIZE); + c->ctx.uc_stack.ss_sp = c->stack; + c->ctx.uc_stack.ss_size = CORO_STACK_SIZE; + + makecontext(&c->ctx, coro_do_start, 0); + + return c; +} + +void +coro_suspend(void) +{ + ASSERT(coro_current); + ASSERT(main_context); + coroutine *c = coro_current; + coro_current = NULL; + swapcontext(&c->ctx, main_context); + ASSERT(coro_current == c); +} + +void +coro_resume(coroutine *c) +{ + ASSERT(!coro_current); + coro_current = c; + swapcontext(main_context, &c->ctx); + ASSERT(!coro_current); +} + +/* Coroutine-based I/O */ + +static int +coro_sk_rx_hook(sock *sk, uint size UNUSED) +{ + ASSERT(sk->rx_coroutine); + ASSERT(!coro_current); + coro_resume(sk->rx_coroutine); + return 0; +} + +int +coro_sk_read(sock *s) +{ + ASSERT(coro_current); + s->rx_coroutine = coro_current; + s->rx_hook = coro_sk_rx_hook; + coro_suspend(); + s->rx_hook = NULL; + return s->rpos - s->rbuf; +} diff --git a/sysdep/unix/io.c b/sysdep/unix/io.c index 44d0d8b5..0cf48c9d 100644 --- a/sysdep/unix/io.c +++ b/sysdep/unix/io.c @@ -2665,113 +2665,3 @@ test_old_bird(char *path) die("I found another BIRD running."); close(fd); } - -/* EXPERIMENTAL: Support for coroutines */ - -#include - -struct coroutine { - resource r; - ucontext_t ctx; - void *stack; - void (*entry_point)(void *arg); - void *arg; -}; - -static ucontext_t *main_context; -static coroutine *coro_current; // NULL for main context - -static void -coro_free(resource *r) -{ - coroutine *c = (coroutine *) r; - xfree(c->stack); -} - -static void -coro_dump(resource *r UNUSED) -{ - debug("\n"); -} - -static struct resclass coro_class = { - .name = "Coroutine", - .size = sizeof(struct coroutine), - .free = coro_free, - .dump = coro_dump, - // FIXME: Implement memsize -}; - -static void -coro_do_start(void) -{ - ASSERT(coro_current); - coro_current->entry_point(coro_current->arg); - bug("Coroutine returned unexpectedly"); -} - -struct coroutine * -coro_new(pool *p, void (*entry_point)(void *), void *arg) -{ - if (!main_context) - { - main_context = xmalloc(sizeof(*main_context)); - if (getcontext(main_context) < 0) - bug("getcontext() failed"); - } - - coroutine *c = ralloc(p, &coro_class); - c->entry_point = entry_point; - c->arg = arg; - if (getcontext(&c->ctx) < 0) - bug("getcontext() failed"); - c->stack = xmalloc(65536); - c->ctx.uc_stack.ss_sp = c->stack; - c->ctx.uc_stack.ss_size = 65536; - - makecontext(&c->ctx, coro_do_start, 0); - - return c; -} - -// Return to main context -void -coro_suspend(void) -{ - ASSERT(coro_current); - ASSERT(main_context); - coroutine *c = coro_current; - coro_current = NULL; - swapcontext(&c->ctx, main_context); - ASSERT(coro_current == c); -} - -// Resume context -void -coro_resume(coroutine *c) -{ - ASSERT(!coro_current); - coro_current = c; - swapcontext(main_context, &c->ctx); - ASSERT(!coro_current); -} - -static int -coro_sk_rx_hook(sock *sk, uint size UNUSED) -{ - ASSERT(sk->rx_coroutine); - ASSERT(!coro_current); - coro_resume(sk->rx_coroutine); - return 0; -} - -int -coro_sk_read(sock *s) -{ - ASSERT(coro_current); - s->rx_coroutine = coro_current; - s->rx_hook = coro_sk_rx_hook; - coro_suspend(); - s->rx_hook = NULL; - return s->rpos - s->rbuf; -} diff --git a/sysdep/unix/main.c b/sysdep/unix/main.c index c625a859..35615831 100644 --- a/sysdep/unix/main.c +++ b/sysdep/unix/main.c @@ -23,6 +23,7 @@ #include #include "nest/bird.h" +#include "lib/coroutine.h" #include "lib/lists.h" #include "lib/resource.h" #include "lib/socket.h" diff --git a/sysdep/unix/unix.h b/sysdep/unix/unix.h index 4a9b4253..dcaab729 100644 --- a/sysdep/unix/unix.h +++ b/sysdep/unix/unix.h @@ -102,13 +102,6 @@ int sk_open_unix(struct birdsock *s, char *name); void *tracked_fopen(struct pool *, char *name, char *mode); void test_old_bird(char *path); -/* Co-routines */ -typedef struct coroutine coroutine; -coroutine *coro_new(struct pool *pool, void (*entry_point)(void *arg), void *arg); -void coro_suspend(void); -void coro_resume(coroutine *c); -int coro_sk_read(struct birdsock *s); - /* krt.c bits */ void krt_io_init(void); From ceaa316125230ea9d285fcc45bcbdb96b5e98dbf Mon Sep 17 00:00:00 2001 From: Martin Mares Date: Tue, 18 Jul 2017 23:48:25 +0200 Subject: [PATCH 03/13] Configure: Fix a typo in checking of backtrace() --- configure.ac | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/configure.ac b/configure.ac index 9e375762..5514fb56 100644 --- a/configure.ac +++ b/configure.ac @@ -314,7 +314,7 @@ if test "$enable_debug" = yes ; then [ AC_DEFINE([HAVE_EXECINFO_H], [1], [Define to 1 if you have the header file.]) AC_SEARCH_LIBS([backtrace], [execinfo], - [] + [], [AC_MSG_ERROR([Function backtrace not available.])] ) ] From 966609903524d2415ca40f31ed9fd364af7f226b Mon Sep 17 00:00:00 2001 From: Martin Mares Date: Tue, 18 Jul 2017 23:51:09 +0200 Subject: [PATCH 04/13] Rewritten CLI based on coroutines I also moved the boundary between generic parts of the CLI and sysdep code: the generic parts now assume that CLI runs over a socket, but the actual creation of the socket is still kept in sysdep. The documentation does not reflect these changes yet. --- nest/cli.c | 328 ++++++++++++++++++++++++++++++++++----------- nest/cli.h | 44 +++--- sysdep/unix/main.c | 147 +------------------- 3 files changed, 280 insertions(+), 239 deletions(-) diff --git a/nest/cli.c b/nest/cli.c index bf4b47a8..ac0558bc 100644 --- a/nest/cli.c +++ b/nest/cli.c @@ -1,7 +1,7 @@ /* * BIRD Internet Routing Daemon -- Command-Line Interface * - * (c) 1999--2000 Martin Mares + * (c) 1999--2017 Martin Mares * * Can be freely distributed and used under the terms of the GNU GPL. */ @@ -60,9 +60,11 @@ * the new one. When the consumer processes everything in the buffer * queue, it calls cli_written(), tha frees all buffers (except the * first one) and schedules cli.event . - * + * */ +#undef LOCAL_DEBUG + #include "nest/bird.h" #include "nest/cli.h" #include "conf/conf.h" @@ -71,6 +73,13 @@ pool *cli_pool; +/* Hack for scheduled undo notification */ +extern cli *cmd_reconfig_stored_cli; + +/* + * Output buffering + */ + static byte * cli_alloc_out(cli *c, int size) { @@ -222,95 +231,61 @@ cli_free_out(cli *c) c->async_msg_size = 0; } -void -cli_written(cli *c) +static void +cli_write(cli *c) { + sock *s = c->socket; + + while (c->tx_pos) + { + struct cli_out *o = c->tx_pos; + + int len = o->wpos - o->outpos; + s->tbuf = o->outpos; + o->outpos = o->wpos; + + if (sk_send(s, len) <= 0) + return; + + c->tx_pos = o->next; + } + + /* Everything is written */ + s->tbuf = NULL; cli_free_out(c); ev_schedule(c->event); } - -static byte *cli_rh_pos; -static uint cli_rh_len; -static int cli_rh_trick_flag; -struct cli *this_cli; - -static int -cli_cmd_read_hook(byte *buf, uint max, UNUSED int fd) -{ - if (!cli_rh_trick_flag) - { - cli_rh_trick_flag = 1; - buf[0] = '!'; - return 1; - } - if (max > cli_rh_len) - max = cli_rh_len; - memcpy(buf, cli_rh_pos, max); - cli_rh_pos += max; - cli_rh_len -= max; - return max; -} - void -cli_command(cli *c) +cli_write_trigger(cli *c) { - struct config f; - int res; - - if (config->cli_debug > 1) - log(L_TRACE "CLI: %s", c->rx_buf); - bzero(&f, sizeof(f)); - f.mem = c->parser_pool; - f.pool = rp_new(c->pool, "Config"); - cf_read_hook = cli_cmd_read_hook; - cli_rh_pos = c->rx_buf; - cli_rh_len = strlen(c->rx_buf); - cli_rh_trick_flag = 0; - this_cli = c; - lp_flush(c->parser_pool); - res = cli_parse(&f); - if (!res) - cli_printf(c, 9001, f.err_msg); - - config_free(&f); + if (c->tx_pos && c->socket->tbuf == NULL) + cli_write(c); } static void -cli_event(void *data) +cli_tx_hook(sock *s) { - cli *c = data; - - while (c->ring_read != c->ring_write && - c->async_msg_size < CLI_MAX_ASYNC_QUEUE) - cli_copy_message(c); - - cli_write_trigger(c); - - if (c->sleeping_on_yield) - coro_resume(c->coro); + cli_write(s->data); } -cli * -cli_new(void *priv) +static void +cli_err_hook(sock *s, int err) { - pool *p = rp_new(cli_pool, "CLI"); - cli *c = mb_alloc(p, sizeof(cli)); - - bzero(c, sizeof(cli)); - c->pool = p; - c->priv = priv; - c->event = ev_new(p); - c->event->hook = cli_event; - c->event->data = c; - c->cont = cli_hello; - c->parser_pool = lp_new_default(c->pool); - c->show_pool = lp_new_default(c->pool); - c->rx_buf = mb_alloc(c->pool, CLI_RX_BUF_SIZE); - ev_schedule(c->event); - return c; + if (config->cli_debug) + { + if (err) + log(L_INFO "CLI connection dropped: %s", strerror(err)); + else + log(L_INFO "CLI connection closed"); + } + cli_free(s->data); } +/* + * Echoing of asynchronous messages + */ + static list cli_log_hooks; static int cli_log_inited; @@ -381,12 +356,211 @@ cli_echo(uint class, byte *msg) } } -/* Hack for scheduled undo notification */ -extern cli *cmd_reconfig_stored_cli; +/* + * Reading of input + */ + +static int +cli_getchar(cli *c) +{ + sock *s = c->socket; + + if (c->rx_aux == s->rpos) + { + DBG("CLI: Waiting on read\n"); + c->rx_aux = s->rpos = s->rbuf; + c->state = CLI_STATE_WAIT_RX; + int n = coro_sk_read(s); + c->state = CLI_STATE_RUN; + DBG("CLI: Read returned %d bytes\n", n); + ASSERT(n); + } + return *c->rx_aux++; +} + +static int +cli_read_line(cli *c) +{ + byte *d = c->rx_buf; + byte *dend = c->rx_buf + CLI_RX_BUF_SIZE - 2; + for (;;) + { + int ch = cli_getchar(c); + if (ch == '\r') + ; + else if (ch == '\n') + break; + else if (d < dend) + *d++ = ch; + } + + if (d >= dend) + return 0; + + *d = 0; + return 1; +} + +/* + * Execution of commands + */ + +static byte *cli_rh_pos; +static uint cli_rh_len; +static int cli_rh_trick_flag; +struct cli *this_cli; + +static int +cli_cmd_read_hook(byte *buf, uint max, UNUSED int fd) +{ + if (!cli_rh_trick_flag) + { + cli_rh_trick_flag = 1; + buf[0] = '!'; + return 1; + } + if (max > cli_rh_len) + max = cli_rh_len; + memcpy(buf, cli_rh_pos, max); + cli_rh_pos += max; + cli_rh_len -= max; + return max; +} + +static void +cli_command(cli *c) +{ + struct config f; + int res; + + if (config->cli_debug > 1) + log(L_TRACE "CLI: %s", c->rx_buf); + bzero(&f, sizeof(f)); + f.mem = c->parser_pool; + f.pool = rp_new(c->pool, "Config"); + cf_read_hook = cli_cmd_read_hook; + cli_rh_pos = c->rx_buf; + cli_rh_len = strlen(c->rx_buf); + cli_rh_trick_flag = 0; + this_cli = c; + lp_flush(c->parser_pool); + res = cli_parse(&f); + if (!res) + cli_printf(c, 9001, f.err_msg); + + config_free(&f); +} + +/* + * Session control + */ + +static void +cli_event(void *data) +{ + cli *c = data; + DBG("CLI: Event in state %u\n", (int) c->state); + + while (c->ring_read != c->ring_write && + c->async_msg_size < CLI_MAX_ASYNC_QUEUE) + cli_copy_message(c); + + cli_write_trigger(c); + + if (c->state == CLI_STATE_YIELD || + c->state == CLI_STATE_WAIT_TX && !c->tx_pos) + coro_resume(c->coro); +} + +void +cli_yield(cli *c) +{ + c->state = CLI_STATE_YIELD; + DBG("CLI: Yielding\n"); + ev_schedule(c->event); + coro_suspend(); + c->state = CLI_STATE_RUN; + DBG("CLI: Yield resumed\n"); +} + +static void +cli_coroutine(void *_c) +{ + cli *c = _c; + sock *s = c->socket; + + DBG("CLI: Coroutine started\n"); + c->rx_aux = s->rbuf; + + for (;;) + { + while (c->tx_pos) + { + DBG("CLI: Sleeping on write\n"); + c->state = CLI_STATE_WAIT_TX; + coro_suspend(); + c->state = CLI_STATE_RUN; + DBG("CLI: Woke up on write\n"); + } + + if (c->cont) + { + c->cont(c); + cli_write_trigger(c); + cli_yield(c); + continue; + } + + if (!cli_read_line(c)) + cli_printf(c, 9000, "Command too long"); + else + cli_command(c); + cli_write_trigger(c); + } +} + +cli * +cli_new(sock *s) +{ + pool *p = rp_new(cli_pool, "CLI session"); + cli *c = mb_alloc(p, sizeof(cli)); + DBG("CLI: Created new session\n"); + + bzero(c, sizeof(cli)); + c->pool = p; + c->socket = s; + c->event = ev_new(p); + c->event->hook = cli_event; + c->event->data = c; + c->cont = cli_hello; + c->parser_pool = lp_new_default(c->pool); + c->show_pool = lp_new_default(c->pool); + c->rx_buf = mb_alloc(c->pool, CLI_RX_BUF_SIZE); + + s->pool = c->pool; /* We need to have all the socket buffers allocated in the cli pool */ + rmove(s, c->pool); + s->tx_hook = cli_tx_hook; + s->err_hook = cli_err_hook; + s->data = c; + + return c; +} + +void +cli_run(cli *c) +{ + DBG("CLI: Running\n"); + c->state = CLI_STATE_RUN; + c->rx_pos = c->rx_buf; + c->rx_aux = NULL; + c->coro = coro_new(c->pool, cli_coroutine, c); + coro_resume(c->coro); +} void cli_free(cli *c) { + DBG("CLI: Destroying session\n"); cli_set_log_echo(c, 0, 0); if (c->cleanup) c->cleanup(c); diff --git a/nest/cli.h b/nest/cli.h index 904ca8e8..18e680e2 100644 --- a/nest/cli.h +++ b/nest/cli.h @@ -1,7 +1,7 @@ /* * BIRD Internet Routing Daemon -- Command-Line Interface * - * (c) 1999--2000 Martin Mares + * (c) 1999--2017 Martin Mares * * Can be freely distributed and used under the terms of the GNU GPL. */ @@ -10,7 +10,9 @@ #define _BIRD_CLI_H_ #include "lib/resource.h" +#include "lib/coroutine.h" #include "lib/event.h" +#include "lib/socket.h" #define CLI_RX_BUF_SIZE 4096 #define CLI_TX_BUF_SIZE 4096 @@ -25,31 +27,44 @@ struct cli_out { byte buf[0]; }; -struct coroutine; +enum cli_state { + CLI_STATE_INIT, + CLI_STATE_RUN, + CLI_STATE_WAIT_RX, + CLI_STATE_WAIT_TX, + CLI_STATE_YIELD, +}; typedef struct cli { node n; /* Node in list of all log hooks */ pool *pool; - void *priv; /* Private to sysdep layer */ - byte *rx_buf, *rx_pos, *rx_aux; /* sysdep */ + coroutine *coro; + enum cli_state state; + int restricted; /* CLI is restricted to read-only commands */ + + /* I/O */ + sock *socket; + byte *rx_buf, *rx_pos, *rx_aux; struct cli_out *tx_buf, *tx_pos, *tx_write; event *event; + + /* Continuation mechanism */ void (*cont)(struct cli *c); void (*cleanup)(struct cli *c); void *rover; /* Private to continuation routine */ int last_reply; - int restricted; /* CLI is restricted to read-only commands */ + + /* Pools */ struct linpool *parser_pool; /* Pool used during parsing */ struct linpool *show_pool; /* Pool used during route show */ + + /* Asynchronous messages */ byte *ring_buf; /* Ring buffer for asynchronous messages */ byte *ring_end, *ring_read, *ring_write; /* Pointers to the ring buffer */ uint ring_overflow; /* Counter of ring overflows */ uint log_mask; /* Mask of allowed message levels */ uint log_threshold; /* When free < log_threshold, store only important messages */ uint async_msg_size; /* Total size of async messages queued in tx_buf */ - struct coroutine *coro; - int sleeping_on_tx; - int sleeping_on_yield; } cli; extern pool *cli_pool; @@ -61,17 +76,17 @@ extern struct cli *this_cli; /* Used during parsing */ void cli_printf(cli *, int, char *, ...); #define cli_msg(x...) cli_printf(this_cli, x) +void cli_write_trigger(cli *c); void cli_set_log_echo(cli *, uint mask, uint size); +void cli_yield(cli *c); /* Functions provided to sysdep layer */ -cli *cli_new(void *); void cli_init(void); +cli *cli_new(sock *s); +void cli_run(cli *); void cli_free(cli *); -void cli_kick(cli *); -void cli_written(cli *); void cli_echo(uint class, byte *msg); -void cli_command(cli *c); static inline int cli_access_restricted(void) { @@ -81,9 +96,4 @@ static inline int cli_access_restricted(void) return 0; } -/* Functions provided by sysdep layer */ - -void cli_write_trigger(cli *); -int cli_get_command(cli *); - #endif diff --git a/sysdep/unix/main.c b/sysdep/unix/main.c index 35615831..b73ede3b 100644 --- a/sysdep/unix/main.c +++ b/sysdep/unix/main.c @@ -352,142 +352,6 @@ cmd_reconfig_undo(void) static sock *cli_sk; static char *path_control_socket = PATH_CONTROL_SOCKET; - -static void -cli_write(cli *c) -{ - sock *s = c->priv; - - while (c->tx_pos) - { - struct cli_out *o = c->tx_pos; - - int len = o->wpos - o->outpos; - s->tbuf = o->outpos; - o->outpos = o->wpos; - - if (sk_send(s, len) <= 0) - return; - - c->tx_pos = o->next; - } - - /* Everything is written */ - s->tbuf = NULL; - cli_written(c); - - if (c->sleeping_on_tx) - coro_resume(c->coro); -} - -void -cli_write_trigger(cli *c) -{ - sock *s = c->priv; - - if (s->tbuf == NULL) - cli_write(c); -} - -static void -cli_tx(sock *s) -{ - cli_write(s->data); -} - -static void -cli_err(sock *s, int err) -{ - if (config->cli_debug) - { - if (err) - log(L_INFO "CLI connection dropped: %s", strerror(err)); - else - log(L_INFO "CLI connection closed"); - } - cli_free(s->data); -} - -static int -cli_getchar(cli *c) -{ - sock *s = c->priv; - - if (c->rx_aux == s->rpos) - { - log(L_INFO "CLI wants to read"); - c->rx_aux = s->rpos = s->rbuf; - int n = coro_sk_read(s); - log(L_INFO "CLI read %d bytes", n); - ASSERT(n); - } - return *c->rx_aux++; -} - -static void -cli_yield(cli *c) -{ - log(L_INFO "CLI: Yield"); - c->sleeping_on_yield = 1; - ev_schedule(c->event); - coro_suspend(); - c->sleeping_on_yield = 0; - log(L_INFO "CLI: Resumed after yield"); -} - -static void -cli_coroutine(void *_c) -{ - cli *c = _c; - sock *s = c->priv; - - log(L_INFO "CLI coroutine reached"); - c->rx_aux = s->rbuf; - for (;;) - { - while (c->tx_pos) - { - log(L_INFO "CLI sleeps on write"); - c->sleeping_on_tx = 1; - coro_suspend(); - c->sleeping_on_tx = 0; - log(L_INFO "CLI wakeup on write"); - } - - if (c->cont) - { - c->cont(c); - cli_write_trigger(c); - cli_yield(c); - continue; - } - - byte *d = c->rx_buf; - byte *dend = c->rx_buf + CLI_RX_BUF_SIZE - 2; - for (;;) - { - int ch = cli_getchar(c); - if (ch == '\r') - ; - else if (ch == '\n') - break; - else if (d < dend) - *d++ = ch; - } - - if (d >= dend) - { - cli_printf(c, 9000, "Command too long"); - cli_write_trigger(c); - continue; - } - - *d = 0; - cli_command(c); - cli_write_trigger(c); - } -} - static int cli_connect(sock *s, uint size UNUSED) { @@ -495,16 +359,9 @@ cli_connect(sock *s, uint size UNUSED) if (config->cli_debug) log(L_INFO "CLI connect"); - s->tx_hook = cli_tx; - s->err_hook = cli_err; - s->data = c = cli_new(s); - s->pool = c->pool; /* We need to have all the socket buffers allocated in the cli pool */ + c = cli_new(s); s->fast_rx = 1; - c->rx_pos = c->rx_buf; - c->rx_aux = NULL; - rmove(s, c->pool); - c->coro = coro_new(c->pool, cli_coroutine, c); - coro_resume(c->coro); + cli_run(c); return 1; } From f9cbd6205175ad6300b6f6f3ee1d1516bef72d92 Mon Sep 17 00:00:00 2001 From: Martin Mares Date: Wed, 19 Jul 2017 00:21:42 +0200 Subject: [PATCH 05/13] Another implementation of coroutines using pthreads --- sysdep/unix/coroutine.c | 125 +++++++++++++++++++++++++++++++++++++++- 1 file changed, 122 insertions(+), 3 deletions(-) diff --git a/sysdep/unix/coroutine.c b/sysdep/unix/coroutine.c index 3d648eb4..5e986a7c 100644 --- a/sysdep/unix/coroutine.c +++ b/sysdep/unix/coroutine.c @@ -11,7 +11,6 @@ #endif #include -#include #include "nest/bird.h" #include "lib/coroutine.h" @@ -19,6 +18,16 @@ #include "lib/socket.h" #include "sysdep/unix/unix.h" +#define CORO_STACK_SIZE 65536 + +#if 1 + +/* + * Implementation of coroutines based on + */ + +#include + struct coroutine { resource r; ucontext_t ctx; @@ -30,8 +39,6 @@ struct coroutine { static ucontext_t *main_context; static coroutine *coro_current; // NULL for main context -#define CORO_STACK_SIZE 65536 - static void coro_free(resource *r) { @@ -112,6 +119,118 @@ coro_resume(coroutine *c) ASSERT(!coro_current); } +#else + +/* + * Implementation of coroutines based on POSIX threads + */ + +#include +#include + +struct coroutine { + resource r; + pthread_t thread; + void (*entry_point)(void *arg); + void *arg; + sem_t sem; +}; + +static coroutine *coro_current; // NULL for main context +static int coro_inited; +static sem_t coro_main_sem; +static pthread_attr_t coro_thread_attrs; + +static void +coro_free(resource *r) +{ + coroutine *c = (coroutine *) r; + pthread_cancel(c->thread); + pthread_join(c->thread, NULL); +} + +static void +coro_dump(resource *r UNUSED) +{ + debug("\n"); +} + +static size_t +coro_memsize(resource *r) +{ + coroutine *c = (coroutine *) r; + return sizeof(*c) + CORO_STACK_SIZE + 2*ALLOC_OVERHEAD; +} + +static struct resclass coro_class = { + .name = "Coroutine", + .size = sizeof(struct coroutine), + .free = coro_free, + .dump = coro_dump, + .memsize = coro_memsize, +}; + +static void * +coro_do_start(void *c_) +{ + coroutine *c = c_; + while (sem_wait(&c->sem) < 0) + ; + coro_current = c; + c->entry_point(c->arg); + bug("Coroutine returned unexpectedly"); +} + +struct coroutine * +coro_new(pool *p, void (*entry_point)(void *), void *arg) +{ + if (!coro_inited) + { + if (sem_init(&coro_main_sem, 0, 0) < 0) + bug("sem_init() failed"); + if (pthread_attr_init(&coro_thread_attrs)) + bug("pthread_attr_init() failed"); + if (pthread_attr_setstacksize(&coro_thread_attrs, CORO_STACK_SIZE)) + bug("pthread_attr_setstacksize() failed"); + coro_inited = 1; + } + + coroutine *c = ralloc(p, &coro_class); + c->entry_point = entry_point; + c->arg = arg; + if (sem_init(&c->sem, 0, 0) < 0) + bug("sem_init() failed"); + if (pthread_create(&c->thread, &coro_thread_attrs, coro_do_start, c)) + bug("pthread_create() failed"); + + return c; +} + +void +coro_suspend(void) +{ + ASSERT(coro_inited); + ASSERT(coro_current); + coroutine *c = coro_current; + sem_post(&coro_main_sem); + while (sem_wait(&c->sem) < 0) + ; + coro_current = c; +} + +void +coro_resume(coroutine *c) +{ + ASSERT(coro_inited); + ASSERT(!coro_current); + sem_post(&c->sem); + while (sem_wait(&coro_main_sem) < 0) + ; + coro_current = NULL; +} + +#endif + /* Coroutine-based I/O */ static int From a5ea67d7d0216786549d650018adeac2f7986afe Mon Sep 17 00:00:00 2001 From: Martin Mares Date: Wed, 19 Jul 2017 00:42:47 +0200 Subject: [PATCH 06/13] Update development docs on the CLI --- nest/cli.c | 41 +++++++++++++++++++++-------------------- 1 file changed, 21 insertions(+), 20 deletions(-) diff --git a/nest/cli.c b/nest/cli.c index ac0558bc..65e9d9b7 100644 --- a/nest/cli.c +++ b/nest/cli.c @@ -32,35 +32,36 @@ * Each CLI session is internally represented by a &cli structure and a * resource pool containing all resources associated with the connection, * so that it can be easily freed whenever the connection gets closed, not depending - * on the current state of command processing. + * on the current state of command processing. A socket is associated with + * the session, over which requests and replies are sent. * * The CLI commands are declared as a part of the configuration grammar * by using the |CF_CLI| macro. When a command is received, it is processed * by the same lexical analyzer and parser as used for the configuration, but * it's switched to a special mode by prepending a fake token to the text, * so that it uses only the CLI command rules. Then the parser invokes - * an execution routine corresponding to the command, which either constructs - * the whole reply and returns it back or (in case it expects the reply will be long) - * it prints a partial reply and asks the CLI module (using the @cont hook) - * to call it again when the output is transferred to the user. + * an execution routine corresponding to the command, which constructs the + * reply. + * + * Replies are buffered in memory and then sent asynchronously. Commands + * which produce long outputs must split them to pieces and yield to other + * operations between pieces. To simplify this (and possibly also complex + * parsing of input), the CLI session runs in a coroutine with its own + * execution context. At any time, cli_yield() can be called to interrupt + * the current coroutine and have the buffered output sent. + * + * Alternatively, a long sequence of replies can be split to parts + * using the @cont hook, which translates to yielding internally. * * The @this_cli variable points to a &cli structure of the session being - * currently parsed, but it's of course available only in command handlers - * not entered using the @cont hook. - * - * TX buffer management works as follows: At cli.tx_buf there is a - * list of TX buffers (struct cli_out), cli.tx_write is the buffer - * currently used by the producer (cli_printf(), cli_alloc_out()) and - * cli.tx_pos is the buffer currently used by the consumer - * (cli_write(), in system dependent code). The producer uses - * cli_out.wpos ptr as the current write position and the consumer - * uses cli_out.outpos ptr as the current read position. When the - * producer produces something, it calls cli_write_trigger(). If there - * is not enough space in the current buffer, the producer allocates - * the new one. When the consumer processes everything in the buffer - * queue, it calls cli_written(), tha frees all buffers (except the - * first one) and schedules cli.event . + * currently parsed, but it's available only before the first yield. * + * A note on transmit buffer management: cli.tx_buf is a head of a list + * of TX buffers (struct cli_out). A buffer pointed to by cli.tx_write + * is the one currently written to using cli_printf() and cli_alloc_out(), + * its wpos field points to the position of the write head in that buffer. + * On the other side, cli.tx_pos is the buffer being set to the socket + * and its outpos field is the position of the read head. */ #undef LOCAL_DEBUG From a043f2d79488cdfbc4d97ac0bad4926d29bf9a82 Mon Sep 17 00:00:00 2001 From: "Ondrej Zajicek (work)" Date: Fri, 24 Aug 2018 18:54:27 +0200 Subject: [PATCH 07/13] Doc: Fix description of 'description' Thanks to Clemens Schrimpe for the bugreport. --- doc/bird.sgml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/doc/bird.sgml b/doc/bird.sgml index 0057d2c5..fdfae2a6 100644 --- a/doc/bird.sgml +++ b/doc/bird.sgml @@ -653,7 +653,7 @@ agreement").