Project

General

Profile

Feature #10600 ยป patch-25f99aef.diff

djellemah (John Anderson), 02/25/2015 07:51 PM

View differences:

ext/thread/thread.c
7 7
enum {
8 8
    QUEUE_QUE       = 0,
9 9
    QUEUE_WAITERS   = 1,
10
    SZQUEUE_WAITERS = 2,
11
    SZQUEUE_MAX     = 3
10
    QUEUE_PENDING   = 2,
11
    QUEUE_TOKEN     = 3,
12
    SZQUEUE_WAITERS = 4,
13
    SZQUEUE_MAX     = 5
12 14
};
13 15

  
14 16
#define GET_CONDVAR_WAITERS(cv) get_array((cv), CONDVAR_WAITERS)
15 17

  
16 18
#define GET_QUEUE_QUE(q)        get_array((q), QUEUE_QUE)
17 19
#define GET_QUEUE_WAITERS(q)    get_array((q), QUEUE_WAITERS)
20
#define GET_QUEUE_TOKEN(q)      get_array((q), QUEUE_TOKEN)
18 21
#define GET_SZQUEUE_WAITERS(q)  get_array((q), SZQUEUE_WAITERS)
19 22
#define GET_SZQUEUE_MAX(q)      RSTRUCT_GET((q), SZQUEUE_MAX)
20 23
#define GET_SZQUEUE_ULONGMAX(q) NUM2ULONG(GET_SZQUEUE_MAX(q))
21 24

  
25
/*
26
    QUEUE_PENDING is the collection producer threads at the point where close
27
    was called. Can't use SZQUEUE_WAITERS for this because
28
    rb_ensure(rb_thread_sleep_deadly ...) is not atomic with respect to
29
    add/remove of threads in SZQUEUE_WAITERS.
30
*/
31
#define GET_QUEUE_PENDING(q)     get_array((q), QUEUE_PENDING)
32

  
33
/* Has the close method been called? */
34
#define QUEUE_CLOSED_P(self)   RARRAY_LEN(GET_QUEUE_TOKEN(self))
35

  
22 36
static VALUE
23 37
get_array(VALUE obj, int idx)
24 38
{
......
179 193
 *
180 194
 *  This class provides a way to synchronize communication between threads.
181 195
 *
182
 *  Example:
183 196
 *
184
 *	require 'thread'
185
 *    	queue = Queue.new
186
 *
187
 *	producer = Thread.new do
188
 *	  5.times do |i|
189
 *	     sleep rand(i) # simulate expense
190
 *	     queue << i
191
 *	     puts "#{i} produced"
192
 *	  end
193
 *	end
194
 *
195
 *	consumer = Thread.new do
196
 *	  5.times do |i|
197
 *	     value = queue.pop
198
 *	     sleep rand(i/2) # simulate expense
199
 *	     puts "consumed #{value}"
200
 *	  end
201
 *	end
197
 *  Example using close(StopIteration):
198
 *
199
 *    require 'thread'
200
 *    queue = Queue.new
201
 *
202
 *    consumers = a_few.times.map do
203
 *      Thread.new do
204
 *        loop{ do_something_with queue.pop}
205
 *      end
206
 *    end
207
 *
208
 *    producers = some.times.map do
209
 *      Thread.new do
210
 *        several.times{ q << something_interesting }
211
 *      end
212
 *    end
213
 *    producers.each(&:join)
214
 *    q.close StopIteration
215
 *
216
 *
217
 *  Example using nil to close queue:
218
 *
219
 *    require 'thread'
220
 *    queue = Queue.new
221
 *
222
 *    consumers = a_few.times.map do
223
 *      Thread.new do
224
 *        while item = queue.pop
225
 *          do_something_with item
226
 *        end
227
 *      end
228
 *    end
229
 *
230
 *    producers = some.times.map do
231
 *      Thread.new do
232
 *        several.times{ q << something_interesting }
233
 *      end
234
 *    end
235
 *    producers.each(&:join)
236
 *    q.close
237
 *
238
 *
239
 *  Example using traditional consumer <-> producer coupling (1 producer, 1 consumer):
240
 *
241
 *    require 'thread'
242
 *    queue = Queue.new
243
 *
244
 *    producer = Thread.new do
245
 *      5.times do |i|
246
 *         sleep rand(i) # simulate expense
247
 *         queue << i
248
 *         puts "#{i} produced"
249
 *      end
250
 *    end
251
 *
252
 *    consumer = Thread.new do
253
 *      5.times do |i|
254
 *         value = queue.pop
255
 *         sleep rand(i/2) # simulate expense
256
 *         puts "consumed #{value}"
257
 *      end
258
 *    end
259
 *
260
 *  Example using queue poison token (1 producer, 1 consumer):
261
 *
262
 *    require 'thread'
263
 *    queue = Queue.new
264
 *
265
 *    producer = Thread.new do
266
 *      5.times do |i|
267
 *         sleep rand(i) # simulate expense
268
 *         queue << i
269
 *         puts "#{i} produced"
270
 *      end
271
 *      q << :poison
272
 *    end
273
 *
274
 *    consumer = Thread.new do
275
 *      while item = queue.pop
276
 *         break if item == :poison
277
 *         sleep rand(i/2) # simulate expense
278
 *         puts "consumed #{value}"
279
 *      end
280
 *    end
202 281
 *
203 282
 */
204 283

  
......
213 292
{
214 293
    RSTRUCT_SET(self, QUEUE_QUE, ary_buf_new());
215 294
    RSTRUCT_SET(self, QUEUE_WAITERS, ary_buf_new());
295
    RSTRUCT_SET(self, QUEUE_PENDING, ary_buf_new());
296
    RSTRUCT_SET(self, QUEUE_TOKEN, ary_buf_new());
216 297
    return self;
217 298
}
218 299

  
219 300
static VALUE
301
queue_raise_if_closed(VALUE self)
302
{
303
    if (QUEUE_CLOSED_P(self)) {
304
	rb_raise(rb_eThreadError, "queue closed");
305
    }
306
    return Qnil;
307
}
308

  
309
static VALUE
220 310
queue_do_push(VALUE self, VALUE obj)
221 311
{
312
    /* TODO would be nice to not have to use macro to check for correct
313
       initialization on every single call to push. */
222 314
    rb_ary_push(GET_QUEUE_QUE(self), obj);
223 315
    wakeup_first_thread(GET_QUEUE_WAITERS(self));
224 316
    return self;
......
237 329
static VALUE
238 330
rb_queue_push(VALUE self, VALUE obj)
239 331
{
332
    queue_raise_if_closed(self);
240 333
    return queue_do_push(self, obj);
241 334
}
242 335

  
......
254 347
    return RARRAY_LEN(waiters);
255 348
}
256 349

  
350
/*
351
    Do close accounting.
352

  
353
    1) Set CLOSE_TOKEN
354

  
355
    2) copy pending producers to QUEUE_PENDING. This is to work around
356
       SZQUEUE_WAITERS updating non-atomically in the thread
357
       wake/sleep handling.
358

  
359
    CLOSE_TOKEN contains an array with 0 or 1 elements. This is to allow the
360
    queue to be closed several times, provided the same value is used each
361
    time, and having an array makes that test easier.
362
*/
363
static VALUE
364
queue_set_close_state(int argc, VALUE *argv, VALUE self, VALUE pending_producers)
365
{
366
    VALUE close_token = Qnil;
367
    VALUE existing_close_token = Qnil;
368
    VALUE close_token_ary = GET_QUEUE_TOKEN(self);
369

  
370
    rb_check_arity(argc, 0, 1);
371

  
372
    /* handle arg defaults, and conversion to an Exception instance. */
373
    if (argc > 0) {
374
	close_token = argv[0];
375
	if (rb_obj_is_kind_of(close_token,rb_cClass) && rb_class_inherited_p(close_token,rb_eException)) {
376
	    close_token = rb_exc_new2(close_token, "queue closed");
377
	}
378
    }
379

  
380
    /* Allow close to be called several times, with the same argument (same-ness defined by ==). */
381
    if (RARRAY_LEN(close_token_ary) == 0) {
382
	rb_ary_store(close_token_ary, 0, close_token);
383
	/* Start accounting for pending producers. Can only do this once. */
384
	rb_ary_concat(GET_QUEUE_PENDING(self), pending_producers);
385
	return close_token;
386
    } else {
387
	existing_close_token = RARRAY_AREF(close_token_ary, 0);
388
	if (!rb_eql(existing_close_token, close_token)) {
389
	    rb_raise(rb_eThreadError, "already closed with %"PRIsVALUE, rb_inspect(existing_close_token));
390
	}
391
	return existing_close_token;
392
    }
393
}
394

  
395
/*
396
 * Document-method: Queue#close
397
 * call-seq:
398
 *   close(token=nil)
399
 *
400
 * Closes the queue to producers. A closed queue cannot be re-opened.
401
 *
402
 * +token+ can be any object, or an instance of an Exception subclass, or an
403
 * Exception subclass. +close+ can be called repeatedly on a single queue as
404
 * long as the same (defined by ==) +token+ is used.
405
 *
406
 * After the call to close completes, the following are true:
407
 *
408
 * - +closed?+ will return true
409
 *
410
 * - calling enq/push/<< will raise ThreadError('queue closed')
411
 *
412
 * - when +empty?+ is false, calling deq/pop/shift will return an object from the queue as usual.
413
 *
414
 * - when +empty?+ is true, deq(non_block=false) will not suspend and
415
 *   will either return the +token+, or raise if +token+ was an exception
416
 *   instance or class. deq(non_block=true) will ignore the parameter and
417
 *   raise a ThreadError('queue empty').
418
 *
419
 * And for SizedQueue, these will also be true:
420
 *
421
 * - each thread already suspended in enq at the time of the call
422
 *   to close will be allowed to push its object as usual.
423
 *
424
 * - +empty?+ will be false when there are either objects in the queue, or
425
 *   producers which were suspended at the time of the call to +close+ but whose
426
 *   objects are not yet in the queue. Therefore, it can be true (very
427
 *   briefly) that empty? == false && size == 0, since +size+ returns the number
428
 *   of objects actually in the queue.
429
 */
430

  
431
static VALUE
432
rb_queue_close(int argc, VALUE *argv, VALUE self)
433
{
434
    /* Never any pending producers for ordinary queue, so pending is empty. */
435
    queue_set_close_state(argc, argv, self, ary_buf_new());
436
    if (queue_length(self) < queue_num_waiting(self)) {
437
	wakeup_all_threads(GET_QUEUE_WAITERS(self));
438
    }
439
    return self;
440
}
441

  
442
static VALUE
443
rb_szqueue_close(int argc, VALUE *argv, VALUE self)
444
{
445
    queue_set_close_state(argc, argv, self, GET_SZQUEUE_WAITERS(self));
446
    if (RARRAY_LEN(GET_QUEUE_PENDING(self)) == 0) {
447
	/* Wake up all consumers because there will be never be more items. */
448
	wakeup_all_threads(GET_QUEUE_WAITERS(self));
449
    }
450
    return self;
451
}
452

  
257 453
struct waiting_delete {
258 454
    VALUE waiting;
259 455
    VALUE th;
456
    VALUE pending;
457
    VALUE self;
260 458
};
261 459

  
262 460
static VALUE
263 461
queue_delete_from_waiting(struct waiting_delete *p)
264 462
{
463
    /* The waiting queues */
464
    rb_ary_delete(p->waiting, p->th);
465
    return Qnil;
466
}
467

  
468
static ID id_status;
469

  
470
static VALUE
471
szqueue_delete_from_waiting(struct waiting_delete *p)
472
{
473
    /* The waiting queues */
265 474
    rb_ary_delete(p->waiting, p->th);
475

  
476
    /* Handle the post-close pending producers on thread abort. Only applies to SizedQueue. */
477
    if (QUEUE_CLOSED_P(p->self)) {
478
    /* TODO This is not great, but all the better ways seem to involved rb_thread_t.status
479
       And it won't get in the way of normal queue operation. */
480
	if (strcmp("aborting",RSTRING_PTR(rb_funcall(p->th,id_status,0))) == 0) {
481
	    rb_ary_delete(p->pending, p->th);
482
	}
483
    }
266 484
    return Qnil;
267 485
}
268 486

  
......
274 492
}
275 493

  
276 494
static VALUE
495
queue_close_pop(VALUE self)
496
{
497
    VALUE token = RARRAY_AREF(GET_QUEUE_TOKEN(self),0);
498
    if (rb_obj_is_kind_of(token,rb_eException)) {
499
	rb_exc_raise(token);
500
    }
501
    return token;
502
}
503

  
504
static VALUE
277 505
queue_do_pop(VALUE self, int should_block)
278 506
{
279 507
    struct waiting_delete args;
280
    args.waiting = GET_QUEUE_WAITERS(self);
281
    args.th	 = rb_thread_current();
282 508

  
283 509
    while (queue_length(self) == 0) {
284 510
	if (!should_block) {
285 511
	    rb_raise(rb_eThreadError, "queue empty");
286 512
	}
513

  
514
	if (QUEUE_CLOSED_P(self) && RARRAY_LEN(GET_QUEUE_PENDING(self)) == 0) {
515
	return queue_close_pop(self);
516
	}
517

  
518
	args.waiting = GET_QUEUE_WAITERS(self);
519
	args.th      = rb_thread_current();
520
	args.pending = Qnil;
521
	args.self    = Qnil;
522

  
287 523
	rb_ary_push(args.waiting, args.th);
288 524
	rb_ensure(queue_sleep, (VALUE)0, queue_delete_from_waiting, (VALUE)&args);
289 525
    }
......
324 560
}
325 561

  
326 562
/*
563
 * Document-method: Queue#closed?
564
 * call-seq: closed?
565
 *
566
 * Returns +true+ if the queue is closed.
567
 */
