Feature #10600 » queue-close.diff
ext/thread/thread.c | ||
---|---|---|
enum {
|
||
QUEUE_QUE = 0,
|
||
QUEUE_WAITERS = 1,
|
||
SZQUEUE_WAITERS = 2,
|
||
SZQUEUE_MAX = 3
|
||
QUEUE_CLOSED = 2,
|
||
SZQUEUE_WAITERS = 3,
|
||
SZQUEUE_MAX = 4
|
||
};
|
||
#define GET_CONDVAR_WAITERS(cv) get_array((cv), CONDVAR_WAITERS)
|
||
... | ... | |
{
|
||
RSTRUCT_SET(self, QUEUE_QUE, ary_buf_new());
|
||
RSTRUCT_SET(self, QUEUE_WAITERS, ary_buf_new());
|
||
RSTRUCT_SET(self, QUEUE_CLOSED, Qfalse);
|
||
return self;
|
||
}
|
||
static VALUE
|
||
queue_raise_if_closed(VALUE self)
|
||
{
|
||
if (RSTRUCT_GET(self, QUEUE_CLOSED) == Qtrue ) {
|
||
rb_raise(rb_eThreadError, "queue closed");
|
||
}
|
||
return Qnil;
|
||
}
|
||
static VALUE
|
||
queue_do_push(VALUE self, VALUE obj)
|
||
{
|
||
rb_ary_push(GET_QUEUE_QUE(self), obj);
|
||
... | ... | |
static VALUE
|
||
rb_queue_push(VALUE self, VALUE obj)
|
||
{
|
||
queue_raise_if_closed(self);
|
||
return queue_do_push(self, obj);
|
||
}
|
||
... | ... | |
return RARRAY_LEN(waiters);
|
||
}
|
||
/*
|
||
* Document-method: Queue#close
|
||
* call-seq: close
|
||
*
|
||
* Closes the queue.
|
||
*
|
||
* Once the queue is closed, enq will raise an exception, although remaining
|
||
* items in the queue can be deq'd as usual. Any threads waiting on
|
||
* pop(non_block=false) at the time the queue is closed will be woken and
|
||
* given a nil.
|
||
*
|
||
* Once a closed queue is empty, deq with non_block=false will always return
|
||
* nil.
|
||
*
|
||
* A closed queue cannot be reopened.
|
||
*/
|
||
static VALUE
|
||
rb_queue_close(VALUE self)
|
||
{
|
||
RSTRUCT_SET(self, QUEUE_CLOSED, Qtrue);
|
||
if (queue_num_waiting(self) > 0) {
|
||
wakeup_all_threads(GET_QUEUE_WAITERS(self));
|
||
}
|
||
return self;
|
||
}
|
||
struct waiting_delete {
|
||
VALUE waiting;
|
||
VALUE th;
|
||
... | ... | |
if (!should_block) {
|
||
rb_raise(rb_eThreadError, "queue empty");
|
||
}
|
||
if (RSTRUCT_GET(self,QUEUE_CLOSED) == Qtrue) {
|
||
return Qnil;
|
||
}
|
||
rb_ary_push(args.waiting, args.th);
|
||
rb_ensure(queue_sleep, (VALUE)0, queue_delete_from_waiting, (VALUE)&args);
|
||
}
|
||
... | ... | |
}
|
||
/*
|
||
* Document-method: Queue#closed?
|
||
* call-seq: closed?
|
||
*
|
||
* Returns +true+ if the queue is closed.
|
||
*/
|
||
static VALUE
|
||
rb_queue_closed_p(VALUE self)
|
||
{
|
||
return RSTRUCT_GET(self, QUEUE_CLOSED);
|
||
}
|
||
/*
|
||
* Document-method: Queue#empty?
|
||
* call-seq: empty?
|
||
*
|
||
... | ... | |
RSTRUCT_SET(self, QUEUE_QUE, ary_buf_new());
|
||
RSTRUCT_SET(self, QUEUE_WAITERS, ary_buf_new());
|
||
RSTRUCT_SET(self, QUEUE_CLOSED, Qfalse);
|
||
RSTRUCT_SET(self, SZQUEUE_WAITERS, ary_buf_new());
|
||
RSTRUCT_SET(self, SZQUEUE_MAX, vmax);
|
||
... | ... | |
{
|
||
struct waiting_delete args;
|
||
int should_block = szqueue_push_should_block(argc, argv);
|
||
queue_raise_if_closed(self);
|
||
args.waiting = GET_SZQUEUE_WAITERS(self);
|
||
args.th = rb_thread_current();
|
||
... | ... | |
VALUE rb_cQueue = rb_struct_define_without_accessor_under(
|
||
OUTER,
|
||
"Queue", rb_cObject, rb_struct_alloc_noinit,
|
||
"que", "waiters", NULL);
|
||
"que", "waiters", "closed", NULL);
|
||
VALUE rb_cSizedQueue = rb_struct_define_without_accessor_under(
|
||
OUTER,
|
||
"SizedQueue", rb_cQueue, rb_struct_alloc_noinit,
|
||
"que", "waiters", "queue_waiters", "size", NULL);
|
||
"que", "waiters", "closed", "queue_waiters", "size", NULL);
|
||
#if 0
|
||
rb_cConditionVariable = rb_define_class("ConditionVariable", rb_cObject); /* teach rdoc ConditionVariable */
|
||
... | ... | |
rb_define_method(rb_cQueue, "clear", rb_queue_clear, 0);
|
||
rb_define_method(rb_cQueue, "length", rb_queue_length, 0);
|
||
rb_define_method(rb_cQueue, "num_waiting", rb_queue_num_waiting, 0);
|
||
rb_define_method(rb_cQueue, "close", rb_queue_close, 0);
|
||
rb_define_method(rb_cQueue, "closed?", rb_queue_closed_p, 0);
|
||
/* Alias for #push. */
|
||
rb_define_alias(rb_cQueue, "enq", "push");
|
test/thread/test_queue.rb | ||
---|---|---|
Marshal.dump(q)
|
||
end
|
||
end
|
||
def test_close
|
||
[->{Queue.new}, ->{SizedQueue.new 3}].each do |qcreate|
|
||
q = qcreate[]
|
||
assert_equal false, q.closed?
|
||
q << :something
|
||
q.close
|
||
assert_equal true, q.closed?
|
||
assert_raise_with_message(ThreadError, /closed/){q << :nothing}
|
||
assert_equal q.pop, :something
|
||
assert_equal q.pop, nil
|
||
end
|
||
end
|
||
def test_close_wakeup
|
||
[->{Queue.new}, ->{SizedQueue.new 3}].each do |qcreate|
|
||
q = qcreate[]
|
||
threads = []
|
||
4.times{threads << Thread.new{q.pop}}
|
||
3.times{|i| q << i}
|
||
sleep 0.01
|
||
assert_equal 1, threads.count{|thr| thr.alive? && thr.stop?}
|
||
q.close
|
||
assert_equal 0, threads.count{|thr| thr.alive? && thr.stop?}
|
||
end
|
||
end
|
||
end
|