diff --git a/ext/thread/thread.c b/ext/thread/thread.c index b8656a1..5693098 100644 --- a/ext/thread/thread.c +++ b/ext/thread/thread.c @@ -7,8 +7,9 @@ enum { enum { QUEUE_QUE = 0, QUEUE_WAITERS = 1, - SZQUEUE_WAITERS = 2, - SZQUEUE_MAX = 3 + QUEUE_CLOSED = 2, + SZQUEUE_WAITERS = 3, + SZQUEUE_MAX = 4 }; #define GET_CONDVAR_WAITERS(cv) get_array((cv), CONDVAR_WAITERS) @@ -19,6 +20,10 @@ enum { #define GET_SZQUEUE_MAX(q) RSTRUCT_GET((q), SZQUEUE_MAX) #define GET_SZQUEUE_ULONGMAX(q) NUM2ULONG(GET_SZQUEUE_MAX(q)) +#define GET_QUEUE_CLOSED(q) get_array((q), QUEUE_CLOSED) +/* Has the close method been called? */ +#define QUEUE_CLOSED_P(self) !NIL_P(RSTRUCT_GET(self,QUEUE_CLOSED)) + static VALUE get_array(VALUE obj, int idx) { @@ -213,12 +218,59 @@ rb_queue_initialize(VALUE self) { RSTRUCT_SET(self, QUEUE_QUE, ary_buf_new()); RSTRUCT_SET(self, QUEUE_WAITERS, ary_buf_new()); + RSTRUCT_SET(self, QUEUE_CLOSED, Qnil); return self; } +// after call to close, this is the number of known items remaining before +// queue is really empty. +struct queue_shutdown +{ + VALUE closed; + unsigned long countdown; +}; + +/* + QUEUE_CLOSED serving double-duty as a collection of threads that were + pending (for SizedQueue) when close was called. +*/ +static struct queue_shutdown +queue_closing(VALUE self) +{ + struct queue_shutdown retval; + VALUE closed = RSTRUCT_GET(self,QUEUE_CLOSED); + + if (closed != Qnil) { + retval.closed = Qtrue; + retval.countdown = RARRAY_LEN(closed); + } else { + retval.closed = Qfalse; + retval.countdown = 0; + } + + return retval; +} + +static VALUE +queue_raise_if_closed(VALUE self) +{ + struct queue_shutdown queue_shutdown; + // Optimise the frequent case, which is queue open + if (!QUEUE_CLOSED_P(self)) return Qnil; + + // queue is closing, which is not frequent, so don't worry about the extra fetch of QUEUE_CLOSED + queue_shutdown = queue_closing(self); + if (queue_shutdown.closed == Qtrue && queue_shutdown.countdown == 0) { + rb_raise(rb_eThreadError, "queue closed"); + } + return Qnil; +} + static VALUE queue_do_push(VALUE self, VALUE obj) { + // TODO would be nice to not have to use macro to check for correct + // initialization on every single call to push. rb_ary_push(GET_QUEUE_QUE(self), obj); wakeup_first_thread(GET_QUEUE_WAITERS(self)); return self; @@ -237,6 +289,7 @@ queue_do_push(VALUE self, VALUE obj) static VALUE rb_queue_push(VALUE self, VALUE obj) { + queue_raise_if_closed(self); return queue_do_push(self, obj); } @@ -254,6 +307,66 @@ queue_num_waiting(VALUE self) return RARRAY_LEN(waiters); } +/* + * Document-method: Queue#close + * call-seq: close + * + * Closes the queue to producers. A closed queue cannot be re-opened. + * + * After the call to close completes, the following are true: + * + * - closed? will return true + * + * - calling enq/push/<< will raise an exception + * + * - calling deq/pop/shift will return an object from the queue as usual. + * + * - when empty? is true, deq(non_block=false) will not suspend and + * will return nil. deq(non_block=true) will raise an exception. + * + * And for SizedQueue, these will also be true: + * + * - each thread already suspended in enq at the time of the call + * to close will be allowed to push its object as usual. + * + * - empty? will be false when there are either objects in the queue, or + * producers which were suspended at the time of the call to close but whose + * objects are not yet in the queue. Therefore, it can be true (very + * briefly) that empty? == false && size == 0, since size returns the number + * of objects actually in the queue. + */ + +static VALUE +rb_queue_close(VALUE self) +{ + // Never any pending producers for ordinary queue, so create empty array. + RSTRUCT_SET(self, QUEUE_CLOSED, ary_buf_new()); + if (queue_length(self) < queue_num_waiting(self)) { + wakeup_all_threads(GET_QUEUE_WAITERS(self)); + } + return self; +} + +static VALUE +rb_szqueue_close(VALUE self) +{ + VALUE remaining_producers = rb_ary_dup(GET_SZQUEUE_WAITERS(self)); + RSTRUCT_SET(self, QUEUE_CLOSED, remaining_producers); + + if (queue_length(self) + RARRAY_LEN(remaining_producers) < queue_num_waiting(self)) { + // There may be more consumers waiting than pending items, so wake + // up consumers and some will go away. + + // TODO is this only necessary when + // queue_length(self) + // RARRAY_LEN(remaining_producers) == 0 ? + + // Because if there's even 1 blocked producer, the countdown will kick + // in, and that culminates in wakeup_all_threads(...) + wakeup_all_threads(GET_QUEUE_WAITERS(self)); + } + return self; +} + struct waiting_delete { VALUE waiting; VALUE th; @@ -276,14 +389,30 @@ queue_sleep(VALUE arg) static VALUE queue_do_pop(VALUE self, int should_block) { + struct queue_shutdown queue_shutdown; struct waiting_delete args; - args.waiting = GET_QUEUE_WAITERS(self); - args.th = rb_thread_current(); while (queue_length(self) == 0) { + // non-blocking case will use empty?, so that must only return + // true when the queue is really closed, ie not ever any more pending items. + // But in the non-blocking case, there will never be blocked producers, so + // it's not really relevant. if (!should_block) { rb_raise(rb_eThreadError, "queue empty"); } + + // only checking for shutdown when queue is empty, so less of a + // performance drain. + queue_shutdown = queue_closing(self); + if (queue_shutdown.closed == Qtrue && queue_shutdown.countdown == 0) { + // TODO signal queue closed to consumers + // ie could also raise StopIteration or other exception here. + return Qnil; + } + + // put this consumer to sleep until there's an item available + args.waiting = GET_QUEUE_WAITERS(self); + args.th = rb_thread_current(); rb_ary_push(args.waiting, args.th); rb_ensure(queue_sleep, (VALUE)0, queue_delete_from_waiting, (VALUE)&args); } @@ -324,6 +453,19 @@ rb_queue_pop(int argc, VALUE *argv, VALUE self) } /* + * Document-method: Queue#closed? + * call-seq: closed? + * + * Returns +true+ if the queue is closed. + */ + +static VALUE +rb_queue_closed_p(VALUE self) +{ + return RSTRUCT_GET(self, QUEUE_CLOSED) != Qnil ? Qtrue : Qfalse; +} + +/* * Document-method: Queue#empty? * call-seq: empty? * @@ -333,7 +475,13 @@ rb_queue_pop(int argc, VALUE *argv, VALUE self) static VALUE rb_queue_empty_p(VALUE self) { - return queue_length(self) == 0 ? Qtrue : Qfalse; + struct queue_shutdown queue_shutdown; + if (QUEUE_CLOSED_P(self)) { + queue_shutdown = queue_closing(self); + return queue_length(self) + queue_shutdown.countdown == 0 ? Qtrue : Qfalse; + } else { + return queue_length(self) == 0 ? Qtrue : Qfalse; + } } /* @@ -406,6 +554,7 @@ rb_szqueue_initialize(VALUE self, VALUE vmax) RSTRUCT_SET(self, QUEUE_QUE, ary_buf_new()); RSTRUCT_SET(self, QUEUE_WAITERS, ary_buf_new()); + RSTRUCT_SET(self, QUEUE_CLOSED, Qnil); RSTRUCT_SET(self, SZQUEUE_WAITERS, ary_buf_new()); RSTRUCT_SET(self, SZQUEUE_MAX, vmax); @@ -478,19 +627,41 @@ szqueue_push_should_block(int argc, const VALUE *argv) static VALUE rb_szqueue_push(int argc, VALUE *argv, VALUE self) { - struct waiting_delete args; int should_block = szqueue_push_should_block(argc, argv); - args.waiting = GET_SZQUEUE_WAITERS(self); - args.th = rb_thread_current(); + struct waiting_delete args; + + queue_raise_if_closed(self); while (queue_length(self) >= GET_SZQUEUE_ULONGMAX(self)) { if (!should_block) { rb_raise(rb_eThreadError, "queue full"); } + // queue is full, so sleep until needed + args.waiting = GET_SZQUEUE_WAITERS(self); + args.th = rb_thread_current(); rb_ary_push(args.waiting, args.th); + // TODO should also handle countdown decrement if the thread is terminated while sleeping. + // TODO shouldn't this be queue_sleep as well? rb_ensure((VALUE (*)())rb_thread_sleep_deadly, (VALUE)0, queue_delete_from_waiting, (VALUE)&args); } - return queue_do_push(self, argv[0]); + + queue_do_push(self, argv[0]); + + // skip this for the normal case, which is queue open + if (QUEUE_CLOSED_P(self)) { + // At this point, we know we've actually pushed an item onto the queue, + // and we're in a closing state, so remove the current thread from the list + // of pending pushers, if it was one to start with. + VALUE removed_thread = rb_ary_delete(GET_QUEUE_CLOSED(self), rb_thread_current()); + + // was that the last pending thread? + if (removed_thread != Qnil && RARRAY_LEN(GET_QUEUE_CLOSED(self)) == 0) { + // wake all waiting consumers, because there will never be more items + wakeup_all_threads(GET_QUEUE_WAITERS(self)); + } + } + + return self; } static VALUE @@ -590,11 +761,11 @@ Init_thread(void) VALUE rb_cQueue = rb_struct_define_without_accessor_under( OUTER, "Queue", rb_cObject, rb_struct_alloc_noinit, - "que", "waiters", NULL); + "que", "waiters", "closed", NULL); VALUE rb_cSizedQueue = rb_struct_define_without_accessor_under( OUTER, "SizedQueue", rb_cQueue, rb_struct_alloc_noinit, - "que", "waiters", "queue_waiters", "size", NULL); + "que", "waiters", "closed", "queue_waiters", "size", NULL); #if 0 rb_cConditionVariable = rb_define_class("ConditionVariable", rb_cObject); /* teach rdoc ConditionVariable */ @@ -620,6 +791,8 @@ Init_thread(void) rb_define_method(rb_cQueue, "clear", rb_queue_clear, 0); rb_define_method(rb_cQueue, "length", rb_queue_length, 0); rb_define_method(rb_cQueue, "num_waiting", rb_queue_num_waiting, 0); + rb_define_method(rb_cQueue, "close", rb_queue_close, 0); + rb_define_method(rb_cQueue, "closed?", rb_queue_closed_p, 0); /* Alias for #push. */ rb_define_alias(rb_cQueue, "enq", "push"); @@ -639,6 +812,7 @@ Init_thread(void) rb_define_method(rb_cSizedQueue, "pop", rb_szqueue_pop, -1); rb_define_method(rb_cSizedQueue, "clear", rb_szqueue_clear, 0); rb_define_method(rb_cSizedQueue, "num_waiting", rb_szqueue_num_waiting, 0); + rb_define_method(rb_cSizedQueue, "close", rb_szqueue_close, 0); /* Alias for #push. */ rb_define_alias(rb_cSizedQueue, "enq", "push");