568

  
569
static VALUE
570
rb_queue_closed_p(VALUE self)
571
{
572
    return QUEUE_CLOSED_P(self) ? Qtrue : Qfalse;
573
}
574

  
575
/*
327 576
 * Document-method: Queue#empty?
328 577
 * call-seq: empty?
329 578
 *
......
333 582
static VALUE
334 583
rb_queue_empty_p(VALUE self)
335 584
{
336
    return queue_length(self) == 0 ? Qtrue : Qfalse;
585
    unsigned long items = queue_length(self);
586
    if (QUEUE_CLOSED_P(self)) {
587
	/* Add number of known pending items. */
588
	items += RARRAY_LEN(GET_QUEUE_PENDING(self));
589
    }
590
    return items == 0 ? Qtrue : Qfalse;
337 591
}
338 592

  
339 593
/*
......
381 635
/*
382 636
 *  Document-class: SizedQueue
383 637
 *
384
 * This class represents queues of specified size capacity.  The push operation
385
 * may be blocked if the capacity is full.
638
 * This class represents queues of specified size capacity, also known as a
639
 * bounded queue.  The push operation may be blocked if the capacity is full.
386 640
 *
387 641
 * See Queue for an example of how a SizedQueue works.
388 642
 */
......
397 651
static VALUE
398 652
rb_szqueue_initialize(VALUE self, VALUE vmax)
399 653
{
400
    long max;
401

  
402
    max = NUM2LONG(vmax);
654
    long max = NUM2LONG(vmax);
403 655
    if (max <= 0) {
404 656
	rb_raise(rb_eArgError, "queue size must be positive");
405 657
    }
406 658

  
407 659
    RSTRUCT_SET(self, QUEUE_QUE, ary_buf_new());
408 660
    RSTRUCT_SET(self, QUEUE_WAITERS, ary_buf_new());
661
    RSTRUCT_SET(self, QUEUE_PENDING, ary_buf_new());
662
    RSTRUCT_SET(self, QUEUE_TOKEN, ary_buf_new());
409 663
    RSTRUCT_SET(self, SZQUEUE_WAITERS, ary_buf_new());
410 664
    RSTRUCT_SET(self, SZQUEUE_MAX, vmax);
411 665

  
......
472 726
 *
473 727
 * If there is no space left in the queue, waits until space becomes
474 728
 * available, unless +non_block+ is true.  If +non_block+ is true, the
475
 * thread isn't suspended, and an exception is raised.
729
 * thread isn't suspended, and ThreadError('queue full') is raised.
476 730
 */
477 731

  
478 732
static VALUE
479 733
rb_szqueue_push(int argc, VALUE *argv, VALUE self)
480 734
{
481
    struct waiting_delete args;
482 735
    int should_block = szqueue_push_should_block(argc, argv);
483
    args.waiting = GET_SZQUEUE_WAITERS(self);
484
    args.th      = rb_thread_current();
736
    struct waiting_delete args;
737

  
738
    queue_raise_if_closed(self);
485 739

  
486 740
    while (queue_length(self) >= GET_SZQUEUE_ULONGMAX(self)) {
487 741
	if (!should_block) {
488 742
	    rb_raise(rb_eThreadError, "queue full");
489 743
	}
744

  
745
	args.waiting = GET_SZQUEUE_WAITERS(self);
746
	args.th      = rb_thread_current();
747
	args.pending = GET_QUEUE_PENDING(self);
748
	args.self    = self;
749

  
490 750
	rb_ary_push(args.waiting, args.th);
491
	rb_ensure((VALUE (*)())rb_thread_sleep_deadly, (VALUE)0, queue_delete_from_waiting, (VALUE)&args);
751
	rb_ensure(queue_sleep, (VALUE)0, szqueue_delete_from_waiting, (VALUE)&args);
492 752
    }
493
    return queue_do_push(self, argv[0]);
753

  
754
    queue_do_push(self, argv[0]);
755

  
756
    if (QUEUE_CLOSED_P(self)) {
757
	VALUE removed_thread = rb_ary_delete(GET_QUEUE_PENDING(self), rb_thread_current());
758
	if (removed_thread != Qnil && RARRAY_LEN(GET_QUEUE_PENDING(self)) == 0) {
759
	    /* wake all waiting consumers, because there will never be more items. */
760
	    wakeup_all_threads(GET_QUEUE_WAITERS(self));
761
	}
762
    }
763

  
764
    return self;
494 765
}
495 766

  
496 767
static VALUE
......
515 786
 * Retrieves data from the queue.
516 787
 *
517 788
 * If the queue is empty, the calling thread is suspended until data is pushed
518
 * onto the queue. If +non_block+ is true, the thread isn't suspended, and an
519
 * exception is raised.
789
 * onto the queue. If +non_block+ is true, the thread isn't suspended, and
790
 * ThreadError('queue empty') is raised.
520 791
 */
521 792

  
522 793
static VALUE
......
590 861
    VALUE rb_cQueue = rb_struct_define_without_accessor_under(
591 862
	OUTER,
592 863
	"Queue", rb_cObject, rb_struct_alloc_noinit,
593
	"que", "waiters", NULL);
864
	"que", "waiters", "pending", "token", NULL);
