Project

General

Profile

Actions

Feature #17

closed

deadlock detection for 1.9

Added by mame (Yusuke Endoh) over 16 years ago. Updated about 12 years ago.

Status:
Closed
Target version:
-

Description

=begin
遠藤です。

redmine から投稿してみます。

1.9 のスレッドにデッドロック検出を実装してみました。

  • スレッドの状態種別に THREAD_STOPPED_FOREVER を追加。
    無期限の sleep 状態と mutex 解放待ち状態を表す。

  • rb_vm_t に変数 sleeper を追加。
    THREAD_STOPPED_FOREVER 状態のスレッドの数を表す。

  • rb_thread_t に変数 locking_mutex を追加。
    このスレッドが待っている mutex を表す。

  • rb_thread_t に st_table *keeping_mutexes を追加。
    このスレッドがロックしている mutex たちを表す。

  • mutex_t に変数 cond_notified を追加。
    cond_signal されてまだ起動していないスレッドの数を表す。

  • スレッドの終了時にロックしていた mutex をすべて
    解放するようにした。

  • rb_mutex_lock や rb_mutex_unlock で追加した変数たちを
    適宜更新するようにした。

  • 唯一動いていそうなスレッドが native_cond_wait しそうな
    時は、lock_func を抜けてデッドロック検査するようにした。

  • rb_check_deadlock では vm->living_threads を列挙して、

    • 例外状態のスレッド、または
    • locking_mutex のロックに成功したスレッド、または
    • locking_mutex が誰にもロックされていないスレッド、
      のいずれもなければ main_thread にデッドロックを投げる
      ようにした。

どんなもんでしょう。

Index: thread_pthread.c

--- thread_pthread.c (revision 16676)
+++ thread_pthread.c (working copy)
@@ -418,7 +418,14 @@
}
}

  • th->status = THREAD_STOPPED;
  • if (tv) {

  • th->status = THREAD_STOPPED;

  • }

  • else {

  • th->status = THREAD_STOPPED_FOREVER;

  • th->vm->sleeper++;

  • rb_check_deadlock(th->vm);

  • }

    thread_debug("native_sleep %ld\n", tv ? tv->tv_sec : -1);
    GVL_UNLOCK_BEGIN();
    @@ -455,9 +462,10 @@
    th->unblock_function_arg = 0;

pthread_mutex_unlock(&th->interrupt_lock);
  • th->status = prev_status;
    }
    GVL_UNLOCK_END();
  • th->status = prev_status;

  • if (!tv) th->vm->sleeper--;
    RUBY_VM_CHECK_INTS();

    thread_debug("native_sleep done\n");
    Index: bootstraptest/test_thread.rb
    ===================================================================
    --- bootstraptest/test_thread.rb (revision 16676)
    +++ bootstraptest/test_thread.rb (working copy)
    @@ -268,3 +268,66 @@
    at_exit { Fiber.new{}.resume }
    }

+assert_equal 'ok', %q{

  • begin
  • Thread.new { sleep }
  • sleep
  • :ng
  • rescue Exception
  • :ok
  • end
    +}

+assert_equal 'ok', %q{

  • begin
  • m1, m2 = Mutex.new, Mutex.new
  • Thread.new { m1.lock; sleep 1; m2.lock }
  • m2.lock; sleep 1; m1.lock
  • sleep
  • :ng
  • rescue Exception
  • :ok
  • end
    +}

+assert_equal 'ok', %q{

  • begin
  • m = Mutex.new
  • Thread.new { m.lock }; m.lock
  • :ok
  • rescue Exception
  • :ng
  • end
    +}

+assert_equal 'ok', %q{

  • begin
  • m = Mutex.new
  • Thread.new { m.lock }.join; m.lock
  • :ok
  • rescue Exception
  • :ng
  • end
    +}

+assert_equal 'ok', %q{

  • begin
  • m = Mutex.new
  • Thread.new { m.lock; sleep 2 }
  • sleep 1; m.lock
  • :ok
  • rescue Exception
  • :ng
  • end
    +}

