0
0
mirror of https://gitlab.nic.cz/labs/bird.git synced 2024-12-22 09:41:54 +00:00

Sockets are forcibly closed synchronously.

This enables birdloop merging (future commits) and also fixes possible
future bugs when trying to reopen sockets too early.
This commit is contained in:
Maria Matejka 2022-11-10 15:52:04 +01:00
parent 2fa31ea38d
commit 45832e5385
2 changed files with 61 additions and 60 deletions

View File

@ -59,6 +59,12 @@ birdloop_inside(struct birdloop *loop)
return 0; return 0;
} }
_Bool
birdloop_in_this_thread(struct birdloop *loop)
{
return pthread_equal(pthread_self(), loop->thread_id);
}
void void
birdloop_flag(struct birdloop *loop, u32 flag) birdloop_flag(struct birdloop *loop, u32 flag)
{ {
@ -160,7 +166,7 @@ pipe_kick(struct pipe *p)
while (1) { while (1) {
rv = write(p->fd[1], &v, sizeof(v)); rv = write(p->fd[1], &v, sizeof(v));
if ((rv >= 0) || (errno == EAGAIN)) if ((rv >= 0) || (errno == EAGAIN))
return; return;
if (errno != EINTR) if (errno != EINTR)
bug("wakeup write: %m"); bug("wakeup write: %m");
@ -224,10 +230,12 @@ sockets_init(struct birdloop *loop)
{ {
init_list(&loop->sock_list); init_list(&loop->sock_list);
loop->sock_num = 0; loop->sock_num = 0;
atomic_store_explicit(&loop->sock_close_requests, 0, memory_order_relaxed);
sem_init(&loop->sock_close_sem, 0, 0);
BUFFER_INIT(loop->poll_sk, loop->pool, 4);
BUFFER_INIT(loop->poll_fd, loop->pool, 4); BUFFER_INIT(loop->poll_fd, loop->pool, 4);
loop->poll_changed = 1; /* add wakeup fd */ loop->poll_changed = 1; /* add wakeup fd */
loop->poll_domain = DOMAIN_NEW(resource, "Poll");
} }
static void static void
@ -258,16 +266,25 @@ sockets_remove(struct birdloop *loop, sock *s)
rem_node(&s->n); rem_node(&s->n);
loop->sock_num--; loop->sock_num--;
if (s->index >= 0) if (birdloop_in_this_thread(loop))
{ {
loop->poll_sk.data[s->index] = NULL; if (s->index >= 0)
s->index = -1; {
loop->poll_changed = 1; s->index = -1;
loop->close_scheduled = 1; loop->poll_changed = 1;
birdloop_ping(loop); }
close(s->fd);
} }
else else
{
atomic_fetch_add_explicit(&loop->sock_close_requests, 1, memory_order_acq_rel);
wakeup_do_kick(loop);
LOCK_DOMAIN(resource, loop->poll_domain);
s->index = -1;
close(s->fd); close(s->fd);
UNLOCK_DOMAIN(resource, loop->poll_domain);
sem_post(&loop->sock_close_sem);
}
} }
void void
@ -279,25 +296,13 @@ sk_stop(sock *s)
static inline uint sk_want_events(sock *s) static inline uint sk_want_events(sock *s)
{ return (s->rx_hook ? POLLIN : 0) | ((s->ttx != s->tpos) ? POLLOUT : 0); } { return (s->rx_hook ? POLLIN : 0) | ((s->ttx != s->tpos) ? POLLOUT : 0); }
/*
FIXME: this should be called from sock code
static void
sockets_update(struct birdloop *loop, sock *s)
{
if (s->index >= 0)
loop->poll_fd.data[s->index].events = sk_want_events(s);
}
*/
static void static void
sockets_prepare(struct birdloop *loop) sockets_prepare(struct birdloop *loop)
{ {
BUFFER_SET(loop->poll_sk, loop->sock_num + 1); LOCK_DOMAIN(resource, loop->poll_domain);
BUFFER_SET(loop->poll_fd, loop->sock_num + 1); BUFFER_SET(loop->poll_fd, loop->sock_num + 1);
struct pollfd *pfd = loop->poll_fd.data; struct pollfd *pfd = loop->poll_fd.data;
sock **psk = loop->poll_sk.data;
uint i = 0; uint i = 0;
node *n; node *n;
@ -308,38 +313,21 @@ sockets_prepare(struct birdloop *loop)
ASSERT(i < loop->sock_num); ASSERT(i < loop->sock_num);
s->index = i; s->index = i;
*psk = s;
pfd->fd = s->fd; pfd->fd = s->fd;
pfd->events = sk_want_events(s); pfd->events = sk_want_events(s);
pfd->revents = 0; pfd->revents = 0;
pfd++; pfd++;
psk++;
i++; i++;
} }
ASSERT(i == loop->sock_num); ASSERT(i == loop->sock_num);
/* Add internal wakeup fd */ /* Add internal wakeup fd */
*psk = NULL;
pipe_pollin(&loop->wakeup, pfd); pipe_pollin(&loop->wakeup, pfd);
loop->poll_changed = 0; loop->poll_changed = 0;
} UNLOCK_DOMAIN(resource, loop->poll_domain);
static void
sockets_close_fds(struct birdloop *loop)
{
struct pollfd *pfd = loop->poll_fd.data;
sock **psk = loop->poll_sk.data;
int poll_num = loop->poll_fd.used - 1;
int i;
for (i = 0; i < poll_num; i++)
if (psk[i] == NULL)
close(pfd[i].fd);
loop->close_scheduled = 0;
} }
int sk_read(sock *s, int revents); int sk_read(sock *s, int revents);
@ -349,7 +337,6 @@ static void
sockets_fire(struct birdloop *loop) sockets_fire(struct birdloop *loop)
{ {
struct pollfd *pfd = loop->poll_fd.data; struct pollfd *pfd = loop->poll_fd.data;
sock **psk = loop->poll_sk.data;
int poll_num = loop->poll_fd.used - 1; int poll_num = loop->poll_fd.used - 1;
times_update(); times_update();
@ -358,27 +345,33 @@ sockets_fire(struct birdloop *loop)
if (pfd[poll_num].revents & POLLIN) if (pfd[poll_num].revents & POLLIN)
wakeup_drain(loop); wakeup_drain(loop);
int i; sock *s; node *n, *nxt;
for (i = 0; i < poll_num; pfd++, psk++, i++) WALK_LIST2_DELSAFE(s, n, nxt, loop->sock_list, n)
{ {
int e = 1; if (s->index < 0)
if (! pfd->revents)
continue; continue;
if (pfd->revents & POLLNVAL) LOCK_DOMAIN(resource, loop->poll_domain);
bug("poll: invalid fd %d", pfd->fd); int rev = loop->poll_fd.data[s->index].revents;
UNLOCK_DOMAIN(resource, loop->poll_domain);
if (pfd->revents & POLLIN) if (! rev)
while (e && *psk && (*psk)->rx_hook) continue;
e = sk_read(*psk, pfd->revents);
e = 1; if (rev & POLLNVAL)
if (pfd->revents & POLLOUT) bug("poll: invalid fd %d", s->fd);
int e = 1;
if (rev & POLLIN)
while (e && s->rx_hook)
e = sk_read(s, rev);
if (rev & POLLOUT)
{ {
loop->poll_changed = 1; loop->poll_changed = 1;
while (e && *psk) while (e = sk_write(s))
e = sk_write(*psk); ;
} }
} }
} }
@ -475,7 +468,7 @@ void
birdloop_free(struct birdloop *loop) birdloop_free(struct birdloop *loop)
{ {
ASSERT_DIE(loop->links == 0); ASSERT_DIE(loop->links == 0);
ASSERT_DIE(pthread_equal(pthread_self(), loop->thread_id)); ASSERT_DIE(birdloop_in_this_thread(loop));
rcu_birdloop_stop(&loop->rcu); rcu_birdloop_stop(&loop->rcu);
pthread_attr_destroy(&loop->thread_attr); pthread_attr_destroy(&loop->thread_attr);
@ -596,6 +589,7 @@ birdloop_main(void *arg)
birdloop_leave(loop); birdloop_leave(loop);
LOCK_DOMAIN(resource, loop->poll_domain);
try: try:
rv = poll(loop->poll_fd.data, loop->poll_fd.used, timeout); rv = poll(loop->poll_fd.data, loop->poll_fd.used, timeout);
if (rv < 0) if (rv < 0)
@ -604,12 +598,15 @@ birdloop_main(void *arg)
goto try; goto try;
bug("poll: %m"); bug("poll: %m");
} }
UNLOCK_DOMAIN(resource, loop->poll_domain);
/* Wait until remote requestors close their sockets */
int close_count = atomic_exchange_explicit(&loop->sock_close_requests, 0, memory_order_acq_rel);
while (close_count--)
sem_wait(&loop->sock_close_sem);
birdloop_enter(loop); birdloop_enter(loop);
if (loop->close_scheduled)
sockets_close_fds(loop);
if (loop->stopped) if (loop->stopped)
break; break;

