Feature #14859 ยป 0001-implement-Timeout-in-VM.patch
| benchmark/bm_timeout_mt_nested.rb | ||
|---|---|---|
|
require 'timeout'
|
||
|
total = 10000
|
||
|
nthr = 8
|
||
|
nr = total / nthr
|
||
|
def nest_timeout(n, nthr)
|
||
|
n -= 1
|
||
|
if n > 0
|
||
|
Timeout.timeout(n) { nest_timeout(n, nthr) }
|
||
|
else
|
||
|
nthr.times { Thread.pass }
|
||
|
end
|
||
|
end
|
||
|
nthr.times.map do
|
||
|
Thread.new do
|
||
|
nr.times { nest_timeout(10, nthr) }
|
||
|
end
|
||
|
end.map(&:join)
|
||
| benchmark/bm_timeout_mt_same.rb | ||
|---|---|---|
|
require 'timeout'
|
||
|
total = 100000
|
||
|
nthr = 8
|
||
|
nr = total / nthr
|
||
|
nthr.times.map do
|
||
|
Thread.new do
|
||
|
nr.times { Timeout.timeout(5) { Thread.pass } }
|
||
|
end
|
||
|
end.map(&:join)
|
||
| benchmark/bm_timeout_mt_ugly.rb | ||
|---|---|---|
|
# unrealistic: this is the worst-case of insertion-sort-based timeout
|
||
|
require 'timeout'
|
||
|
total = 100000
|
||
|
nthr = 8
|
||
|
nr = total / nthr
|
||
|
nthr.times.map do
|
||
|
Thread.new do
|
||
|
i = nr
|
||
|
while (i -= 1) >= 0
|
||
|
Timeout.timeout(i + 1) { nthr.times { Thread.pass } }
|
||
|
end
|
||
|
end
|
||
|
end.map(&:join)
|
||
| benchmark/bm_timeout_nested.rb | ||
|---|---|---|
|
require 'timeout'
|
||
|
def nest_timeout(n)
|
||
|
n -= 1
|
||
|
if n > 0
|
||
|
Timeout.timeout(n) { nest_timeout(n) }
|
||
|
end
|
||
|
end
|
||
|
100000.times do
|
||
|
nest_timeout(10)
|
||
|
end
|
||
| benchmark/bm_timeout_same.rb | ||
|---|---|---|
|
require 'timeout'
|
||
|
100000.times { Timeout.timeout(5) {} }
|
||
| benchmark/bm_timeout_zero.rb | ||
|---|---|---|
|
require 'timeout'
|
||
|
100000.times { Timeout.timeout(0) {} }
|
||
| common.mk | ||
|---|---|---|
|
symbol.$(OBJEXT) \
|
||
|
thread.$(OBJEXT) \
|
||
|
time.$(OBJEXT) \
|
||
|
timeout.$(OBJEXT) \
|
||
|
transcode.$(OBJEXT) \
|
||
|
util.$(OBJEXT) \
|
||
|
variable.$(OBJEXT) \
|
||
| ... | ... | |
|
time.$(OBJEXT): {$(VPATH)}subst.h
|
||
|
time.$(OBJEXT): {$(VPATH)}time.c
|
||
|
time.$(OBJEXT): {$(VPATH)}timev.h
|
||
|
timeout.$(OBJEXT): $(CCAN_DIR)/check_type/check_type.h
|
||
|
timeout.$(OBJEXT): $(CCAN_DIR)/container_of/container_of.h
|
||
|
timeout.$(OBJEXT): $(CCAN_DIR)/list/list.h
|
||
|
timeout.$(OBJEXT): $(CCAN_DIR)/str/str.h
|
||
|
timeout.$(OBJEXT): $(hdrdir)/ruby/ruby.h
|
||
|
timeout.$(OBJEXT): $(top_srcdir)/include/ruby.h
|
||
|
timeout.$(OBJEXT): {$(VPATH)}config.h
|
||
|
timeout.$(OBJEXT): {$(VPATH)}defines.h
|
||
|
timeout.$(OBJEXT): {$(VPATH)}id.h
|
||
|
timeout.$(OBJEXT): {$(VPATH)}intern.h
|
||
|
timeout.$(OBJEXT): {$(VPATH)}internal.h
|
||
|
timeout.$(OBJEXT): {$(VPATH)}method.h
|
||
|
timeout.$(OBJEXT): {$(VPATH)}missing.h
|
||
|
timeout.$(OBJEXT): {$(VPATH)}node.h
|
||
|
timeout.$(OBJEXT): {$(VPATH)}ruby_assert.h
|
||
|
timeout.$(OBJEXT): {$(VPATH)}ruby_atomic.h
|
||
|
timeout.$(OBJEXT): {$(VPATH)}st.h
|
||
|
timeout.$(OBJEXT): {$(VPATH)}subst.h
|
||
|
timeout.$(OBJEXT): {$(VPATH)}thread_$(THREAD_MODEL).h
|
||
|
timeout.$(OBJEXT): {$(VPATH)}thread_native.h
|
||
|
timeout.$(OBJEXT): {$(VPATH)}timeout.c
|
||
|
timeout.$(OBJEXT): {$(VPATH)}vm_core.h
|
||
|
timeout.$(OBJEXT): {$(VPATH)}vm_opts.h
|
||
|
transcode.$(OBJEXT): $(hdrdir)/ruby/ruby.h
|
||
|
transcode.$(OBJEXT): $(top_srcdir)/include/ruby.h
|
||
|
transcode.$(OBJEXT): {$(VPATH)}config.h
|
||
| inits.c | ||
|---|---|---|
|
CALL(version);
|
||
|
CALL(vm_trace);
|
||
|
CALL(ast);
|
||
|
CALL(timeout);
|
||
|
}
|
||
|
#undef CALL
|
||
| internal.h | ||
|---|---|---|
|
/* time.c */
|
||
|
struct timeval rb_time_timeval(VALUE);
|
||
|
/* timeout.c */
|
||
|
typedef struct rb_vm_struct rb_vm_t;
|
||
|
typedef struct rb_execution_context_struct rb_execution_context_t;
|
||
|
struct timespec *rb_timeout_sleep_interval(rb_vm_t *, struct timespec *);
|
||
|
void rb_timeout_expire(const rb_execution_context_t *);
|
||
|
/* thread.c */
|
||
|
#define COVERAGE_INDEX_LINES 0
|
||
|
#define COVERAGE_INDEX_BRANCHES 1
|
||
| ... | ... | |
|
void rb_mutex_allow_trap(VALUE self, int val);
|
||
|
VALUE rb_uninterruptible(VALUE (*b_proc)(ANYARGS), VALUE data);
|
||
|
VALUE rb_mutex_owned_p(VALUE self);
|
||
|
void rb_getclockofday(struct timespec *);
|
||
|
/* thread_pthread.c, thread_win32.c */
|
||
|
int rb_divert_reserved_fd(int fd);
|
||
| test/test_timeout.rb | ||
|---|---|---|
|
# frozen_string_literal: false
|
||
|
require 'test/unit'
|
||
|
require 'timeout'
|
||
|
begin
|
||
|
require 'io/wait'
|
||
|
rescue LoadError
|
||
|
end
|
||
|
class TestTimeout < Test::Unit::TestCase
|
||
|
def test_queue
|
||
| ... | ... | |
|
}
|
||
|
assert(ok, bug11344)
|
||
|
end
|
||
|
def test_io
|
||
|
t = 0.001
|
||
|
IO.pipe do |r, w|
|
||
|
assert_raise(Timeout::Error) { Timeout.timeout(t) { r.read } }
|
||
|
if r.respond_to?(:wait)
|
||
|
assert_raise(Timeout::Error) { Timeout.timeout(t) { r.wait } }
|
||
|
assert_raise(Timeout::Error) { Timeout.timeout(t) { r.wait(9) } }
|
||
|
end
|
||
|
rset = [r, r.dup]
|
||
|
assert_raise(Timeout::Error) do
|
||
|
Timeout.timeout(t) { IO.select(rset, nil, nil, 9) }
|
||
|
end
|
||
|
assert_raise(Timeout::Error) { Timeout.timeout(t) { IO.select(rset) } }
|
||
|
rset.each(&:close)
|
||
|
end
|
||
|
end
|
||
|
def test_thread_join
|
||
|
th = Thread.new { sleep }
|
||
|
assert_raise(Timeout::Error) { Timeout.timeout(0.001) { th.join } }
|
||
|
ensure
|
||
|
th.kill
|
||
|
th.join
|
||
|
end
|
||
|
def test_mutex_lock
|
||
|
m = Mutex.new
|
||
|
m.lock
|
||
|
th = Thread.new { m.synchronize { :ok} }
|
||
|
assert_raise(Timeout::Error) { Timeout.timeout(0.001) { th.join } }
|
||
|
m.unlock
|
||
|
assert_equal :ok, th.value
|
||
|
end
|
||
|
def test_yield_and_return_value
|
||
|
r = Timeout.timeout(0) do |sec|
|
||
|
assert_equal 0, sec
|
||
|
sec
|
||
|
end
|
||
|
assert_equal 0, r
|
||
|
t = 123
|
||
|
r = Timeout.timeout(t) do |sec|
|
||
|
assert_same t, sec
|
||
|
sec
|
||
|
end
|
||
|
assert_same r, t
|
||
|
r = Timeout.timeout(t, RuntimeError) do |sec|
|
||
|
assert_same t, sec
|
||
|
sec
|
||
|
end
|
||
|
assert_same r, t
|
||
|
end
|
||
|
def test_timeout_thread
|
||
|
in_thread { sleep }
|
||
|
end
|
||
|
def test_timeout_loop
|
||
|
in_thread { loop {} }
|
||
|
end
|
||
|
def test_timeout_io_read
|
||
|
IO.pipe { |r, w| in_thread { r.read } }
|
||
|
end
|
||
|
def test_timeout_mutex
|
||
|
m = Mutex.new
|
||
|
m.synchronize { in_thread { m.synchronize {} } }
|
||
|
in_thread { m.synchronize { m.sleep } }
|
||
|
end
|
||
|
def in_thread(&blk)
|
||
|
th = Thread.new do
|
||
|
begin
|
||
|
Timeout.timeout(0.001) { blk.call }
|
||
|
rescue => e
|
||
|
e
|
||
|
end
|
||
|
end
|
||
|
assert_same th, th.join(0.3)
|
||
|
assert_kind_of Timeout::Error, th.value
|
||
|
end
|
||
|
end
|
||
| thread.c | ||
|---|---|---|
|
}
|
||
|
static void
|
||
|
rb_threadptr_interrupt_common(rb_thread_t *th, int trap)
|
||
|
rb_threadptr_interrupt_set(rb_thread_t *th, rb_atomic_t flag)
|
||
|
{
|
||
|
rb_native_mutex_lock(&th->interrupt_lock);
|
||
|
if (trap) {
|
||
|
RUBY_VM_SET_TRAP_INTERRUPT(th->ec);
|
||
|
}
|
||
|
else {
|
||
|
RUBY_VM_SET_INTERRUPT(th->ec);
|
||
|
}
|
||
|
ATOMIC_OR(th->ec->interrupt_flag, flag);
|
||
|
if (th->unblock.func != NULL) {
|
||
|
(th->unblock.func)(th->unblock.arg);
|
||
|
}
|
||
|
else {
|
||
|
/* none */
|
||
|
(th->unblock.func)(th->unblock.arg);
|
||
|
}
|
||
|
rb_native_mutex_unlock(&th->interrupt_lock);
|
||
|
}
|
||
| ... | ... | |
|
void
|
||
|
rb_threadptr_interrupt(rb_thread_t *th)
|
||
|
{
|
||
|
rb_threadptr_interrupt_common(th, 0);
|
||
|
rb_threadptr_interrupt_set(th, PENDING_INTERRUPT_MASK);
|
||
|
}
|
||
|
static void
|
||
|
threadptr_trap_interrupt(rb_thread_t *th)
|
||
|
{
|
||
|
rb_threadptr_interrupt_common(th, 1);
|
||
|
rb_threadptr_interrupt_set(th, TRAP_INTERRUPT_MASK);
|
||
|
}
|
||
|
static void
|
||
| ... | ... | |
|
rb_timespec_now(ts);
|
||
|
}
|
||
|
void
|
||
|
rb_getclockofday(struct timespec *ts)
|
||
|
{
|
||
|
getclockofday(ts);
|
||
|
}
|
||
|
static void
|
||
|
timespec_add(struct timespec *dst, const struct timespec *ts)
|
||
|
{
|
||
| ... | ... | |
|
int timer_interrupt;
|
||
|
int pending_interrupt;
|
||
|
int trap_interrupt;
|
||
|
int timeout_interrupt;
|
||
|
timer_interrupt = interrupt & TIMER_INTERRUPT_MASK;
|
||
|
pending_interrupt = interrupt & PENDING_INTERRUPT_MASK;
|
||
|
postponed_job_interrupt = interrupt & POSTPONED_JOB_INTERRUPT_MASK;
|
||
|
trap_interrupt = interrupt & TRAP_INTERRUPT_MASK;
|
||
|
timeout_interrupt = interrupt & TIMEOUT_INTERRUPT_MASK;
|
||
|
if (postponed_job_interrupt) {
|
||
|
rb_postponed_job_flush(th->vm);
|
||
| ... | ... | |
|
}
|
||
|
}
|
||
|
if (timeout_interrupt) {
|
||
|
rb_timeout_expire(th->ec);
|
||
|
}
|
||
|
if (timer_interrupt) {
|
||
|
uint32_t limits_us = TIME_QUANTUM_USEC;
|
||
| ... | ... | |
|
}
|
||
|
rb_native_mutex_unlock(&vm->thread_destruct_lock);
|
||
|
if (vm->timer_thread_timeout >= 0) {
|
||
|
rb_threadptr_interrupt_set(vm->main_thread, TIMEOUT_INTERRUPT_MASK);
|
||
|
}
|
||
|
/* check signal */
|
||
|
rb_threadptr_check_signal(vm->main_thread);
|
||
|
vm->timer_thread_timeout = ATOMIC_EXCHANGE(vm->next_timeout, -1);
|
||
|
#if 0
|
||
|
/* prove profiler */
|
||
|
if (vm->prove_profile.enable) {
|
||
| ... | ... | |
|
if (vm_living_thread_num(vm) > vm->sleeper) return;
|
||
|
if (vm_living_thread_num(vm) < vm->sleeper) rb_bug("sleeper must not be more than vm_living_thread_num(vm)");
|
||
|
if (patrol_thread && patrol_thread != GET_THREAD()) return;
|
||
|
if (rb_timeout_sleep_interval(vm, 0)) return;
|
||
|
list_for_each(&vm->living_threads, th, vmlt_node) {
|
||
|
if (th->status != THREAD_STOPPED_FOREVER || RUBY_VM_INTERRUPTED(th->ec)) {
|
||
| thread_pthread.c | ||
|---|---|---|
|
void rb_native_cond_wait(rb_nativethread_cond_t *cond, rb_nativethread_lock_t *mutex);
|
||
|
void rb_native_cond_initialize(rb_nativethread_cond_t *cond);
|
||
|
void rb_native_cond_destroy(rb_nativethread_cond_t *cond);
|
||
|
static void rb_thread_wakeup_timer_thread_low(void);
|
||
|
void rb_thread_wakeup_timer_thread_low(void);
|
||
|
static struct {
|
||
|
pthread_t id;
|
||
|
int created;
|
||
| ... | ... | |
|
}
|
||
|
}
|
||
|
static void
|
||
|
void
|
||
|
rb_thread_wakeup_timer_thread_low(void)
|
||
|
{
|
||
|
if (timer_thread_pipe.owner_process == getpid()) {
|
||
| ... | ... | |
|
* @pre the calling context is in the timer thread.
|
||
|
*/
|
||
|
static inline void
|
||
|
timer_thread_sleep(rb_global_vm_lock_t* gvl)
|
||
|
timer_thread_sleep(rb_vm_t *vm)
|
||
|
{
|
||
|
int result;
|
||
|
int need_polling;
|
||
| ... | ... | |
|
need_polling = !ubf_threads_empty();
|
||
|
if (gvl->waiting > 0 || need_polling) {
|
||
|
if (vm->gvl.waiting > 0 || need_polling) {
|
||
|
/* polling (TIME_QUANTUM_USEC usec) */
|
||
|
result = poll(pollfds, 1, TIME_QUANTUM_USEC/1000);
|
||
|
}
|
||
|
else {
|
||
|
/* wait (infinite) */
|
||
|
result = poll(pollfds, numberof(pollfds), -1);
|
||
|
/* wait (infinite, or whatever timeout.c sets) */
|
||
|
result = poll(pollfds, numberof(pollfds), vm->timer_thread_timeout);
|
||
|
}
|
||
|
if (result == 0) {
|
||
| ... | ... | |
|
#else /* USE_SLEEPY_TIMER_THREAD */
|
||
|
# define PER_NANO 1000000000
|
||
|
void rb_thread_wakeup_timer_thread(void) {}
|
||
|
static void rb_thread_wakeup_timer_thread_low(void) {}
|
||
|
void rb_thread_wakeup_timer_thread_low(void) {}
|
||
|
static rb_nativethread_lock_t timer_thread_lock;
|
||
|
static rb_nativethread_cond_t timer_thread_cond;
|
||
|
static inline void
|
||
|
timer_thread_sleep(rb_global_vm_lock_t* unused)
|
||
|
timer_thread_sleep(rb_vm_t *unused)
|
||
|
{
|
||
|
struct timespec ts;
|
||
|
ts.tv_sec = 0;
|
||
| ... | ... | |
|
static void *
|
||
|
thread_timer(void *p)
|
||
|
{
|
||
|
rb_global_vm_lock_t *gvl = (rb_global_vm_lock_t *)p;
|
||
|
rb_vm_t *vm = p;
|
||
|
if (TT_DEBUG) WRITE_CONST(2, "start timer thread\n");
|
||
| ... | ... | |
|
if (TT_DEBUG) WRITE_CONST(2, "tick\n");
|
||
|
/* wait */
|
||
|
timer_thread_sleep(gvl);
|
||
|
timer_thread_sleep(vm);
|
||
|
}
|
||
|
#if USE_SLEEPY_TIMER_THREAD
|
||
|
CLOSE_INVALIDATE(normal[0]);
|
||
| ... | ... | |
|
if (timer_thread.created) {
|
||
|
rb_bug("rb_thread_create_timer_thread: Timer thread was already created\n");
|
||
|
}
|
||
|
err = pthread_create(&timer_thread.id, &attr, thread_timer, &vm->gvl);
|
||
|
err = pthread_create(&timer_thread.id, &attr, thread_timer, vm);
|
||
|
pthread_attr_destroy(&attr);
|
||
|
if (err == EINVAL) {
|
||
| ... | ... | |
|
* default stack size is enough for them:
|
||
|
*/
|
||
|
stack_size = 0;
|
||
|
err = pthread_create(&timer_thread.id, NULL, thread_timer, &vm->gvl);
|
||
|
err = pthread_create(&timer_thread.id, NULL, thread_timer, vm);
|
||
|
}
|
||
|
if (err != 0) {
|
||
|
rb_warn("pthread_create failed for timer: %s, scheduling broken",
|
||
| thread_win32.c | ||
|---|---|---|
|
/* do nothing */
|
||
|
}
|
||
|
void
|
||
|
rb_thread_wakeup_timer_thread_low(void)
|
||
|
{
|
||
|
/* do nothing */
|
||
|
}
|
||
|
static void
|
||
|
rb_thread_create_timer_thread(void)
|
||
|
{
|
||
| timeout.c | ||
|---|---|---|
|
#include "internal.h"
|
||
|
#include "vm_core.h"
|
||
|
/* match ccan/timer/timer.h, which we may support in the future: */
|
||
|
struct timer {
|
||
|
struct list_node list;
|
||
|
uint64_t time; /* usec */
|
||
|
};
|
||
|
struct timeout {
|
||
|
rb_execution_context_t *ec;
|
||
|
VALUE sec;
|
||
|
VALUE klass;
|
||
|
VALUE message;
|
||
|
struct timer t;
|
||
|
};
|
||
|
static VALUE eTimeoutError, mTimeout, eUncaughtThrow;
|
||
|
static ID id_thread;
|
||
|
static uint64_t
|
||
|
timespec2usec(const struct timespec *ts)
|
||
|
{
|
||
|
return (uint64_t)ts->tv_sec * 1000000 + (uint64_t)ts->tv_nsec / 1000;
|
||
|
}
|
||
|
static void
|
||
|
timers_ll_add(struct list_head *timers, struct timer *t,
|
||
|
uint64_t rel_usec, uint64_t now_usec)
|
||
|
{
|
||
|
struct timer *i = 0;
|
||
|
t->time = rel_usec + now_usec;
|
||
|
/*
|
||
|
* search backwards: assume typical projects have multiple objects
|
||
|
* sharing the same timeout values, so new timers will expire later
|
||
|
* than existing timers
|
||
|
*/
|
||
|
list_for_each_rev(timers, i, list) {
|
||
|
if (t->time >= i->time) {
|
||
|
list_add_after(timers, &i->list, &t->list);
|
||
|
return;
|
||
|
}
|
||
|
}
|
||
|
list_add(timers, &t->list);
|
||
|
}
|
||
|
static struct timer *
|
||
|
timers_ll_expire(struct list_head *timers, uint64_t now_usec)
|
||
|
{
|
||
|
struct timer *t = list_top(timers, struct timer, list);
|
||
|
if (t && now_usec >= t->time) {
|
||
|
list_del_init(&t->list);
|
||
|
return t;
|
||
|
}
|
||
|
return 0;
|
||
|
}
|
||
|
static struct timer *
|
||
|
timers_ll_earliest(const struct list_head *timers)
|
||
|
{
|
||
|
return list_top(timers, struct timer, list);
|
||
|
}
|
||
|
static VALUE
|
||
|
timeout_yield(VALUE tag, VALUE sec)
|
||
|
{
|
||
|
return rb_yield(sec);
|
||
|
}
|
||
|
static VALUE
|
||
|
timeout_run(VALUE x)
|
||
|
{
|
||
|
struct timeout *a = (struct timeout *)x;
|
||
|
if (RTEST(a->klass)) {
|
||
|
return rb_yield(a->sec);
|
||
|
}
|
||
|
/* for Timeout::Error#exception to throw */
|
||
|
a->message = rb_exc_new_str(eTimeoutError, a->message);
|
||
|
/* hide for rb_gc_force_recycle */
|
||
|
RBASIC_CLEAR_CLASS(a->message);
|
||
|
x = rb_catch_obj(a->message, timeout_yield, a->sec);
|
||
|
if (x == a->message) {
|
||
|
rb_attr_delete(x, id_thread);
|
||
|
rb_exc_raise(x);
|
||
|
}
|
||
|
/* common case, no timeout, so exc is still hidden and safe to recycle */
|
||
|
VM_ASSERT(!RBASIC_CLASS(a->message) && RB_TYPE_P(a->message, T_OBJECT));
|
||
|
if (FL_TEST(a->message, FL_EXIVAR)) {
|
||
|
rb_free_generic_ivar(a->message);
|
||
|
FL_UNSET(a->message, FL_EXIVAR);
|
||
|
}
|
||
|
rb_gc_force_recycle(a->message);
|
||
|
return x;
|
||
|
}
|
||
|
static VALUE
|
||
|
timeout_ensure(VALUE x)
|
||
|
{
|
||
|
struct timeout *a = (struct timeout *)x;
|
||
|
list_del_init(&a->t.list); /* inlined timer_del */
|
||
|
return Qfalse;
|
||
|
}
|
||
|
static struct timeout *
|
||
|
rb_timers_expire_one(rb_vm_t *vm, uint64_t now_usec)
|
||
|
{
|
||
|
struct timer *t = timers_ll_expire(&vm->timers, now_usec);
|
||
|
return t ? container_of(t, struct timeout, t) : 0;
|
||
|
}
|
||
|
static void
|
||
|
arm_timer(rb_vm_t *vm, uint64_t rel_usec)
|
||
|
{
|
||
|
int msec = rel_usec / 1000;
|
||
|
ATOMIC_EXCHANGE(vm->next_timeout, (rb_atomic_t)msec);
|
||
|
/* _low makes a difference in benchmark/bm_timeout_mt_nested.rb */
|
||
|
rb_thread_wakeup_timer_thread_low();
|
||
|
}
|
||
|
struct expire_args {
|
||
|
uint64_t now_usec;
|
||
|
rb_thread_t *current_th;
|
||
|
enum rb_thread_status prev_status;
|
||
|
};
|
||
|
static VALUE
|
||
|
do_expire(VALUE x)
|
||
|
{
|
||
|
struct expire_args *ea = (struct expire_args *)x;
|
||
|
rb_vm_t *vm = ea->current_th->vm;
|
||
|
struct timeout *a;
|
||
|
while ((a = rb_timers_expire_one(vm, ea->now_usec))) {
|
||
|
rb_thread_t *target_th = rb_ec_thread_ptr(a->ec);
|
||
|
VALUE exc;
|
||
|
if (RTEST(a->klass)) {
|
||
|
exc = rb_exc_new_str(a->klass, a->message);
|
||
|
}
|
||
|
else { /* default, pre-made Timeout::Error */
|
||
|
exc = a->message;
|
||
|
RBASIC_SET_CLASS_RAW(exc, eTimeoutError); /* reveal */
|
||
|
/* for Timeout::Error#exception to call `throw' */
|
||
|
rb_ivar_set(exc, id_thread, target_th->self);
|
||
|
}
|
||
|
if (ea->current_th == target_th) {
|
||
|
rb_threadptr_pending_interrupt_enque(target_th, exc);
|
||
|
rb_threadptr_interrupt(target_th);
|
||
|
}
|
||
|
else {
|
||
|
rb_funcall(target_th->self, rb_intern("raise"), 1, exc);
|
||
|
}
|
||
|
}
|
||
|
return Qfalse;
|
||
|
}
|
||
|
static VALUE
|
||
|
expire_ensure(VALUE p)
|
||
|
{
|
||
|
struct expire_args *ea = (struct expire_args *)p;
|
||
|
rb_vm_t *vm = ea->current_th->vm;
|
||
|
struct timer *t = timers_ll_earliest(&vm->timers);
|
||
|
if (t) {
|
||
|
arm_timer(vm, t->time > ea->now_usec ? t->time - ea->now_usec : 0);
|
||
|
}
|
||
|
ea->current_th->status = ea->prev_status;
|
||
|
return Qfalse;
|
||
|
}
|
||
|
void
|
||
|
rb_timeout_expire(const rb_execution_context_t *ec)
|
||
|
{
|
||
|
struct expire_args ea;
|
||
|
struct timespec ts;
|
||
|
rb_getclockofday(&ts);
|
||
|
ea.now_usec = timespec2usec(&ts);
|
||
|
ea.current_th = rb_ec_thread_ptr(ec);
|
||
|
ea.prev_status = ea.current_th->status;
|
||
|
ea.current_th->status = THREAD_RUNNABLE;
|
||
|
rb_ensure(do_expire, (VALUE)&ea, expire_ensure, (VALUE)&ea);
|
||
|
}
|
||
|
struct timespec *
|
||
|
rb_timeout_sleep_interval(rb_vm_t *vm, struct timespec *ts)
|
||
|
{
|
||
|
struct timer *t = timers_ll_earliest(&vm->timers);
|
||
|
if (t && !ts) {
|
||
|
return (struct timespec *)-1;
|
||
|
}
|
||
|
if (t) {
|
||
|
uint64_t now_usec;
|
||
|
rb_getclockofday(ts);
|
||
|
now_usec = timespec2usec(ts);
|
||
|
if (t->time >= now_usec) {
|
||
|
uint64_t rel_usec = t->time - now_usec;
|
||
|
ts->tv_sec = rel_usec / 1000000;
|
||
|
ts->tv_nsec = rel_usec % 1000000 * 1000;
|
||
|
}
|
||
|
else {
|
||
|
ts->tv_sec = 0;
|
||
|
ts->tv_nsec = 0;
|
||
|
}
|
||
|
return ts;
|
||
|
}
|
||
|
return 0;
|
||
|
}
|
||
|
static void
|
||
|
timeout_add(struct timeout *a)
|
||
|
{
|
||
|
rb_vm_t *vm = rb_ec_vm_ptr(a->ec);
|
||
|
struct timer *cur = timers_ll_earliest(&vm->timers);
|
||
|
uint64_t now_usec, rel_usec;
|
||
|
struct timeval tv = rb_time_interval(a->sec);
|
||
|
struct timespec ts;
|
||
|
ts.tv_sec = tv.tv_sec;
|
||
|
ts.tv_nsec = tv.tv_usec * 1000;
|
||
|
rel_usec = timespec2usec(&ts);
|
||
|
rb_getclockofday(&ts);
|
||
|
now_usec = timespec2usec(&ts);
|
||
|
timers_ll_add(&vm->timers, &a->t, rel_usec, now_usec);
|
||
|
if (!cur || timers_ll_earliest(&vm->timers) == &a->t) {
|
||
|
arm_timer(vm, rel_usec);
|
||
|
}
|
||
|
}
|
||
|
static VALUE
|
||
|
s_timeout(int argc, VALUE *argv, VALUE mod)
|
||
|
{
|
||
|
struct timeout a;
|
||
|
rb_scan_args(argc, argv, "12", &a.sec, &a.klass, &a.message);
|
||
|
if (NIL_P(a.sec) || rb_equal(a.sec, INT2FIX(0))) {
|
||
|
return rb_yield(a.sec);
|
||
|
}
|
||
|
if (!RTEST(a.message)) {
|
||
|
a.message = rb_fstring_cstr("execution expired");
|
||
|
}
|
||
|
a.ec = GET_EC();
|
||
|
timeout_add(&a);
|
||
|
return rb_ensure(timeout_run, (VALUE)&a, timeout_ensure, (VALUE)&a);
|
||
|
}
|
||
|
static VALUE
|
||
|
begin_throw(VALUE self)
|
||
|
{
|
||
|
rb_throw_obj(self, self);
|
||
|
return self;
|
||
|
}
|
||
|
static VALUE
|
||
|
rescue_throw(VALUE ignore, VALUE err)
|
||
|
{
|
||
|
return Qnil;
|
||
|
}
|
||
|
/*
|
||
|
* We don't want to generate a backtrace like the version
|
||
|
* in timeout.rb does. We also want to raise the same
|
||
|
* exception object so s_timeout (in core) can match
|
||
|
* against it without relying on an extra proc for:
|
||
|
*
|
||
|
* proc { |exception| return yield(sec) }
|
||
|
*/
|
||
|
static VALUE
|
||
|
timeout_error_exception(int argc, VALUE *argv, VALUE self)
|
||
|
{
|
||
|
if (rb_attr_get(self, id_thread) == rb_thread_current()) {
|
||
|
rb_rescue2(begin_throw, self, rescue_throw, Qfalse, eUncaughtThrow, 0);
|
||
|
}
|
||
|
return self;
|
||
|
}
|
||
|
static VALUE
|
||
|
timeout_compat(int argc, VALUE *argv, VALUE mod)
|
||
|
{
|
||
|
VALUE w[2];
|
||
|
w[0] = rb_funcall(mod, rb_intern("__method__"), 0);
|
||
|
w[0] = rb_sprintf("Object#%"PRIsVALUE
|
||
|
" is deprecated, use Timeout.timeout instead.", w[0]);
|
||
|
w[1] = rb_hash_new();
|
||
|
rb_hash_aset(w[1], ID2SYM(rb_intern("uplevel")), INT2FIX(1));
|
||
|
rb_funcallv(mod, rb_intern("warn"), 2, w);
|
||
|
return s_timeout(argc, argv, mTimeout);
|
||
|
}
|
||
|
void
|
||
|
Init_timeout(void)
|
||
|
{
|
||
|
#undef rb_intern
|
||
|
mTimeout = rb_define_module("Timeout");
|
||
|
eTimeoutError = rb_define_class_under(mTimeout, "Error", rb_eRuntimeError);
|
||
|
eUncaughtThrow = rb_const_get(rb_cObject, rb_intern("UncaughtThrowError"));
|
||
|
rb_define_method(mTimeout, "timeout", s_timeout, -1);
|
||
|
rb_define_singleton_method(mTimeout, "timeout", s_timeout, -1);
|
||
|
rb_define_method(eTimeoutError, "exception", timeout_error_exception, -1);
|
||
|
id_thread = rb_intern("@thread");
|
||
|
/* backwards compatibility */
|
||
|
rb_define_method(rb_mKernel, "timeout", timeout_compat, -1);
|
||
|
rb_const_set(rb_cObject, rb_intern("TimeoutError"), eTimeoutError);
|
||
|
rb_deprecate_constant(rb_cObject, "TimeoutError");
|
||
|
rb_provide("timeout.rb");
|
||
|
}
|
||
| vm.c | ||
|---|---|---|
|
{
|
||
|
MEMZERO(vm, rb_vm_t, 1);
|
||
|
rb_vm_living_threads_init(vm);
|
||
|
list_head_init(&vm->timers);
|
||
|
vm->thread_report_on_exception = 1;
|
||
|
vm->src_encoding_index = -1;
|
||
|
vm->next_timeout = (rb_atomic_t)-1;
|
||
|
vm->timer_thread_timeout = -1;
|
||
|
vm_default_params_setup(vm);
|
||
|
}
|
||
| vm_core.h | ||
|---|---|---|
|
rb_global_vm_lock_t gvl;
|
||
|
rb_nativethread_lock_t thread_destruct_lock;
|
||
|
struct list_head timers; /* TODO: consider moving to rb_thread_t */
|
||
|
rb_atomic_t next_timeout;
|
||
|
int timer_thread_timeout;
|
||
|
struct rb_thread_struct *main_thread;
|
||
|
struct rb_thread_struct *running_thread;
|
||
| ... | ... | |
|
void rb_thread_stop_timer_thread(void);
|
||
|
void rb_thread_reset_timer_thread(void);
|
||
|
void rb_thread_wakeup_timer_thread(void);
|
||
|
void rb_thread_wakeup_timer_thread_low(void);
|
||
|
static inline void
|
||
|
rb_vm_living_threads_init(rb_vm_t *vm)
|
||
| ... | ... | |
|
TIMER_INTERRUPT_MASK = 0x01,
|
||
|
PENDING_INTERRUPT_MASK = 0x02,
|
||
|
POSTPONED_JOB_INTERRUPT_MASK = 0x04,
|
||
|
TRAP_INTERRUPT_MASK = 0x08
|
||
|
TRAP_INTERRUPT_MASK = 0x08,
|
||
|
TIMEOUT_INTERRUPT_MASK = 0x10
|
||
|
};
|
||
|
#define RUBY_VM_SET_TIMER_INTERRUPT(ec) ATOMIC_OR((ec)->interrupt_flag, TIMER_INTERRUPT_MASK)
|
||
|
#define RUBY_VM_SET_INTERRUPT(ec) ATOMIC_OR((ec)->interrupt_flag, PENDING_INTERRUPT_MASK)
|
||
|
#define RUBY_VM_SET_POSTPONED_JOB_INTERRUPT(ec) ATOMIC_OR((ec)->interrupt_flag, POSTPONED_JOB_INTERRUPT_MASK)
|
||
|
#define RUBY_VM_SET_TRAP_INTERRUPT(ec) ATOMIC_OR((ec)->interrupt_flag, TRAP_INTERRUPT_MASK)
|
||
|
#define RUBY_VM_INTERRUPTED(ec) ((ec)->interrupt_flag & ~(ec)->interrupt_mask & \
|
||
|
(PENDING_INTERRUPT_MASK|TRAP_INTERRUPT_MASK))
|
||
|
#define RUBY_VM_INTERRUPTED(ec) ((ec)->interrupt_flag & \
|
||
|
~(ec)->interrupt_mask & \
|
||
|
(PENDING_INTERRUPT_MASK|\
|
||
|
TRAP_INTERRUPT_MASK|\
|
||
|
TIMEOUT_INTERRUPT_MASK))
|
||
|
#define RUBY_VM_INTERRUPTED_ANY(ec) ((ec)->interrupt_flag & ~(ec)->interrupt_mask)
|
||
|
VALUE rb_exc_set_backtrace(VALUE exc, VALUE bt);
|
||
|
-
|
||