diff --git a/flock/container.c b/flock/container.c index 476ef8ff..02d5ef99 100644 --- a/flock/container.c +++ b/flock/container.c @@ -21,7 +21,7 @@ static struct hypervisor_container_forker { sock *s; pool *p; - struct cbor_stream stream; + CBOR_STREAM_EMBED(stream, 4); struct birdloop *loop; HASH(struct container_runtime) hash; struct container_runtime *cur_crt; @@ -29,7 +29,7 @@ static struct hypervisor_container_forker { } hcf; struct container_fork_request { - struct cbor_channel cch; + CBOR_CHANNEL_EMBED(cch, 4); struct cbor_channel *ctl_ch; struct container_runtime *crt; int reply_state; @@ -40,7 +40,7 @@ struct container_runtime { uint hash; pid_t pid; sock *s; - struct cbor_stream stream; + CBOR_STREAM_EMBED(stream, 4); char hostname[]; }; @@ -646,6 +646,20 @@ container_start(struct flock_machine_container_config *ccf) /* The Parent */ +static struct container_runtime * +container_find_by_name(const char *name) +{ + uint h = mem_hash(name, strlen(name)); + return HASH_FIND(hcf.hash, CRT, name, h); +} + +struct cbor_channel * +container_get_channel(const char *name) +{ + struct container_runtime *crt = container_find_by_name(name); + return crt ? cbor_channel_new(&crt->stream) : NULL; +} + static void container_cleanup(struct container_runtime *crt) { @@ -655,7 +669,7 @@ container_cleanup(struct container_runtime *crt) } struct container_ctl_msg { - struct cbor_channel cch; + CBOR_CHANNEL_EMBED(cch, 4); struct cbor_channel *ctl_ch; int msg_state; int down_signal; @@ -812,7 +826,7 @@ container_fork_request_reply(struct cbor_channel *cch, enum cbor_parse_result re cbor_put_string(cw, "OK"); } - cbor_done_channel(&cfr->cch); + cbor_channel_done(&cfr->cch); return CPR_BLOCK_END; #undef FAIL @@ -826,7 +840,7 @@ hypervisor_container_start(struct cbor_channel *cch, struct flock_machine_contai #define FAIL(id, msg) do { \ CBOR_REPLY(cch, cw) CBOR_PUT_MAP(cw) { \ cbor_put_int(cw, id); cbor_put_string(cw, msg);\ - } cbor_done_channel(cch); \ + } cbor_channel_done(cch); \ birdloop_leave(hcf.loop); \ return; } while (0) @@ -860,7 +874,7 @@ hypervisor_container_start(struct cbor_channel *cch, struct flock_machine_contai cfr->cch.parse = container_fork_request_reply; crt->stream.parse = container_ctl_parse; - cbor_stream_init(&crt->stream, 2); + CBOR_STREAM_INIT(crt, stream, cch, hcf.p, struct container_ctl_msg); CBOR_REPLY(&cfr->cch, cw) CBOR_PUT_MAP(cw) @@ -941,7 +955,7 @@ container_stopped(struct cbor_channel *cch, enum cbor_parse_result res) cbor_put_string(cw, "OK"); } - cbor_done_channel(&ccc->cch); + cbor_channel_done(&ccc->cch); return CPR_BLOCK_END; #undef FAIL } @@ -964,7 +978,7 @@ hypervisor_container_shutdown(struct cbor_channel *cch, struct flock_machine_con cbor_put_string(cw, "BAD: Not found"); } - cbor_done_channel(cch); + cbor_channel_done(cch); birdloop_leave(hcf.loop); return; } @@ -1182,7 +1196,7 @@ hypervisor_container_fork(void) hcf.s->type = SK_UNIX_MSG; hcf.stream.parse = container_fork_request_reply; - cbor_stream_init(&hcf.stream, 3); + CBOR_STREAM_INIT(&hcf, stream, cch, hcf.p, struct container_fork_request); cbor_stream_attach(&hcf.stream, hcf.s); birdloop_leave(hcf.loop); diff --git a/flock/ctl.c b/flock/ctl.c index c5b4c801..66f323e4 100644 --- a/flock/ctl.c +++ b/flock/ctl.c @@ -27,11 +27,11 @@ struct hcs_parser_stream { u64 bytes_consumed; u64 major_state; - struct cbor_stream stream; + CBOR_STREAM_EMBED(stream, 4); }; struct hcs_parser_channel { - struct cbor_channel cch; + CBOR_CHANNEL_EMBED(cch, 4); struct hcs_parser_stream *htx; enum { @@ -58,18 +58,18 @@ hcs_request_poweroff(struct hcs_parser_channel *hpc) cbor_put_string(cw, "OK"); } - cbor_done_channel(&hpc->cch); + cbor_channel_done(&hpc->cch); } struct hcs_parser_stream * hcs_parser_init(sock *s) { - struct cbor_parser_context *ctx = cbor_parser_new(s->pool, 4); struct hcs_parser_stream *htx = mb_allocz(s->pool, sizeof *htx); - htx->ctx = ctx; - htx->sock = s; - cbor_stream_init(&htx->stream, 3); + CBOR_STREAM_INIT(htx, stream, cch, s->pool, struct hcs_parser_channel); + cbor_stream_attach(&htx->stream, s); + htx->stream.parse = hcs_parse; + htx->stream.cancel = hcs_parser_cleanup; return htx; } diff --git a/flock/flock-cli b/flock/flock-cli index 67e2153b..dc137dd1 100755 --- a/flock/flock-cli +++ b/flock/flock-cli @@ -35,6 +35,9 @@ class HypervisorStaleError(HandlerError): def __init__(self, *args, **kwargs): return super().__init__("Hypervisor stale", *args, **kwargs) +class GarbledReplyError(HandlerError): + pass + def connect(where: pathlib.Path): if not where.exists(): raise HypervisorNonexistentError() @@ -49,15 +52,20 @@ def connect(where: pathlib.Path): def ctl_path(name: str): return DEFAULT_RUN_PATH / f"{name}.ctl" -def msg(name: str, data: dict): +def msg(name: str, cmd: int, data): try: ctl = connect(ctl_path(name)) except HypervisorNonexistentError as e: e.add_note(f"Failed to send message {data} to {name}") raise e - ctl.sendall(cbor2.dumps(data)) - return cbor2.loads(ctl.recv(1024)) + ctl.sendall(cbor2.dumps([ 0, cmd, data])) + data = cbor2.loads(ctl.recv(1024)) + if len(data) != 2: + raise GarbledReplyError(f"Expected 2 array items, got {len(data)}") + if data[0] != 0: + raise GarbledReplyError(f"Expected zero ID, got {data[0]}") + return data[1] @handler def start(name: str): @@ -72,7 +80,7 @@ def start(name: str): @handler def stop(name: str): - for k,v in msg(name, { 0: None }).items(): + for k,v in msg(name, 1, None).items(): assert(k == -1) assert(v == "OK") @@ -86,28 +94,28 @@ def cleanup(name: str): @handler def telnet(name: str): - for k,v in msg(name, { 1: None}).items(): + for k,v in msg(name, 2, None).items(): assert(k == -2) os.execlp("telnet", "telnet", "localhost", str(v)) @handler def container_start(hypervisor: str, name: str): - for k,v in msg(hypervisor, { 3: { + for k,v in msg(hypervisor, 3, { 0: name, 1: 1, 2: b"/", 3: bytes(DEFAULT_RUN_PATH / hypervisor / name), - }}).items(): + }).items(): print(k,v) @handler def container_stop(hypervisor: str, name: str): - for k,v in msg(hypervisor, { 4: { 0: name, }}).items(): + for k,v in msg(hypervisor, 4, { 0: name, }).items(): print(k,v) @handler def container_telnet(hypervisor: str, name: str): - for k,v in msg(hypervisor, { 1: name}).items(): + for k,v in msg(hypervisor, 2, name).items(): assert(k == -2) os.execlp("telnet", "telnet", "localhost", str(v)) diff --git a/flock/flock.h b/flock/flock.h index 1182fc85..eabef9be 100644 --- a/flock/flock.h +++ b/flock/flock.h @@ -52,7 +52,8 @@ union flock_machine_config { void hypervisor_container_start(struct cbor_channel *, struct flock_machine_container_config *); void hypervisor_container_shutdown(struct cbor_channel *, struct flock_machine_container_config *); -int container_ctl_fd(const char *name); + +struct cbor_channel *container_get_channel(const char *name); void hexp_cleanup_after_fork(void); diff --git a/flock/hypervisor.c b/flock/hypervisor.c index 1310d974..95cd5a4d 100644 --- a/flock/hypervisor.c +++ b/flock/hypervisor.c @@ -20,53 +20,12 @@ static pool *hcs_pool; OBSREF(struct shutdown_placeholder) hcs_shutdown_placeholder; -static int -hcs_rx(sock *s, uint size) -{ - s64 sz = hcs_parse(s->data, s->rbuf, size); - if (sz < 0) - { - log(L_INFO "CLI parser error at position %ld: %s", -sz-1, hcs_error(s->data)); - sk_close(s); - return 0; /* Must return 0 when closed */ - } - - if (!hcs_complete(s->data)) - { - ASSERT_DIE(sz == size); - return 1; - } - - log(L_INFO "Parsed command."); - - /* TODO do something more */ - if (sz < size) - memmove(s->rbuf, s->rbuf + sz, size - sz); - if (!s->rx_hook) - return (sz == size); - - hcs_parser_cleanup(s->data); - s->data = hcs_parser_init(s); - - return (sz < size) ? hcs_rx(s, size - sz) : 1; -} - -static void -hcs_err(sock *s, int err) -{ - log(L_INFO "CLI dropped: %s", strerror(err)); - hcs_parser_cleanup(s->data); - sk_close(s); -} - static int hcs_connect(sock *s, uint size UNUSED) { log(L_INFO "CLI connected: %p", s); - s->rx_hook = hcs_rx; - s->err_hook = hcs_err; - s->data = hcs_parser_init(s); + hcs_parser_init(s); return 1; } @@ -127,8 +86,7 @@ static struct hypervisor_exposed { pool *p; sock *s; struct birdloop *loop; - const char *port_name; - sock *port_sreq; + struct hcs_parser_channel *hpc; } he; /** @@ -281,13 +239,19 @@ hypervisor_exposed_child_rx(sock *sk, uint size) sk->txfd = sfd; - linpool *lp = lp_new(sk->pool); - struct cbor_writer *cw = cbor_init(sk->tbuf, sk->tbsize, lp); - cbor_open_block_with_length(cw, 1); - cbor_add_int(cw, -2); - cbor_add_int(cw, r); + struct { + struct cbor_writer cw; + struct cbor_writer_stack_item si[2]; + } cw; - e = sk_send(sk, cw->pt); + cbor_writer_init(&cw.cw, 2, sk->tbuf, sk->tbsize); + CBOR_PUT_MAP(&cw.cw) + { + cbor_put_int(&cw.cw, -2); + cbor_put_int(&cw.cw, r); + } + + e = sk_send(sk, cw.cw.data.pos - cw.cw.data.start); if (e < 0) log(L_ERR "Failed to send socket: %m"); @@ -388,9 +352,7 @@ hexp_cleanup_after_fork(void) static void hexp_sock_err(sock *s, int err UNUSED) { - ASSERT_DIE(s == he.port_sreq); - he.port_name = NULL; - he.port_sreq = NULL; + he.hpc = NULL; } void @@ -406,17 +368,31 @@ hexp_get_telnet(struct hcs_parser_channel *hpc) int e = write(he.s->fd, buf, sizeof buf); if (e != sizeof buf) bug("write error handling not implemented, got %d (%m)", e); - - s->err_paused = hexp_sock_err; - sk_pause_rx(s->loop, s); } -static void hexp_received_telnet(struct hexp_received_telnet *hrt) +struct hcs_parser_channel { + struct cbor_channel cch; + struct hcs_parser_stream *htx; + + enum { + HCS_CMD_SHUTDOWN = 1, + HCS_CMD_TELNET, + HCS_CMD_MACHINE_START, + HCS_CMD_MACHINE_STOP, + HCS_CMD__MAX, + } cmd; + + union flock_machine_config cfg; +}; + +static void hexp_received_telnet(void *_hrt) { - if (hrt->name[0]) + struct hexp_received_telnet *hrt = _hrt; + + if (he.hpc->cfg.cf.name[0]) { /* Transferring the received listening socket to the container */ - struct cbor_channel *ccc = container_get_channel(hrt->name); + struct cbor_channel *ccc = container_get_channel(he.hpc->cfg.cf.name); CBOR_REPLY(ccc, cw) CBOR_PUT_MAP(cw) { @@ -440,21 +416,16 @@ static void hexp_received_telnet(struct hexp_received_telnet *hrt) bug("Telnet listener: sk_open failed"); } - if (s) + if (he.hpc) { - linpool *lp = lp_new(hcs_pool); - struct cbor_writer *cw = cbor_init(s->tbuf, s->tbsize, lp); - cbor_open_block_with_length(cw, 1); - cbor_add_int(cw, -2); - cbor_add_int(cw, hrt->port); + CBOR_REPLY(&he.hpc->cch, cw) + CBOR_PUT_MAP(cw) + { + cbor_put_int(cw, -2); + cbor_put_int(cw, hrt->port); + } - sk_send(s, cw->pt); - sk_resume_rx(hcs_loop, s); - - hcs_parser_cleanup(s->data); - s->data = hcs_parser_init(s); - - rfree(lp); + cbor_channel_done(&he.hpc->cch); } birdloop_enter(he.loop);