0
0
mirror of https://gitlab.nic.cz/labs/bird.git synced 2025-01-11 03:21:53 +00:00

Socket cork fixes

This commit is contained in:
Maria Matejka 2022-02-01 09:45:50 +01:00
parent 127862f626
commit 8447b24e59
6 changed files with 73 additions and 6 deletions

View File

@ -283,7 +283,13 @@ void ev_uncork(struct event_cork *ec)
birdloop_ping(el->loop); birdloop_ping(el->loop);
} }
UNLOCK_DOMAIN(cork, ec->lock); struct birdsock *sk;
WALK_LIST_FIRST2(sk, cork_node, ec->sockets)
{
// log(L_TRACE "Socket %p uncorked", sk);
rem_node(&sk->cork_node);
sk_ping(sk);
}
birdloop_ping(&main_birdloop); UNLOCK_DOMAIN(cork, ec->lock);
} }

View File

@ -38,6 +38,7 @@ struct event_cork {
DOMAIN(cork) lock; DOMAIN(cork) lock;
u32 count; u32 count;
list events; list events;
list sockets;
}; };
extern event_list global_event_list; extern event_list global_event_list;
@ -56,6 +57,7 @@ static inline void ev_init_list(event_list *el, struct birdloop *loop, const cha
static inline void ev_init_cork(struct event_cork *ec, const char *name) static inline void ev_init_cork(struct event_cork *ec, const char *name)
{ {
init_list(&ec->events); init_list(&ec->events);
init_list(&ec->sockets);
ec->lock = DOMAIN_NEW(cork, name); ec->lock = DOMAIN_NEW(cork, name);
ec->count = 0; ec->count = 0;
}; };

View File

@ -17,6 +17,7 @@
void sk_start(sock *s); void sk_start(sock *s);
void sk_stop(sock *s); void sk_stop(sock *s);
void sk_reloop(sock *s, struct birdloop *loop); void sk_reloop(sock *s, struct birdloop *loop);
void sk_ping(sock *s);
extern struct birdloop main_birdloop; extern struct birdloop main_birdloop;

View File

@ -59,6 +59,7 @@ typedef struct birdsock {
uint rbsize; uint rbsize;
int (*rx_hook)(struct birdsock *, uint size); /* NULL=receiving turned off, returns 1 to clear rx buffer */ int (*rx_hook)(struct birdsock *, uint size); /* NULL=receiving turned off, returns 1 to clear rx buffer */
struct event_cork *cork; /* Cork to temporarily stop receiving data */ struct event_cork *cork; /* Cork to temporarily stop receiving data */
node cork_node; /* Node in cork list */
byte *tbuf, *tpos; /* NULL=allocate automatically */ byte *tbuf, *tpos; /* NULL=allocate automatically */
byte *ttx; /* Internal */ byte *ttx; /* Internal */

View File

@ -218,7 +218,36 @@ sk_stop(sock *s)
} }
static inline uint sk_want_events(sock *s) static inline uint sk_want_events(sock *s)
{ return ((s->rx_hook && !ev_corked(s->cork)) ? POLLIN : 0) | ((s->ttx != s->tpos) ? POLLOUT : 0); } {
uint out = ((s->ttx != s->tpos) ? POLLOUT : 0);
if (s->rx_hook)
if (s->cork)
{
LOCK_DOMAIN(cork, s->cork->lock);
if (!enlisted(&s->cork_node))
if (s->cork->count)
{
// log(L_TRACE "Socket %p corked", s);
add_tail(&s->cork->sockets, &s->cork_node);
}
else
out |= POLLIN;
UNLOCK_DOMAIN(cork, s->cork->lock);
}
else
out |= POLLIN;
// log(L_TRACE "sk_want_events(%p) = %x", s, out);
return out;
}
void
sk_ping(sock *s)
{
s->loop->poll_changed = 1;
birdloop_ping(s->loop);
}
/* /*
FIXME: this should be called from sock code FIXME: this should be called from sock code
@ -284,7 +313,10 @@ sockets_fire(struct birdloop *loop)
/* Last fd is internal wakeup fd */ /* Last fd is internal wakeup fd */
if (pfd[poll_num].revents & POLLIN) if (pfd[poll_num].revents & POLLIN)
{
wakeup_drain(loop); wakeup_drain(loop);
loop->poll_changed = 1;
}
int i; int i;
for (i = 0; i < poll_num; pfd++, psk++, i++) for (i = 0; i < poll_num; pfd++, psk++, i++)

View File

@ -802,8 +802,13 @@ sk_free(resource *r)
sk_ssh_free(s); sk_ssh_free(s);
#endif #endif
if ((s->fd < 0) || (s->flags & SKF_THREAD)) if (s->cork)
return; {
LOCK_DOMAIN(cork, s->cork->lock);
if (enlisted(&s->cork_node))
rem_node(&s->cork_node);
UNLOCK_DOMAIN(cork, s->cork->lock);
}
if (!s->loop) if (!s->loop)
; ;
@ -820,7 +825,7 @@ sk_free(resource *r)
rem_node(&s->n); rem_node(&s->n);
} }
if (s->type != SK_SSH && s->type != SK_SSH_ACTIVE) if (s->type != SK_SSH && s->type != SK_SSH_ACTIVE && s->fd != -1)
close(s->fd); close(s->fd);
s->fd = -1; s->fd = -1;
@ -1710,6 +1715,7 @@ sk_maybe_write(sock *s)
s->err_hook(s, (errno != EPIPE) ? errno : 0); s->err_hook(s, (errno != EPIPE) ? errno : 0);
return -1; return -1;
} }
sk_ping(s);
return 0; return 0;
} }
s->ttx += e; s->ttx += e;
@ -1917,6 +1923,25 @@ sk_read(sock *s, int revents)
case SK_TCP: case SK_TCP:
case SK_UNIX: case SK_UNIX:
{ {
if (s->cork)
{
int cont = 0;
LOCK_DOMAIN(cork, s->cork->lock);
if (!enlisted(&s->cork_node))
if (s->cork->count)
{
// log(L_TRACE "Socket %p corked", s);
add_tail(&s->cork->sockets, &s->cork_node);
sk_ping(s);
}
else
cont = 1;
UNLOCK_DOMAIN(cork, s->cork->lock);
if (!cont)
return 0;
}
int c = read(s->fd, s->rpos, s->rbuf + s->rbsize - s->rpos); int c = read(s->fd, s->rpos, s->rbuf + s->rbsize - s->rpos);
if (c < 0) if (c < 0)