0
0
mirror of https://gitlab.nic.cz/labs/bird.git synced 2024-12-24 10:41:54 +00:00

Lib: CBOR message streams and channels

This commit is contained in:
Maria Matejka 2024-10-07 09:45:16 +02:00
parent ccfc127505
commit df43110102
2 changed files with 205 additions and 3 deletions

View File

@ -8,6 +8,11 @@
#include "lib/birdlib.h"
#include "lib/cbor.h"
#include "lib/hash.h"
/*
* Basic parser bits
*/
struct cbor_parser_context *
cbor_parser_new(pool *p, uint stack_max_depth)
@ -178,3 +183,145 @@ cbor_parse_block_end(struct cbor_parser_context *ctx)
return true;
}
/*
* CBOR channel multiplexer
*/
#define CCH_EQ(a,b) (a)->id == (b)->id
#define CCH_FN(x) (x)->idhash
#define CCH_KEY(x) (x)
#define CCH_NEXT(x) (x)->next_hash
struct cbor_channel cbor_channel_parse_error;
#define CSTR_PARSER_ERROR(...) do { \
log(L_ERR __VA_ARGS__); \
sk_close(s); \
return 0; \
} while (0)
#define CCH_PARSE(kind) do { \
ASSERT_DIE(cch); \
switch (cch->parse(cch, kind)) { \
case CPR_MORE: continue; \
case CPR_ERROR: sk_close(s); \
return 0; \
case CPR_BLOCK_END: stream->state = CSTR_FINISH; \
break; \
default: bug("Invalid return value from channel parser"); \
}} while(0)
static int
cbor_stream_rx(sock *s, uint sz)
{
struct cbor_stream *stream = s->data;
struct cbor_parser_context *ctx = &stream->parser;
struct cbor_channel *cch = stream->cur_rx_channel;
struct cbor_channel cchloc = {};
for (uint pos = 0; pos < sz; pos++)
{
switch (cbor_parse_byte(ctx, s->rbuf[pos]))
{
case CPR_MORE:
continue;
case CPR_ERROR:
log(L_ERR "CBOR parser failure: %s", ctx->error);
sk_close(s);
return 0;
case CPR_MAJOR:
switch (stream->state)
{
case CSTR_INIT:
if (ctx->type != 4)
CSTR_PARSER_ERROR("Expected array, got %u", ctx->type);
if (ctx->value != 3)
CSTR_PARSER_ERROR("Expected array of length exactly 3");
stream->state = CSTR_EXPECT_ID;
break;
case CSTR_EXPECT_ID:
CBOR_PARSE_ONLY(ctx, POSINT, cchloc.id);
stream->state = CSTR_MSG;
cchloc.idhash = cchloc.id * stream->hmul;
stream->cur_rx_channel = cch = HASH_FIND(stream->channels, CCH, &cchloc);
if (cch)
break;
stream->cur_rx_channel = cch = sl_alloc(stream->slab);
*cch = cchloc;
cch->p = rp_newf(stream->p, stream->p->domain, "Channel 0x%lx", cchloc.id);
HASH_INSERT(stream->channels, CCH, cch);
break;
case CSTR_MSG:
CCH_PARSE(CPR_MAJOR);
break;
case CSTR_FINISH:
case CSTR_CLEANUP:
bug("Invalid stream pre-parser state");
}
break;
case CPR_STR_END:
ASSERT_DIE(stream->state == CSTR_MSG);
CCH_PARSE(CPR_STR_END);
break;
case CPR_BLOCK_END:
bug("Impossible value returned from cbor_parse_byte()");
}
while (cbor_parse_block_end(ctx))
{
switch (stream->state)
{
case CSTR_INIT:
case CSTR_EXPECT_ID:
case CSTR_CLEANUP:
CSTR_PARSER_ERROR("Invalid stream pre-parser state");
case CSTR_MSG:
CCH_PARSE(CPR_BLOCK_END);
break;
case CSTR_FINISH:
stream->state = CSTR_CLEANUP;
break;
}
}
if (stream->state == CSTR_CLEANUP)
{
if (ctx->partial_state != CPE_EXIT)
CSTR_PARSER_ERROR("Garbled end of message");
ctx->partial_state = CPE_TYPE;
stream->state = CSTR_INIT;
if (pos + 1 < sz)
{
memmove(s->rbuf, s->rbuf + pos + 1, sz - pos - 1);
s->rpos = s->rbuf + sz - pos - 1;
}
return 0;
}
}
return 1;
}
void
cbor_stream_attach(struct cbor_stream *stream, sock *sk)
{
sk->data = stream;
sk->rx_hook = cbor_stream_rx;
}

View File

@ -2,6 +2,8 @@
#define CBOR_H
#include "nest/bird.h"
#include "lib/hash.h"
#include "lib/socket.h"
/**
* CBOR Commonalities
@ -167,9 +169,10 @@ void cbor_parser_reset(struct cbor_parser_context *ctx);
enum cbor_parse_result {
CPR_ERROR = 0,
CPR_MORE = 1,
CPR_MAJOR = 2,
CPR_STR_END = 3,
CPR_MORE,
CPR_MAJOR,
CPR_STR_END,
CPR_BLOCK_END,
} cbor_parse_byte(struct cbor_parser_context *, const byte);
bool cbor_parse_block_end(struct cbor_parser_context *);
@ -187,4 +190,56 @@ bool cbor_parse_block_end(struct cbor_parser_context *);
#define CBOR_STORE_TEXT CBOR_STORE_BYTES
/*
* Message channels
*/
struct cbor_channel;
typedef enum cbor_parse_result (*cbor_stream_parse_fn)(struct cbor_channel *, enum cbor_parse_result);
struct cbor_stream {
HASH(struct cbor_channel) channels;
pool *p;
slab *slab;
cbor_stream_parse_fn parse;
struct cbor_channel *cur_rx_channel;
u64 hmul;
enum {
CSTR_INIT,
CSTR_EXPECT_ID,
CSTR_MSG,
CSTR_FINISH,
CSTR_CLEANUP,
} state;
struct cbor_parser_context parser;
};
/* Init and cleanup of CBOR stream */
struct cbor_stream *cbor_stream_init(struct cbor_stream *, uint ctx_sz);
void cbor_stream_attach(struct cbor_stream *, sock *);
void cbor_stream_cleanup(struct cbor_stream *);
struct cbor_channel {
struct cbor_channel *next_hash;
struct cbor_stream *stream;
cbor_stream_parse_fn parse;
void (*cancel)(struct cbor_channel *);
pool *p;
u64 id;
u64 idhash;
};
extern struct cbor_channel cbor_channel_parse_error;
/* Locally define a new channel */
struct cbor_channel *cbor_channel_new(struct cbor_stream *);
/* Drop the channel */
void cbor_done_channel(struct cbor_channel *);
struct cbor_writer *cbor_reply_init(struct cbor_channel *);
void cbor_reply_send(struct cbor_channel *, struct cbor_writer *);
#define CBOR_REPLY(ch, cw) for (struct cbor_writer *cw = cbor_reply_init(ch); cw; cbor_reply_send(ch, cw), cw = NULL)
#endif