Feature #10600 » patch-25f99aef.diff
ext/thread/thread.c | ||
---|---|---|
enum {
|
||
QUEUE_QUE = 0,
|
||
QUEUE_WAITERS = 1,
|
||
SZQUEUE_WAITERS = 2,
|
||
SZQUEUE_MAX = 3
|
||
QUEUE_PENDING = 2,
|
||
QUEUE_TOKEN = 3,
|
||
SZQUEUE_WAITERS = 4,
|
||
SZQUEUE_MAX = 5
|
||
};
|
||
#define GET_CONDVAR_WAITERS(cv) get_array((cv), CONDVAR_WAITERS)
|
||
#define GET_QUEUE_QUE(q) get_array((q), QUEUE_QUE)
|
||
#define GET_QUEUE_WAITERS(q) get_array((q), QUEUE_WAITERS)
|
||
#define GET_QUEUE_TOKEN(q) get_array((q), QUEUE_TOKEN)
|
||
#define GET_SZQUEUE_WAITERS(q) get_array((q), SZQUEUE_WAITERS)
|
||
#define GET_SZQUEUE_MAX(q) RSTRUCT_GET((q), SZQUEUE_MAX)
|
||
#define GET_SZQUEUE_ULONGMAX(q) NUM2ULONG(GET_SZQUEUE_MAX(q))
|
||
/*
|
||
QUEUE_PENDING is the collection producer threads at the point where close
|
||
was called. Can't use SZQUEUE_WAITERS for this because
|
||
rb_ensure(rb_thread_sleep_deadly ...) is not atomic with respect to
|
||
add/remove of threads in SZQUEUE_WAITERS.
|
||
*/
|
||
#define GET_QUEUE_PENDING(q) get_array((q), QUEUE_PENDING)
|
||
/* Has the close method been called? */
|
||
#define QUEUE_CLOSED_P(self) RARRAY_LEN(GET_QUEUE_TOKEN(self))
|
||
static VALUE
|
||
get_array(VALUE obj, int idx)
|
||
{
|
||
... | ... | |
*
|
||
* This class provides a way to synchronize communication between threads.
|
||
*
|
||
* Example:
|
||
*
|
||
* require 'thread'
|
||
* queue = Queue.new
|
||
*
|
||
* producer = Thread.new do
|
||
* 5.times do |i|
|
||
* sleep rand(i) # simulate expense
|
||
* queue << i
|
||
* puts "#{i} produced"
|
||
* end
|
||
* end
|
||
*
|
||
* consumer = Thread.new do
|
||
* 5.times do |i|
|
||
* value = queue.pop
|
||
* sleep rand(i/2) # simulate expense
|
||
* puts "consumed #{value}"
|
||
* end
|
||
* end
|
||
* Example using close(StopIteration):
|
||
*
|
||
* require 'thread'
|
||
* queue = Queue.new
|
||
*
|
||
* consumers = a_few.times.map do
|
||
* Thread.new do
|
||
* loop{ do_something_with queue.pop}
|
||
* end
|
||
* end
|
||
*
|
||
* producers = some.times.map do
|
||
* Thread.new do
|
||
* several.times{ q << something_interesting }
|
||
* end
|
||
* end
|
||
* producers.each(&:join)
|
||
* q.close StopIteration
|
||
*
|
||
*
|
||
* Example using nil to close queue:
|
||
*
|
||
* require 'thread'
|
||
* queue = Queue.new
|
||
*
|
||
* consumers = a_few.times.map do
|
||
* Thread.new do
|
||
* while item = queue.pop
|
||
* do_something_with item
|
||
* end
|
||
* end
|
||
* end
|
||
*
|
||
* producers = some.times.map do
|
||
* Thread.new do
|
||
* several.times{ q << something_interesting }
|
||
* end
|
||
* end
|
||
* producers.each(&:join)
|
||
* q.close
|
||
*
|
||
*
|
||
* Example using traditional consumer <-> producer coupling (1 producer, 1 consumer):
|
||
*
|
||
* require 'thread'
|
||
* queue = Queue.new
|
||
*
|
||
* producer = Thread.new do
|
||
* 5.times do |i|
|
||
* sleep rand(i) # simulate expense
|
||
* queue << i
|
||
* puts "#{i} produced"
|
||
* end
|
||
* end
|
||
*
|
||
* consumer = Thread.new do
|
||
* 5.times do |i|
|
||
* value = queue.pop
|
||
* sleep rand(i/2) # simulate expense
|
||
* puts "consumed #{value}"
|
||
* end
|
||
* end
|
||
*
|
||
* Example using queue poison token (1 producer, 1 consumer):
|
||
*
|
||
* require 'thread'
|
||
* queue = Queue.new
|
||
*
|
||
* producer = Thread.new do
|
||
* 5.times do |i|
|
||
* sleep rand(i) # simulate expense
|
||
* queue << i
|
||
* puts "#{i} produced"
|
||
* end
|
||
* q << :poison
|
||
* end
|
||
*
|
||
* consumer = Thread.new do
|
||
* while item = queue.pop
|
||
* break if item == :poison
|
||
* sleep rand(i/2) # simulate expense
|
||
* puts "consumed #{value}"
|
||
* end
|
||
* end
|
||
*
|
||
*/
|
||
... | ... | |
{
|
||
RSTRUCT_SET(self, QUEUE_QUE, ary_buf_new());
|
||
RSTRUCT_SET(self, QUEUE_WAITERS, ary_buf_new());
|
||
RSTRUCT_SET(self, QUEUE_PENDING, ary_buf_new());
|
||
RSTRUCT_SET(self, QUEUE_TOKEN, ary_buf_new());
|
||
return self;
|
||
}
|
||
static VALUE
|
||
queue_raise_if_closed(VALUE self)
|
||
{
|
||
if (QUEUE_CLOSED_P(self)) {
|
||
rb_raise(rb_eThreadError, "queue closed");
|
||
}
|
||
return Qnil;
|
||
}
|
||
static VALUE
|
||
queue_do_push(VALUE self, VALUE obj)
|
||
{
|
||
/* TODO would be nice to not have to use macro to check for correct
|
||
initialization on every single call to push. */
|
||
rb_ary_push(GET_QUEUE_QUE(self), obj);
|
||
wakeup_first_thread(GET_QUEUE_WAITERS(self));
|
||
return self;
|
||
... | ... | |
static VALUE
|
||
rb_queue_push(VALUE self, VALUE obj)
|
||
{
|
||
queue_raise_if_closed(self);
|
||
return queue_do_push(self, obj);
|
||
}
|
||
... | ... | |
return RARRAY_LEN(waiters);
|
||
}
|
||
/*
|
||
Do close accounting.
|
||
1) Set CLOSE_TOKEN
|
||
2) copy pending producers to QUEUE_PENDING. This is to work around
|
||
SZQUEUE_WAITERS updating non-atomically in the thread
|
||
wake/sleep handling.
|
||
CLOSE_TOKEN contains an array with 0 or 1 elements. This is to allow the
|
||
queue to be closed several times, provided the same value is used each
|
||
time, and having an array makes that test easier.
|
||
*/
|
||
static VALUE
|
||
queue_set_close_state(int argc, VALUE *argv, VALUE self, VALUE pending_producers)
|
||
{
|
||
VALUE close_token = Qnil;
|
||
VALUE existing_close_token = Qnil;
|
||
VALUE close_token_ary = GET_QUEUE_TOKEN(self);
|
||
rb_check_arity(argc, 0, 1);
|
||
/* handle arg defaults, and conversion to an Exception instance. */
|
||
if (argc > 0) {
|
||
close_token = argv[0];
|
||
if (rb_obj_is_kind_of(close_token,rb_cClass) && rb_class_inherited_p(close_token,rb_eException)) {
|
||
close_token = rb_exc_new2(close_token, "queue closed");
|
||
}
|
||
}
|
||
/* Allow close to be called several times, with the same argument (same-ness defined by ==). */
|
||
if (RARRAY_LEN(close_token_ary) == 0) {
|
||
rb_ary_store(close_token_ary, 0, close_token);
|
||
/* Start accounting for pending producers. Can only do this once. */
|
||
rb_ary_concat(GET_QUEUE_PENDING(self), pending_producers);
|
||
return close_token;
|
||
} else {
|
||
existing_close_token = RARRAY_AREF(close_token_ary, 0);
|
||
if (!rb_eql(existing_close_token, close_token)) {
|
||
rb_raise(rb_eThreadError, "already closed with %"PRIsVALUE, rb_inspect(existing_close_token));
|
||
}
|
||
return existing_close_token;
|
||
}
|
||
}
|
||
/*
|
||
* Document-method: Queue#close
|
||
* call-seq:
|
||
* close(token=nil)
|
||
*
|
||
* Closes the queue to producers. A closed queue cannot be re-opened.
|
||
*
|
||
* +token+ can be any object, or an instance of an Exception subclass, or an
|
||
* Exception subclass. +close+ can be called repeatedly on a single queue as
|
||
* long as the same (defined by ==) +token+ is used.
|
||
*
|
||
* After the call to close completes, the following are true:
|
||
*
|
||
* - +closed?+ will return true
|
||
*
|
||
* - calling enq/push/<< will raise ThreadError('queue closed')
|
||
*
|
||
* - when +empty?+ is false, calling deq/pop/shift will return an object from the queue as usual.
|
||
*
|
||
* - when +empty?+ is true, deq(non_block=false) will not suspend and
|
||
* will either return the +token+, or raise if +token+ was an exception
|
||
* instance or class. deq(non_block=true) will ignore the parameter and
|
||
* raise a ThreadError('queue empty').
|
||
*
|
||
* And for SizedQueue, these will also be true:
|
||
*
|
||
* - each thread already suspended in enq at the time of the call
|
||
* to close will be allowed to push its object as usual.
|
||
*
|
||
* - +empty?+ will be false when there are either objects in the queue, or
|
||
* producers which were suspended at the time of the call to +close+ but whose
|
||
* objects are not yet in the queue. Therefore, it can be true (very
|
||
* briefly) that empty? == false && size == 0, since +size+ returns the number
|
||
* of objects actually in the queue.
|
||
*/
|
||
static VALUE
|
||
rb_queue_close(int argc, VALUE *argv, VALUE self)
|
||
{
|
||
/* Never any pending producers for ordinary queue, so pending is empty. */
|
||
queue_set_close_state(argc, argv, self, ary_buf_new());
|
||
if (queue_length(self) < queue_num_waiting(self)) {
|
||
wakeup_all_threads(GET_QUEUE_WAITERS(self));
|
||
}
|
||
return self;
|
||
}
|
||
static VALUE
|
||
rb_szqueue_close(int argc, VALUE *argv, VALUE self)
|
||
{
|
||
queue_set_close_state(argc, argv, self, GET_SZQUEUE_WAITERS(self));
|
||
if (RARRAY_LEN(GET_QUEUE_PENDING(self)) == 0) {
|
||
/* Wake up all consumers because there will be never be more items. */
|
||
wakeup_all_threads(GET_QUEUE_WAITERS(self));
|
||
}
|
||
return self;
|
||
}
|
||
struct waiting_delete {
|
||
VALUE waiting;
|
||
VALUE th;
|
||
VALUE pending;
|
||
VALUE self;
|
||
};
|
||
static VALUE
|
||
queue_delete_from_waiting(struct waiting_delete *p)
|
||
{
|
||
/* The waiting queues */
|
||
rb_ary_delete(p->waiting, p->th);
|
||
return Qnil;
|
||
}
|
||
static ID id_status;
|
||
static VALUE
|
||
szqueue_delete_from_waiting(struct waiting_delete *p)
|
||
{
|
||
/* The waiting queues */
|
||
rb_ary_delete(p->waiting, p->th);
|
||
/* Handle the post-close pending producers on thread abort. Only applies to SizedQueue. */
|
||
if (QUEUE_CLOSED_P(p->self)) {
|
||
/* TODO This is not great, but all the better ways seem to involved rb_thread_t.status
|
||
And it won't get in the way of normal queue operation. */
|
||
if (strcmp("aborting",RSTRING_PTR(rb_funcall(p->th,id_status,0))) == 0) {
|
||
rb_ary_delete(p->pending, p->th);
|
||
}
|
||
}
|
||
return Qnil;
|
||
}
|
||
... | ... | |
}
|
||
static VALUE
|
||
queue_close_pop(VALUE self)
|
||
{
|
||
VALUE token = RARRAY_AREF(GET_QUEUE_TOKEN(self),0);
|
||
if (rb_obj_is_kind_of(token,rb_eException)) {
|
||
rb_exc_raise(token);
|
||
}
|
||
return token;
|
||
}
|
||
static VALUE
|
||
queue_do_pop(VALUE self, int should_block)
|
||
{
|
||
struct waiting_delete args;
|
||
args.waiting = GET_QUEUE_WAITERS(self);
|
||
args.th = rb_thread_current();
|
||
while (queue_length(self) == 0) {
|
||
if (!should_block) {
|
||
rb_raise(rb_eThreadError, "queue empty");
|
||
}
|
||
if (QUEUE_CLOSED_P(self) && RARRAY_LEN(GET_QUEUE_PENDING(self)) == 0) {
|
||
return queue_close_pop(self);
|
||
}
|
||
args.waiting = GET_QUEUE_WAITERS(self);
|
||
args.th = rb_thread_current();
|
||
args.pending = Qnil;
|
||
args.self = Qnil;
|
||
rb_ary_push(args.waiting, args.th);
|
||
rb_ensure(queue_sleep, (VALUE)0, queue_delete_from_waiting, (VALUE)&args);
|
||
}
|
||
... | ... | |
}
|
||
/*
|
||
* Document-method: Queue#closed?
|
||
* call-seq: closed?
|
||
*
|
||
* Returns +true+ if the queue is closed.
|
||
*/
|
||
static VALUE
|
||
rb_queue_closed_p(VALUE self)
|
||
{
|
||
return QUEUE_CLOSED_P(self) ? Qtrue : Qfalse;
|
||
}
|
||
/*
|
||
* Document-method: Queue#empty?
|
||
* call-seq: empty?
|
||
*
|
||
... | ... | |
static VALUE
|
||
rb_queue_empty_p(VALUE self)
|
||
{
|
||
return queue_length(self) == 0 ? Qtrue : Qfalse;
|
||
unsigned long items = queue_length(self);
|
||
if (QUEUE_CLOSED_P(self)) {
|
||
/* Add number of known pending items. */
|
||
items += RARRAY_LEN(GET_QUEUE_PENDING(self));
|
||
}
|
||
return items == 0 ? Qtrue : Qfalse;
|
||
}
|
||
/*
|
||
... | ... | |
/*
|
||
* Document-class: SizedQueue
|
||
*
|
||
* This class represents queues of specified size capacity. The push operation
|
||
* may be blocked if the capacity is full.
|
||
* This class represents queues of specified size capacity, also known as a
|
||
* bounded queue. The push operation may be blocked if the capacity is full.
|
||
*
|
||
* See Queue for an example of how a SizedQueue works.
|
||
*/
|
||
... | ... | |
static VALUE
|
||
rb_szqueue_initialize(VALUE self, VALUE vmax)
|
||
{
|
||
long max;
|
||
max = NUM2LONG(vmax);
|
||
long max = NUM2LONG(vmax);
|
||
if (max <= 0) {
|
||
rb_raise(rb_eArgError, "queue size must be positive");
|
||
}
|
||
RSTRUCT_SET(self, QUEUE_QUE, ary_buf_new());
|
||
RSTRUCT_SET(self, QUEUE_WAITERS, ary_buf_new());
|
||
RSTRUCT_SET(self, QUEUE_PENDING, ary_buf_new());
|
||
RSTRUCT_SET(self, QUEUE_TOKEN, ary_buf_new());
|
||
RSTRUCT_SET(self, SZQUEUE_WAITERS, ary_buf_new());
|
||
RSTRUCT_SET(self, SZQUEUE_MAX, vmax);
|
||
... | ... | |
*
|
||
* If there is no space left in the queue, waits until space becomes
|
||
* available, unless +non_block+ is true. If +non_block+ is true, the
|
||
* thread isn't suspended, and an exception is raised.
|
||
* thread isn't suspended, and ThreadError('queue full') is raised.
|
||
*/
|
||
static VALUE
|
||
rb_szqueue_push(int argc, VALUE *argv, VALUE self)
|
||
{
|
||
struct waiting_delete args;
|
||
int should_block = szqueue_push_should_block(argc, argv);
|
||
args.waiting = GET_SZQUEUE_WAITERS(self);
|
||
args.th = rb_thread_current();
|
||
struct waiting_delete args;
|
||
queue_raise_if_closed(self);
|
||
while (queue_length(self) >= GET_SZQUEUE_ULONGMAX(self)) {
|
||
if (!should_block) {
|
||
rb_raise(rb_eThreadError, "queue full");
|
||
}
|
||
args.waiting = GET_SZQUEUE_WAITERS(self);
|
||
args.th = rb_thread_current();
|
||
args.pending = GET_QUEUE_PENDING(self);
|
||
args.self = self;
|
||
rb_ary_push(args.waiting, args.th);
|
||
rb_ensure((VALUE (*)())rb_thread_sleep_deadly, (VALUE)0, queue_delete_from_waiting, (VALUE)&args);
|
||
rb_ensure(queue_sleep, (VALUE)0, szqueue_delete_from_waiting, (VALUE)&args);
|
||
}
|
||
return queue_do_push(self, argv[0]);
|
||
queue_do_push(self, argv[0]);
|
||
if (QUEUE_CLOSED_P(self)) {
|
||
VALUE removed_thread = rb_ary_delete(GET_QUEUE_PENDING(self), rb_thread_current());
|
||
if (removed_thread != Qnil && RARRAY_LEN(GET_QUEUE_PENDING(self)) == 0) {
|
||
/* wake all waiting consumers, because there will never be more items. */
|
||
wakeup_all_threads(GET_QUEUE_WAITERS(self));
|
||
}
|
||
}
|
||
return self;
|
||
}
|
||
static VALUE
|
||
... | ... | |
* Retrieves data from the queue.
|
||
*
|
||
* If the queue is empty, the calling thread is suspended until data is pushed
|
||
* onto the queue. If +non_block+ is true, the thread isn't suspended, and an
|
||
* exception is raised.
|
||
* onto the queue. If +non_block+ is true, the thread isn't suspended, and
|
||
* ThreadError('queue empty') is raised.
|
||
*/
|
||
static VALUE
|
||
... | ... | |
VALUE rb_cQueue = rb_struct_define_without_accessor_under(
|
||
OUTER,
|
||
"Queue", rb_cObject, rb_struct_alloc_noinit,
|
||
"que", "waiters", NULL);
|
||
"que", "waiters", "pending", "token", NULL);
|
||
VALUE rb_cSizedQueue = rb_struct_define_without_accessor_under(
|
||
OUTER,
|
||
"SizedQueue", rb_cQueue, rb_struct_alloc_noinit,
|
||
"que", "waiters", "queue_waiters", "size", NULL);
|
||
"que", "waiters", "pending", "token", "queue_waiters", "size", NULL);
|
||
#if 0
|
||
rb_cConditionVariable = rb_define_class("ConditionVariable", rb_cObject); /* teach rdoc ConditionVariable */
|
||
... | ... | |
#endif
|
||
id_sleep = rb_intern("sleep");
|
||
id_status = rb_intern("status");
|
||
rb_define_method(rb_cConditionVariable, "initialize", rb_condvar_initialize, 0);
|
||
rb_undef_method(rb_cConditionVariable, "initialize_copy");
|
||
... | ... | |
rb_define_method(rb_cQueue, "clear", rb_queue_clear, 0);
|
||
rb_define_method(rb_cQueue, "length", rb_queue_length, 0);
|
||
rb_define_method(rb_cQueue, "num_waiting", rb_queue_num_waiting, 0);
|
||
rb_define_method(rb_cQueue, "close", rb_queue_close, -1);
|
||
rb_define_method(rb_cQueue, "closed?", rb_queue_closed_p, 0);
|
||
/* Alias for #push. */
|
||
rb_define_alias(rb_cQueue, "enq", "push");
|
||
... | ... | |
rb_define_method(rb_cSizedQueue, "pop", rb_szqueue_pop, -1);
|
||
rb_define_method(rb_cSizedQueue, "clear", rb_szqueue_clear, 0);
|
||
rb_define_method(rb_cSizedQueue, "num_waiting", rb_szqueue_num_waiting, 0);
|
||
rb_define_method(rb_cSizedQueue, "close", rb_szqueue_close, -1);
|
||
/* Alias for #push. */
|
||
rb_define_alias(rb_cSizedQueue, "enq", "push");
|
test/thread/test_queue.rb | ||
---|---|---|
}
|
||
}.join
|
||
# close the queue the old way to test for backwards-compatibility
|
||
num_threads.times { to_workers.push nil }
|
||
workers.each { |t| t.join }
|
||
... | ... | |
Marshal.dump(q)
|
||
end
|
||
end
|
||
def test_close
|
||
[->{Queue.new}, ->{SizedQueue.new 3}].each do |qcreate|
|
||
q = qcreate.call
|
||
assert_equal false, q.closed?
|
||
q << :something
|
||
assert_equal q, q.close
|
||
assert q.closed?
|
||
assert_raise_with_message(ThreadError, /closed/){q << :nothing}
|
||
assert_equal q.pop, :something
|
||
assert_nil q.pop
|
||
# non-blocking
|
||
assert_raise_with_message(ThreadError, /queue empty/){q.pop(non_block=true)}
|
||
end
|
||
end
|
||
# test that waiting producers are woken up on close
|
||
def close_wakeup( num_items, num_threads, &qcreate )
|
||
raise "This test won't work with num_items(#{num_items}) >= num_threads(#{num_threads})" if num_items >= num_threads
|
||
# create the Queue
|
||
q = yield
|
||
threads = num_threads.times.map{Thread.new{q.pop}}
|
||
num_items.times{|i| q << i}
|
||
# wait until queue empty
|
||
(Thread.pass; sleep 0.01) until q.size == 0
|
||
# now there should be some waiting consumers
|
||
assert_equal num_threads - num_items, threads.count{|thr| thr.alive? && thr.stop?}
|
||
# tell them all to go away
|
||
q.close
|
||
# wait for them to go away
|
||
Thread.pass until threads.all?{|thr| thr.status == false}
|
||
# check that they've gone away. Convert nil to -1 so we can sort and do the comparison
|
||
expected_values = [-1] * (num_threads - num_items) + num_items.times.to_a
|
||
assert_equal expected_values, threads.map{|thr| thr.value || -1 }.sort
|
||
end
|
||
def test_queue_close_wakeup
|
||
close_wakeup(15, 18){Queue.new}
|
||
end
|
||
def test_size_queue_close_wakeup
|
||
close_wakeup(5, 8){SizedQueue.new 9}
|
||
end
|
||
def test_sized_queue_closed_multi_interrupt
|
||
q = SizedQueue.new 1
|
||
q << :one
|
||
prod_threads = 32.times.map{|i| Thread.new{ q << i}}
|
||
sleep 0.01 until prod_threads.all?{|thr| thr.stop?}
|
||
more_threads = []
|
||
q.close
|
||
prod_threads.each{|thr| more_threads << Thread.new{ thr.kill.join }}
|
||
more_threads.each &:join
|
||
assert_equal 1, q.size
|
||
assert_equal :one, q.pop
|
||
assert q.empty?, "queue not empty"
|
||
end
|
||
# this might be unnecessary, not sure
|
||
def test_sized_queue_two_closed_interrupt
|
||
q = SizedQueue.new 1
|
||
q << :one
|
||
t1 = Thread.new { q << :two }
|
||
t2 = Thread.new { q << :tre }
|
||
sleep 0.01 until t1.stop? && t2.stop?
|
||
q.close
|
||
t1.kill.join
|
||
assert_equal 1, q.size
|
||
assert_equal :one, q.pop
|
||
assert !q.empty?, "queue empty"
|
||
t2.join
|
||
assert_equal :tre, q.pop
|
||
assert q.empty?, "queue not empty"
|
||
end
|
||
def test_sized_queue_one_closed_interrupt
|
||
q = SizedQueue.new 1
|
||
q << :one
|
||
t1 = Thread.new { q << :two }
|
||
sleep 0.01 until t1.stop?
|
||
q.close
|
||
t1.kill.join
|
||
assert_equal 1, q.size
|
||
assert_equal :one, q.pop
|
||
assert q.empty?, "queue not empty"
|
||
end
|
||
# make sure that shutdown state is handled properly by empty? for the non-blocking case
|
||
def test_empty_non_blocking
|
||
q = SizedQueue.new 3
|
||
3.times{|i| q << i}
|
||
# these all block cos the queue is full
|
||
prod_threads = 4.times.map{|i| Thread.new{q << 3+i}}
|
||
sleep 0.01 until prod_threads.all?{|thr| thr.status == 'sleep'}
|
||
q.close
|
||
items = []
|
||
# sometimes empty? is false but pop will raise ThreadError('empty'),
|
||
# meaning a value is not immediately available but will be soon.
|
||
until q.empty?
|
||
items << q.pop(non_block=true) rescue nil
|
||
end
|
||
items.compact!
|
||
assert_equal 7.times.to_a, items.sort
|
||
assert q.empty?
|
||
end
|
||
def test_sized_queue_closed_push_non_blocking
|
||
q = SizedQueue.new 7
|
||
q.close
|
||
assert_raise_with_message(ThreadError, /queue closed/){q.push(non_block=true)}
|
||
end
|
||
def test_blocked_pushers
|
||
q = SizedQueue.new 3
|
||
prod_threads = 6.times.map do |i|
|
||
thr = Thread.new{q << i}; thr[:pc] = i; thr
|
||
end
|
||
# wait until some producer threads have finished, and the other 3 are blocked
|
||
sleep 0.01 while prod_threads.reject{|t| t.status}.count < 3
|
||
# this would ensure that all producer threads call push before close
|
||
# sleep 0.01 while prod_threads.select{|t| t.status == 'sleep'}.count < 3
|
||
q.close
|
||
# more than prod_threads
|
||
cons_threads = 10.times.map do |i|
|
||
thr = Thread.new{q.pop}; thr[:pc] = i; thr
|
||
end
|
||
# values that came from the queue
|
||
popped_values = cons_threads.map &:value
|
||
# pick only the producer threads that got in before close
|
||
successful_prod_threads = prod_threads.reject{|thr| thr.status == nil}
|
||
assert_nothing_raised{ successful_prod_threads.map(&:value) }
|
||
# the producer threads that tried to push after q.close should all fail
|
||
unsuccessful_prod_threads = prod_threads - successful_prod_threads
|
||
unsuccessful_prod_threads.each do |thr|
|
||
assert_raise(ThreadError){ thr.value }
|
||
end
|
||
assert_equal cons_threads.size, popped_values.size
|
||
assert_equal 0, q.size
|
||
# check that consumer threads with values match producers that called push before close
|
||
assert_equal successful_prod_threads.map{|thr| thr[:pc]}, popped_values.compact.sort
|
||
assert_nil q.pop
|
||
end
|
||
def test_deny_pushers
|
||
[->{Queue.new}, ->{SizedQueue.new 3}].each do |qcreate|
|
||
prod_threads = nil
|
||
q = qcreate[]
|
||
producers_start = Thread.new do
|
||
prod_threads = 20.times.map do |i|
|
||
Thread.new{ sleep 0.01 until prod_threads; q << i}
|
||
end
|
||
end
|
||
q.close
|
||
# wait for all threads to be finished, because of exceptions
|
||
sleep 0.01 until prod_threads && prod_threads.all?{|thr| thr.status == nil}
|
||
# check that all threads failed to call push
|
||
prod_threads.each do |thr|
|
||
assert_kind_of ThreadError, (thr.value rescue $!)
|
||
end
|
||
end
|
||
end
|
||
# size should account for waiting pushers during shutdown
|
||
def sized_queue_size_close
|
||
q = SizedQueue.new 4
|
||
4.times{|i| q << i}
|
||
Thread.new{ q << 5 }
|
||
Thread.new{ q << 6 }
|
||
assert_equal 4, q.size
|
||
assert_equal 4, q.items
|
||
q.close
|
||
assert_equal 6, q.size
|
||
assert_equal 4, q.items
|
||
end
|
||
def test_blocked_pushers_empty
|
||
q = SizedQueue.new 3
|
||
prod_threads = 6.times.map do |i|
|
||
Thread.new{ q << i}
|
||
end
|
||
# this ensures that all producer threads call push before close
|
||
sleep 0.01 while prod_threads.select{|t| t.status == 'sleep'}.count < 3
|
||
q.close
|
||
ary = []
|
||
until q.empty?
|
||
ary << q.pop
|
||
end
|
||
assert_equal 0, q.size
|
||
assert_equal 6, ary.size
|
||
assert_equal [0,1,2,3,4,5], ary.sort
|
||
assert_nil q.pop
|
||
end
|
||
# test thread wakeup on one-element SizedQueue with close
|
||
def test_one_element_sized_queue
|
||
q = SizedQueue.new 1
|
||
t = Thread.new{ q.pop }
|
||
q.close
|
||
assert_nil t.value
|
||
end
|
||
def test_close_token
|
||
[->{Queue.new}, ->{SizedQueue.new 3}].each do |qcreate|
|
||
q = qcreate[]
|
||
q.close :token
|
||
assert_equal :token, q.pop
|
||
end
|
||
end
|
||
def test_close_token_exception
|
||
[->{Queue.new}, ->{SizedQueue.new 3}].each do |qcreate|
|
||
q = qcreate[]
|
||
q.close RuntimeError.new("no more")
|
||
assert_raise(RuntimeError){q.pop}
|
||
end
|
||
end
|
||
def test_close_token_loop
|
||
[->{Queue.new}, ->{SizedQueue.new 3}].each do |qcreate|
|
||
q = qcreate[]
|
||
popped_items = []
|
||
consumer_thread = Thread.new{loop{popped_items << q.pop}; :done}
|
||
7.times{|i| q << i}
|
||
q.close StopIteration
|
||
assert_equal :done, consumer_thread.value
|
||
assert_equal 7.times.to_a, popped_items
|
||
end
|
||
end
|
||
def test_close_wrong_token
|
||
[->{Queue.new}, ->{SizedQueue.new 3}].each do |qcreate|
|
||
q = qcreate[]
|
||
q.close :token
|
||
assert_raise(ThreadError){q.close :another_token}
|
||
q = qcreate[]
|
||
q.close
|
||
assert_raise(ThreadError){q.close :not_nil}
|
||
end
|
||
end
|
||
def test_queue_close_multi_multi
|
||
q = SizedQueue.new rand(800..1200)
|
||
count_items = rand(3000..5000)
|
||
count_producers = rand(10..20)
|
||
producers = count_producers.times.map do
|
||
Thread.new do
|
||
sleep(rand / 100)
|
||
count_items.times{|i| q << [i,"#{i} for #{Thread.current.inspect}"]}
|
||
end
|
||
end
|
||
consumers = rand(7..12).times.map do
|
||
Thread.new do
|
||
count = 0
|
||
loop do
|
||
i, st = q.pop
|
||
count += 1 if i.is_a?(Fixnum) && st.is_a?(String)
|
||
end
|
||
count
|
||
end
|
||
end
|
||
# No dead or finished threads
|
||
assert (consumers + producers).all?{|thr| thr.status =~ /\Arun|sleep\Z/}, 'no threads runnning'
|
||
# just exercising the concurrency of the support methods.
|
||
counter = Thread.new do
|
||
until q.closed? && q.empty?
|
||
raise if q.size > q.max
|
||
# otherwise this exercise causes too much contention on the lock
|
||
sleep 0.01
|
||
end
|
||
end
|
||
producers.each &:join
|
||
q.close StopIteration
|
||
# results not randomly distributed. Not sure why.
|
||
# consumers.map{|thr| thr.value}.each do |x|
|
||
# assert_not_equal 0, x
|
||
# end
|
||
all_items_count = consumers.map{|thr| thr.value}.inject(:+)
|
||
assert_equal count_items * count_producers, all_items_count
|
||
# don't leak this thread
|
||
assert_nothing_raised{counter.join}
|
||
end
|
||
end
|