594 865
    VALUE rb_cSizedQueue = rb_struct_define_without_accessor_under(
595 866
	OUTER,
596 867
	"SizedQueue", rb_cQueue, rb_struct_alloc_noinit,
597
	"que", "waiters", "queue_waiters", "size", NULL);
868
	"que", "waiters", "pending", "token", "queue_waiters", "size", NULL);
598 869

  
599 870
#if 0
600 871
    rb_cConditionVariable = rb_define_class("ConditionVariable", rb_cObject); /* teach rdoc ConditionVariable */
......
603 874
#endif
604 875

  
605 876
    id_sleep = rb_intern("sleep");
877
    id_status = rb_intern("status");
606 878

  
607 879
    rb_define_method(rb_cConditionVariable, "initialize", rb_condvar_initialize, 0);
608 880
    rb_undef_method(rb_cConditionVariable, "initialize_copy");
......
620 892
    rb_define_method(rb_cQueue, "clear", rb_queue_clear, 0);
621 893
    rb_define_method(rb_cQueue, "length", rb_queue_length, 0);
622 894
    rb_define_method(rb_cQueue, "num_waiting", rb_queue_num_waiting, 0);
895
    rb_define_method(rb_cQueue, "close", rb_queue_close, -1);
896
    rb_define_method(rb_cQueue, "closed?", rb_queue_closed_p, 0);
