Feature #17
closeddeadlock detection for 1.9
Description
=begin
遠藤です。
# redmine から投稿してみます。
1.9 のスレッドにデッドロック検出を実装してみました。
スレッドの状態種別に THREAD_STOPPED_FOREVER を追加。
無期限の sleep 状態と mutex 解放待ち状態を表す。rb_vm_t に変数 sleeper を追加。
THREAD_STOPPED_FOREVER 状態のスレッドの数を表す。rb_thread_t に変数 locking_mutex を追加。
このスレッドが待っている mutex を表す。rb_thread_t に st_table *keeping_mutexes を追加。
このスレッドがロックしている mutex たちを表す。mutex_t に変数 cond_notified を追加。
cond_signal されてまだ起動していないスレッドの数を表す。スレッドの終了時にロックしていた mutex をすべて
解放するようにした。rb_mutex_lock や rb_mutex_unlock で追加した変数たちを
適宜更新するようにした。唯一動いていそうなスレッドが native_cond_wait しそうな
時は、lock_func を抜けてデッドロック検査するようにした。rb_check_deadlock では vm->living_threads を列挙して、
- 例外状態のスレッド、または
- locking_mutex のロックに成功したスレッド、または
- locking_mutex が誰にもロックされていないスレッド、 のいずれもなければ main_thread にデッドロックを投げる ようにした。
どんなもんでしょう。
Index: thread_pthread.c
===================================================================
--- thread_pthread.c (revision 16676)
+++ thread_pthread.c (working copy)
@@ -418,7 +418,14 @@
}
}
- th->status = THREAD_STOPPED;
- if (tv) {
- th->status = THREAD_STOPPED;
- }
- else {
- th->status = THREAD_STOPPED_FOREVER;
- th->vm->sleeper++;
- rb_check_deadlock(th->vm);
}
thread_debug("native_sleep %ld\n", tv ? tv->tv_sec : -1);
GVL_UNLOCK_BEGIN();
@@ -455,9 +462,10 @@
th->unblock_function_arg = 0;pthread_mutex_unlock(&th->interrupt_lock);
th->status = prev_status;
}
GVL_UNLOCK_END();th->status = prev_status;
if (!tv) th->vm->sleeper--;
RUBY_VM_CHECK_INTS();thread_debug("native_sleep done\n");
Index: bootstraptest/test_thread.rb¶
--- bootstraptest/test_thread.rb (revision 16676)
+++ bootstraptest/test_thread.rb (working copy)
@@ -268,3 +268,66 @@
at_exit { Fiber.new{}.resume }
}
+assert_equal 'ok', %q{
- begin
- Thread.new { sleep }
- sleep
- :ng
- rescue Exception
- :ok
- end +} + +assert_equal 'ok', %q{
- begin
- m1, m2 = Mutex.new, Mutex.new
- Thread.new { m1.lock; sleep 1; m2.lock }
- m2.lock; sleep 1; m1.lock
- sleep
- :ng
- rescue Exception
- :ok
- end +} + +assert_equal 'ok', %q{
- begin
- m = Mutex.new
- Thread.new { m.lock }; m.lock
- :ok
- rescue Exception
- :ng
- end +} + +assert_equal 'ok', %q{
- begin
- m = Mutex.new
- Thread.new { m.lock }.join; m.lock
- :ok
- rescue Exception
- :ng
- end +} + +assert_equal 'ok', %q{
- begin
- m = Mutex.new
- Thread.new { m.lock; sleep 2 }
- sleep 1; m.lock
- :ok
- rescue Exception
- :ng
- end +} + +assert_equal 'ok', %q{
- begin
- m = Mutex.new
- Thread.new { m.lock; sleep 2; m.unlock }
- sleep 1; m.lock
- :ok
- rescue Exception
- :ng
- end +} Index: vm_core.h =================================================================== --- vm_core.h (revision 16676) +++ vm_core.h (working copy) @@ -300,6 +300,7 @@ int running; int thread_abort_on_exception; unsigned long trace_flag;
volatile int sleeper;
/* object management */
VALUE mark_object_ary;
@@ -354,6 +355,7 @@
THREAD_TO_KILL,
THREAD_RUNNABLE,
THREAD_STOPPED,THREAD_STOPPED_FOREVER,
THREAD_KILLED,
};
@@ -421,6 +423,8 @@
rb_unblock_function_t *unblock_function;
void *unblock_function_arg;
rb_thread_lock_t interrupt_lock;
- VALUE locking_mutex;
st_table *keeping_mutexes;
struct rb_vm_tag *tag;
struct rb_vm_trap_tag *trap_tag;Index: thread.c¶
--- thread.c (revision 16676)
+++ thread.c (working copy)
@@ -62,6 +62,9 @@
struct timeval rb_time_interval(VALUE);
static int rb_thread_dead(rb_thread_t *th);
+static int unlock_i(st_data_t key, st_data_t val, rb_thread_t *th);
+static void rb_check_deadlock(rb_vm_t *vm);
+
void rb_signal_exec(rb_thread_t *th, int sig);
void rb_disable_interrupt(void);
@@ -92,13 +95,13 @@
rb_thread_set_current(_th_stored); \
} while(0)
-#define BLOCKING_REGION(exec, ubf, ubfarg) do { \
+#define BLOCKING_REGION(exec, ubf, ubfarg, stopped) do { \
rb_thread_t *th = GET_THREAD(); \
int __prev_status = __th->status; \
rb_unblock_function_t *oldubf; \
void *oldubfarg; \
set_unblock_function(th, ubf, ubfarg, &oldubf, &oldubfarg); \
- __th->status = THREAD_STOPPED; \
- if (stopped) th->status = THREAD_STOPPED; \ thread_debug("enter blocking region (%p)\n", __th); \ GVL_UNLOCK_BEGIN(); {\ exec; \ @@ -107,10 +110,9 @@ thread_debug("leave blocking region (%p)\n", __th); \ remove_signal_thread_list(th); \ set_unblock_function(__th, __oldubf, __oldubfarg, 0, 0); \
- if (__th->status == THREAD_STOPPED) { \
- if (stopped && __th->status == THREAD_STOPPED) { \ __th->status = __prev_status; \ } \
- RUBY_VM_CHECK_INTS(); \ } while(0)
#if THREAD_DEBUG
@@ -197,19 +199,11 @@
set_unblock_function(rb_thread_t *th, rb_unblock_function_t *func, void *arg,
rb_unblock_function_t **oldfunc, void **oldarg)
{
- check_ints:
- RUBY_VM_CHECK_INTS(); /* check signal or so */ native_mutex_lock(&th->interrupt_lock);
- if (th->interrupt_flag) {
- native_mutex_unlock(&th->interrupt_lock);
- goto check_ints;
- }
- else {
- if (oldfunc) *oldfunc = th->unblock_function;
- if (oldarg) *oldarg = th->unblock_function_arg;
- th->unblock_function = func;
- th->unblock_function_arg = arg;
- }
- if (oldfunc) *oldfunc = th->unblock_function;
- if (oldarg) *oldarg = th->unblock_function_arg;
- th->unblock_function = func;
- th->unblock_function_arg = arg; native_mutex_unlock(&th->interrupt_lock); }
@@ -259,6 +253,11 @@
thread_debug("rb_thread_terminate_all (main thread: %p)\n", th);
st_foreach(vm->living_threads, terminate_i, (st_data_t)th);
- /* unlock all locking mutexes */
- if (th->keeping_mutexes) {
- st_foreach(th->keeping_mutexes, unlock_i, 0);
}
+
while (!rb_thread_alone()) {
PUSH_TAG();
if (EXEC_TAG() == 0) {
@@ -354,6 +353,17 @@
}
TH_POP_TAG();/* locking_mutex must be Qfalse */
if (th->locking_mutex != Qfalse) {
rb_bug("thread_start_func_2: locking_mutex must be NULL (%p:%p)", th, (void*)th->locking_mutex);
}
+/* unlock all locking mutexes */
if (th->keeping_mutexes) {
st_foreach(th->keeping_mutexes, unlock_i, 0);
}
+/* delete self from living_threads */
st_delete_wrap(th->vm->living_threads, th->self);/* wake up joinning threads */
@@ -363,7 +373,7 @@
rb_thread_interrupt(join_th);
join_th = join_th->join_list_next;
}st_delete_wrap(th->vm->living_threads, th->self);
rb_check_deadlock(th->vm);
if (!th->root_fiber) {
rb_thread_recycle_stack_release(th->stack);
@@ -775,7 +785,8 @@BLOCKING_REGION({
val = func(data1);}, ubf, data2);
}, ubf, data2, 1);
RUBY_VM_CHECK_INTS();
return val;
}
@@ -1135,6 +1146,7 @@
switch (th->status) {
case THREAD_RUNNABLE:
case THREAD_STOPPED:case THREAD_STOPPED_FOREVER: case THREAD_TO_KILL:
rb_ary_push(ary, th->self);
default:
@@ -1329,6 +1341,7 @@
case THREAD_RUNNABLE:
return "run";
case THREAD_STOPPED:case THREAD_STOPPED_FOREVER:
return "sleep";
case THREAD_TO_KILL:
return "aborting";
@@ -1428,7 +1441,7 @@if (rb_thread_dead(th))
return Qtrue;if (th->status == THREAD_STOPPED)
if (th->status == THREAD_STOPPED || th->status == THREAD_STOPPED_FOREVER)
return Qtrue;
return Qfalse;
}
@@ -1868,14 +1881,16 @@
if (except) *except = orig_except;
wait = &wait_100ms;
} while (__th->interrupt_flag == 0 && (timeout == 0 || subst(timeout, &wait_100ms)));}, 0, 0);
}, 0, 0, 1);
RUBY_VM_CHECK_INTS();
} while (result == 0 && (timeout == 0 || subst(timeout, &wait_100ms)));
}
#else
BLOCKING_REGION({
result = select(n, read, write, except, timeout);
if (result < 0) lerrno = errno;}, ubf_select, GET_THREAD());
}, ubf_select, GET_THREAD(), 1);
RUBY_VM_CHECK_INTS();
#endiferrno = lerrno;
@@ -2070,6 +2085,7 @@
st_foreach(vm->living_threads, terminate_atfork_i, (st_data_t)th);
st_clear(vm->living_threads);
st_insert(vm->living_threads, thval, (st_data_t) th->thread_id);vm->sleeper = 0;
}
static int
@@ -2096,6 +2112,7 @@
st_foreach(vm->living_threads, terminate_atfork_before_exec_i, (st_data_t)th);
st_clear(vm->living_threads);
st_insert(vm->living_threads, thval, (st_data_t) th->thread_id);
- vm->sleeper = 0; }
struct thgroup {
@@ -2312,7 +2329,7 @@
rb_thread_lock_t lock;
rb_thread_cond_t cond;
rb_thread_t volatile *th;
- volatile int cond_waiting;
- volatile int cond_waiting, cond_notified; } mutex_t;
#define GetMutexPtr(obj, tobj) \
@@ -2384,6 +2401,15 @@
return mutex->th ? Qtrue : Qfalse;
}
+static void
+mutex_locked(rb_thread_t *th, VALUE self)
+{
- if (!th->keeping_mutexes) {
- th->keeping_mutexes = st_init_numtable();
- }
- st_insert(th->keeping_mutexes, self, (st_data_t) th->thread_id);
+}
+
/*
- call-seq:
- mutex.try_lock => true or false @@ -2406,6 +2432,8 @@ if (mutex->th == 0) { mutex->th = GET_THREAD(); locked = Qtrue; +
- mutex_locked(GET_THREAD(), self); } native_mutex_unlock(&mutex->lock);
@@ -2413,17 +2441,23 @@
}
static int
-lock_func(rb_thread_t *th, mutex_t *mutex)
+lock_func(rb_thread_t *th, mutex_t *mutex, int last_thread)
{
int interrupted = Qfalse;
native_mutex_lock(&mutex->lock); while (mutex->th || (mutex->th = th, 0)) {
- if (last_thread) {
- interrupted = 2;
- break;
- } + mutex->cond_waiting++; native_cond_wait(&mutex->cond, &mutex->lock);
mutex->cond_notified--;
if (th->interrupt_flag) {
interrupted = Qtrue;
if (RUBY_VM_INTERRUPTED(th)) {
interrupted = 1; break;
}
}
@@ -2438,6 +2472,7 @@
native_mutex_lock(&mutex->lock);
if (mutex->cond_waiting > 0) {
native_cond_broadcast(&mutex->cond);mutex->cond_notified += mutex->cond_waiting;
mutex->cond_waiting = 0;
}
native_mutex_unlock(&mutex->lock);
@@ -2460,11 +2495,30 @@while (mutex->th != th) {
int interrupted;int prev_status = th->status;
int last_thread = 0;
th->locking_mutex = self;
th->status = THREAD_STOPPED_FOREVER;
th->vm->sleeper++;
if (th->vm->living_threads->num_entries == th->vm->sleeper) {
last_thread = 1;
}
+
BLOCKING_REGION({interrupted = lock_func(th, mutex);
}, lock_interrupt, mutex);
interrupted = lock_func(th, mutex, last_thread);
}, lock_interrupt, mutex, 0);
th->locking_mutex = Qfalse;
if (interrupted == 2) {
/* assert: mutex->th != th */
rb_check_deadlock(th->vm);
}
th->status = prev_status;
th->vm->sleeper--;
+
if (mutex->th == th) mutex_locked(th, self);
+
if (interrupted) {
RUBY_VM_CHECK_INTS();
}
@@ -2473,15 +2527,8 @@
return self;
}
-/*
- * call-seq:
- * mutex.unlock => self
- *
- * Releases the lock.
- * Raises +ThreadError+ if +mutex+ wasn't locked by the current thread.
- / -VALUE -rb_mutex_unlock(VALUE self) +static char * +mutex_unlock(VALUE self) { mutex_t *mutex; char *err = NULL; @@ -2501,16 +2548,45 @@ / waiting thread */ native_cond_signal(&mutex->cond); mutex->cond_waiting--;
mutex->cond_notified++;
}
}native_mutex_unlock(&mutex->lock);
return err;
+}
+
+/*- call-seq:
- mutex.unlock => self
*
- Releases the lock.
- Raises +ThreadError+ if +mutex+ wasn't locked by the current thread.
*/
+VALUE
+rb_mutex_unlock(VALUE self)
+{char *err;
+err = mutex_unlock(self);
+if (!err) st_delete_wrap(GET_THREAD()->keeping_mutexes, self);
if (err) rb_raise(rb_eThreadError, err);return self;
}
+static int
+unlock_i(st_data_t key, st_data_t val, rb_thread_t *th)
+{
- VALUE mtxval = key; +
- mutex_unlock(mtxval); +
- return ST_CONTINUE; +} + static VALUE rb_mutex_sleep_forever(VALUE time) { @@ -2579,6 +2655,51 @@ return rb_ensure(func, arg, rb_mutex_unlock, mutex); }
+static int
+check_deadlock_i(st_data_t key, st_data_t val, int *found)
+{
- VALUE thval = key;
- rb_thread_t *th;
- GetThreadPtr(thval, th); +
- if (th->status != THREAD_STOPPED_FOREVER) {
- rb_bug("check_deadlock_i: thread that is not THREAD_STOPPED_FOREVER found (%p:%d)", th, th->status);
- } +
- if (RUBY_VM_INTERRUPTED(th)) {
- *found = 1;
- }
- else if (th->locking_mutex) {
- mutex_t *mutex;
- GetMutexPtr(th->locking_mutex, mutex); +
- native_mutex_lock(&mutex->lock);
- if (mutex->th == th || (!mutex->th && mutex->cond_notified)) {
- *found = 1;
- }
- native_mutex_unlock(&mutex->lock);
- } +
- return (*found) ? ST_STOP : ST_CONTINUE; +} + +static void +rb_check_deadlock(rb_vm_t *vm) +{
- int found = 0; +
- if (vm->living_threads->num_entries != vm->sleeper) return; +
- st_foreach(vm->living_threads, check_deadlock_i, (st_data_t)&found); +
- if (!found) {
- VALUE argv[2];
- argv[0] = rb_eFatal;
- argv[1] = rb_str_new2("deadlock detected");
- rb_thread_raise(2, argv, vm->main_thread);
}
+}
+
/*- Document-class: Barrier */ Index: vm.c =================================================================== --- vm.c (revision 16676) +++ vm.c (working copy) @@ -1445,6 +1445,13 @@ RUBY_FREE_UNLESS_NULL(th->stack); }
if (th->locking_mutex != Qfalse) {
rb_bug("thread_free: locking_mutex must be NULL (%p:%d)", th, th->locking_mutex);
}
if (th->keeping_mutexes) {
st_free_table(th->keeping_mutexes);
}
+
if (th->local_storage) {
st_free_table(th->local_storage);
}
@@ -1512,6 +1519,12 @@
RUBY_MARK_UNLESS_NULL(th->root_fiber);
RUBY_MARK_UNLESS_NULL(th->stat_insn_usage);RUBY_MARK_UNLESS_NULL(th->locking_mutex);
+if (th->keeping_mutexes) {
st_foreach(th->keeping_mutexes, vm_mark_each_thread_func, 0);
}
+
rb_mark_tbl(th->local_storage);if (GET_THREAD() != th && th->machine_stack_start && th->machine_stack_end) {
--
Yusuke ENDOH mame@tsg.ne.jp
=end
Files