mirror of
https://gitlab.nic.cz/labs/bird.git
synced 2025-01-18 06:51:54 +00:00
Container streaming
This commit is contained in:
parent
6cbda097c9
commit
d3e2fe4535
@ -21,7 +21,7 @@
|
|||||||
static struct hypervisor_container_forker {
|
static struct hypervisor_container_forker {
|
||||||
sock *s;
|
sock *s;
|
||||||
pool *p;
|
pool *p;
|
||||||
struct cbor_stream *stream;
|
struct cbor_stream stream;
|
||||||
struct birdloop *loop;
|
struct birdloop *loop;
|
||||||
HASH(struct container_runtime) hash;
|
HASH(struct container_runtime) hash;
|
||||||
struct container_runtime *cur_crt;
|
struct container_runtime *cur_crt;
|
||||||
@ -654,8 +654,9 @@ container_cleanup(struct container_runtime *crt)
|
|||||||
mb_free(crt);
|
mb_free(crt);
|
||||||
}
|
}
|
||||||
|
|
||||||
struct container_ctl_channel {
|
struct container_ctl_msg {
|
||||||
struct cbor_channel cch;
|
struct cbor_channel cch;
|
||||||
|
struct cbor_channel *ctl_ch;
|
||||||
int msg_state;
|
int msg_state;
|
||||||
int down_signal;
|
int down_signal;
|
||||||
};
|
};
|
||||||
@ -663,7 +664,7 @@ struct container_ctl_channel {
|
|||||||
static enum cbor_parse_result
|
static enum cbor_parse_result
|
||||||
container_ctl_parse(struct cbor_channel *cch, enum cbor_parse_result res)
|
container_ctl_parse(struct cbor_channel *cch, enum cbor_parse_result res)
|
||||||
{
|
{
|
||||||
SKIP_BACK_DECLARE(struct container_ctl_channel, ccc, cch, cch);
|
SKIP_BACK_DECLARE(struct container_ctl_msg, ccc, cch, cch);
|
||||||
SKIP_BACK_DECLARE(struct container_runtime, crt, stream, cch->stream);
|
SKIP_BACK_DECLARE(struct container_runtime, crt, stream, cch->stream);
|
||||||
struct cbor_parser_context *ctx = &crt->stream.parser;
|
struct cbor_parser_context *ctx = &crt->stream.parser;
|
||||||
|
|
||||||
@ -727,7 +728,7 @@ container_ctl_parse(struct cbor_channel *cch, enum cbor_parse_result res)
|
|||||||
static enum cbor_parse_result
|
static enum cbor_parse_result
|
||||||
container_fork_request_reply(struct cbor_channel *cch, enum cbor_parse_result res)
|
container_fork_request_reply(struct cbor_channel *cch, enum cbor_parse_result res)
|
||||||
{
|
{
|
||||||
ASSERT_DIE(cch->stream == hcf.stream);
|
ASSERT_DIE(cch->stream == &hcf.stream);
|
||||||
|
|
||||||
SKIP_BACK_DECLARE(struct container_fork_request, cfr, cch, cch);
|
SKIP_BACK_DECLARE(struct container_fork_request, cfr, cch, cch);
|
||||||
struct container_runtime *crt = cfr->crt;
|
struct container_runtime *crt = cfr->crt;
|
||||||
@ -817,59 +818,6 @@ container_fork_request_reply(struct cbor_channel *cch, enum cbor_parse_result re
|
|||||||
#undef FAIL
|
#undef FAIL
|
||||||
}
|
}
|
||||||
|
|
||||||
static void
|
|
||||||
hypervisor_container_forker_err(sock *sk, int e UNUSED)
|
|
||||||
{
|
|
||||||
sk_close(sk);
|
|
||||||
}
|
|
||||||
|
|
||||||
/* The child */
|
|
||||||
|
|
||||||
#if 0
|
|
||||||
static void
|
|
||||||
crt_err(sock *s, int err UNUSED)
|
|
||||||
{
|
|
||||||
struct container_runtime *crt = s->data;
|
|
||||||
s->data = crt->ccc->data;
|
|
||||||
callback_cancel(&crt->ccc->cb);
|
|
||||||
mb_free(crt->ccc);
|
|
||||||
crt->ccc = NULL;
|
|
||||||
}
|
|
||||||
|
|
||||||
int
|
|
||||||
container_ctl_fd(const char *name)
|
|
||||||
{
|
|
||||||
uint h = mem_hash(name, strlen(name));
|
|
||||||
struct container_runtime *crt = HASH_FIND(hcf.hash, CRT, name, h);
|
|
||||||
return (crt && crt->s) ? crt->s->fd : -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
static void
|
|
||||||
container_created(callback *cb)
|
|
||||||
{
|
|
||||||
SKIP_BACK_DECLARE(struct container_operation_callback, ccc, cb, cb);
|
|
||||||
|
|
||||||
sock *s = ccc->s;
|
|
||||||
struct {
|
|
||||||
struct cbor_writer w;
|
|
||||||
struct cbor_writer_stack_item si[2];
|
|
||||||
} _cw;
|
|
||||||
|
|
||||||
struct cbor_writer *cw = cbor_writer_init(&_cw.w, 2, s->tbuf, s->tbsize);
|
|
||||||
CBOR_PUT_MAP(cw)
|
|
||||||
{
|
|
||||||
cbor_put_int(cw, -1);
|
|
||||||
cbor_put_string(cw, "OK");
|
|
||||||
}
|
|
||||||
sk_send(s, cw->data.pos - cw->data.start);
|
|
||||||
|
|
||||||
s->data = ccc->data;
|
|
||||||
sk_resume_rx(s->loop, s);
|
|
||||||
|
|
||||||
mb_free(ccc);
|
|
||||||
}
|
|
||||||
#endif
|
|
||||||
|
|
||||||
void
|
void
|
||||||
hypervisor_container_start(struct cbor_channel *cch, struct flock_machine_container_config *ccf)
|
hypervisor_container_start(struct cbor_channel *cch, struct flock_machine_container_config *ccf)
|
||||||
{
|
{
|
||||||
@ -902,7 +850,7 @@ hypervisor_container_start(struct cbor_channel *cch, struct flock_machine_contai
|
|||||||
|
|
||||||
/* Create a new channel atop the forker stream */
|
/* Create a new channel atop the forker stream */
|
||||||
log(L_INFO "requesting machine creation, name %s", name);
|
log(L_INFO "requesting machine creation, name %s", name);
|
||||||
SKIP_BACK_DECLARE(struct container_fork_request, cfr, cch, cbor_channel_new(hcf.stream));
|
SKIP_BACK_DECLARE(struct container_fork_request, cfr, cch, cbor_channel_new(&hcf.stream));
|
||||||
cfr->ctl_ch = cch;
|
cfr->ctl_ch = cch;
|
||||||
cfr->crt = crt;
|
cfr->crt = crt;
|
||||||
cfr->cch.parse = container_fork_request_reply;
|
cfr->cch.parse = container_fork_request_reply;
|
||||||
@ -924,30 +872,73 @@ hypervisor_container_start(struct cbor_channel *cch, struct flock_machine_contai
|
|||||||
birdloop_leave(hcf.loop);
|
birdloop_leave(hcf.loop);
|
||||||
}
|
}
|
||||||
|
|
||||||
static void
|
static enum cbor_parse_result
|
||||||
container_stopped(callback *cb)
|
container_stopped(struct cbor_channel *cch, enum cbor_parse_result res)
|
||||||
{
|
{
|
||||||
SKIP_BACK_DECLARE(struct container_operation_callback, ccc, cb, cb);
|
SKIP_BACK_DECLARE(struct container_ctl_msg, ccc, cch, cch);
|
||||||
|
SKIP_BACK_DECLARE(struct container_runtime, crt, stream, cch->stream);
|
||||||
|
struct cbor_parser_context *ctx = &crt->stream.parser;
|
||||||
|
|
||||||
sock *s = ccc->s;
|
#define FAIL(...) do { log(L_ERR "Container stopped parse: " __VA_ARGS__); return CPR_ERROR; } while (0)
|
||||||
struct {
|
|
||||||
struct cbor_writer w;
|
|
||||||
struct cbor_writer_stack_item si[2];
|
|
||||||
} _cw;
|
|
||||||
|
|
||||||
struct cbor_writer *cw = cbor_writer_init(&_cw.w, 2, s->tbuf, s->tbsize);
|
switch (res)
|
||||||
|
{
|
||||||
|
case CPR_MAJOR:
|
||||||
|
switch (ccc->msg_state)
|
||||||
|
{
|
||||||
|
case 0:
|
||||||
|
if ((ctx->type != CBOR_MAP) || (ctx->value != 1))
|
||||||
|
FAIL("Expected map of size 1, got %d-%d", ctx->type, ctx->value);
|
||||||
|
|
||||||
|
ccc->msg_state = 1;
|
||||||
|
return CPR_MORE;
|
||||||
|
|
||||||
|
case 1:
|
||||||
|
if ((ctx->type != CBOR_NEGINT) || (ctx->value != 3))
|
||||||
|
FAIL("Expected key -4, got %d-%d", ctx->type, ctx->value);
|
||||||
|
|
||||||
|
ccc->msg_state = 2;
|
||||||
|
return CPR_MORE;
|
||||||
|
|
||||||
|
case 2:
|
||||||
|
CBOR_PARSE_ONLY(ctx, POSINT, ccc->down_signal);
|
||||||
|
ccc->msg_state = 3;
|
||||||
|
return CPR_MORE;
|
||||||
|
|
||||||
|
default:
|
||||||
|
FAIL("Input overflow to state %d", ccc->msg_state);
|
||||||
|
}
|
||||||
|
bug("Overrun switch");
|
||||||
|
|
||||||
|
case CPR_STR_END:
|
||||||
|
FAIL("Unexpected string end");
|
||||||
|
|
||||||
|
case CPR_BLOCK_END:
|
||||||
|
switch (ccc->msg_state) {
|
||||||
|
case 3:
|
||||||
|
ccc->msg_state = 4;
|
||||||
|
break;
|
||||||
|
|
||||||
|
default:
|
||||||
|
FAIL("Unexpected block end in state %d", ccc->msg_state);
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
|
||||||
|
case CPR_ERROR:
|
||||||
|
case CPR_MORE:
|
||||||
|
FAIL("Invalid input");
|
||||||
|
}
|
||||||
|
|
||||||
|
CBOR_REPLY(ccc->ctl_ch, cw)
|
||||||
CBOR_PUT_MAP(cw)
|
CBOR_PUT_MAP(cw)
|
||||||
{
|
{
|
||||||
cbor_put_int(cw, -1);
|
cbor_put_int(cw, -1);
|
||||||
cbor_put_string(cw, "OK");
|
cbor_put_string(cw, "OK");
|
||||||
}
|
}
|
||||||
|
|
||||||
sk_send(s, cw->data.pos - cw->data.start);
|
cbor_done_channel(&ccc->cch);
|
||||||
|
return CPR_BLOCK_END;
|
||||||
s->data = ccc->data;
|
#undef FAIL
|
||||||
sk_resume_rx(s->loop, s);
|
|
||||||
|
|
||||||
mb_free(ccc);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
void
|
void
|
||||||
@ -973,22 +964,16 @@ hypervisor_container_shutdown(struct cbor_channel *cch, struct flock_machine_con
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
struct cbor_channel *xch = cbor_channel_new(crt->stream);
|
SKIP_BACK_DECLARE(struct container_ctl_msg, ccr, cch, cbor_channel_new(&crt->stream));
|
||||||
CBOR_REPLY(xch, cw)
|
CBOR_REPLY(&ccr->cch, cw)
|
||||||
CBOR_PUT_MAP(cw)
|
CBOR_PUT_MAP(cw)
|
||||||
{
|
{
|
||||||
cbor_put_int(cw, 0);
|
cbor_put_int(cw, 0);
|
||||||
cbor_put_null(cw);
|
cbor_put_null(cw);
|
||||||
}
|
}
|
||||||
|
|
||||||
struct container_operation_callback *ccc = mb_alloc(cch->pool, sizeof *ccc);
|
ccr->cch.parse = container_stopped;
|
||||||
*ccc = (struct container_operation_callback) {
|
ccr->ctl_ch = cch;
|
||||||
.cch = cch,
|
|
||||||
.ccf = ccf,
|
|
||||||
.cancel = container_operation_hangup,
|
|
||||||
};
|
|
||||||
callback_init(&ccc->cb, container_stopped, s->loop);
|
|
||||||
crt->ccc = ccc;
|
|
||||||
|
|
||||||
birdloop_leave(hcf.loop);
|
birdloop_leave(hcf.loop);
|
||||||
}
|
}
|
||||||
@ -1000,6 +985,7 @@ struct ccs_parser_context {
|
|||||||
u64 major_state;
|
u64 major_state;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
#undef CBOR_PARSER_ERROR
|
||||||
#define CBOR_PARSER_ERROR bug
|
#define CBOR_PARSER_ERROR bug
|
||||||
|
|
||||||
static struct ccs_parser_context ccx_, *ccx = &ccx_;
|
static struct ccs_parser_context ccx_, *ccx = &ccx_;
|
||||||
@ -1010,6 +996,8 @@ hcf_parse(byte *buf, int size)
|
|||||||
ASSERT_DIE(size > 0);
|
ASSERT_DIE(size > 0);
|
||||||
struct cbor_parser_context *ctx = ccx->ctx;
|
struct cbor_parser_context *ctx = ccx->ctx;
|
||||||
|
|
||||||
|
static struct flock_machine_container_config ccf;
|
||||||
|
|
||||||
for (int pos = 0; pos < size; pos++)
|
for (int pos = 0; pos < size; pos++)
|
||||||
{
|
{
|
||||||
switch (cbor_parse_byte(ctx, buf[pos]))
|
switch (cbor_parse_byte(ctx, buf[pos]))
|
||||||
@ -1028,7 +1016,7 @@ hcf_parse(byte *buf, int size)
|
|||||||
if (ctx->type != 5)
|
if (ctx->type != 5)
|
||||||
CBOR_PARSER_ERROR("Expected mapping, got %u", ctx->type);
|
CBOR_PARSER_ERROR("Expected mapping, got %u", ctx->type);
|
||||||
|
|
||||||
ccf = (struct container_config) {};
|
ccf = (struct flock_machine_container_config) {};
|
||||||
|
|
||||||
ccx->major_state = 1;
|
ccx->major_state = 1;
|
||||||
break;
|
break;
|
||||||
@ -1050,11 +1038,11 @@ hcf_parse(byte *buf, int size)
|
|||||||
if (ctx->tflags & CPT_VARLEN)
|
if (ctx->tflags & CPT_VARLEN)
|
||||||
CBOR_PARSER_ERROR("Variable length string not supported yet");
|
CBOR_PARSER_ERROR("Variable length string not supported yet");
|
||||||
|
|
||||||
if (ccf.hostname)
|
if (ccf.cf.name)
|
||||||
CBOR_PARSER_ERROR("Duplicate argument 0 / hostname");
|
CBOR_PARSER_ERROR("Duplicate argument 0 / hostname");
|
||||||
|
|
||||||
ASSERT_DIE(!ctx->target_buf);
|
ASSERT_DIE(!ctx->target_buf);
|
||||||
ccf.hostname = ctx->target_buf = lp_alloc(ctx->lp, ctx->value + 1);
|
ccf.cf.name = ctx->target_buf = lp_alloc(ctx->lp, ctx->value + 1);
|
||||||
ctx->target_len = ctx->value;
|
ctx->target_len = ctx->value;
|
||||||
break;
|
break;
|
||||||
|
|
||||||
@ -1109,6 +1097,8 @@ hcf_parse(byte *buf, int size)
|
|||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
|
|
||||||
|
case CPR_BLOCK_END:
|
||||||
|
bug("invalid parser state");
|
||||||
}
|
}
|
||||||
|
|
||||||
/* End of array or map */
|
/* End of array or map */
|
||||||
@ -1128,7 +1118,7 @@ hcf_parse(byte *buf, int size)
|
|||||||
return;
|
return;
|
||||||
|
|
||||||
case 1: /* the mapping ended */
|
case 1: /* the mapping ended */
|
||||||
if (!ccf.hostname)
|
if (!ccf.cf.name)
|
||||||
CBOR_PARSER_ERROR("Missing hostname");
|
CBOR_PARSER_ERROR("Missing hostname");
|
||||||
|
|
||||||
if (!ccf.workdir)
|
if (!ccf.workdir)
|
||||||
@ -1137,7 +1127,7 @@ hcf_parse(byte *buf, int size)
|
|||||||
if (!ccf.basedir)
|
if (!ccf.basedir)
|
||||||
CBOR_PARSER_ERROR("Missing basedir");
|
CBOR_PARSER_ERROR("Missing basedir");
|
||||||
|
|
||||||
container_start();
|
container_start(&ccf);
|
||||||
|
|
||||||
ccx->major_state = 0;
|
ccx->major_state = 0;
|
||||||
break;
|
break;
|
||||||
@ -1175,8 +1165,6 @@ hypervisor_container_fork(void)
|
|||||||
hcf.s = sk_new(hcf.p);
|
hcf.s = sk_new(hcf.p);
|
||||||
hcf.s->type = SK_MAGIC;
|
hcf.s->type = SK_MAGIC;
|
||||||
/* Set the hooks and fds according to the side we are at */
|
/* Set the hooks and fds according to the side we are at */
|
||||||
hcf.s->rx_hook = hypervisor_container_forker_rx;
|
|
||||||
hcf.s->err_hook = hypervisor_container_forker_err;
|
|
||||||
sk_set_tbsize(hcf.s, 16384);
|
sk_set_tbsize(hcf.s, 16384);
|
||||||
sk_set_rbsize(hcf.s, 128);
|
sk_set_rbsize(hcf.s, 128);
|
||||||
hcf.s->fd = fds[0];
|
hcf.s->fd = fds[0];
|
||||||
@ -1188,6 +1176,9 @@ hypervisor_container_fork(void)
|
|||||||
bug("Container forker parent: sk_open failed");
|
bug("Container forker parent: sk_open failed");
|
||||||
|
|
||||||
hcf.s->type = SK_UNIX_MSG;
|
hcf.s->type = SK_UNIX_MSG;
|
||||||
|
hcf.stream.parse = container_fork_request_reply;
|
||||||
|
cbor_stream_init(&hcf.stream, 3);
|
||||||
|
cbor_stream_attach(&hcf.stream, hcf.s);
|
||||||
|
|
||||||
birdloop_leave(hcf.loop);
|
birdloop_leave(hcf.loop);
|
||||||
return;
|
return;
|
||||||
|
Loading…
Reference in New Issue
Block a user