623 897

  
624 898
    /* Alias for #push. */
625 899
    rb_define_alias(rb_cQueue, "enq", "push");
......
639 913
    rb_define_method(rb_cSizedQueue, "pop", rb_szqueue_pop, -1);
640 914
    rb_define_method(rb_cSizedQueue, "clear", rb_szqueue_clear, 0);
641 915
    rb_define_method(rb_cSizedQueue, "num_waiting", rb_szqueue_num_waiting, 0);
916
    rb_define_method(rb_cSizedQueue, "close", rb_szqueue_close, -1);
642 917

  
643 918
    /* Alias for #push. */
644 919
    rb_define_alias(rb_cSizedQueue, "enq", "push");
test/thread/test_queue.rb
43 43
      }
44 44
    }.join
45 45

  
46
    # close the queue the old way to test for backwards-compatibility
46 47
    num_threads.times { to_workers.push nil }
47 48
    workers.each { |t| t.join }
48 49

  
......
277 278
      Marshal.dump(q)
278 279
    end
279 280
  end
281

  
282
  def test_close
283
    [->{Queue.new}, ->{SizedQueue.new 3}].each do |qcreate|
284
      q = qcreate.call
285
      assert_equal false, q.closed?
286
      q << :something
287
      assert_equal q, q.close