+assert_equal 'ok', %q{

  • begin

  • m = Mutex.new

  • Thread.new { m.lock; sleep 2; m.unlock }

  • sleep 1; m.lock

  • :ok

  • rescue Exception

  • :ng

  • end
    +}
    Index: vm_core.h
    ===================================================================
    --- vm_core.h (revision 16676)
    +++ vm_core.h (working copy)
    @@ -300,6 +300,7 @@
    int running;
    int thread_abort_on_exception;
    unsigned long trace_flag;

  • volatile int sleeper;

    /* object management */
    VALUE mark_object_ary;
    @@ -354,6 +355,7 @@
    THREAD_TO_KILL,
    THREAD_RUNNABLE,
    THREAD_STOPPED,

  • THREAD_STOPPED_FOREVER,
    THREAD_KILLED,
    };

@@ -421,6 +423,8 @@
rb_unblock_function_t *unblock_function;
void *unblock_function_arg;
rb_thread_lock_t interrupt_lock;

  • VALUE locking_mutex;

  • st_table *keeping_mutexes;

    struct rb_vm_tag *tag;
    struct rb_vm_trap_tag *trap_tag;
    Index: thread.c
    ===================================================================
    --- thread.c (revision 16676)
    +++ thread.c (working copy)
    @@ -62,6 +62,9 @@
    struct timeval rb_time_interval(VALUE);
    static int rb_thread_dead(rb_thread_t *th);

+static int unlock_i(st_data_t key, st_data_t val, rb_thread_t *th);
+static void rb_check_deadlock(rb_vm_t *vm);
+
void rb_signal_exec(rb_thread_t *th, int sig);
void rb_disable_interrupt(void);

@@ -92,13 +95,13 @@
rb_thread_set_current(_th_stored);
} while(0)

