diff --git a/lib/lockfree.c b/lib/lockfree.c index 2c2b1b56..11d98715 100644 --- a/lib/lockfree.c +++ b/lib/lockfree.c @@ -12,6 +12,18 @@ #define LOCAL_DEBUG +_Thread_local struct lfuc_unlock_queue *lfuc_unlock_queue; + +void lfuc_unlock_deferred(void *_q) +{ + struct lfuc_unlock_queue *q = _q; + for (u32 i = 0; i < q->pos; i++) + lfuc_unlock_immediately(q->block[i].c, q->block[i].el, q->block[i].ev); + + free_page(q); + lfuc_unlock_queue = NULL; +} + #if 0 #define lfjour_debug(...) log(L_TRACE __VA_ARGS__) #define lfjour_debug_detailed(...) log(L_TRACE __VA_ARGS__) diff --git a/lib/lockfree.h b/lib/lockfree.h index 7b2ecf05..833b8366 100644 --- a/lib/lockfree.h +++ b/lib/lockfree.h @@ -14,6 +14,7 @@ #include "lib/rcu.h" #include "lib/settle.h" #include "lib/tlists.h" +#include "lib/io-loop.h" #include @@ -57,7 +58,7 @@ static inline u64 lfuc_lock_revive(struct lfuc *c) } /** - * lfuc_unlock - decrease an atomic usecount + * lfuc_unlock_immediately - decrease an atomic usecount * @c: the usecount structure * @el: prune event list * @ev: prune event itself @@ -65,7 +66,7 @@ static inline u64 lfuc_lock_revive(struct lfuc *c) * If the usecount reaches zero, a prune event is run to possibly free the object. * The prune event MUST use lfuc_finished() to check the object state. */ -static inline u64 lfuc_unlock(struct lfuc *c, event_list *el, event *ev) +static inline void lfuc_unlock_immediately(struct lfuc *c, event_list *el, event *ev) { /* Unlocking is tricky. We do it lockless so at the same time, the prune * event may be running, therefore if the unlock gets us to zero, it must be @@ -112,7 +113,49 @@ static inline u64 lfuc_unlock(struct lfuc *c, event_list *el, event *ev) * RCU synchronization instead of a busy loop. */ rcu_read_unlock(); - return uc - LFUC_IN_PROGRESS - 1; +// return uc - LFUC_IN_PROGRESS - 1; +} + +extern _Thread_local struct lfuc_unlock_queue { + event e; + u32 pos; + struct lfuc_unlock_queue_block { + struct lfuc *c; + event_list *el; + event *ev; + } block[0]; +} *lfuc_unlock_queue; + +void lfuc_unlock_deferred(void *queue); + +static inline void lfuc_unlock(struct lfuc *c, event_list *el, event *ev) +{ + static u32 queue_items = 0; + if (queue_items == 0) + { + ASSERT_DIE((u64) page_size > sizeof(struct lfuc_unlock_queue) + sizeof(struct lfuc_unlock_queue_block)); + queue_items = (page_size - OFFSETOF(struct lfuc_unlock_queue, block)) + / sizeof lfuc_unlock_queue->block[0]; + } + + if (!lfuc_unlock_queue || (lfuc_unlock_queue->pos >= queue_items)) + { + lfuc_unlock_queue = alloc_page(); + *lfuc_unlock_queue = (struct lfuc_unlock_queue) { + .e = { + .hook = lfuc_unlock_deferred, + .data = lfuc_unlock_queue, + }, + }; + + ev_send_this_thread(&lfuc_unlock_queue->e); + } + + lfuc_unlock_queue->block[lfuc_unlock_queue->pos++] = (struct lfuc_unlock_queue_block) { + .c = c, + .el = el, + .ev = ev, + }; } /** diff --git a/nest/mpls.c b/nest/mpls.c index bf7caab7..5400bcba 100644 --- a/nest/mpls.c +++ b/nest/mpls.c @@ -1196,8 +1196,8 @@ inline void mpls_lock_fec(struct mpls_fec *fec) inline void mpls_unlock_fec(struct mpls_fec *fec) { - UNUSED u64 s = lfuc_unlock(&fec->uc, birdloop_event_list(fec->map->loop), fec->map->cleanup_event); - DBGL("Unlocked FEC %p %u, now %lu", fec, fec->label, s); + lfuc_unlock(&fec->uc, birdloop_event_list(fec->map->loop), fec->map->cleanup_event); + DBGL("Unlocked FEC %p %u (deferred)", fec, fec->label); } static inline void