288
      assert q.closed?
289
      assert_raise_with_message(ThreadError, /closed/){q << :nothing}
290
      assert_equal q.pop, :something
291
      assert_nil q.pop
292
      # non-blocking
293
      assert_raise_with_message(ThreadError, /queue empty/){q.pop(non_block=true)}
294
    end
295
  end
296

  
297
  # test that waiting producers are woken up on close
298
  def close_wakeup( num_items, num_threads, &qcreate )
299
    raise "This test won't work with num_items(#{num_items}) >= num_threads(#{num_threads})" if num_items >= num_threads
300

  
301
    # create the Queue
302
    q = yield
303
    threads = num_threads.times.map{Thread.new{q.pop}}
304
    num_items.times{|i| q << i}
305

  
306
    # wait until queue empty
307
    (Thread.pass; sleep 0.01) until q.size == 0
308

  
309
    # now there should be some waiting consumers
310
    assert_equal num_threads - num_items, threads.count{|thr| thr.alive? && thr.stop?}
311

  
312
    # tell them all to go away
313
    q.close
314

  
315
    # wait for them to go away
316
    Thread.pass until threads.all?{|thr| thr.status == false}
317

  
318
    # check that they've gone away. Convert nil to -1 so we can sort and do the comparison
319
    expected_values = [-1] * (num_threads - num_items) + num_items.times.to_a
