Index: thread_pthread.c =================================================================== --- thread_pthread.c (revision 16676) +++ thread_pthread.c (working copy) @@ -418,7 +418,14 @@ } } - th->status = THREAD_STOPPED; + if (tv) { + th->status = THREAD_STOPPED; + } + else { + th->status = THREAD_STOPPED_FOREVER; + th->vm->sleeper++; + rb_check_deadlock(th->vm); + } thread_debug("native_sleep %ld\n", tv ? tv->tv_sec : -1); GVL_UNLOCK_BEGIN(); @@ -455,9 +462,10 @@ th->unblock_function_arg = 0; pthread_mutex_unlock(&th->interrupt_lock); - th->status = prev_status; } GVL_UNLOCK_END(); + th->status = prev_status; + if (!tv) th->vm->sleeper--; RUBY_VM_CHECK_INTS(); thread_debug("native_sleep done\n"); Index: bootstraptest/test_thread.rb =================================================================== --- bootstraptest/test_thread.rb (revision 16676) +++ bootstraptest/test_thread.rb (working copy) @@ -268,3 +268,66 @@ at_exit { Fiber.new{}.resume } } +assert_equal 'ok', %q{ + begin + Thread.new { sleep } + sleep + :ng + rescue Exception + :ok + end +} + +assert_equal 'ok', %q{ + begin + m1, m2 = Mutex.new, Mutex.new + Thread.new { m1.lock; sleep 1; m2.lock } + m2.lock; sleep 1; m1.lock + sleep + :ng + rescue Exception + :ok + end +} + +assert_equal 'ok', %q{ + begin + m = Mutex.new + Thread.new { m.lock }; m.lock + :ok + rescue Exception + :ng + end +} + +assert_equal 'ok', %q{ + begin + m = Mutex.new + Thread.new { m.lock }.join; m.lock + :ok + rescue Exception + :ng + end +} + +assert_equal 'ok', %q{ + begin + m = Mutex.new + Thread.new { m.lock; sleep 2 } + sleep 1; m.lock + :ok + rescue Exception + :ng + end +} + +assert_equal 'ok', %q{ + begin + m = Mutex.new + Thread.new { m.lock; sleep 2; m.unlock } + sleep 1; m.lock + :ok + rescue Exception + :ng + end +} Index: vm_core.h =================================================================== --- vm_core.h (revision 16676) +++ vm_core.h (working copy) @@ -300,6 +300,7 @@ int running; int thread_abort_on_exception; unsigned long trace_flag; + volatile int sleeper; /* object management */ VALUE mark_object_ary; @@ -354,6 +355,7 @@ THREAD_TO_KILL, THREAD_RUNNABLE, THREAD_STOPPED, + THREAD_STOPPED_FOREVER, THREAD_KILLED, }; @@ -421,6 +423,8 @@ rb_unblock_function_t *unblock_function; void *unblock_function_arg; rb_thread_lock_t interrupt_lock; + VALUE locking_mutex; + st_table *keeping_mutexes; struct rb_vm_tag *tag; struct rb_vm_trap_tag *trap_tag; Index: thread.c =================================================================== --- thread.c (revision 16676) +++ thread.c (working copy) @@ -62,6 +62,9 @@ struct timeval rb_time_interval(VALUE); static int rb_thread_dead(rb_thread_t *th); +static int unlock_i(st_data_t key, st_data_t val, rb_thread_t *th); +static void rb_check_deadlock(rb_vm_t *vm); + void rb_signal_exec(rb_thread_t *th, int sig); void rb_disable_interrupt(void); @@ -92,13 +95,13 @@ rb_thread_set_current(_th_stored); \ } while(0) -#define BLOCKING_REGION(exec, ubf, ubfarg) do { \ +#define BLOCKING_REGION(exec, ubf, ubfarg, stopped) do { \ rb_thread_t *__th = GET_THREAD(); \ int __prev_status = __th->status; \ rb_unblock_function_t *__oldubf; \ void *__oldubfarg; \ set_unblock_function(__th, ubf, ubfarg, &__oldubf, &__oldubfarg); \ - __th->status = THREAD_STOPPED; \ + if (stopped) __th->status = THREAD_STOPPED; \ thread_debug("enter blocking region (%p)\n", __th); \ GVL_UNLOCK_BEGIN(); {\ exec; \ @@ -107,10 +110,9 @@ thread_debug("leave blocking region (%p)\n", __th); \ remove_signal_thread_list(__th); \ set_unblock_function(__th, __oldubf, __oldubfarg, 0, 0); \ - if (__th->status == THREAD_STOPPED) { \ + if (stopped && __th->status == THREAD_STOPPED) { \ __th->status = __prev_status; \ } \ - RUBY_VM_CHECK_INTS(); \ } while(0) #if THREAD_DEBUG @@ -197,19 +199,11 @@ set_unblock_function(rb_thread_t *th, rb_unblock_function_t *func, void *arg, rb_unblock_function_t **oldfunc, void **oldarg) { - check_ints: - RUBY_VM_CHECK_INTS(); /* check signal or so */ native_mutex_lock(&th->interrupt_lock); - if (th->interrupt_flag) { - native_mutex_unlock(&th->interrupt_lock); - goto check_ints; - } - else { - if (oldfunc) *oldfunc = th->unblock_function; - if (oldarg) *oldarg = th->unblock_function_arg; - th->unblock_function = func; - th->unblock_function_arg = arg; - } + if (oldfunc) *oldfunc = th->unblock_function; + if (oldarg) *oldarg = th->unblock_function_arg; + th->unblock_function = func; + th->unblock_function_arg = arg; native_mutex_unlock(&th->interrupt_lock); } @@ -259,6 +253,11 @@ thread_debug("rb_thread_terminate_all (main thread: %p)\n", th); st_foreach(vm->living_threads, terminate_i, (st_data_t)th); + /* unlock all locking mutexes */ + if (th->keeping_mutexes) { + st_foreach(th->keeping_mutexes, unlock_i, 0); + } + while (!rb_thread_alone()) { PUSH_TAG(); if (EXEC_TAG() == 0) { @@ -354,6 +353,17 @@ } TH_POP_TAG(); + /* locking_mutex must be Qfalse */ + if (th->locking_mutex != Qfalse) { + rb_bug("thread_start_func_2: locking_mutex must be NULL (%p:%p)", th, (void*)th->locking_mutex); + } + + /* unlock all locking mutexes */ + if (th->keeping_mutexes) { + st_foreach(th->keeping_mutexes, unlock_i, 0); + } + + /* delete self from living_threads */ st_delete_wrap(th->vm->living_threads, th->self); /* wake up joinning threads */ @@ -363,7 +373,7 @@ rb_thread_interrupt(join_th); join_th = join_th->join_list_next; } - st_delete_wrap(th->vm->living_threads, th->self); + rb_check_deadlock(th->vm); if (!th->root_fiber) { rb_thread_recycle_stack_release(th->stack); @@ -775,7 +785,8 @@ BLOCKING_REGION({ val = func(data1); - }, ubf, data2); + }, ubf, data2, 1); + RUBY_VM_CHECK_INTS(); return val; } @@ -1135,6 +1146,7 @@ switch (th->status) { case THREAD_RUNNABLE: case THREAD_STOPPED: + case THREAD_STOPPED_FOREVER: case THREAD_TO_KILL: rb_ary_push(ary, th->self); default: @@ -1329,6 +1341,7 @@ case THREAD_RUNNABLE: return "run"; case THREAD_STOPPED: + case THREAD_STOPPED_FOREVER: return "sleep"; case THREAD_TO_KILL: return "aborting"; @@ -1428,7 +1441,7 @@ if (rb_thread_dead(th)) return Qtrue; - if (th->status == THREAD_STOPPED) + if (th->status == THREAD_STOPPED || th->status == THREAD_STOPPED_FOREVER) return Qtrue; return Qfalse; } @@ -1868,14 +1881,16 @@ if (except) *except = orig_except; wait = &wait_100ms; } while (__th->interrupt_flag == 0 && (timeout == 0 || subst(timeout, &wait_100ms))); - }, 0, 0); + }, 0, 0, 1); + RUBY_VM_CHECK_INTS(); } while (result == 0 && (timeout == 0 || subst(timeout, &wait_100ms))); } #else BLOCKING_REGION({ result = select(n, read, write, except, timeout); if (result < 0) lerrno = errno; - }, ubf_select, GET_THREAD()); + }, ubf_select, GET_THREAD(), 1); + RUBY_VM_CHECK_INTS(); #endif errno = lerrno; @@ -2070,6 +2085,7 @@ st_foreach(vm->living_threads, terminate_atfork_i, (st_data_t)th); st_clear(vm->living_threads); st_insert(vm->living_threads, thval, (st_data_t) th->thread_id); + vm->sleeper = 0; } static int @@ -2096,6 +2112,7 @@ st_foreach(vm->living_threads, terminate_atfork_before_exec_i, (st_data_t)th); st_clear(vm->living_threads); st_insert(vm->living_threads, thval, (st_data_t) th->thread_id); + vm->sleeper = 0; } struct thgroup { @@ -2312,7 +2329,7 @@ rb_thread_lock_t lock; rb_thread_cond_t cond; rb_thread_t volatile *th; - volatile int cond_waiting; + volatile int cond_waiting, cond_notified; } mutex_t; #define GetMutexPtr(obj, tobj) \ @@ -2384,6 +2401,15 @@ return mutex->th ? Qtrue : Qfalse; } +static void +mutex_locked(rb_thread_t *th, VALUE self) +{ + if (!th->keeping_mutexes) { + th->keeping_mutexes = st_init_numtable(); + } + st_insert(th->keeping_mutexes, self, (st_data_t) th->thread_id); +} + /* * call-seq: * mutex.try_lock => true or false @@ -2406,6 +2432,8 @@ if (mutex->th == 0) { mutex->th = GET_THREAD(); locked = Qtrue; + + mutex_locked(GET_THREAD(), self); } native_mutex_unlock(&mutex->lock); @@ -2413,17 +2441,23 @@ } static int -lock_func(rb_thread_t *th, mutex_t *mutex) +lock_func(rb_thread_t *th, mutex_t *mutex, int last_thread) { int interrupted = Qfalse; native_mutex_lock(&mutex->lock); while (mutex->th || (mutex->th = th, 0)) { + if (last_thread) { + interrupted = 2; + break; + } + mutex->cond_waiting++; native_cond_wait(&mutex->cond, &mutex->lock); + mutex->cond_notified--; - if (th->interrupt_flag) { - interrupted = Qtrue; + if (RUBY_VM_INTERRUPTED(th)) { + interrupted = 1; break; } } @@ -2438,6 +2472,7 @@ native_mutex_lock(&mutex->lock); if (mutex->cond_waiting > 0) { native_cond_broadcast(&mutex->cond); + mutex->cond_notified += mutex->cond_waiting; mutex->cond_waiting = 0; } native_mutex_unlock(&mutex->lock); @@ -2460,11 +2495,30 @@ while (mutex->th != th) { int interrupted; + int prev_status = th->status; + int last_thread = 0; + th->locking_mutex = self; + th->status = THREAD_STOPPED_FOREVER; + th->vm->sleeper++; + if (th->vm->living_threads->num_entries == th->vm->sleeper) { + last_thread = 1; + } + BLOCKING_REGION({ - interrupted = lock_func(th, mutex); - }, lock_interrupt, mutex); + interrupted = lock_func(th, mutex, last_thread); + }, lock_interrupt, mutex, 0); + th->locking_mutex = Qfalse; + if (interrupted == 2) { + /* assert: mutex->th != th */ + rb_check_deadlock(th->vm); + } + th->status = prev_status; + th->vm->sleeper--; + + if (mutex->th == th) mutex_locked(th, self); + if (interrupted) { RUBY_VM_CHECK_INTS(); } @@ -2473,15 +2527,8 @@ return self; } -/* - * call-seq: - * mutex.unlock => self - * - * Releases the lock. - * Raises +ThreadError+ if +mutex+ wasn't locked by the current thread. - */ -VALUE -rb_mutex_unlock(VALUE self) +static char * +mutex_unlock(VALUE self) { mutex_t *mutex; char *err = NULL; @@ -2501,16 +2548,45 @@ /* waiting thread */ native_cond_signal(&mutex->cond); mutex->cond_waiting--; + mutex->cond_notified++; } } native_mutex_unlock(&mutex->lock); + return err; +} + +/* + * call-seq: + * mutex.unlock => self + * + * Releases the lock. + * Raises +ThreadError+ if +mutex+ wasn't locked by the current thread. + */ +VALUE +rb_mutex_unlock(VALUE self) +{ + char *err; + + err = mutex_unlock(self); + + if (!err) st_delete_wrap(GET_THREAD()->keeping_mutexes, self); if (err) rb_raise(rb_eThreadError, err); return self; } +static int +unlock_i(st_data_t key, st_data_t val, rb_thread_t *th) +{ + VALUE mtxval = key; + + mutex_unlock(mtxval); + + return ST_CONTINUE; +} + static VALUE rb_mutex_sleep_forever(VALUE time) { @@ -2579,6 +2655,51 @@ return rb_ensure(func, arg, rb_mutex_unlock, mutex); } +static int +check_deadlock_i(st_data_t key, st_data_t val, int *found) +{ + VALUE thval = key; + rb_thread_t *th; + GetThreadPtr(thval, th); + + if (th->status != THREAD_STOPPED_FOREVER) { + rb_bug("check_deadlock_i: thread that is not THREAD_STOPPED_FOREVER found (%p:%d)", th, th->status); + } + + if (RUBY_VM_INTERRUPTED(th)) { + *found = 1; + } + else if (th->locking_mutex) { + mutex_t *mutex; + GetMutexPtr(th->locking_mutex, mutex); + + native_mutex_lock(&mutex->lock); + if (mutex->th == th || (!mutex->th && mutex->cond_notified)) { + *found = 1; + } + native_mutex_unlock(&mutex->lock); + } + + return (*found) ? ST_STOP : ST_CONTINUE; +} + +static void +rb_check_deadlock(rb_vm_t *vm) +{ + int found = 0; + + if (vm->living_threads->num_entries != vm->sleeper) return; + + st_foreach(vm->living_threads, check_deadlock_i, (st_data_t)&found); + + if (!found) { + VALUE argv[2]; + argv[0] = rb_eFatal; + argv[1] = rb_str_new2("deadlock detected"); + rb_thread_raise(2, argv, vm->main_thread); + } +} + /* * Document-class: Barrier */ Index: vm.c =================================================================== --- vm.c (revision 16676) +++ vm.c (working copy) @@ -1445,6 +1445,13 @@ RUBY_FREE_UNLESS_NULL(th->stack); } + if (th->locking_mutex != Qfalse) { + rb_bug("thread_free: locking_mutex must be NULL (%p:%d)", th, th->locking_mutex); + } + if (th->keeping_mutexes) { + st_free_table(th->keeping_mutexes); + } + if (th->local_storage) { st_free_table(th->local_storage); } @@ -1512,6 +1519,12 @@ RUBY_MARK_UNLESS_NULL(th->root_fiber); RUBY_MARK_UNLESS_NULL(th->stat_insn_usage); + RUBY_MARK_UNLESS_NULL(th->locking_mutex); + + if (th->keeping_mutexes) { + st_foreach(th->keeping_mutexes, vm_mark_each_thread_func, 0); + } + rb_mark_tbl(th->local_storage); if (GET_THREAD() != th && th->machine_stack_start && th->machine_stack_end) {