diff --git a/flock/container.c b/flock/container.c index 2c0dcbe4..98f3db32 100644 --- a/flock/container.c +++ b/flock/container.c @@ -32,6 +32,7 @@ struct container_fork_request { struct cbor_channel cch; struct cbor_channel *ctl_ch; struct container_runtime *crt; + int reply_state; }; struct container_runtime { @@ -39,10 +40,13 @@ struct container_runtime { uint hash; pid_t pid; sock *s; + struct cbor_stream stream; char hostname[]; }; -#define CRT_KEY(c) c->ccf.hostname, c->hash +#define CBOR_PARSER_ERROR FAIL + +#define CRT_KEY(c) c->hostname, c->hash #define CRT_NEXT(c) c->next #define CRT_EQ(a,h,b,i) ((h) == (i)) && (!strcmp(a,b)) #define CRT_FN(a,h) h @@ -650,98 +654,167 @@ container_cleanup(struct container_runtime *crt) mb_free(crt); } -static int -hypervisor_container_rx(sock *sk, uint _sz UNUSED) +struct container_ctl_channel { + struct cbor_channel cch; + int msg_state; + int down_signal; +}; + +static enum cbor_parse_result +container_ctl_parse(struct cbor_channel *cch, enum cbor_parse_result res) { - byte buf[128]; - ssize_t sz = read(sk->fd, buf, sizeof buf); - if (sz < 0) + SKIP_BACK_DECLARE(struct container_ctl_channel, ccc, cch, cch); + SKIP_BACK_DECLARE(struct container_runtime, crt, stream, cch->stream); + struct cbor_parser_context *ctx = &crt->stream.parser; + +#define FAIL(...) do { log(L_ERR "Container ctl parse: " __VA_ARGS__); return CPR_ERROR; } while (0) + + switch (res) { - log(L_ERR "error reading data from %p (container_rx): %m", sk); - sk_close(sk); - return 0; - } + case CPR_MAJOR: + switch (ccc->msg_state) + { + case 0: + if ((ctx->type != CBOR_MAP) || (ctx->value != 1)) + FAIL("Expected map of size 1, got %d-%d", ctx->type, ctx->value); - struct container_runtime *crt = sk->data; - ASSERT_DIE(crt->s == sk); + ccc->msg_state = 1; + return CPR_MORE; - ASSERT_DIE(sz >= 3); - ASSERT_DIE(buf[0] == 0xa1); + case 1: + if ((ctx->type != CBOR_NEGINT) || (ctx->value != 3)) + FAIL("Expected key -4, got %d-%d", ctx->type, ctx->value); - switch (buf[1]) { - case 0x23: - log(L_INFO "container %s ended by signal %d", crt->ccf.hostname, buf[2]); - if (crt->ccc) - callback_activate(&crt->ccc->cb); - container_cleanup(crt); + ccc->msg_state = 2; + return CPR_MORE; + + case 2: + CBOR_PARSE_ONLY(ctx, POSINT, ccc->down_signal); + ccc->msg_state = 3; + return CPR_MORE; + + default: + FAIL("Input overflow to state %d", ccc->msg_state); + } + bug("Overrun switch"); + + case CPR_STR_END: + FAIL("Unexpected string end"); + + case CPR_BLOCK_END: + switch (ccc->msg_state) { + case 3: + ccc->msg_state = 4; + break; + + default: + FAIL("Unexpected block end in state %d", ccc->msg_state); + } break; - default: - log(L_ERR "container %s sent a weird message 0x%02x sz %d", crt->ccf.hostname, buf[1], sz); - break; + case CPR_ERROR: + case CPR_MORE: + FAIL("Invalid input"); } - - return 0; -} - -static void -hypervisor_container_err(sock *sk, int err) -{ - struct container_runtime *crt = sk->data; - log(L_ERR "Container %s socket closed unexpectedly: %s", crt->ccf.hostname, strerror(err)); + + log(L_INFO "container %s ended by signal %d", crt->hostname, ccc->down_signal); container_cleanup(crt); + +#undef FAIL + return CPR_BLOCK_END; } -static int -hypervisor_container_forker_rx(sock *sk, uint sz) +static enum cbor_parse_result +container_fork_request_reply(struct cbor_channel *cch, enum cbor_parse_result res) { - if ((sz < 3) || (sk->rxfd < 0)) + ASSERT_DIE(cch->stream == hcf.stream); + + SKIP_BACK_DECLARE(struct container_fork_request, cfr, cch, cch); + struct container_runtime *crt = cfr->crt; + struct cbor_parser_context *ctx = &cch->stream->parser; + +#define FAIL(...) do { log(L_ERR "Container fork request reply: " __VA_ARGS__); return CPR_ERROR; } while (0) + + switch (res) { - log(L_ERR "Container forker RX strange behavior, what the hell (sz %d, fd %d)", sz, sk->rxfd); - sk_close(sk); - ev_send_loop(&main_birdloop, &poweroff_event); - return 0; + case CPR_MAJOR: + switch (cfr->reply_state) { + case 0: + if ((ctx->type != CBOR_MAP) || (ctx->value != 1)) + FAIL("Expected map of size 1, got %d-%d", ctx->type, ctx->value); + + cfr->reply_state = 1; + return CPR_MORE; + + case 1: + if ((ctx->type != CBOR_NEGINT) || (ctx->value != 1)) + FAIL("Expected key -2, got %d-%d", ctx->type, ctx->value); + + cfr->reply_state = 2; + return CPR_MORE; + + case 2: + CBOR_PARSE_ONLY(ctx, POSINT, crt->pid); + cfr->reply_state = 3; + return CPR_MORE; + + default: + FAIL("Input overflow to state %d", cfr->reply_state); + } + bug("Overrun switch"); + + case CPR_STR_END: + FAIL("Unexpected string end"); + + case CPR_BLOCK_END: + switch (cfr->reply_state) { + case 3: + cfr->reply_state = 4; + break; + + default: + FAIL("Unexpected block end in state %d", cfr->reply_state); + } + break; + + case CPR_ERROR: + case CPR_MORE: + FAIL("Invalid input"); } + + log(L_INFO "Machine started with PID %d", crt->pid); - ASSERT_DIE(sk->rbuf[0] == 0xa1); - ASSERT_DIE(sk->rbuf[1] == 0x21); - pid_t pid; - if (sk->rbuf[2] < 0x18) - pid = sk->rbuf[2]; - else if (sk->rbuf[2] == 24) - pid = sk->rbuf[3]; - else if (sk->rbuf[2] == 25) - pid = sk->rbuf[3] << 8 + sk->rbuf[4]; - else if (sk->rbuf[3] == 26) - pid = sk->rbuf[3] << 32 + sk->rbuf[4] << 24 + sk->rbuf[5] << 16 + sk->rbuf[6]; - else - bug("not implemented"); - - log(L_INFO "Machine started with PID %d", pid); - - sock *skl = sk_new(sk->pool); - skl->type = SK_MAGIC; - skl->rx_hook = hypervisor_container_rx; - skl->err_hook = hypervisor_container_err; - skl->fd = sk->rxfd; - sk_set_tbsize(skl, 1024); - - if (sk_open(skl, sk->loop) < 0) - bug("Machine control socket: sk_open failed"); + if (hcf.s->rxfd < 0) + FAIL("No control socket of the new machine"); ASSERT_DIE(birdloop_inside(hcf.loop)); - ASSERT_DIE(hcf.cur_crt); - skl->data = hcf.cur_crt; + sock *skl = sk_new(hcf.p); + skl->type = SK_MAGIC; - hcf.cur_crt->pid = pid; - hcf.cur_crt->s = skl; - if (hcf.cur_crt->ccc) - callback_activate(&hcf.cur_crt->ccc->cb); - hcf.cur_crt->ccc = NULL; - hcf.cur_crt = NULL; + skl->fd = hcf.s->rxfd; + hcf.s->rxfd = -1; - return 0; + if (sk_open(skl, hcf.loop) < 0) + bug("Machine control socket: sk_open failed"); + + sk_set_tbsize(skl, 1024); + skl->type = SK_UNIX_MSG; + + crt->s = skl; + cbor_stream_attach(&crt->stream, skl); + + CBOR_REPLY(cfr->ctl_ch, cw) + CBOR_PUT_MAP(cw) + { + cbor_put_int(cw, -1); + cbor_put_string(cw, "OK"); + } + + cbor_done_channel(&cfr->cch); + + return CPR_BLOCK_END; +#undef FAIL } static void @@ -827,12 +900,16 @@ hypervisor_container_start(struct cbor_channel *cch, struct flock_machine_contai HASH_INSERT(hcf.hash, CRT, crt); + /* Create a new channel atop the forker stream */ log(L_INFO "requesting machine creation, name %s", name); SKIP_BACK_DECLARE(struct container_fork_request, cfr, cch, cbor_channel_new(hcf.stream)); - cfr->crl_ch = cch; + cfr->ctl_ch = cch; cfr->crt = crt; cfr->cch.parse = container_fork_request_reply; + crt->stream.parse = container_ctl_parse; + cbor_stream_init(&crt->stream, 2); + CBOR_REPLY(&cfr->cch, cw) CBOR_PUT_MAP(cw) {