320
    assert_equal expected_values, threads.map{|thr| thr.value || -1 }.sort
321
  end
322

  
323
  def test_queue_close_wakeup
324
    close_wakeup(15, 18){Queue.new}
325
  end
326

  
327
  def test_size_queue_close_wakeup
328
    close_wakeup(5, 8){SizedQueue.new 9}
329
  end
330

  
331
  def test_sized_queue_closed_multi_interrupt
332
    q = SizedQueue.new 1
333
    q << :one
334
    prod_threads = 32.times.map{|i| Thread.new{ q << i}}
335
    sleep 0.01 until prod_threads.all?{|thr| thr.stop?}
336

  
337
    more_threads = []
338
    q.close
339
    prod_threads.each{|thr| more_threads << Thread.new{ thr.kill.join }}
340
    more_threads.each &:join
341

  
342
    assert_equal 1, q.size
343
    assert_equal :one, q.pop
344
    assert q.empty?, "queue not empty"
345
  end
346

  
347
  # this might be unnecessary, not sure
348
  def test_sized_queue_two_closed_interrupt
349
    q = SizedQueue.new 1
350
    q << :one
351
    t1 = Thread.new { q << :two }
352
    t2 = Thread.new { q << :tre }
353
    sleep 0.01 until t1.stop? && t2.stop?
354
    q.close
355

  
356
    t1.kill.join
357
    assert_equal 1, q.size
358
    assert_equal :one, q.pop
359
    assert !q.empty?, "queue empty"
360

  
361
    t2.join
362
    assert_equal :tre, q.pop
363
    assert q.empty?, "queue not empty"
364
  end