View File

@ -7,6 +7,7 @@
#ifndef _BIRD_SYSDEP_UNIX_IO_LOOP_H_ #ifndef _BIRD_SYSDEP_UNIX_IO_LOOP_H_
#define _BIRD_SYSDEP_UNIX_IO_LOOP_H_ #define _BIRD_SYSDEP_UNIX_IO_LOOP_H_
#include <semaphore.h>
#include "lib/rcu.h" #include "lib/rcu.h"
struct pipe struct pipe
@ -19,6 +20,8 @@ void pipe_pollin(struct pipe *, struct pollfd *);
void pipe_drain(struct pipe *); void pipe_drain(struct pipe *);
void pipe_kick(struct pipe *); void pipe_kick(struct pipe *);
DEFINE_DOMAIN(resource);
struct birdloop struct birdloop
{ {
pool *pool; pool *pool;
@ -27,11 +30,12 @@ struct birdloop
event_list event_list; event_list event_list;
list sock_list; list sock_list;
uint sock_num; uint sock_num;
_Atomic int sock_close_requests;
sem_t sock_close_sem;
BUFFER(sock *) poll_sk; DOMAIN(resource) poll_domain;
BUFFER(struct pollfd) poll_fd; BUFFER(struct pollfd) poll_fd;
u8 poll_changed; u8 poll_changed;
u8 close_scheduled;
uint ping_pending; uint ping_pending;
_Atomic u32 ping_sent; _Atomic u32 ping_sent;