Feature #10600 » queue-close.diff
| ext/thread/thread.c | ||
|---|---|---|
|
enum {
|
||
|
QUEUE_QUE = 0,
|
||
|
QUEUE_WAITERS = 1,
|
||
|
SZQUEUE_WAITERS = 2,
|
||
|
SZQUEUE_MAX = 3
|
||
|
QUEUE_CLOSED = 2,
|
||
|
SZQUEUE_WAITERS = 3,
|
||
|
SZQUEUE_MAX = 4
|
||
|
};
|
||
|
#define GET_CONDVAR_WAITERS(cv) get_array((cv), CONDVAR_WAITERS)
|
||
| ... | ... | |
|
{
|
||
|
RSTRUCT_SET(self, QUEUE_QUE, ary_buf_new());
|
||
|
RSTRUCT_SET(self, QUEUE_WAITERS, ary_buf_new());
|
||
|
RSTRUCT_SET(self, QUEUE_CLOSED, Qfalse);
|
||
|
return self;
|
||
|
}
|
||
|
static VALUE
|
||
|
queue_raise_if_closed(VALUE self)
|
||
|
{
|
||
|
if (RSTRUCT_GET(self, QUEUE_CLOSED) == Qtrue ) {
|
||
|
rb_raise(rb_eThreadError, "queue closed");
|
||
|
}
|
||
|
return Qnil;
|
||
|
}
|
||
|
static VALUE
|
||
|
queue_do_push(VALUE self, VALUE obj)
|
||
|
{
|
||
|
rb_ary_push(GET_QUEUE_QUE(self), obj);
|
||
| ... | ... | |
|
static VALUE
|
||
|
rb_queue_push(VALUE self, VALUE obj)
|
||
|
{
|
||
|
queue_raise_if_closed(self);
|
||
|
return queue_do_push(self, obj);
|
||
|
}
|
||
| ... | ... | |
|
return RARRAY_LEN(waiters);
|
||
|
}
|
||
|
/*
|
||
|
* Document-method: Queue#close
|
||
|
* call-seq: close
|
||
|
*
|
||
|
* Closes the queue.
|
||
|
*
|
||
|
* Once the queue is closed, enq will raise an exception, although remaining
|
||
|
* items in the queue can be deq'd as usual. Any threads waiting on
|
||
|
* pop(non_block=false) at the time the queue is closed will be woken and
|
||
|
* given a nil.
|
||
|
*
|
||
|
* Once a closed queue is empty, deq with non_block=false will always return
|
||
|
* nil.
|
||
|
*
|
||
|
* A closed queue cannot be reopened.
|
||
|
*/
|
||
|
static VALUE
|
||
|
rb_queue_close(VALUE self)
|
||
|
{
|
||
|
RSTRUCT_SET(self, QUEUE_CLOSED, Qtrue);
|
||
|
if (queue_num_waiting(self) > 0) {
|
||
|
wakeup_all_threads(GET_QUEUE_WAITERS(self));
|
||
|
}
|
||
|
return self;
|
||
|
}
|
||
|
struct waiting_delete {
|
||
|
VALUE waiting;
|
||
|
VALUE th;
|
||
| ... | ... | |
|
if (!should_block) {
|
||
|
rb_raise(rb_eThreadError, "queue empty");
|
||
|
}
|
||
|
if (RSTRUCT_GET(self,QUEUE_CLOSED) == Qtrue) {
|
||
|
return Qnil;
|
||
|
}
|
||
|
rb_ary_push(args.waiting, args.th);
|
||
|
rb_ensure(queue_sleep, (VALUE)0, queue_delete_from_waiting, (VALUE)&args);
|
||
|
}
|
||
| ... | ... | |
|
}
|
||
|
/*
|
||
|
* Document-method: Queue#closed?
|
||
|
* call-seq: closed?
|
||
|
*
|
||
|
* Returns +true+ if the queue is closed.
|
||
|
*/
|
||
|
static VALUE
|
||
|
rb_queue_closed_p(VALUE self)
|
||
|
{
|
||
|
return RSTRUCT_GET(self, QUEUE_CLOSED);
|
||
|
}
|
||
|
/*
|
||
|
* Document-method: Queue#empty?
|
||
|
* call-seq: empty?
|
||
|
*
|
||
| ... | ... | |
|
RSTRUCT_SET(self, QUEUE_QUE, ary_buf_new());
|
||
|
RSTRUCT_SET(self, QUEUE_WAITERS, ary_buf_new());
|
||
|
RSTRUCT_SET(self, QUEUE_CLOSED, Qfalse);
|
||
|
RSTRUCT_SET(self, SZQUEUE_WAITERS, ary_buf_new());
|
||
|
RSTRUCT_SET(self, SZQUEUE_MAX, vmax);
|
||
| ... | ... | |
|
{
|
||
|
struct waiting_delete args;
|
||
|
int should_block = szqueue_push_should_block(argc, argv);
|
||
|
queue_raise_if_closed(self);
|
||
|
args.waiting = GET_SZQUEUE_WAITERS(self);
|
||
|
args.th = rb_thread_current();
|
||
| ... | ... | |
|
VALUE rb_cQueue = rb_struct_define_without_accessor_under(
|
||
|
OUTER,
|
||
|
"Queue", rb_cObject, rb_struct_alloc_noinit,
|
||
|
"que", "waiters", NULL);
|
||
|
"que", "waiters", "closed", NULL);
|
||
|
VALUE rb_cSizedQueue = rb_struct_define_without_accessor_under(
|
||
|
OUTER,
|
||
|
"SizedQueue", rb_cQueue, rb_struct_alloc_noinit,
|
||
|
"que", "waiters", "queue_waiters", "size", NULL);
|
||
|
"que", "waiters", "closed", "queue_waiters", "size", NULL);
|
||
|
#if 0
|
||
|
rb_cConditionVariable = rb_define_class("ConditionVariable", rb_cObject); /* teach rdoc ConditionVariable */
|
||
| ... | ... | |
|
rb_define_method(rb_cQueue, "clear", rb_queue_clear, 0);
|
||
|
rb_define_method(rb_cQueue, "length", rb_queue_length, 0);
|
||
|
rb_define_method(rb_cQueue, "num_waiting", rb_queue_num_waiting, 0);
|
||
|
rb_define_method(rb_cQueue, "close", rb_queue_close, 0);
|
||
|
rb_define_method(rb_cQueue, "closed?", rb_queue_closed_p, 0);
|
||
|
/* Alias for #push. */
|
||
|
rb_define_alias(rb_cQueue, "enq", "push");
|
||
| test/thread/test_queue.rb | ||
|---|---|---|
|
Marshal.dump(q)
|
||
|
end
|
||
|
end
|
||
|
def test_close
|
||
|
[->{Queue.new}, ->{SizedQueue.new 3}].each do |qcreate|
|
||
|
q = qcreate[]
|
||
|
assert_equal false, q.closed?
|
||
|
q << :something
|
||
|
q.close
|
||
|
assert_equal true, q.closed?
|
||
|
assert_raise_with_message(ThreadError, /closed/){q << :nothing}
|
||
|
assert_equal q.pop, :something
|
||
|
assert_equal q.pop, nil
|
||
|
end
|
||
|
end
|
||
|
def test_close_wakeup
|
||
|
[->{Queue.new}, ->{SizedQueue.new 3}].each do |qcreate|
|
||
|
q = qcreate[]
|
||
|
threads = []
|
||
|
4.times{threads << Thread.new{q.pop}}
|
||
|
3.times{|i| q << i}
|
||
|
sleep 0.01
|
||
|
assert_equal 1, threads.count{|thr| thr.alive? && thr.stop?}
|
||
|
q.close
|
||
|
assert_equal 0, threads.count{|thr| thr.alive? && thr.stop?}
|
||
|
end
|
||
|
end
|
||
|
end
|
||