365

  
366
  def test_sized_queue_one_closed_interrupt
367
    q = SizedQueue.new 1
368
    q << :one
369
    t1 = Thread.new { q << :two }
370
    sleep 0.01 until t1.stop?
371
    q.close
372

  
373
    t1.kill.join
374
    assert_equal 1, q.size
375
    assert_equal :one, q.pop
376
    assert q.empty?, "queue not empty"
377
  end
378

  
379
  # make sure that shutdown state is handled properly by empty? for the non-blocking case
380
  def test_empty_non_blocking
381
    q = SizedQueue.new 3
382
    3.times{|i| q << i}
383

  
384
    # these all block cos the queue is full
385
    prod_threads = 4.times.map{|i| Thread.new{q << 3+i}}
386
    sleep 0.01 until prod_threads.all?{|thr| thr.status == 'sleep'}
387
    q.close
388

  
389
    items = []
390
    # sometimes empty? is false but pop will raise ThreadError('empty'),
391
    # meaning a value is not immediately available but will be soon.
392
    until q.empty?
393
      items << q.pop(non_block=true) rescue nil
394
    end
395
    items.compact!
396

  
397
    assert_equal 7.times.to_a, items.sort
398
    assert q.empty?
399
  end
400

  
401
  def test_sized_queue_closed_push_non_blocking
402
    q = SizedQueue.new 7
403
    q.close
404
    assert_raise_with_message(ThreadError, /queue closed/){q.push(non_block=true)}
405
  end
406

  
407
  def test_blocked_pushers
408
    q = SizedQueue.new 3
409
    prod_threads = 6.times.map do |i|
410
      thr = Thread.new{q << i}; thr[:pc] = i; thr
411
    end
412

  
413
    # wait until some producer threads have finished, and the other 3 are blocked
414
    sleep 0.01 while prod_threads.reject{|t| t.status}.count < 3
415
    # this would ensure that all producer threads call push before close
416
    # sleep 0.01 while prod_threads.select{|t| t.status == 'sleep'}.count < 3
417
    q.close
418

  
419
    # more than prod_threads
420
    cons_threads = 10.times.map do |i|
421
      thr = Thread.new{q.pop}; thr[:pc] = i; thr
422
    end
423

  
424
    # values that came from the queue
425
    popped_values = cons_threads.map &:value
426

  
427
    # pick only the producer threads that got in before close
428
    successful_prod_threads = prod_threads.reject{|thr| thr.status == nil}
429
    assert_nothing_raised{ successful_prod_threads.map(&:value) }
430

  
431
    # the producer threads that tried to push after q.close should all fail
432
    unsuccessful_prod_threads = prod_threads - successful_prod_threads
433
    unsuccessful_prod_threads.each do |thr|
434
      assert_raise(ThreadError){ thr.value }
435
    end
436

  
437
    assert_equal cons_threads.size, popped_values.size
438
    assert_equal 0, q.size
439

  
440
    # check that consumer threads with values match producers that called push before close
441
    assert_equal successful_prod_threads.map{|thr| thr[:pc]}, popped_values.compact.sort
442
    assert_nil q.pop
443
  end
444

  
445
  def test_deny_pushers
446
    [->{Queue.new}, ->{SizedQueue.new 3}].each do |qcreate|
447
      prod_threads = nil
448
      q = qcreate[]
449
      producers_start = Thread.new do
450
        prod_threads = 20.times.map do |i|
451
          Thread.new{ sleep 0.01 until prod_threads; q << i}
452
        end
453
      end
454
      q.close
455

  
456
      # wait for all threads to be finished, because of exceptions
457
      sleep 0.01 until prod_threads && prod_threads.all?{|thr| thr.status == nil}
458

  
459
      # check that all threads failed to call push
460
      prod_threads.each do |thr|
461
        assert_kind_of ThreadError, (thr.value rescue $!)
462
      end
463
    end
464
  end
465

  
466
  # size should account for waiting pushers during shutdown
467
  def sized_queue_size_close
468
    q = SizedQueue.new 4
469
    4.times{|i| q << i}
470
    Thread.new{ q << 5 }
471
    Thread.new{ q << 6 }
472
    assert_equal 4, q.size
