0
0
mirror of https://gitlab.nic.cz/labs/bird.git synced 2025-03-22 06:17:04 +00:00

Flock: some more parsers

This commit is contained in:
Maria Matejka 2024-10-10 14:28:20 +02:00
parent 393559ace7
commit 6cbda097c9

View File

@ -32,6 +32,7 @@ struct container_fork_request {
struct cbor_channel cch; struct cbor_channel cch;
struct cbor_channel *ctl_ch; struct cbor_channel *ctl_ch;
struct container_runtime *crt; struct container_runtime *crt;
int reply_state;
}; };
struct container_runtime { struct container_runtime {
@ -39,10 +40,13 @@ struct container_runtime {
uint hash; uint hash;
pid_t pid; pid_t pid;
sock *s; sock *s;
struct cbor_stream stream;
char hostname[]; char hostname[];
}; };
#define CRT_KEY(c) c->ccf.hostname, c->hash #define CBOR_PARSER_ERROR FAIL
#define CRT_KEY(c) c->hostname, c->hash
#define CRT_NEXT(c) c->next #define CRT_NEXT(c) c->next
#define CRT_EQ(a,h,b,i) ((h) == (i)) && (!strcmp(a,b)) #define CRT_EQ(a,h,b,i) ((h) == (i)) && (!strcmp(a,b))
#define CRT_FN(a,h) h #define CRT_FN(a,h) h
@ -650,98 +654,167 @@ container_cleanup(struct container_runtime *crt)
mb_free(crt); mb_free(crt);
} }
static int struct container_ctl_channel {
hypervisor_container_rx(sock *sk, uint _sz UNUSED) struct cbor_channel cch;
int msg_state;
int down_signal;
};
static enum cbor_parse_result
container_ctl_parse(struct cbor_channel *cch, enum cbor_parse_result res)
{ {
byte buf[128]; SKIP_BACK_DECLARE(struct container_ctl_channel, ccc, cch, cch);
ssize_t sz = read(sk->fd, buf, sizeof buf); SKIP_BACK_DECLARE(struct container_runtime, crt, stream, cch->stream);
if (sz < 0) struct cbor_parser_context *ctx = &crt->stream.parser;
#define FAIL(...) do { log(L_ERR "Container ctl parse: " __VA_ARGS__); return CPR_ERROR; } while (0)
switch (res)
{ {
log(L_ERR "error reading data from %p (container_rx): %m", sk); case CPR_MAJOR:
sk_close(sk); switch (ccc->msg_state)
return 0; {
} case 0:
if ((ctx->type != CBOR_MAP) || (ctx->value != 1))
FAIL("Expected map of size 1, got %d-%d", ctx->type, ctx->value);
struct container_runtime *crt = sk->data; ccc->msg_state = 1;
ASSERT_DIE(crt->s == sk); return CPR_MORE;
ASSERT_DIE(sz >= 3); case 1:
ASSERT_DIE(buf[0] == 0xa1); if ((ctx->type != CBOR_NEGINT) || (ctx->value != 3))
FAIL("Expected key -4, got %d-%d", ctx->type, ctx->value);
switch (buf[1]) { ccc->msg_state = 2;
case 0x23: return CPR_MORE;
log(L_INFO "container %s ended by signal %d", crt->ccf.hostname, buf[2]);
if (crt->ccc) case 2:
callback_activate(&crt->ccc->cb); CBOR_PARSE_ONLY(ctx, POSINT, ccc->down_signal);
container_cleanup(crt); ccc->msg_state = 3;
return CPR_MORE;
default:
FAIL("Input overflow to state %d", ccc->msg_state);
}
bug("Overrun switch");
case CPR_STR_END:
FAIL("Unexpected string end");
case CPR_BLOCK_END:
switch (ccc->msg_state) {
case 3:
ccc->msg_state = 4;
break;
default:
FAIL("Unexpected block end in state %d", ccc->msg_state);
}
break; break;
default: case CPR_ERROR:
log(L_ERR "container %s sent a weird message 0x%02x sz %d", crt->ccf.hostname, buf[1], sz); case CPR_MORE:
break; FAIL("Invalid input");
} }
return 0; log(L_INFO "container %s ended by signal %d", crt->hostname, ccc->down_signal);
}
static void
hypervisor_container_err(sock *sk, int err)
{
struct container_runtime *crt = sk->data;
log(L_ERR "Container %s socket closed unexpectedly: %s", crt->ccf.hostname, strerror(err));
container_cleanup(crt); container_cleanup(crt);
#undef FAIL
return CPR_BLOCK_END;
} }
static int static enum cbor_parse_result
hypervisor_container_forker_rx(sock *sk, uint sz) container_fork_request_reply(struct cbor_channel *cch, enum cbor_parse_result res)
{ {
if ((sz < 3) || (sk->rxfd < 0)) ASSERT_DIE(cch->stream == hcf.stream);
SKIP_BACK_DECLARE(struct container_fork_request, cfr, cch, cch);
struct container_runtime *crt = cfr->crt;
struct cbor_parser_context *ctx = &cch->stream->parser;
#define FAIL(...) do { log(L_ERR "Container fork request reply: " __VA_ARGS__); return CPR_ERROR; } while (0)
switch (res)
{ {
log(L_ERR "Container forker RX strange behavior, what the hell (sz %d, fd %d)", sz, sk->rxfd); case CPR_MAJOR:
sk_close(sk); switch (cfr->reply_state) {
ev_send_loop(&main_birdloop, &poweroff_event); case 0:
return 0; if ((ctx->type != CBOR_MAP) || (ctx->value != 1))
FAIL("Expected map of size 1, got %d-%d", ctx->type, ctx->value);
cfr->reply_state = 1;
return CPR_MORE;
case 1:
if ((ctx->type != CBOR_NEGINT) || (ctx->value != 1))
FAIL("Expected key -2, got %d-%d", ctx->type, ctx->value);
cfr->reply_state = 2;
return CPR_MORE;
case 2:
CBOR_PARSE_ONLY(ctx, POSINT, crt->pid);
cfr->reply_state = 3;
return CPR_MORE;
default:
FAIL("Input overflow to state %d", cfr->reply_state);
}
bug("Overrun switch");
case CPR_STR_END:
FAIL("Unexpected string end");
case CPR_BLOCK_END:
switch (cfr->reply_state) {
case 3:
cfr->reply_state = 4;
break;
default:
FAIL("Unexpected block end in state %d", cfr->reply_state);
}
break;
case CPR_ERROR:
case CPR_MORE:
FAIL("Invalid input");
} }
ASSERT_DIE(sk->rbuf[0] == 0xa1); log(L_INFO "Machine started with PID %d", crt->pid);
ASSERT_DIE(sk->rbuf[1] == 0x21);
pid_t pid;
if (sk->rbuf[2] < 0x18)
pid = sk->rbuf[2];
else if (sk->rbuf[2] == 24)
pid = sk->rbuf[3];
else if (sk->rbuf[2] == 25)
pid = sk->rbuf[3] << 8 + sk->rbuf[4];
else if (sk->rbuf[3] == 26)
pid = sk->rbuf[3] << 32 + sk->rbuf[4] << 24 + sk->rbuf[5] << 16 + sk->rbuf[6];
else
bug("not implemented");
log(L_INFO "Machine started with PID %d", pid); if (hcf.s->rxfd < 0)
FAIL("No control socket of the new machine");
sock *skl = sk_new(sk->pool);
skl->type = SK_MAGIC;
skl->rx_hook = hypervisor_container_rx;
skl->err_hook = hypervisor_container_err;
skl->fd = sk->rxfd;
sk_set_tbsize(skl, 1024);
if (sk_open(skl, sk->loop) < 0)
bug("Machine control socket: sk_open failed");
ASSERT_DIE(birdloop_inside(hcf.loop)); ASSERT_DIE(birdloop_inside(hcf.loop));
ASSERT_DIE(hcf.cur_crt); sock *skl = sk_new(hcf.p);
skl->data = hcf.cur_crt; skl->type = SK_MAGIC;
hcf.cur_crt->pid = pid; skl->fd = hcf.s->rxfd;
hcf.cur_crt->s = skl; hcf.s->rxfd = -1;
if (hcf.cur_crt->ccc)
callback_activate(&hcf.cur_crt->ccc->cb);
hcf.cur_crt->ccc = NULL;
hcf.cur_crt = NULL;
return 0; if (sk_open(skl, hcf.loop) < 0)
bug("Machine control socket: sk_open failed");
sk_set_tbsize(skl, 1024);
skl->type = SK_UNIX_MSG;
crt->s = skl;
cbor_stream_attach(&crt->stream, skl);
CBOR_REPLY(cfr->ctl_ch, cw)
CBOR_PUT_MAP(cw)
{
cbor_put_int(cw, -1);
cbor_put_string(cw, "OK");
}
cbor_done_channel(&cfr->cch);
return CPR_BLOCK_END;
#undef FAIL
} }
static void static void
@ -827,12 +900,16 @@ hypervisor_container_start(struct cbor_channel *cch, struct flock_machine_contai
HASH_INSERT(hcf.hash, CRT, crt); HASH_INSERT(hcf.hash, CRT, crt);
/* Create a new channel atop the forker stream */
log(L_INFO "requesting machine creation, name %s", name); log(L_INFO "requesting machine creation, name %s", name);
SKIP_BACK_DECLARE(struct container_fork_request, cfr, cch, cbor_channel_new(hcf.stream)); SKIP_BACK_DECLARE(struct container_fork_request, cfr, cch, cbor_channel_new(hcf.stream));
cfr->crl_ch = cch; cfr->ctl_ch = cch;
cfr->crt = crt; cfr->crt = crt;
cfr->cch.parse = container_fork_request_reply; cfr->cch.parse = container_fork_request_reply;
crt->stream.parse = container_ctl_parse;
cbor_stream_init(&crt->stream, 2);
CBOR_REPLY(&cfr->cch, cw) CBOR_REPLY(&cfr->cch, cw)
CBOR_PUT_MAP(cw) CBOR_PUT_MAP(cw)
{ {