Project

General

Profile

Feature #10600 ยป queue-close.diff

djellemah (John Anderson), 12/15/2014 09:10 AM

View differences:

ext/thread/thread.c
7 7
enum {
8 8
    QUEUE_QUE       = 0,
9 9
    QUEUE_WAITERS   = 1,
10
    SZQUEUE_WAITERS = 2,
11
    SZQUEUE_MAX     = 3
10
    QUEUE_CLOSED    = 2,
11
    SZQUEUE_WAITERS = 3,
12
    SZQUEUE_MAX     = 4
12 13
};
13 14

  
14 15
#define GET_CONDVAR_WAITERS(cv) get_array((cv), CONDVAR_WAITERS)
......
213 214
{
214 215
    RSTRUCT_SET(self, QUEUE_QUE, ary_buf_new());
215 216
    RSTRUCT_SET(self, QUEUE_WAITERS, ary_buf_new());
217
    RSTRUCT_SET(self, QUEUE_CLOSED, Qfalse);
216 218
    return self;
217 219
}
218 220

  
219 221
static VALUE
222
queue_raise_if_closed(VALUE self)
223
{
224
    if (RSTRUCT_GET(self, QUEUE_CLOSED) == Qtrue ) {
225
        rb_raise(rb_eThreadError, "queue closed");
226
    }
227
    return Qnil;
228
}
229

  
230
static VALUE
220 231
queue_do_push(VALUE self, VALUE obj)
221 232
{
222 233
    rb_ary_push(GET_QUEUE_QUE(self), obj);
......
237 248
static VALUE
238 249
rb_queue_push(VALUE self, VALUE obj)
239 250
{
251
    queue_raise_if_closed(self);
240 252
    return queue_do_push(self, obj);
241 253
}
242 254

  
......
254 266
    return RARRAY_LEN(waiters);
255 267
}
256 268

  
269
/*
270
 * Document-method: Queue#close
271
 * call-seq: close
272
 *
273
 * Closes the queue.
274
 *
275
 * Once the queue is closed, enq will raise an exception, although remaining
276
 * items in the queue can be deq'd as usual. Any threads waiting on
277
 * pop(non_block=false) at the time the queue is closed will be woken and
278
 * given a nil.
279
 *
280
 * Once a closed queue is empty, deq with non_block=false will always return
281
 * nil.
282
 *
283
 * A closed queue cannot be reopened.
284
 */
285

  
286
static VALUE
287
rb_queue_close(VALUE self)
288
{
289
    RSTRUCT_SET(self, QUEUE_CLOSED, Qtrue);
290
    if (queue_num_waiting(self) > 0) {
291
	wakeup_all_threads(GET_QUEUE_WAITERS(self));
292
    }
293
    return self;
294
}
295

  
257 296
struct waiting_delete {
258 297
    VALUE waiting;
259 298
    VALUE th;
......
284 323
	if (!should_block) {
285 324
	    rb_raise(rb_eThreadError, "queue empty");
286 325
	}
326

  
327
	if (RSTRUCT_GET(self,QUEUE_CLOSED) == Qtrue) {
328
	    return Qnil;
329
	}
330

  
287 331
	rb_ary_push(args.waiting, args.th);
288 332
	rb_ensure(queue_sleep, (VALUE)0, queue_delete_from_waiting, (VALUE)&args);
289 333
    }
......
324 368
}
325 369

  
326 370
/*
371
 * Document-method: Queue#closed?
372
 * call-seq: closed?
373
 *
374
 * Returns +true+ if the queue is closed.
375
 */
376

  
377
static VALUE
378
rb_queue_closed_p(VALUE self)
379
{
380
    return RSTRUCT_GET(self, QUEUE_CLOSED);
381
}
382

  
383
/*
327 384
 * Document-method: Queue#empty?
328 385
 * call-seq: empty?
329 386
 *
......
406 463

  
407 464
    RSTRUCT_SET(self, QUEUE_QUE, ary_buf_new());
408 465
    RSTRUCT_SET(self, QUEUE_WAITERS, ary_buf_new());
466
    RSTRUCT_SET(self, QUEUE_CLOSED, Qfalse);
409 467
    RSTRUCT_SET(self, SZQUEUE_WAITERS, ary_buf_new());
410 468
    RSTRUCT_SET(self, SZQUEUE_MAX, vmax);
411 469

  
......
480 538
{
481 539
    struct waiting_delete args;
482 540
    int should_block = szqueue_push_should_block(argc, argv);
541

  
542
    queue_raise_if_closed(self);
543

  
483 544
    args.waiting = GET_SZQUEUE_WAITERS(self);
484 545
    args.th      = rb_thread_current();
485 546

  
......
590 651
    VALUE rb_cQueue = rb_struct_define_without_accessor_under(
591 652
	OUTER,
592 653
	"Queue", rb_cObject, rb_struct_alloc_noinit,
593
	"que", "waiters", NULL);
654
	"que", "waiters", "closed", NULL);
594 655
    VALUE rb_cSizedQueue = rb_struct_define_without_accessor_under(
595 656
	OUTER,
596 657
	"SizedQueue", rb_cQueue, rb_struct_alloc_noinit,
597
	"que", "waiters", "queue_waiters", "size", NULL);
658
	"que", "waiters", "closed", "queue_waiters", "size", NULL);
598 659

  
599 660
#if 0
600 661
    rb_cConditionVariable = rb_define_class("ConditionVariable", rb_cObject); /* teach rdoc ConditionVariable */
......
620 681
    rb_define_method(rb_cQueue, "clear", rb_queue_clear, 0);
621 682
    rb_define_method(rb_cQueue, "length", rb_queue_length, 0);
622 683
    rb_define_method(rb_cQueue, "num_waiting", rb_queue_num_waiting, 0);
684
    rb_define_method(rb_cQueue, "close", rb_queue_close, 0);
685
    rb_define_method(rb_cQueue, "closed?", rb_queue_closed_p, 0);
623 686

  
624 687
    /* Alias for #push. */
625 688
    rb_define_alias(rb_cQueue, "enq", "push");
test/thread/test_queue.rb
277 277
      Marshal.dump(q)
278 278
    end
279 279
  end
280

  
281
  def test_close
282
    [->{Queue.new}, ->{SizedQueue.new 3}].each do |qcreate|
283
      q = qcreate[]
284
      assert_equal false, q.closed?
285
      q << :something
286
      q.close
287
      assert_equal true, q.closed?
288
      assert_raise_with_message(ThreadError, /closed/){q << :nothing}
289
      assert_equal q.pop, :something
290
      assert_equal q.pop, nil
291
    end
292
  end
293

  
294
  def test_close_wakeup
295
    [->{Queue.new}, ->{SizedQueue.new 3}].each do |qcreate|
296
      q = qcreate[]
297
      threads = []
298
      4.times{threads << Thread.new{q.pop}}
299
      3.times{|i| q << i}
300
      sleep 0.01
301
      assert_equal 1, threads.count{|thr| thr.alive? && thr.stop?}
302
      q.close
303
      assert_equal 0, threads.count{|thr| thr.alive? && thr.stop?}
304
    end
305
  end
280 306
end