473
    assert_equal 4, q.items
474
    q.close
475
    assert_equal 6, q.size
476
    assert_equal 4, q.items
477
  end
478

  
479
  def test_blocked_pushers_empty
480
    q = SizedQueue.new 3
481
    prod_threads = 6.times.map do |i|
482
      Thread.new{ q << i}
483
    end
484

  
485
    # this ensures that all producer threads call push before close
486
    sleep 0.01 while prod_threads.select{|t| t.status == 'sleep'}.count < 3
487
    q.close
488

  
489
    ary = []
490
    until q.empty?
491
      ary << q.pop
492
    end
493
    assert_equal 0, q.size
494

  
495
    assert_equal 6, ary.size
496
    assert_equal [0,1,2,3,4,5], ary.sort
497
    assert_nil q.pop
498
  end
499

  
500
  # test thread wakeup on one-element SizedQueue with close
501
  def test_one_element_sized_queue
502
    q = SizedQueue.new 1
503
    t = Thread.new{ q.pop }
504
    q.close
505
    assert_nil t.value
506
  end
507

  
508
  def test_close_token
509
    [->{Queue.new}, ->{SizedQueue.new 3}].each do |qcreate|
510
      q = qcreate[]
511
      q.close :token
512
      assert_equal :token, q.pop
513
    end
514
  end
515

  
516
  def test_close_token_exception
517
    [->{Queue.new}, ->{SizedQueue.new 3}].each do |qcreate|
518
      q = qcreate[]
519
      q.close RuntimeError.new("no more")
520
      assert_raise(RuntimeError){q.pop}
521
    end
522
  end
523

  
524
  def test_close_token_loop
525
    [->{Queue.new}, ->{SizedQueue.new 3}].each do |qcreate|
526
      q = qcreate[]
527
      popped_items = []
528
      consumer_thread = Thread.new{loop{popped_items << q.pop}; :done}
529
      7.times{|i| q << i}
530
      q.close StopIteration
531
      assert_equal :done, consumer_thread.value
532
      assert_equal 7.times.to_a, popped_items
533
    end
534
  end
535

  
536
  def test_close_wrong_token
537
    [->{Queue.new}, ->{SizedQueue.new 3}].each do |qcreate|
538
      q = qcreate[]
539
      q.close :token
540
      assert_raise(ThreadError){q.close :another_token}
541

  
542
      q = qcreate[]
543
      q.close
544
      assert_raise(ThreadError){q.close :not_nil}
545
    end
546
  end
547

  
548
  def test_queue_close_multi_multi
549
    q = SizedQueue.new rand(800..1200)
550

  
551
    count_items = rand(3000..5000)
552
    count_producers = rand(10..20)
553

  
554
    producers = count_producers.times.map do
555
      Thread.new do
556
        sleep(rand / 100)
557
        count_items.times{|i| q << [i,"#{i} for #{Thread.current.inspect}"]}
558
      end
559
    end
560

  
561
    consumers = rand(7..12).times.map do
562
      Thread.new do
563
        count = 0
564
        loop do
565
          i, st = q.pop
566
          count += 1 if i.is_a?(Fixnum) && st.is_a?(String)
567
        end
568
        count
569
      end
570
    end
571

  
572
    # No dead or finished threads
573
    assert (consumers + producers).all?{|thr| thr.status =~ /\Arun|sleep\Z/}, 'no threads runnning'
574

  
575
    # just exercising the concurrency of the support methods.
576
    counter = Thread.new do
577
      until q.closed? && q.empty?
578
        raise if q.size > q.max
579
        # otherwise this exercise causes too much contention on the lock
580
        sleep 0.01
581
      end
582
    end
583

  
584
    producers.each &:join
585
    q.close StopIteration
586

  
587
    # results not randomly distributed. Not sure why.
588
    # consumers.map{|thr| thr.value}.each do |x|
589
    #   assert_not_equal 0, x
590
    # end
591

  
592
    all_items_count = consumers.map{|thr| thr.value}.inject(:+)
593
    assert_equal count_items * count_producers, all_items_count
594

  
595
    # don't leak this thread
596
    assert_nothing_raised{counter.join}
597
  end
280 598
end