diff --git a/ext/thread/thread.c b/ext/thread/thread.c index b8656a1..561129a 100644 --- a/ext/thread/thread.c +++ b/ext/thread/thread.c @@ -13,8 +13,6 @@ enum { #define GET_CONDVAR_WAITERS(cv) get_array((cv), CONDVAR_WAITERS) -#define GET_QUEUE_QUE(q) get_array((q), QUEUE_QUE) -#define GET_QUEUE_WAITERS(q) get_array((q), QUEUE_WAITERS) #define GET_SZQUEUE_WAITERS(q) get_array((q), SZQUEUE_WAITERS) #define GET_SZQUEUE_MAX(q) RSTRUCT_GET((q), SZQUEUE_MAX) #define GET_SZQUEUE_ULONGMAX(q) NUM2ULONG(GET_SZQUEUE_MAX(q)) @@ -106,7 +104,7 @@ struct sleep_call { VALUE timeout; }; -static ID id_sleep; +static ID id_sleep, id_length; static VALUE do_sleep(VALUE args) @@ -174,84 +172,11 @@ rb_condvar_broadcast(VALUE self) return self; } -/* - * Document-class: Queue - * - * This class provides a way to synchronize communication between threads. - * - * Example: - * - * require 'thread' - * queue = Queue.new - * - * producer = Thread.new do - * 5.times do |i| - * sleep rand(i) # simulate expense - * queue << i - * puts "#{i} produced" - * end - * end - * - * consumer = Thread.new do - * 5.times do |i| - * value = queue.pop - * sleep rand(i/2) # simulate expense - * puts "consumed #{value}" - * end - * end - * - */ - -/* - * Document-method: Queue::new - * - * Creates a new queue instance. - */ - -static VALUE -rb_queue_initialize(VALUE self) -{ - RSTRUCT_SET(self, QUEUE_QUE, ary_buf_new()); - RSTRUCT_SET(self, QUEUE_WAITERS, ary_buf_new()); - return self; -} - -static VALUE -queue_do_push(VALUE self, VALUE obj) -{ - rb_ary_push(GET_QUEUE_QUE(self), obj); - wakeup_first_thread(GET_QUEUE_WAITERS(self)); - return self; -} - -/* - * Document-method: Queue#push - * call-seq: - * push(object) - * enq(object) - * <<(object) - * - * Pushes the given +object+ to the queue. - */ - -static VALUE -rb_queue_push(VALUE self, VALUE obj) -{ - return queue_do_push(self, obj); -} - static unsigned long queue_length(VALUE self) { - VALUE que = GET_QUEUE_QUE(self); - return RARRAY_LEN(que); -} - -static unsigned long -queue_num_waiting(VALUE self) -{ - VALUE waiters = GET_QUEUE_WAITERS(self); - return RARRAY_LEN(waiters); + VALUE length_v = rb_funcall2(self, id_length, 0, NULL); + return NUM2ULONG(length_v); } struct waiting_delete { @@ -266,118 +191,6 @@ queue_delete_from_waiting(struct waiting_delete *p) return Qnil; } -static VALUE -queue_sleep(VALUE arg) -{ - rb_thread_sleep_deadly(); - return Qnil; -} - -static VALUE -queue_do_pop(VALUE self, int should_block) -{ - struct waiting_delete args; - args.waiting = GET_QUEUE_WAITERS(self); - args.th = rb_thread_current(); - - while (queue_length(self) == 0) { - if (!should_block) { - rb_raise(rb_eThreadError, "queue empty"); - } - rb_ary_push(args.waiting, args.th); - rb_ensure(queue_sleep, (VALUE)0, queue_delete_from_waiting, (VALUE)&args); - } - - return rb_ary_shift(GET_QUEUE_QUE(self)); -} - -static int -queue_pop_should_block(int argc, const VALUE *argv) -{ - int should_block = 1; - rb_check_arity(argc, 0, 1); - if (argc > 0) { - should_block = !RTEST(argv[0]); - } - return should_block; -} - -/* - * Document-method: Queue#pop - * call-seq: - * pop(non_block=false) - * deq(non_block=false) - * shift(non_block=false) - * - * Retrieves data from the queue. - * - * If the queue is empty, the calling thread is suspended until data is pushed - * onto the queue. If +non_block+ is true, the thread isn't suspended, and an - * exception is raised. - */ - -static VALUE -rb_queue_pop(int argc, VALUE *argv, VALUE self) -{ - int should_block = queue_pop_should_block(argc, argv); - return queue_do_pop(self, should_block); -} - -/* - * Document-method: Queue#empty? - * call-seq: empty? - * - * Returns +true+ if the queue is empty. - */ - -static VALUE -rb_queue_empty_p(VALUE self) -{ - return queue_length(self) == 0 ? Qtrue : Qfalse; -} - -/* - * Document-method: Queue#clear - * - * Removes all objects from the queue. - */ - -static VALUE -rb_queue_clear(VALUE self) -{ - rb_ary_clear(GET_QUEUE_QUE(self)); - return self; -} - -/* - * Document-method: Queue#length - * call-seq: - * length - * size - * - * Returns the length of the queue. - */ - -static VALUE -rb_queue_length(VALUE self) -{ - unsigned long len = queue_length(self); - return ULONG2NUM(len); -} - -/* - * Document-method: Queue#num_waiting - * - * Returns the number of threads waiting on the queue. - */ - -static VALUE -rb_queue_num_waiting(VALUE self) -{ - unsigned long len = queue_num_waiting(self); - return ULONG2NUM(len); -} - /* * Document-class: SizedQueue * @@ -490,19 +303,7 @@ rb_szqueue_push(int argc, VALUE *argv, VALUE self) rb_ary_push(args.waiting, args.th); rb_ensure((VALUE (*)())rb_thread_sleep_deadly, (VALUE)0, queue_delete_from_waiting, (VALUE)&args); } - return queue_do_push(self, argv[0]); -} - -static VALUE -szqueue_do_pop(VALUE self, int should_block) -{ - VALUE retval = queue_do_pop(self, should_block); - - if (queue_length(self) < GET_SZQUEUE_ULONGMAX(self)) { - wakeup_first_thread(GET_SZQUEUE_WAITERS(self)); - } - - return retval; + return rb_call_super(argc, argv); } /* @@ -522,12 +323,17 @@ szqueue_do_pop(VALUE self, int should_block) static VALUE rb_szqueue_pop(int argc, VALUE *argv, VALUE self) { - int should_block = queue_pop_should_block(argc, argv); - return szqueue_do_pop(self, should_block); + VALUE retval = rb_call_super(argc, argv); + + if (queue_length(self) < GET_SZQUEUE_ULONGMAX(self)) { + wakeup_first_thread(GET_SZQUEUE_WAITERS(self)); + } + + return retval; } /* - * Document-method: Queue#clear + * Document-method: SizedQueue#clear * * Removes all objects from the queue. */ @@ -535,9 +341,8 @@ rb_szqueue_pop(int argc, VALUE *argv, VALUE self) static VALUE rb_szqueue_clear(VALUE self) { - rb_ary_clear(GET_QUEUE_QUE(self)); wakeup_all_threads(GET_SZQUEUE_WAITERS(self)); - return self; + return rb_call_super(0, NULL); } /* @@ -549,7 +354,8 @@ rb_szqueue_clear(VALUE self) static VALUE rb_szqueue_num_waiting(VALUE self) { - long len = queue_num_waiting(self); + VALUE len_v = rb_call_super(0, NULL); + long len = NUM2LONG(len_v); VALUE waiters = GET_SZQUEUE_WAITERS(self); len += RARRAY_LEN(waiters); return ULONG2NUM(len); @@ -587,22 +393,19 @@ Init_thread(void) OUTER, "ConditionVariable", rb_cObject, rb_struct_alloc_noinit, "waiters", NULL); - VALUE rb_cQueue = rb_struct_define_without_accessor_under( - OUTER, - "Queue", rb_cObject, rb_struct_alloc_noinit, - "que", "waiters", NULL); + VALUE rb_cSizedQueue = rb_struct_define_without_accessor_under( - OUTER, + rb_cQueue, "SizedQueue", rb_cQueue, rb_struct_alloc_noinit, "que", "waiters", "queue_waiters", "size", NULL); #if 0 rb_cConditionVariable = rb_define_class("ConditionVariable", rb_cObject); /* teach rdoc ConditionVariable */ - rb_cQueue = rb_define_class("Queue", rb_cObject); /* teach rdoc Queue */ rb_cSizedQueue = rb_define_class("SizedQueue", rb_cObject); /* teach rdoc SizedQueue */ #endif id_sleep = rb_intern("sleep"); + id_length = rb_intern("length"); rb_define_method(rb_cConditionVariable, "initialize", rb_condvar_initialize, 0); rb_undef_method(rb_cConditionVariable, "initialize_copy"); @@ -611,27 +414,6 @@ Init_thread(void) rb_define_method(rb_cConditionVariable, "signal", rb_condvar_signal, 0); rb_define_method(rb_cConditionVariable, "broadcast", rb_condvar_broadcast, 0); - rb_define_method(rb_cQueue, "initialize", rb_queue_initialize, 0); - rb_undef_method(rb_cQueue, "initialize_copy"); - rb_define_method(rb_cQueue, "marshal_dump", undumpable, 0); - rb_define_method(rb_cQueue, "push", rb_queue_push, 1); - rb_define_method(rb_cQueue, "pop", rb_queue_pop, -1); - rb_define_method(rb_cQueue, "empty?", rb_queue_empty_p, 0); - rb_define_method(rb_cQueue, "clear", rb_queue_clear, 0); - rb_define_method(rb_cQueue, "length", rb_queue_length, 0); - rb_define_method(rb_cQueue, "num_waiting", rb_queue_num_waiting, 0); - - /* Alias for #push. */ - rb_define_alias(rb_cQueue, "enq", "push"); - /* Alias for #push. */ - rb_define_alias(rb_cQueue, "<<", "push"); - /* Alias for #pop. */ - rb_define_alias(rb_cQueue, "deq", "pop"); - /* Alias for #pop. */ - rb_define_alias(rb_cQueue, "shift", "pop"); - /* Alias for #length. */ - rb_define_alias(rb_cQueue, "size", "length"); - rb_define_method(rb_cSizedQueue, "initialize", rb_szqueue_initialize, 1); rb_define_method(rb_cSizedQueue, "max", rb_szqueue_max_get, 0); rb_define_method(rb_cSizedQueue, "max=", rb_szqueue_max_set, 1); @@ -651,6 +433,5 @@ Init_thread(void) rb_provide("thread.rb"); ALIAS_GLOBAL_CONST(ConditionVariable); - ALIAS_GLOBAL_CONST(Queue); ALIAS_GLOBAL_CONST(SizedQueue); } diff --git a/include/ruby/ruby.h b/include/ruby/ruby.h index fb51132..07c0755 100644 --- a/include/ruby/ruby.h +++ b/include/ruby/ruby.h @@ -1605,6 +1605,7 @@ RUBY_EXTERN VALUE rb_cThread; RUBY_EXTERN VALUE rb_cTime; RUBY_EXTERN VALUE rb_cTrueClass; RUBY_EXTERN VALUE rb_cUnboundMethod; +RUBY_EXTERN VALUE rb_cQueue; RUBY_EXTERN VALUE rb_eException; RUBY_EXTERN VALUE rb_eStandardError; diff --git a/thread.c b/thread.c index a8cc250..e1df766 100644 --- a/thread.c +++ b/thread.c @@ -76,6 +76,7 @@ #endif VALUE rb_cMutex; +VALUE rb_cQueue; VALUE rb_cThreadShield; static VALUE sym_immediate; @@ -4556,6 +4557,256 @@ void rb_mutex_allow_trap(VALUE self, int val) m->allow_trap = val; } +enum { + CONDVAR_WAITERS = 0 +}; + +enum { + QUEUE_QUE = 0, + QUEUE_WAITERS = 1 +}; + +#define GET_QUEUE_QUE(q) get_array((q), QUEUE_QUE) +#define GET_QUEUE_WAITERS(q) get_array((q), QUEUE_WAITERS) + +static VALUE +get_array(VALUE obj, int idx) +{ + VALUE ary = RSTRUCT_GET(obj, idx); + if (!RB_TYPE_P(ary, T_ARRAY)) { + rb_raise(rb_eTypeError, "%+"PRIsVALUE" not initialized", obj); + } + return ary; +} + +static VALUE +ary_buf_new(void) +{ + return rb_ary_tmp_new(1); +} + +static void +wakeup_first_thread(VALUE list) +{ + VALUE thread; + + while (!NIL_P(thread = rb_ary_shift(list))) { + if (RTEST(rb_thread_wakeup_alive(thread))) break; + } +} + +/* + * Document-class: Queue + * + * This class provides a way to synchronize communication between threads. + * + * Example: + * + * require 'thread' + * queue = Queue.new + * + * producer = Thread.new do + * 5.times do |i| + * sleep rand(i) # simulate expense + * queue << i + * puts "#{i} produced" + * end + * end + * + * consumer = Thread.new do + * 5.times do |i| + * value = queue.pop + * sleep rand(i/2) # simulate expense + * puts "consumed #{value}" + * end + * end + * + */ + +/* + * Document-method: Queue::new + * + * Creates a new queue instance. + */ + +static VALUE +rb_queue_initialize(VALUE self) +{ + RSTRUCT_SET(self, QUEUE_QUE, ary_buf_new()); + RSTRUCT_SET(self, QUEUE_WAITERS, ary_buf_new()); + return self; +} + +static VALUE +queue_do_push(VALUE self, VALUE obj) +{ + rb_ary_push(GET_QUEUE_QUE(self), obj); + wakeup_first_thread(GET_QUEUE_WAITERS(self)); + return self; +} + +/* + * Document-method: Queue#push + * call-seq: + * push(object) + * enq(object) + * <<(object) + * + * Pushes the given +object+ to the queue. + */ + +static VALUE +rb_queue_push(VALUE self, VALUE obj) +{ + return queue_do_push(self, obj); +} + +static unsigned long +queue_length(VALUE self) +{ + VALUE que = GET_QUEUE_QUE(self); + return RARRAY_LEN(que); +} + +static unsigned long +queue_num_waiting(VALUE self) +{ + VALUE waiters = GET_QUEUE_WAITERS(self); + return RARRAY_LEN(waiters); +} + +struct waiting_delete { + VALUE waiting; + VALUE th; +}; + +static VALUE +queue_delete_from_waiting(struct waiting_delete *p) +{ + rb_ary_delete(p->waiting, p->th); + return Qnil; +} + +static VALUE +queue_sleep(VALUE arg) +{ + rb_thread_sleep_deadly(); + return Qnil; +} + +static VALUE +queue_do_pop(VALUE self, int should_block) +{ + struct waiting_delete args; + args.waiting = GET_QUEUE_WAITERS(self); + args.th = rb_thread_current(); + + while (queue_length(self) == 0) { + if (!should_block) { + rb_raise(rb_eThreadError, "queue empty"); + } + rb_ary_push(args.waiting, args.th); + rb_ensure(queue_sleep, (VALUE)0, queue_delete_from_waiting, (VALUE)&args); + } + + return rb_ary_shift(GET_QUEUE_QUE(self)); +} + +static int +queue_pop_should_block(int argc, const VALUE *argv) +{ + int should_block = 1; + rb_check_arity(argc, 0, 1); + if (argc > 0) { + should_block = !RTEST(argv[0]); + } + return should_block; +} + +/* + * Document-method: Queue#pop + * call-seq: + * pop(non_block=false) + * deq(non_block=false) + * shift(non_block=false) + * + * Retrieves data from the queue. + * + * If the queue is empty, the calling thread is suspended until data is pushed + * onto the queue. If +non_block+ is true, the thread isn't suspended, and an + * exception is raised. + */ + +static VALUE +rb_queue_pop(int argc, VALUE *argv, VALUE self) +{ + int should_block = queue_pop_should_block(argc, argv); + return queue_do_pop(self, should_block); +} + +/* + * Document-method: Queue#empty? + * call-seq: empty? + * + * Returns +true+ if the queue is empty. + */ + +static VALUE +rb_queue_empty_p(VALUE self) +{ + return queue_length(self) == 0 ? Qtrue : Qfalse; +} + +/* + * Document-method: Queue#clear + * + * Removes all objects from the queue. + */ + +static VALUE +rb_queue_clear(VALUE self) +{ + rb_ary_clear(GET_QUEUE_QUE(self)); + return self; +} + +/* + * Document-method: Queue#length + * call-seq: + * length + * size + * + * Returns the length of the queue. + */ + +static VALUE +rb_queue_length(VALUE self) +{ + unsigned long len = queue_length(self); + return ULONG2NUM(len); +} + +/* + * Document-method: Queue#num_waiting + * + * Returns the number of threads waiting on the queue. + */ + +static VALUE +rb_queue_num_waiting(VALUE self) +{ + unsigned long len = queue_num_waiting(self); + return ULONG2NUM(len); +} + +/* :nodoc: */ +static VALUE +undumpable(VALUE obj) +{ + rb_raise(rb_eTypeError, "can't dump %"PRIsVALUE, rb_obj_class(obj)); + UNREACHABLE; +} + /* * Document-class: ThreadShield */ @@ -5062,6 +5313,36 @@ Init_Thread(void) rb_define_method(rb_cMutex, "synchronize", rb_mutex_synchronize_m, 0); rb_define_method(rb_cMutex, "owned?", rb_mutex_owned_p, 0); + rb_cQueue = rb_struct_define_without_accessor( + "Queue", rb_cObject, rb_struct_alloc_noinit, + "que", "waiters", NULL + ); + +#if 0 + rb_cQueue = rb_define_class("Queue", rb_cObject); /* teach rdoc Queue */ +#endif + + rb_define_method(rb_cQueue, "initialize", rb_queue_initialize, 0); + rb_undef_method(rb_cQueue, "initialize_copy"); + rb_define_method(rb_cQueue, "marshal_dump", undumpable, 0); + rb_define_method(rb_cQueue, "push", rb_queue_push, 1); + rb_define_method(rb_cQueue, "pop", rb_queue_pop, -1); + rb_define_method(rb_cQueue, "empty?", rb_queue_empty_p, 0); + rb_define_method(rb_cQueue, "clear", rb_queue_clear, 0); + rb_define_method(rb_cQueue, "length", rb_queue_length, 0); + rb_define_method(rb_cQueue, "num_waiting", rb_queue_num_waiting, 0); + + /* Alias for #push. */ + rb_define_alias(rb_cQueue, "enq", "push"); + /* Alias for #push. */ + rb_define_alias(rb_cQueue, "<<", "push"); + /* Alias for #pop. */ + rb_define_alias(rb_cQueue, "deq", "pop"); + /* Alias for #pop. */ + rb_define_alias(rb_cQueue, "shift", "pop"); + /* Alias for #length. */ + rb_define_alias(rb_cQueue, "size", "length"); + recursive_key = rb_intern("__recursive_key__"); rb_eThreadError = rb_define_class("ThreadError", rb_eStandardError);