-#define BLOCKING_REGION(exec, ubf, ubfarg) do {
+#define BLOCKING_REGION(exec, ubf, ubfarg, stopped) do {
rb_thread_t *__th = GET_THREAD();
int __prev_status = __th->status;
rb_unblock_function_t *__oldubf;
void *__oldubfarg;
set_unblock_function(__th, ubf, ubfarg, &__oldubf, &__oldubfarg); \

  • __th->status = THREAD_STOPPED; \
  • if (stopped) __th->status = THREAD_STOPPED;
    thread_debug("enter blocking region (%p)\n", __th);
    GVL_UNLOCK_BEGIN(); {
    exec;
    @@ -107,10 +110,9 @@
    thread_debug("leave blocking region (%p)\n", __th);
    remove_signal_thread_list(__th);
    set_unblock_function(__th, __oldubf, __oldubfarg, 0, 0); \
  • if (__th->status == THREAD_STOPPED) { \
  • if (stopped && __th->status == THREAD_STOPPED) {
    __th->status = __prev_status;
    } \
  • RUBY_VM_CHECK_INTS();
    } while(0)

#if THREAD_DEBUG
@@ -197,19 +199,11 @@
set_unblock_function(rb_thread_t *th, rb_unblock_function_t *func, void *arg,
rb_unblock_function_t **oldfunc, void **oldarg)
{

  • check_ints:
  • RUBY_VM_CHECK_INTS(); /* check signal or so */
    native_mutex_lock(&th->interrupt_lock);
  • if (th->interrupt_flag) {
  • native_mutex_unlock(&th->interrupt_lock);
  • goto check_ints;
  • }
  • else {
  • if (oldfunc) *oldfunc = th->unblock_function;
  • if (oldarg) *oldarg = th->unblock_function_arg;
  • th->unblock_function = func;
  • th->unblock_function_arg = arg;
  • }
  • if (oldfunc) *oldfunc = th->unblock_function;
  • if (oldarg) *oldarg = th->unblock_function_arg;
  • th->unblock_function = func;
  • th->unblock_function_arg = arg;
    native_mutex_unlock(&th->interrupt_lock);
    }

@@ -259,6 +253,11 @@
thread_debug("rb_thread_terminate_all (main thread: %p)\n", th);
st_foreach(vm->living_threads, terminate_i, (st_data_t)th);

  • /* unlock all locking mutexes */

  • if (th->keeping_mutexes) {

  • st_foreach(th->keeping_mutexes, unlock_i, 0);

  • }

  • while (!rb_thread_alone()) {
    PUSH_TAG();
    if (EXEC_TAG() == 0) {
    @@ -354,6 +353,17 @@
    }
    TH_POP_TAG();

  • /* locking_mutex must be Qfalse */

  • if (th->locking_mutex != Qfalse) {

  •  rb_bug("thread_start_func_2: locking_mutex must be NULL (%p:%p)", th, (void*)th->locking_mutex);
    
  • }

  • /* unlock all locking mutexes */

  • if (th->keeping_mutexes) {

  •  st_foreach(th->keeping_mutexes, unlock_i, 0);
    
  • }

  • /* delete self from living_threads */
    st_delete_wrap(th->vm->living_threads, th->self);

    /* wake up joinning threads */
    @@ -363,7 +373,7 @@
    rb_thread_interrupt(join_th);
    join_th = join_th->join_list_next;
    }

  • st_delete_wrap(th->vm->living_threads, th->self);
  • rb_check_deadlock(th->vm);

    if (!th->root_fiber) {
    rb_thread_recycle_stack_release(th->stack);
    @@ -775,7 +785,8 @@

    BLOCKING_REGION({
    val = func(data1);

  • }, ubf, data2);
  • }, ubf, data2, 1);

  • RUBY_VM_CHECK_INTS();

    return val;
    }
    @@ -1135,6 +1146,7 @@
    switch (th->status) {
    case THREAD_RUNNABLE:
    case THREAD_STOPPED:

  •  case THREAD_STOPPED_FOREVER:
     case THREAD_TO_KILL:
    

    rb_ary_push(ary, th->self);
    default:
    @@ -1329,6 +1341,7 @@
    case THREAD_RUNNABLE:
    return "run";
    case THREAD_STOPPED:

  •  case THREAD_STOPPED_FOREVER:
    

    return "sleep";
    case THREAD_TO_KILL:
    return "aborting";
    @@ -1428,7 +1441,7 @@

    if (rb_thread_dead(th))
    return Qtrue;

  • if (th->status == THREAD_STOPPED)
  • if (th->status == THREAD_STOPPED || th->status == THREAD_STOPPED_FOREVER)
    return Qtrue;
    return Qfalse;
    }
    @@ -1868,14 +1881,16 @@
    if (except) *except = orig_except;
    wait = &wait_100ms;
    } while (__th->interrupt_flag == 0 && (timeout == 0 || subst(timeout, &wait_100ms)));
  •  }, 0, 0);
    
  •  }, 0, 0, 1);
    
  •  RUBY_VM_CHECK_INTS();
    
    } while (result == 0 && (timeout == 0 || subst(timeout, &wait_100ms)));
    }
    #else
    BLOCKING_REGION({
    result = select(n, read, write, except, timeout);
    if (result < 0) lerrno = errno;
  • }, ubf_select, GET_THREAD());
  • }, ubf_select, GET_THREAD(), 1);

  • RUBY_VM_CHECK_INTS();
    #endif

    errno = lerrno;
    @@ -2070,6 +2085,7 @@
    st_foreach(vm->living_threads, terminate_atfork_i, (st_data_t)th);
    st_clear(vm->living_threads);
    st_insert(vm->living_threads, thval, (st_data_t) th->thread_id);

  • vm->sleeper = 0;
    }

static int
@@ -2096,6 +2112,7 @@
st_foreach(vm->living_threads, terminate_atfork_before_exec_i, (st_data_t)th);
st_clear(vm->living_threads);
st_insert(vm->living_threads, thval, (st_data_t) th->thread_id);

  • vm->sleeper = 0;
    }

struct thgroup {
@@ -2312,7 +2329,7 @@
rb_thread_lock_t lock;
rb_thread_cond_t cond;
rb_thread_t volatile *th;

