From df4311010242816c5d455e30265079f6ee886895 Mon Sep 17 00:00:00 2001 From: Maria Matejka Date: Mon, 7 Oct 2024 09:45:16 +0200 Subject: [PATCH] Lib: CBOR message streams and channels --- lib/cbor-parser.c | 147 ++++++++++++++++++++++++++++++++++++++++++++++ lib/cbor.h | 61 ++++++++++++++++++- 2 files changed, 205 insertions(+), 3 deletions(-) diff --git a/lib/cbor-parser.c b/lib/cbor-parser.c index caa3c728..cda3da60 100644 --- a/lib/cbor-parser.c +++ b/lib/cbor-parser.c @@ -8,6 +8,11 @@ #include "lib/birdlib.h" #include "lib/cbor.h" +#include "lib/hash.h" + +/* + * Basic parser bits + */ struct cbor_parser_context * cbor_parser_new(pool *p, uint stack_max_depth) @@ -178,3 +183,145 @@ cbor_parse_block_end(struct cbor_parser_context *ctx) return true; } + +/* + * CBOR channel multiplexer + */ + +#define CCH_EQ(a,b) (a)->id == (b)->id +#define CCH_FN(x) (x)->idhash +#define CCH_KEY(x) (x) +#define CCH_NEXT(x) (x)->next_hash + +struct cbor_channel cbor_channel_parse_error; + +#define CSTR_PARSER_ERROR(...) do { \ + log(L_ERR __VA_ARGS__); \ + sk_close(s); \ + return 0; \ +} while (0) + +#define CCH_PARSE(kind) do { \ + ASSERT_DIE(cch); \ + switch (cch->parse(cch, kind)) { \ + case CPR_MORE: continue; \ + case CPR_ERROR: sk_close(s); \ + return 0; \ + case CPR_BLOCK_END: stream->state = CSTR_FINISH; \ + break; \ + default: bug("Invalid return value from channel parser"); \ + }} while(0) + +static int +cbor_stream_rx(sock *s, uint sz) +{ + struct cbor_stream *stream = s->data; + struct cbor_parser_context *ctx = &stream->parser; + struct cbor_channel *cch = stream->cur_rx_channel; + struct cbor_channel cchloc = {}; + + for (uint pos = 0; pos < sz; pos++) + { + switch (cbor_parse_byte(ctx, s->rbuf[pos])) + { + case CPR_MORE: + continue; + + case CPR_ERROR: + log(L_ERR "CBOR parser failure: %s", ctx->error); + sk_close(s); + return 0; + + case CPR_MAJOR: + switch (stream->state) + { + case CSTR_INIT: + if (ctx->type != 4) + CSTR_PARSER_ERROR("Expected array, got %u", ctx->type); + + if (ctx->value != 3) + CSTR_PARSER_ERROR("Expected array of length exactly 3"); + + stream->state = CSTR_EXPECT_ID; + break; + + case CSTR_EXPECT_ID: + CBOR_PARSE_ONLY(ctx, POSINT, cchloc.id); + stream->state = CSTR_MSG; + + cchloc.idhash = cchloc.id * stream->hmul; + stream->cur_rx_channel = cch = HASH_FIND(stream->channels, CCH, &cchloc); + if (cch) + break; + + stream->cur_rx_channel = cch = sl_alloc(stream->slab); + *cch = cchloc; + cch->p = rp_newf(stream->p, stream->p->domain, "Channel 0x%lx", cchloc.id); + HASH_INSERT(stream->channels, CCH, cch); + break; + + case CSTR_MSG: + CCH_PARSE(CPR_MAJOR); + break; + + case CSTR_FINISH: + case CSTR_CLEANUP: + bug("Invalid stream pre-parser state"); + } + break; + + case CPR_STR_END: + ASSERT_DIE(stream->state == CSTR_MSG); + CCH_PARSE(CPR_STR_END); + break; + + case CPR_BLOCK_END: + bug("Impossible value returned from cbor_parse_byte()"); + } + + while (cbor_parse_block_end(ctx)) + { + switch (stream->state) + { + case CSTR_INIT: + case CSTR_EXPECT_ID: + case CSTR_CLEANUP: + CSTR_PARSER_ERROR("Invalid stream pre-parser state"); + + case CSTR_MSG: + CCH_PARSE(CPR_BLOCK_END); + break; + + case CSTR_FINISH: + stream->state = CSTR_CLEANUP; + break; + } + } + + if (stream->state == CSTR_CLEANUP) + { + if (ctx->partial_state != CPE_EXIT) + CSTR_PARSER_ERROR("Garbled end of message"); + + ctx->partial_state = CPE_TYPE; + stream->state = CSTR_INIT; + + if (pos + 1 < sz) + { + memmove(s->rbuf, s->rbuf + pos + 1, sz - pos - 1); + s->rpos = s->rbuf + sz - pos - 1; + } + + return 0; + } + } + + return 1; +} + +void +cbor_stream_attach(struct cbor_stream *stream, sock *sk) +{ + sk->data = stream; + sk->rx_hook = cbor_stream_rx; +} diff --git a/lib/cbor.h b/lib/cbor.h index c3d3b32a..9e5208e7 100644 --- a/lib/cbor.h +++ b/lib/cbor.h @@ -2,6 +2,8 @@ #define CBOR_H #include "nest/bird.h" +#include "lib/hash.h" +#include "lib/socket.h" /** * CBOR Commonalities @@ -167,9 +169,10 @@ void cbor_parser_reset(struct cbor_parser_context *ctx); enum cbor_parse_result { CPR_ERROR = 0, - CPR_MORE = 1, - CPR_MAJOR = 2, - CPR_STR_END = 3, + CPR_MORE, + CPR_MAJOR, + CPR_STR_END, + CPR_BLOCK_END, } cbor_parse_byte(struct cbor_parser_context *, const byte); bool cbor_parse_block_end(struct cbor_parser_context *); @@ -187,4 +190,56 @@ bool cbor_parse_block_end(struct cbor_parser_context *); #define CBOR_STORE_TEXT CBOR_STORE_BYTES +/* + * Message channels + */ + +struct cbor_channel; +typedef enum cbor_parse_result (*cbor_stream_parse_fn)(struct cbor_channel *, enum cbor_parse_result); + +struct cbor_stream { + HASH(struct cbor_channel) channels; + pool *p; + slab *slab; + cbor_stream_parse_fn parse; + struct cbor_channel *cur_rx_channel; + u64 hmul; + enum { + CSTR_INIT, + CSTR_EXPECT_ID, + CSTR_MSG, + CSTR_FINISH, + CSTR_CLEANUP, + } state; + struct cbor_parser_context parser; +}; + +/* Init and cleanup of CBOR stream */ +struct cbor_stream *cbor_stream_init(struct cbor_stream *, uint ctx_sz); +void cbor_stream_attach(struct cbor_stream *, sock *); +void cbor_stream_cleanup(struct cbor_stream *); + +struct cbor_channel { + struct cbor_channel *next_hash; + struct cbor_stream *stream; + cbor_stream_parse_fn parse; + void (*cancel)(struct cbor_channel *); + pool *p; + u64 id; + u64 idhash; +}; + +extern struct cbor_channel cbor_channel_parse_error; + +/* Locally define a new channel */ +struct cbor_channel *cbor_channel_new(struct cbor_stream *); + +/* Drop the channel */ +void cbor_done_channel(struct cbor_channel *); + +struct cbor_writer *cbor_reply_init(struct cbor_channel *); +void cbor_reply_send(struct cbor_channel *, struct cbor_writer *); +#define CBOR_REPLY(ch, cw) for (struct cbor_writer *cw = cbor_reply_init(ch); cw; cbor_reply_send(ch, cw), cw = NULL) + + #endif