From d3e2fe4535ebe46e614f186996fc610ff42af746 Mon Sep 17 00:00:00 2001 From: Maria Matejka Date: Thu, 10 Oct 2024 19:12:43 +0200 Subject: [PATCH] Container streaming --- flock/container.c | 175 ++++++++++++++++++++++------------------------ 1 file changed, 83 insertions(+), 92 deletions(-) diff --git a/flock/container.c b/flock/container.c index 98f3db32..568433ec 100644 --- a/flock/container.c +++ b/flock/container.c @@ -21,7 +21,7 @@ static struct hypervisor_container_forker { sock *s; pool *p; - struct cbor_stream *stream; + struct cbor_stream stream; struct birdloop *loop; HASH(struct container_runtime) hash; struct container_runtime *cur_crt; @@ -654,8 +654,9 @@ container_cleanup(struct container_runtime *crt) mb_free(crt); } -struct container_ctl_channel { +struct container_ctl_msg { struct cbor_channel cch; + struct cbor_channel *ctl_ch; int msg_state; int down_signal; }; @@ -663,7 +664,7 @@ struct container_ctl_channel { static enum cbor_parse_result container_ctl_parse(struct cbor_channel *cch, enum cbor_parse_result res) { - SKIP_BACK_DECLARE(struct container_ctl_channel, ccc, cch, cch); + SKIP_BACK_DECLARE(struct container_ctl_msg, ccc, cch, cch); SKIP_BACK_DECLARE(struct container_runtime, crt, stream, cch->stream); struct cbor_parser_context *ctx = &crt->stream.parser; @@ -727,7 +728,7 @@ container_ctl_parse(struct cbor_channel *cch, enum cbor_parse_result res) static enum cbor_parse_result container_fork_request_reply(struct cbor_channel *cch, enum cbor_parse_result res) { - ASSERT_DIE(cch->stream == hcf.stream); + ASSERT_DIE(cch->stream == &hcf.stream); SKIP_BACK_DECLARE(struct container_fork_request, cfr, cch, cch); struct container_runtime *crt = cfr->crt; @@ -817,59 +818,6 @@ container_fork_request_reply(struct cbor_channel *cch, enum cbor_parse_result re #undef FAIL } -static void -hypervisor_container_forker_err(sock *sk, int e UNUSED) -{ - sk_close(sk); -} - -/* The child */ - -#if 0 -static void -crt_err(sock *s, int err UNUSED) -{ - struct container_runtime *crt = s->data; - s->data = crt->ccc->data; - callback_cancel(&crt->ccc->cb); - mb_free(crt->ccc); - crt->ccc = NULL; -} - -int -container_ctl_fd(const char *name) -{ - uint h = mem_hash(name, strlen(name)); - struct container_runtime *crt = HASH_FIND(hcf.hash, CRT, name, h); - return (crt && crt->s) ? crt->s->fd : -1; -} - -static void -container_created(callback *cb) -{ - SKIP_BACK_DECLARE(struct container_operation_callback, ccc, cb, cb); - - sock *s = ccc->s; - struct { - struct cbor_writer w; - struct cbor_writer_stack_item si[2]; - } _cw; - - struct cbor_writer *cw = cbor_writer_init(&_cw.w, 2, s->tbuf, s->tbsize); - CBOR_PUT_MAP(cw) - { - cbor_put_int(cw, -1); - cbor_put_string(cw, "OK"); - } - sk_send(s, cw->data.pos - cw->data.start); - - s->data = ccc->data; - sk_resume_rx(s->loop, s); - - mb_free(ccc); -} -#endif - void hypervisor_container_start(struct cbor_channel *cch, struct flock_machine_container_config *ccf) { @@ -902,7 +850,7 @@ hypervisor_container_start(struct cbor_channel *cch, struct flock_machine_contai /* Create a new channel atop the forker stream */ log(L_INFO "requesting machine creation, name %s", name); - SKIP_BACK_DECLARE(struct container_fork_request, cfr, cch, cbor_channel_new(hcf.stream)); + SKIP_BACK_DECLARE(struct container_fork_request, cfr, cch, cbor_channel_new(&hcf.stream)); cfr->ctl_ch = cch; cfr->crt = crt; cfr->cch.parse = container_fork_request_reply; @@ -924,30 +872,73 @@ hypervisor_container_start(struct cbor_channel *cch, struct flock_machine_contai birdloop_leave(hcf.loop); } -static void -container_stopped(callback *cb) +static enum cbor_parse_result +container_stopped(struct cbor_channel *cch, enum cbor_parse_result res) { - SKIP_BACK_DECLARE(struct container_operation_callback, ccc, cb, cb); + SKIP_BACK_DECLARE(struct container_ctl_msg, ccc, cch, cch); + SKIP_BACK_DECLARE(struct container_runtime, crt, stream, cch->stream); + struct cbor_parser_context *ctx = &crt->stream.parser; - sock *s = ccc->s; - struct { - struct cbor_writer w; - struct cbor_writer_stack_item si[2]; - } _cw; +#define FAIL(...) do { log(L_ERR "Container stopped parse: " __VA_ARGS__); return CPR_ERROR; } while (0) - struct cbor_writer *cw = cbor_writer_init(&_cw.w, 2, s->tbuf, s->tbsize); - CBOR_PUT_MAP(cw) + switch (res) { - cbor_put_int(cw, -1); - cbor_put_string(cw, "OK"); + case CPR_MAJOR: + switch (ccc->msg_state) + { + case 0: + if ((ctx->type != CBOR_MAP) || (ctx->value != 1)) + FAIL("Expected map of size 1, got %d-%d", ctx->type, ctx->value); + + ccc->msg_state = 1; + return CPR_MORE; + + case 1: + if ((ctx->type != CBOR_NEGINT) || (ctx->value != 3)) + FAIL("Expected key -4, got %d-%d", ctx->type, ctx->value); + + ccc->msg_state = 2; + return CPR_MORE; + + case 2: + CBOR_PARSE_ONLY(ctx, POSINT, ccc->down_signal); + ccc->msg_state = 3; + return CPR_MORE; + + default: + FAIL("Input overflow to state %d", ccc->msg_state); + } + bug("Overrun switch"); + + case CPR_STR_END: + FAIL("Unexpected string end"); + + case CPR_BLOCK_END: + switch (ccc->msg_state) { + case 3: + ccc->msg_state = 4; + break; + + default: + FAIL("Unexpected block end in state %d", ccc->msg_state); + } + break; + + case CPR_ERROR: + case CPR_MORE: + FAIL("Invalid input"); } - sk_send(s, cw->data.pos - cw->data.start); + CBOR_REPLY(ccc->ctl_ch, cw) + CBOR_PUT_MAP(cw) + { + cbor_put_int(cw, -1); + cbor_put_string(cw, "OK"); + } - s->data = ccc->data; - sk_resume_rx(s->loop, s); - - mb_free(ccc); + cbor_done_channel(&ccc->cch); + return CPR_BLOCK_END; +#undef FAIL } void @@ -973,22 +964,16 @@ hypervisor_container_shutdown(struct cbor_channel *cch, struct flock_machine_con return; } - struct cbor_channel *xch = cbor_channel_new(crt->stream); - CBOR_REPLY(xch, cw) + SKIP_BACK_DECLARE(struct container_ctl_msg, ccr, cch, cbor_channel_new(&crt->stream)); + CBOR_REPLY(&ccr->cch, cw) CBOR_PUT_MAP(cw) { cbor_put_int(cw, 0); cbor_put_null(cw); } - struct container_operation_callback *ccc = mb_alloc(cch->pool, sizeof *ccc); - *ccc = (struct container_operation_callback) { - .cch = cch, - .ccf = ccf, - .cancel = container_operation_hangup, - }; - callback_init(&ccc->cb, container_stopped, s->loop); - crt->ccc = ccc; + ccr->cch.parse = container_stopped; + ccr->ctl_ch = cch; birdloop_leave(hcf.loop); } @@ -1000,6 +985,7 @@ struct ccs_parser_context { u64 major_state; }; +#undef CBOR_PARSER_ERROR #define CBOR_PARSER_ERROR bug static struct ccs_parser_context ccx_, *ccx = &ccx_; @@ -1010,6 +996,8 @@ hcf_parse(byte *buf, int size) ASSERT_DIE(size > 0); struct cbor_parser_context *ctx = ccx->ctx; + static struct flock_machine_container_config ccf; + for (int pos = 0; pos < size; pos++) { switch (cbor_parse_byte(ctx, buf[pos])) @@ -1028,7 +1016,7 @@ hcf_parse(byte *buf, int size) if (ctx->type != 5) CBOR_PARSER_ERROR("Expected mapping, got %u", ctx->type); - ccf = (struct container_config) {}; + ccf = (struct flock_machine_container_config) {}; ccx->major_state = 1; break; @@ -1050,11 +1038,11 @@ hcf_parse(byte *buf, int size) if (ctx->tflags & CPT_VARLEN) CBOR_PARSER_ERROR("Variable length string not supported yet"); - if (ccf.hostname) + if (ccf.cf.name) CBOR_PARSER_ERROR("Duplicate argument 0 / hostname"); ASSERT_DIE(!ctx->target_buf); - ccf.hostname = ctx->target_buf = lp_alloc(ctx->lp, ctx->value + 1); + ccf.cf.name = ctx->target_buf = lp_alloc(ctx->lp, ctx->value + 1); ctx->target_len = ctx->value; break; @@ -1109,6 +1097,8 @@ hcf_parse(byte *buf, int size) } break; + case CPR_BLOCK_END: + bug("invalid parser state"); } /* End of array or map */ @@ -1128,7 +1118,7 @@ hcf_parse(byte *buf, int size) return; case 1: /* the mapping ended */ - if (!ccf.hostname) + if (!ccf.cf.name) CBOR_PARSER_ERROR("Missing hostname"); if (!ccf.workdir) @@ -1137,7 +1127,7 @@ hcf_parse(byte *buf, int size) if (!ccf.basedir) CBOR_PARSER_ERROR("Missing basedir"); - container_start(); + container_start(&ccf); ccx->major_state = 0; break; @@ -1175,8 +1165,6 @@ hypervisor_container_fork(void) hcf.s = sk_new(hcf.p); hcf.s->type = SK_MAGIC; /* Set the hooks and fds according to the side we are at */ - hcf.s->rx_hook = hypervisor_container_forker_rx; - hcf.s->err_hook = hypervisor_container_forker_err; sk_set_tbsize(hcf.s, 16384); sk_set_rbsize(hcf.s, 128); hcf.s->fd = fds[0]; @@ -1188,6 +1176,9 @@ hypervisor_container_fork(void) bug("Container forker parent: sk_open failed"); hcf.s->type = SK_UNIX_MSG; + hcf.stream.parse = container_fork_request_reply; + cbor_stream_init(&hcf.stream, 3); + cbor_stream_attach(&hcf.stream, hcf.s); birdloop_leave(hcf.loop); return;