  • volatile int cond_waiting;
  • volatile int cond_waiting, cond_notified;
    } mutex_t;

#define GetMutexPtr(obj, tobj)
@@ -2384,6 +2401,15 @@
return mutex->th ? Qtrue : Qfalse;
}

+static void
+mutex_locked(rb_thread_t *th, VALUE self)
+{

  • if (!th->keeping_mutexes) {
  • th->keeping_mutexes = st_init_numtable();
  • }
  • st_insert(th->keeping_mutexes, self, (st_data_t) th->thread_id);
    +}

/*

  • call-seq:
  • mutex.try_lock => true or false
    @@ -2406,6 +2432,8 @@
    if (mutex->th == 0) {
    mutex->th = GET_THREAD();
    locked = Qtrue;
  • mutex_locked(GET_THREAD(), self);
    }
    native_mutex_unlock(&mutex->lock);

@@ -2413,17 +2441,23 @@
}

static int
-lock_func(rb_thread_t *th, mutex_t *mutex)
+lock_func(rb_thread_t *th, mutex_t *mutex, int last_thread)
{
int interrupted = Qfalse;

  native_mutex_lock(&mutex->lock);
  while (mutex->th || (mutex->th = th, 0)) {
  • if (last_thread) {
  •  interrupted = 2;
    
  •  break;
    
  • }
  • mutex->cond_waiting++;
    native_cond_wait(&mutex->cond, &mutex->lock);
  • mutex->cond_notified--;
  • if (th->interrupt_flag) {
  •  interrupted = Qtrue;
    
  • if (RUBY_VM_INTERRUPTED(th)) {

  •  interrupted = 1;
     break;
    

    }
    }
    @@ -2438,6 +2472,7 @@
    native_mutex_lock(&mutex->lock);
    if (mutex->cond_waiting > 0) {
    native_cond_broadcast(&mutex->cond);

  • mutex->cond_notified += mutex->cond_waiting;
    mutex->cond_waiting = 0;
    }
    native_mutex_unlock(&mutex->lock);
    @@ -2460,11 +2495,30 @@

    while (mutex->th != th) {
    int interrupted;

  •  int prev_status = th->status;
    
  •  int last_thread = 0;
    
  •  th->locking_mutex = self;
    
  •  th->status = THREAD_STOPPED_FOREVER;
    
  •  th->vm->sleeper++;
    
  •  if (th->vm->living_threads->num_entries == th->vm->sleeper) {
    
  •  last_thread = 1;
    
  •  }
    
  •  BLOCKING_REGION({
    
  •  interrupted = lock_func(th, mutex);
    
  •  }, lock_interrupt, mutex);
    
  •  interrupted = lock_func(th, mutex, last_thread);
    
  •  }, lock_interrupt, mutex, 0);
    
  •  th->locking_mutex = Qfalse;
    
  •  if (interrupted == 2) {
    
  •  /* assert: mutex->th != th */
    
  •  rb_check_deadlock(th->vm);
    
  •  }
    
  •  th->status = prev_status;
    
  •  th->vm->sleeper--;
    
  •  if (mutex->th == th) mutex_locked(th, self);
    
  •  if (interrupted) {
     RUBY_VM_CHECK_INTS();
     }
    

@@ -2473,15 +2527,8 @@
return self;
}

-/*

    • call-seq:
    • mutex.unlock => self
    • Releases the lock.
    • Raises +ThreadError+ if +mutex+ wasn't locked by the current thread.
  • */
    -VALUE
    -rb_mutex_unlock(VALUE self)
    +static char *
    +mutex_unlock(VALUE self)
    {
    mutex_t *mutex;
    char err = NULL;
    @@ -2501,16 +2548,45 @@
    /
    waiting thread */
    native_cond_signal(&mutex->cond);
    mutex->cond_waiting--;
  •  mutex->cond_notified++;
    

    }
    }

    native_mutex_unlock(&mutex->lock);

  • return err;
    +}

