Project

General

Profile

Bug #18818 » 0001-Mark-blocked-fibers-in-waitq-Mutex-Queue-etc.patch

nevans (Nicholas Evans), 07/09/2022 10:44 PM

View differences:

thread_sync.c
#define MUTEX_ALLOW_TRAP FL_USER1
/* iterates a waitq, while also pruning any dead threads. */
#define WAITQ_EACH_BEGIN(head, cur) do { \
struct sync_waiter *cur = 0, *next; \
ccan_list_for_each_safe(head, cur, next, node) { \
if (cur->th->status != THREAD_KILLED) { \
#define WAITQ_EACH_END(cur) \
} else { \
ccan_list_del_init(&cur->node); \
} \
} \
} while (0)
static void
sync_wakeup(struct ccan_list_head *head, long max)
{
......
sync_wakeup(head, LONG_MAX);
}
/*
* Dead threads are pruned. Everything else is reachable from the fiber.
*
* A blocking fiber on a running thread should also be marked by:
* GC roots: ractor -> waiter.th -> waiter.fiber -> stack -> waiter.self
*/
static void
waitq_mark(struct ccan_list_head *head)
{
WAITQ_EACH_BEGIN(head, waiter);
// Safe to mark fiber as movable, because we don't actually store the VALUE.
rb_gc_mark_movable(rb_fiberptr_self(waiter->fiber));
WAITQ_EACH_END(waiter);
}
/* same as waitq_mark, but can be used to update num_waiting cache */
static int
waitq_mark_and_count(struct ccan_list_head *head)
{
int count = 0;
WAITQ_EACH_BEGIN(head, waiter);
rb_gc_mark_movable(rb_fiberptr_self(waiter->fiber));
count++;
WAITQ_EACH_END(waiter);
return count;
}
#if defined(HAVE_WORKING_FORK)
static void rb_mutex_abandon_all(rb_mutex_t *mutexes);
static void rb_mutex_abandon_keeping_mutexes(rb_thread_t *th);
......
*
*/
#define mutex_mark ((void(*)(void*))0)
static void
mutex_mark (void *ptr)
{
rb_mutex_t *mutex = ptr;
if (mutex->fiber) {
rb_gc_mark_movable(rb_fiberptr_self(mutex->fiber));
}
waitq_mark(&mutex->waitq);
}
static size_t
rb_mutex_num_waiting(rb_mutex_t *mutex)
......
long max;
});
static int queue_fork_check(struct rb_queue *q);
static void
queue_mark(void *ptr)
{
struct rb_queue *q = ptr;
/* no need to mark threads in waitq, they are on stack */
rb_gc_mark(q->que);
rb_gc_mark(q->que); // const can't be movable
if (queue_fork_check(q) == 0) {
q->num_waiting = waitq_mark_and_count(queue_waitq(q));
}
}
static size_t
......
#define QUEUE_CLOSED FL_USER5
static int szqueue_fork_check(struct rb_szqueue *sq);
static void
szqueue_mark(void *ptr)
{
struct rb_szqueue *sq = ptr;
queue_mark(&sq->q);
if (szqueue_fork_check(sq) == 0) {
sq->num_waiting_push = waitq_mark_and_count(szqueue_pushq(sq));
}
}
static size_t
......
return obj;
}
static int
szqueue_fork_check(struct rb_szqueue *sq)
{
if (queue_fork_check(&sq->q) == 0) {
return 0;
}
ccan_list_head_init(szqueue_pushq(sq));
sq->num_waiting_push = 0;
return 1;
}
static struct rb_szqueue *
szqueue_ptr(VALUE obj)
{
struct rb_szqueue *sq;
TypedData_Get_Struct(obj, struct rb_szqueue, &szqueue_data_type, sq);
if (queue_fork_check(&sq->q)) {
ccan_list_head_init(szqueue_pushq(sq));
sq->num_waiting_push = 0;
}
szqueue_fork_check(sq);
return sq;
}
......
* }
*/
static int condvar_fork_check(struct rb_condvar *cv);
static void
condvar_mark(void *ptr)
{
struct rb_condvar *cv = ptr;
if (condvar_fork_check(cv) == 0) {
waitq_mark(&cv->waitq);
}
}
static size_t
condvar_memsize(const void *ptr)
{
......
static const rb_data_type_t cv_data_type = {
"condvar",
{0, RUBY_TYPED_DEFAULT_FREE, condvar_memsize,},
{condvar_mark, RUBY_TYPED_DEFAULT_FREE, condvar_memsize,},
0, 0, RUBY_TYPED_FREE_IMMEDIATELY|RUBY_TYPED_WB_PROTECTED
};
static int
condvar_fork_check(struct rb_condvar *cv)
{
rb_serial_t fork_gen = GET_VM()->fork_gen;
/* forked children can't reach into parent thread stacks */
if (cv->fork_gen == fork_gen) {
return 0;
}
cv->fork_gen = fork_gen;
ccan_list_head_init(&cv->waitq);
return 1;
}
static struct rb_condvar *
condvar_ptr(VALUE self)
{
struct rb_condvar *cv;
rb_serial_t fork_gen = GET_VM()->fork_gen;
TypedData_Get_Struct(self, struct rb_condvar, &cv_data_type, cv);
/* forked children can't reach into parent thread stacks */
if (cv->fork_gen != fork_gen) {
cv->fork_gen = fork_gen;
ccan_list_head_init(&cv->waitq);
}
condvar_fork_check(cv);
return cv;
}
(4-4/4)