Feature #10600 » queue-close-2.diff
ext/thread/thread.c | ||
---|---|---|
enum {
|
||
QUEUE_QUE = 0,
|
||
QUEUE_WAITERS = 1,
|
||
SZQUEUE_WAITERS = 2,
|
||
SZQUEUE_MAX = 3
|
||
QUEUE_CLOSED = 2,
|
||
SZQUEUE_WAITERS = 3,
|
||
SZQUEUE_MAX = 4
|
||
};
|
||
#define GET_CONDVAR_WAITERS(cv) get_array((cv), CONDVAR_WAITERS)
|
||
... | ... | |
#define GET_SZQUEUE_MAX(q) RSTRUCT_GET((q), SZQUEUE_MAX)
|
||
#define GET_SZQUEUE_ULONGMAX(q) NUM2ULONG(GET_SZQUEUE_MAX(q))
|
||
#define GET_QUEUE_CLOSED(q) get_array((q), QUEUE_CLOSED)
|
||
/* Has the close method been called? */
|
||
#define QUEUE_CLOSED_P(self) !NIL_P(RSTRUCT_GET(self,QUEUE_CLOSED))
|
||
static VALUE
|
||
get_array(VALUE obj, int idx)
|
||
{
|
||
... | ... | |
{
|
||
RSTRUCT_SET(self, QUEUE_QUE, ary_buf_new());
|
||
RSTRUCT_SET(self, QUEUE_WAITERS, ary_buf_new());
|
||
RSTRUCT_SET(self, QUEUE_CLOSED, Qnil);
|
||
return self;
|
||
}
|
||
// after call to close, this is the number of known items remaining before
|
||
// queue is really empty.
|
||
struct queue_shutdown
|
||
{
|
||
VALUE closed;
|
||
unsigned long countdown;
|
||
};
|
||
/*
|
||
QUEUE_CLOSED serving double-duty as a collection of threads that were
|
||
pending (for SizedQueue) when close was called.
|
||
*/
|
||
static struct queue_shutdown
|
||
queue_closing(VALUE self)
|
||
{
|
||
struct queue_shutdown retval;
|
||
VALUE closed = RSTRUCT_GET(self,QUEUE_CLOSED);
|
||
if (closed != Qnil) {
|
||
retval.closed = Qtrue;
|
||
retval.countdown = RARRAY_LEN(closed);
|
||
} else {
|
||
retval.closed = Qfalse;
|
||
retval.countdown = 0;
|
||
}
|
||
return retval;
|
||
}
|
||
static VALUE
|
||
queue_raise_if_closed(VALUE self)
|
||
{
|
||
struct queue_shutdown queue_shutdown;
|
||
// Optimise the frequent case, which is queue open
|
||
if (!QUEUE_CLOSED_P(self)) return Qnil;
|
||
// queue is closing, which is not frequent, so don't worry about the extra fetch of QUEUE_CLOSED
|
||
queue_shutdown = queue_closing(self);
|
||
if (queue_shutdown.closed == Qtrue && queue_shutdown.countdown == 0) {
|
||
rb_raise(rb_eThreadError, "queue closed");
|
||
}
|
||
return Qnil;
|
||
}
|
||
static VALUE
|
||
queue_do_push(VALUE self, VALUE obj)
|
||
{
|
||
// TODO would be nice to not have to use macro to check for correct
|
||
// initialization on every single call to push.
|
||
rb_ary_push(GET_QUEUE_QUE(self), obj);
|
||
wakeup_first_thread(GET_QUEUE_WAITERS(self));
|
||
return self;
|
||
... | ... | |
static VALUE
|
||
rb_queue_push(VALUE self, VALUE obj)
|
||
{
|
||
queue_raise_if_closed(self);
|
||
return queue_do_push(self, obj);
|
||
}
|
||
... | ... | |
return RARRAY_LEN(waiters);
|
||
}
|
||
/*
|
||
* Document-method: Queue#close
|
||
* call-seq: close
|
||
*
|
||
* Closes the queue to producers. A closed queue cannot be re-opened.
|
||
*
|
||
* After the call to close completes, the following are true:
|
||
*
|
||
* - closed? will return true
|
||
*
|
||
* - calling enq/push/<< will raise an exception
|
||
*
|
||
* - calling deq/pop/shift will return an object from the queue as usual.
|
||
*
|
||
* - when empty? is true, deq(non_block=false) will not suspend and
|
||
* will return nil. deq(non_block=true) will raise an exception.
|
||
*
|
||
* And for SizedQueue, these will also be true:
|
||
*
|
||
* - each thread already suspended in enq at the time of the call
|
||
* to close will be allowed to push its object as usual.
|
||
*
|
||
* - empty? will be false when there are either objects in the queue, or
|
||
* producers which were suspended at the time of the call to close but whose
|
||
* objects are not yet in the queue. Therefore, it can be true (very
|
||
* briefly) that empty? == false && size == 0, since size returns the number
|
||
* of objects actually in the queue.
|
||
*/
|
||
static VALUE
|
||
rb_queue_close(VALUE self)
|
||
{
|
||
// Never any pending producers for ordinary queue, so create empty array.
|
||
RSTRUCT_SET(self, QUEUE_CLOSED, ary_buf_new());
|
||
if (queue_length(self) < queue_num_waiting(self)) {
|
||
wakeup_all_threads(GET_QUEUE_WAITERS(self));
|
||
}
|
||
return self;
|
||
}
|
||
static VALUE
|
||
rb_szqueue_close(VALUE self)
|
||
{
|
||
VALUE remaining_producers = rb_ary_dup(GET_SZQUEUE_WAITERS(self));
|
||
RSTRUCT_SET(self, QUEUE_CLOSED, remaining_producers);
|
||
if (queue_length(self) + RARRAY_LEN(remaining_producers) < queue_num_waiting(self)) {
|
||
// There may be more consumers waiting than pending items, so wake
|
||
// up consumers and some will go away.
|
||
// TODO is this only necessary when
|
||
// queue_length(self) + // RARRAY_LEN(remaining_producers) == 0 ?
|
||
// Because if there's even 1 blocked producer, the countdown will kick
|
||
// in, and that culminates in wakeup_all_threads(...)
|
||
wakeup_all_threads(GET_QUEUE_WAITERS(self));
|
||
}
|
||
return self;
|
||
}
|
||
struct waiting_delete {
|
||
VALUE waiting;
|
||
VALUE th;
|
||
... | ... | |
static VALUE
|
||
queue_do_pop(VALUE self, int should_block)
|
||
{
|
||
struct queue_shutdown queue_shutdown;
|
||
struct waiting_delete args;
|
||
args.waiting = GET_QUEUE_WAITERS(self);
|
||
args.th = rb_thread_current();
|
||
while (queue_length(self) == 0) {
|
||
// non-blocking case will use empty?, so that must only return
|
||
// true when the queue is really closed, ie not ever any more pending items.
|
||
// But in the non-blocking case, there will never be blocked producers, so
|
||
// it's not really relevant.
|
||
if (!should_block) {
|
||
rb_raise(rb_eThreadError, "queue empty");
|
||
}
|
||
// only checking for shutdown when queue is empty, so less of a
|
||
// performance drain.
|
||
queue_shutdown = queue_closing(self);
|
||
if (queue_shutdown.closed == Qtrue && queue_shutdown.countdown == 0) {
|
||
// TODO signal queue closed to consumers
|
||
// ie could also raise StopIteration or other exception here.
|
||
return Qnil;
|
||
}
|
||
// put this consumer to sleep until there's an item available
|
||
args.waiting = GET_QUEUE_WAITERS(self);
|
||
args.th = rb_thread_current();
|
||
rb_ary_push(args.waiting, args.th);
|
||
rb_ensure(queue_sleep, (VALUE)0, queue_delete_from_waiting, (VALUE)&args);
|
||
}
|
||
... | ... | |
}
|
||
/*
|
||
* Document-method: Queue#closed?
|
||
* call-seq: closed?
|
||
*
|
||
* Returns +true+ if the queue is closed.
|
||
*/
|
||
static VALUE
|
||
rb_queue_closed_p(VALUE self)
|
||
{
|
||
return RSTRUCT_GET(self, QUEUE_CLOSED) != Qnil ? Qtrue : Qfalse;
|
||
}
|
||
/*
|
||
* Document-method: Queue#empty?
|
||
* call-seq: empty?
|
||
*
|
||
... | ... | |
static VALUE
|
||
rb_queue_empty_p(VALUE self)
|
||
{
|
||
return queue_length(self) == 0 ? Qtrue : Qfalse;
|
||
struct queue_shutdown queue_shutdown;
|
||
if (QUEUE_CLOSED_P(self)) {
|
||
queue_shutdown = queue_closing(self);
|
||
return queue_length(self) + queue_shutdown.countdown == 0 ? Qtrue : Qfalse;
|
||
} else {
|
||
return queue_length(self) == 0 ? Qtrue : Qfalse;
|
||
}
|
||
}
|
||
/*
|
||
... | ... | |
RSTRUCT_SET(self, QUEUE_QUE, ary_buf_new());
|
||
RSTRUCT_SET(self, QUEUE_WAITERS, ary_buf_new());
|
||
RSTRUCT_SET(self, QUEUE_CLOSED, Qnil);
|
||
RSTRUCT_SET(self, SZQUEUE_WAITERS, ary_buf_new());
|
||
RSTRUCT_SET(self, SZQUEUE_MAX, vmax);
|
||
... | ... | |
static VALUE
|
||
rb_szqueue_push(int argc, VALUE *argv, VALUE self)
|
||
{
|
||
struct waiting_delete args;
|
||
int should_block = szqueue_push_should_block(argc, argv);
|
||
args.waiting = GET_SZQUEUE_WAITERS(self);
|
||
args.th = rb_thread_current();
|
||
struct waiting_delete args;
|
||
queue_raise_if_closed(self);
|
||
while (queue_length(self) >= GET_SZQUEUE_ULONGMAX(self)) {
|
||
if (!should_block) {
|
||
rb_raise(rb_eThreadError, "queue full");
|
||
}
|
||
// queue is full, so sleep until needed
|
||
args.waiting = GET_SZQUEUE_WAITERS(self);
|
||
args.th = rb_thread_current();
|
||
rb_ary_push(args.waiting, args.th);
|
||
// TODO should also handle countdown decrement if the thread is terminated while sleeping.
|
||
// TODO shouldn't this be queue_sleep as well?
|
||
rb_ensure((VALUE (*)())rb_thread_sleep_deadly, (VALUE)0, queue_delete_from_waiting, (VALUE)&args);
|
||
}
|
||
return queue_do_push(self, argv[0]);
|
||
queue_do_push(self, argv[0]);
|
||
// skip this for the normal case, which is queue open
|
||
if (QUEUE_CLOSED_P(self)) {
|
||
// At this point, we know we've actually pushed an item onto the queue,
|
||
// and we're in a closing state, so remove the current thread from the list
|
||
// of pending pushers, if it was one to start with.
|
||
VALUE removed_thread = rb_ary_delete(GET_QUEUE_CLOSED(self), rb_thread_current());
|
||
// was that the last pending thread?
|
||
if (removed_thread != Qnil && RARRAY_LEN(GET_QUEUE_CLOSED(self)) == 0) {
|
||
// wake all waiting consumers, because there will never be more items
|
||
wakeup_all_threads(GET_QUEUE_WAITERS(self));
|
||
}
|
||
}
|
||
return self;
|
||
}
|
||
static VALUE
|
||
... | ... | |
VALUE rb_cQueue = rb_struct_define_without_accessor_under(
|
||
OUTER,
|
||
"Queue", rb_cObject, rb_struct_alloc_noinit,
|
||
"que", "waiters", NULL);
|
||
"que", "waiters", "closed", NULL);
|
||
VALUE rb_cSizedQueue = rb_struct_define_without_accessor_under(
|
||
OUTER,
|
||
"SizedQueue", rb_cQueue, rb_struct_alloc_noinit,
|
||
"que", "waiters", "queue_waiters", "size", NULL);
|
||
"que", "waiters", "closed", "queue_waiters", "size", NULL);
|
||
#if 0
|
||
rb_cConditionVariable = rb_define_class("ConditionVariable", rb_cObject); /* teach rdoc ConditionVariable */
|
||
... | ... | |
rb_define_method(rb_cQueue, "clear", rb_queue_clear, 0);
|
||
rb_define_method(rb_cQueue, "length", rb_queue_length, 0);
|
||
rb_define_method(rb_cQueue, "num_waiting", rb_queue_num_waiting, 0);
|
||
rb_define_method(rb_cQueue, "close", rb_queue_close, 0);
|
||
rb_define_method(rb_cQueue, "closed?", rb_queue_closed_p, 0);
|
||
/* Alias for #push. */
|
||
rb_define_alias(rb_cQueue, "enq", "push");
|
||
... | ... | |
rb_define_method(rb_cSizedQueue, "pop", rb_szqueue_pop, -1);
|
||
rb_define_method(rb_cSizedQueue, "clear", rb_szqueue_clear, 0);
|
||
rb_define_method(rb_cSizedQueue, "num_waiting", rb_szqueue_num_waiting, 0);
|
||
rb_define_method(rb_cSizedQueue, "close", rb_szqueue_close, 0);
|
||
/* Alias for #push. */
|
||
rb_define_alias(rb_cSizedQueue, "enq", "push");
|