+/*

    • call-seq:
    • mutex.unlock => self
    • Releases the lock.
    • Raises +ThreadError+ if +mutex+ wasn't locked by the current thread.
  • */
    +VALUE
    +rb_mutex_unlock(VALUE self)
    +{

  • char *err;

  • err = mutex_unlock(self);

  • if (!err) st_delete_wrap(GET_THREAD()->keeping_mutexes, self);
    if (err) rb_raise(rb_eThreadError, err);

    return self;
    }

+static int
+unlock_i(st_data_t key, st_data_t val, rb_thread_t *th)
+{

  • VALUE mtxval = key;
  • mutex_unlock(mtxval);
  • return ST_CONTINUE;
    +}

static VALUE
rb_mutex_sleep_forever(VALUE time)
{
@@ -2579,6 +2655,51 @@
return rb_ensure(func, arg, rb_mutex_unlock, mutex);
}

+static int
+check_deadlock_i(st_data_t key, st_data_t val, int *found)
+{

  • VALUE thval = key;
  • rb_thread_t *th;
  • GetThreadPtr(thval, th);
  • if (th->status != THREAD_STOPPED_FOREVER) {
  • rb_bug("check_deadlock_i: thread that is not THREAD_STOPPED_FOREVER found (%p:%d)", th, th->status);
  • }
  • if (RUBY_VM_INTERRUPTED(th)) {
  • *found = 1;
  • }
  • else if (th->locking_mutex) {
  • mutex_t *mutex;
  • GetMutexPtr(th->locking_mutex, mutex);
  • native_mutex_lock(&mutex->lock);
  • if (mutex->th == th || (!mutex->th && mutex->cond_notified)) {
  •  *found = 1;
    
  • }
  • native_mutex_unlock(&mutex->lock);
  • }
  • return (*found) ? ST_STOP : ST_CONTINUE;
    +}

+static void
+rb_check_deadlock(rb_vm_t *vm)
+{

  • int found = 0;
  • if (vm->living_threads->num_entries != vm->sleeper) return;
  • st_foreach(vm->living_threads, check_deadlock_i, (st_data_t)&found);
  • if (!found) {
  • VALUE argv[2];
  • argv[0] = rb_eFatal;
  • argv[1] = rb_str_new2("deadlock detected");
  • rb_thread_raise(2, argv, vm->main_thread);
  • }
    +}

/*

  • Document-class: Barrier
    */
    Index: vm.c
    ===================================================================
    --- vm.c (revision 16676)
    +++ vm.c (working copy)
    @@ -1445,6 +1445,13 @@
    RUBY_FREE_UNLESS_NULL(th->stack);
    }
  • if (th->locking_mutex != Qfalse) {

  •  rb_bug("thread_free: locking_mutex must be NULL (%p:%d)", th, th->locking_mutex);
    
  • }

  • if (th->keeping_mutexes) {

  •  st_free_table(th->keeping_mutexes);
    
  • }

  • if (th->local_storage) {
    st_free_table(th->local_storage);
    }
    @@ -1512,6 +1519,12 @@
    RUBY_MARK_UNLESS_NULL(th->root_fiber);
    RUBY_MARK_UNLESS_NULL(th->stat_insn_usage);

  • RUBY_MARK_UNLESS_NULL(th->locking_mutex);

  • if (th->keeping_mutexes) {

  •  st_foreach(th->keeping_mutexes, vm_mark_each_thread_func, 0);
    
  • }

  • rb_mark_tbl(th->local_storage);

    if (GET_THREAD() != th && th->machine_stack_start && th->machine_stack_end) {

--
Yusuke ENDOH
=end


Files

patch (13.2 KB) patch パッチ mame (Yusuke Endoh), 05/28/2008 11:31 PM
Actions #1

Updated by mame (Yusuke Endoh) over 16 years ago

  • Status changed from Open to Closed
  • Assignee changed from ko1 (Koichi Sasada) to mame (Yusuke Endoh)

=begin
integrated in r17110
=end

Actions

Also available in: Atom PDF

Like0
Like0