Project

General

Profile

Feature #10600 » queue-close-2.diff

djellemah (John Anderson), 12/17/2014 05:38 PM

View differences:

ext/thread/thread.c
enum {
QUEUE_QUE = 0,
QUEUE_WAITERS = 1,
SZQUEUE_WAITERS = 2,
SZQUEUE_MAX = 3
QUEUE_CLOSED = 2,
SZQUEUE_WAITERS = 3,
SZQUEUE_MAX = 4
};
#define GET_CONDVAR_WAITERS(cv) get_array((cv), CONDVAR_WAITERS)
......
#define GET_SZQUEUE_MAX(q) RSTRUCT_GET((q), SZQUEUE_MAX)
#define GET_SZQUEUE_ULONGMAX(q) NUM2ULONG(GET_SZQUEUE_MAX(q))
#define GET_QUEUE_CLOSED(q) get_array((q), QUEUE_CLOSED)
/* Has the close method been called? */
#define QUEUE_CLOSED_P(self) !NIL_P(RSTRUCT_GET(self,QUEUE_CLOSED))
static VALUE
get_array(VALUE obj, int idx)
{
......
{
RSTRUCT_SET(self, QUEUE_QUE, ary_buf_new());
RSTRUCT_SET(self, QUEUE_WAITERS, ary_buf_new());
RSTRUCT_SET(self, QUEUE_CLOSED, Qnil);
return self;
}
// after call to close, this is the number of known items remaining before
// queue is really empty.
struct queue_shutdown
{
VALUE closed;
unsigned long countdown;
};
/*
QUEUE_CLOSED serving double-duty as a collection of threads that were
pending (for SizedQueue) when close was called.
*/
static struct queue_shutdown
queue_closing(VALUE self)
{
struct queue_shutdown retval;
VALUE closed = RSTRUCT_GET(self,QUEUE_CLOSED);
if (closed != Qnil) {
retval.closed = Qtrue;
retval.countdown = RARRAY_LEN(closed);
} else {
retval.closed = Qfalse;
retval.countdown = 0;
}
return retval;
}
static VALUE
queue_raise_if_closed(VALUE self)
{
struct queue_shutdown queue_shutdown;
// Optimise the frequent case, which is queue open
if (!QUEUE_CLOSED_P(self)) return Qnil;
// queue is closing, which is not frequent, so don't worry about the extra fetch of QUEUE_CLOSED
queue_shutdown = queue_closing(self);
if (queue_shutdown.closed == Qtrue && queue_shutdown.countdown == 0) {
rb_raise(rb_eThreadError, "queue closed");
}
return Qnil;
}
static VALUE
queue_do_push(VALUE self, VALUE obj)
{
// TODO would be nice to not have to use macro to check for correct
// initialization on every single call to push.
rb_ary_push(GET_QUEUE_QUE(self), obj);
wakeup_first_thread(GET_QUEUE_WAITERS(self));
return self;
......
static VALUE
rb_queue_push(VALUE self, VALUE obj)
{
queue_raise_if_closed(self);
return queue_do_push(self, obj);
}
......
return RARRAY_LEN(waiters);
}
/*
* Document-method: Queue#close
* call-seq: close
*
* Closes the queue to producers. A closed queue cannot be re-opened.
*
* After the call to close completes, the following are true:
*
* - closed? will return true
*
* - calling enq/push/<< will raise an exception
*
* - calling deq/pop/shift will return an object from the queue as usual.
*
* - when empty? is true, deq(non_block=false) will not suspend and
* will return nil. deq(non_block=true) will raise an exception.
*
* And for SizedQueue, these will also be true:
*
* - each thread already suspended in enq at the time of the call
* to close will be allowed to push its object as usual.
*
* - empty? will be false when there are either objects in the queue, or
* producers which were suspended at the time of the call to close but whose
* objects are not yet in the queue. Therefore, it can be true (very
* briefly) that empty? == false && size == 0, since size returns the number
* of objects actually in the queue.
*/
static VALUE
rb_queue_close(VALUE self)
{
// Never any pending producers for ordinary queue, so create empty array.
RSTRUCT_SET(self, QUEUE_CLOSED, ary_buf_new());
if (queue_length(self) < queue_num_waiting(self)) {
wakeup_all_threads(GET_QUEUE_WAITERS(self));
}
return self;
}
static VALUE
rb_szqueue_close(VALUE self)
{
VALUE remaining_producers = rb_ary_dup(GET_SZQUEUE_WAITERS(self));
RSTRUCT_SET(self, QUEUE_CLOSED, remaining_producers);
if (queue_length(self) + RARRAY_LEN(remaining_producers) < queue_num_waiting(self)) {
// There may be more consumers waiting than pending items, so wake
// up consumers and some will go away.
// TODO is this only necessary when
// queue_length(self) + // RARRAY_LEN(remaining_producers) == 0 ?
// Because if there's even 1 blocked producer, the countdown will kick
// in, and that culminates in wakeup_all_threads(...)
wakeup_all_threads(GET_QUEUE_WAITERS(self));
}
return self;
}
struct waiting_delete {
VALUE waiting;
VALUE th;
......
static VALUE
queue_do_pop(VALUE self, int should_block)
{
struct queue_shutdown queue_shutdown;
struct waiting_delete args;
args.waiting = GET_QUEUE_WAITERS(self);
args.th = rb_thread_current();
while (queue_length(self) == 0) {
// non-blocking case will use empty?, so that must only return
// true when the queue is really closed, ie not ever any more pending items.
// But in the non-blocking case, there will never be blocked producers, so
// it's not really relevant.
if (!should_block) {
rb_raise(rb_eThreadError, "queue empty");
}
// only checking for shutdown when queue is empty, so less of a
// performance drain.
queue_shutdown = queue_closing(self);
if (queue_shutdown.closed == Qtrue && queue_shutdown.countdown == 0) {
// TODO signal queue closed to consumers
// ie could also raise StopIteration or other exception here.
return Qnil;
}
// put this consumer to sleep until there's an item available
args.waiting = GET_QUEUE_WAITERS(self);
args.th = rb_thread_current();
rb_ary_push(args.waiting, args.th);
rb_ensure(queue_sleep, (VALUE)0, queue_delete_from_waiting, (VALUE)&args);
}
......
}
/*
* Document-method: Queue#closed?
* call-seq: closed?
*
* Returns +true+ if the queue is closed.
*/
static VALUE
rb_queue_closed_p(VALUE self)
{
return RSTRUCT_GET(self, QUEUE_CLOSED) != Qnil ? Qtrue : Qfalse;
}
/*
* Document-method: Queue#empty?
* call-seq: empty?
*
......
static VALUE
rb_queue_empty_p(VALUE self)
{
return queue_length(self) == 0 ? Qtrue : Qfalse;
struct queue_shutdown queue_shutdown;
if (QUEUE_CLOSED_P(self)) {
queue_shutdown = queue_closing(self);
return queue_length(self) + queue_shutdown.countdown == 0 ? Qtrue : Qfalse;
} else {
return queue_length(self) == 0 ? Qtrue : Qfalse;
}
}
/*
......
RSTRUCT_SET(self, QUEUE_QUE, ary_buf_new());
RSTRUCT_SET(self, QUEUE_WAITERS, ary_buf_new());
RSTRUCT_SET(self, QUEUE_CLOSED, Qnil);
RSTRUCT_SET(self, SZQUEUE_WAITERS, ary_buf_new());
RSTRUCT_SET(self, SZQUEUE_MAX, vmax);
......
static VALUE
rb_szqueue_push(int argc, VALUE *argv, VALUE self)
{
struct waiting_delete args;
int should_block = szqueue_push_should_block(argc, argv);
args.waiting = GET_SZQUEUE_WAITERS(self);
args.th = rb_thread_current();
struct waiting_delete args;
queue_raise_if_closed(self);
while (queue_length(self) >= GET_SZQUEUE_ULONGMAX(self)) {
if (!should_block) {
rb_raise(rb_eThreadError, "queue full");
}
// queue is full, so sleep until needed
args.waiting = GET_SZQUEUE_WAITERS(self);
args.th = rb_thread_current();
rb_ary_push(args.waiting, args.th);
// TODO should also handle countdown decrement if the thread is terminated while sleeping.
// TODO shouldn't this be queue_sleep as well?
rb_ensure((VALUE (*)())rb_thread_sleep_deadly, (VALUE)0, queue_delete_from_waiting, (VALUE)&args);
}
return queue_do_push(self, argv[0]);
queue_do_push(self, argv[0]);
// skip this for the normal case, which is queue open
if (QUEUE_CLOSED_P(self)) {
// At this point, we know we've actually pushed an item onto the queue,
// and we're in a closing state, so remove the current thread from the list
// of pending pushers, if it was one to start with.
VALUE removed_thread = rb_ary_delete(GET_QUEUE_CLOSED(self), rb_thread_current());
// was that the last pending thread?
if (removed_thread != Qnil && RARRAY_LEN(GET_QUEUE_CLOSED(self)) == 0) {
// wake all waiting consumers, because there will never be more items
wakeup_all_threads(GET_QUEUE_WAITERS(self));
}
}
return self;
}
static VALUE
......
VALUE rb_cQueue = rb_struct_define_without_accessor_under(
OUTER,
"Queue", rb_cObject, rb_struct_alloc_noinit,
"que", "waiters", NULL);
"que", "waiters", "closed", NULL);
VALUE rb_cSizedQueue = rb_struct_define_without_accessor_under(
OUTER,
"SizedQueue", rb_cQueue, rb_struct_alloc_noinit,
"que", "waiters", "queue_waiters", "size", NULL);
"que", "waiters", "closed", "queue_waiters", "size", NULL);
#if 0
rb_cConditionVariable = rb_define_class("ConditionVariable", rb_cObject); /* teach rdoc ConditionVariable */
......
rb_define_method(rb_cQueue, "clear", rb_queue_clear, 0);
rb_define_method(rb_cQueue, "length", rb_queue_length, 0);
rb_define_method(rb_cQueue, "num_waiting", rb_queue_num_waiting, 0);
rb_define_method(rb_cQueue, "close", rb_queue_close, 0);
rb_define_method(rb_cQueue, "closed?", rb_queue_closed_p, 0);
/* Alias for #push. */
rb_define_alias(rb_cQueue, "enq", "push");
......
rb_define_method(rb_cSizedQueue, "pop", rb_szqueue_pop, -1);
rb_define_method(rb_cSizedQueue, "clear", rb_szqueue_clear, 0);
rb_define_method(rb_cSizedQueue, "num_waiting", rb_szqueue_num_waiting, 0);
rb_define_method(rb_cSizedQueue, "close", rb_szqueue_close, 0);
/* Alias for #push. */
rb_define_alias(rb_cSizedQueue, "enq", "push");
(2-2/4)