Project

General

Profile

Feature #10600 ยป queue-close-2.diff

djellemah (John Anderson), 12/17/2014 05:38 PM

View differences:

ext/thread/thread.c
7 7
enum {
8 8
    QUEUE_QUE       = 0,
9 9
    QUEUE_WAITERS   = 1,
10
    SZQUEUE_WAITERS = 2,
11
    SZQUEUE_MAX     = 3
10
    QUEUE_CLOSED    = 2,
11
    SZQUEUE_WAITERS = 3,
12
    SZQUEUE_MAX     = 4
12 13
};
13 14

  
14 15
#define GET_CONDVAR_WAITERS(cv) get_array((cv), CONDVAR_WAITERS)
......
19 20
#define GET_SZQUEUE_MAX(q)      RSTRUCT_GET((q), SZQUEUE_MAX)
20 21
#define GET_SZQUEUE_ULONGMAX(q) NUM2ULONG(GET_SZQUEUE_MAX(q))
21 22

  
23
#define GET_QUEUE_CLOSED(q)     get_array((q), QUEUE_CLOSED)
24
/* Has the close method been called? */
25
#define QUEUE_CLOSED_P(self)   !NIL_P(RSTRUCT_GET(self,QUEUE_CLOSED))
26

  
22 27
static VALUE
23 28
get_array(VALUE obj, int idx)
24 29
{
......
213 218
{
214 219
    RSTRUCT_SET(self, QUEUE_QUE, ary_buf_new());
215 220
    RSTRUCT_SET(self, QUEUE_WAITERS, ary_buf_new());
221
    RSTRUCT_SET(self, QUEUE_CLOSED, Qnil);
216 222
    return self;
217 223
}
218 224

  
225
// after call to close, this is the number of known items remaining before
226
// queue is really empty.
227
struct queue_shutdown
228
{
229
    VALUE closed;
230
    unsigned long countdown;
231
};
232

  
233
/*
234
    QUEUE_CLOSED serving double-duty as a collection of threads that were
235
    pending (for SizedQueue) when close was called.
236
*/
237
static struct queue_shutdown
238
queue_closing(VALUE self)
239
{
240
    struct queue_shutdown retval;
241
    VALUE closed = RSTRUCT_GET(self,QUEUE_CLOSED);
242

  
243
    if (closed != Qnil) {
244
	retval.closed = Qtrue;
245
	retval.countdown = RARRAY_LEN(closed);
246
    } else {
247
	retval.closed = Qfalse;
248
	retval.countdown = 0;
249
    }
250

  
251
    return retval;
252
}
253

  
254
static VALUE
255
queue_raise_if_closed(VALUE self)
256
{
257
    struct queue_shutdown queue_shutdown;
258
    // Optimise the frequent case, which is queue open
259
    if (!QUEUE_CLOSED_P(self)) return Qnil;
260

  
261
    // queue is closing, which is not frequent, so don't worry about the extra fetch of QUEUE_CLOSED
262
    queue_shutdown = queue_closing(self);
263
    if (queue_shutdown.closed == Qtrue && queue_shutdown.countdown == 0) {
264
	rb_raise(rb_eThreadError, "queue closed");
265
    }
266
    return Qnil;
267
}
268

  
219 269
static VALUE
220 270
queue_do_push(VALUE self, VALUE obj)
221 271
{
272
    // TODO would be nice to not have to use macro to check for correct
273
    // initialization on every single call to push.
222 274
    rb_ary_push(GET_QUEUE_QUE(self), obj);
223 275
    wakeup_first_thread(GET_QUEUE_WAITERS(self));
224 276
    return self;
......
237 289
static VALUE
238 290
rb_queue_push(VALUE self, VALUE obj)
239 291
{
292
    queue_raise_if_closed(self);
240 293
    return queue_do_push(self, obj);
241 294
}
242 295

  
......
254 307
    return RARRAY_LEN(waiters);
255 308
}
256 309

  
310
/*
311
 * Document-method: Queue#close
312
 * call-seq: close
313
 *
314
 * Closes the queue to producers. A closed queue cannot be re-opened.
315
 *
316
 * After the call to close completes, the following are true:
317
 *
318
 * - closed? will return true
319
 *
320
 * - calling enq/push/<< will raise an exception
321
 *
322
 * - calling deq/pop/shift will return an object from the queue as usual.
323
 *
324
 * - when empty? is true, deq(non_block=false) will not suspend and
325
 *   will return nil. deq(non_block=true) will raise an exception.
326
 *
327
 * And for SizedQueue, these will also be true:
328
 *
329
 * - each thread already suspended in enq at the time of the call
330
 *   to close will be allowed to push its object as usual.
331
 *
332
 * - empty? will be false when there are either objects in the queue, or
333
 *   producers which were suspended at the time of the call to close but whose
334
 *   objects are not yet in the queue. Therefore, it can be true (very
335
 *   briefly) that empty? == false && size == 0, since size returns the number
336
 *   of objects actually in the queue.
337
 */
338

  
339
static VALUE
340
rb_queue_close(VALUE self)
341
{
342
    // Never any pending producers for ordinary queue, so create empty array.
343
    RSTRUCT_SET(self, QUEUE_CLOSED, ary_buf_new());
344
    if (queue_length(self) < queue_num_waiting(self)) {
345
	wakeup_all_threads(GET_QUEUE_WAITERS(self));
346
    }
347
    return self;
348
}
349

  
350
static VALUE
351
rb_szqueue_close(VALUE self)
352
{
353
    VALUE remaining_producers = rb_ary_dup(GET_SZQUEUE_WAITERS(self));
354
    RSTRUCT_SET(self, QUEUE_CLOSED, remaining_producers);
355

  
356
    if (queue_length(self) + RARRAY_LEN(remaining_producers) < queue_num_waiting(self)) {
357
	// There may be more consumers waiting than pending items, so wake
358
	// up consumers and some will go away.
359

  
360
	// TODO is this only necessary when
361
	// queue_length(self) + // RARRAY_LEN(remaining_producers) == 0 ?
362

  
363
	// Because if there's even 1 blocked producer, the countdown will kick
364
	// in, and that culminates in wakeup_all_threads(...)
365
	wakeup_all_threads(GET_QUEUE_WAITERS(self));
366
    }
367
    return self;
368
}
369

  
257 370
struct waiting_delete {
258 371
    VALUE waiting;
259 372
    VALUE th;
......
276 389
static VALUE
277 390
queue_do_pop(VALUE self, int should_block)
278 391
{
392
    struct queue_shutdown queue_shutdown;
279 393
    struct waiting_delete args;
280
    args.waiting = GET_QUEUE_WAITERS(self);
281
    args.th	 = rb_thread_current();
282 394

  
283 395
    while (queue_length(self) == 0) {
396
	// non-blocking case will use empty?, so that must only return
397
	// true when the queue is really closed, ie not ever any more pending items.
398
	// But in the non-blocking case, there will never be blocked producers, so
399
	// it's not really relevant.
284 400
	if (!should_block) {
285 401
	    rb_raise(rb_eThreadError, "queue empty");
286 402
	}
403

  
404
	// only checking for shutdown when queue is empty, so less of a
405
	// performance drain.
406
	queue_shutdown = queue_closing(self);
407
	if (queue_shutdown.closed == Qtrue && queue_shutdown.countdown == 0) {
408
	    // TODO signal queue closed to consumers
409
	    // ie could also raise StopIteration or other exception here.
410
	    return Qnil;
411
	}
412

  
413
	// put this consumer to sleep until there's an item available
414
	args.waiting = GET_QUEUE_WAITERS(self);
415
	args.th  = rb_thread_current();
287 416
	rb_ary_push(args.waiting, args.th);
288 417
	rb_ensure(queue_sleep, (VALUE)0, queue_delete_from_waiting, (VALUE)&args);
289 418
    }
......
324 453
}
325 454

  
326 455
/*
456
 * Document-method: Queue#closed?
457
 * call-seq: closed?
458
 *
459
 * Returns +true+ if the queue is closed.
460
 */
461

  
462
static VALUE
463
rb_queue_closed_p(VALUE self)
464
{
465
    return RSTRUCT_GET(self, QUEUE_CLOSED) != Qnil ? Qtrue : Qfalse;
466
}
467

  
468
/*
327 469
 * Document-method: Queue#empty?
328 470
 * call-seq: empty?
329 471
 *
......
333 475
static VALUE
334 476
rb_queue_empty_p(VALUE self)
335 477
{
336
    return queue_length(self) == 0 ? Qtrue : Qfalse;
478
    struct queue_shutdown queue_shutdown;
479
    if (QUEUE_CLOSED_P(self)) {
480
	queue_shutdown = queue_closing(self);
481
	return queue_length(self) + queue_shutdown.countdown == 0 ? Qtrue : Qfalse;
482
    } else {
483
	return queue_length(self) == 0 ? Qtrue : Qfalse;
484
    }
337 485
}
338 486

  
339 487
/*
......
406 554

  
407 555
    RSTRUCT_SET(self, QUEUE_QUE, ary_buf_new());
408 556
    RSTRUCT_SET(self, QUEUE_WAITERS, ary_buf_new());
557
    RSTRUCT_SET(self, QUEUE_CLOSED, Qnil);
409 558
    RSTRUCT_SET(self, SZQUEUE_WAITERS, ary_buf_new());
410 559
    RSTRUCT_SET(self, SZQUEUE_MAX, vmax);
411 560

  
......
478 627
static VALUE
479 628
rb_szqueue_push(int argc, VALUE *argv, VALUE self)
480 629
{
481
    struct waiting_delete args;
482 630
    int should_block = szqueue_push_should_block(argc, argv);
483
    args.waiting = GET_SZQUEUE_WAITERS(self);
484
    args.th      = rb_thread_current();
631
    struct waiting_delete args;
632

  
633
    queue_raise_if_closed(self);
485 634

  
486 635
    while (queue_length(self) >= GET_SZQUEUE_ULONGMAX(self)) {
487 636
	if (!should_block) {
488 637
	    rb_raise(rb_eThreadError, "queue full");
489 638
	}
639
	// queue is full, so sleep until needed
640
	args.waiting = GET_SZQUEUE_WAITERS(self);
641
	args.th      = rb_thread_current();
490 642
	rb_ary_push(args.waiting, args.th);
643
	// TODO should also handle countdown decrement if the thread is terminated while sleeping.
644
	// TODO shouldn't this be queue_sleep as well?
491 645
	rb_ensure((VALUE (*)())rb_thread_sleep_deadly, (VALUE)0, queue_delete_from_waiting, (VALUE)&args);
492 646
    }
493
    return queue_do_push(self, argv[0]);
647

  
648
    queue_do_push(self, argv[0]);
649

  
650
    // skip this for the normal case, which is queue open
651
    if (QUEUE_CLOSED_P(self)) {
652
	// At this point, we know we've actually pushed an item onto the queue,
653
	// and we're in a closing state, so remove the current thread from the list
654
	// of pending pushers, if it was one to start with.
655
	VALUE removed_thread = rb_ary_delete(GET_QUEUE_CLOSED(self), rb_thread_current());
656

  
657
	// was that the last pending thread?
658
	if (removed_thread != Qnil && RARRAY_LEN(GET_QUEUE_CLOSED(self)) == 0) {
659
	    // wake all waiting consumers, because there will never be more items
660
	    wakeup_all_threads(GET_QUEUE_WAITERS(self));
661
        }
662
    }
663

  
664
    return self;
494 665
}
495 666

  
496 667
static VALUE
......
590 761
    VALUE rb_cQueue = rb_struct_define_without_accessor_under(
591 762
	OUTER,
592 763
	"Queue", rb_cObject, rb_struct_alloc_noinit,
593
	"que", "waiters", NULL);
764
	"que", "waiters", "closed", NULL);
594 765
    VALUE rb_cSizedQueue = rb_struct_define_without_accessor_under(
595 766
	OUTER,
596 767
	"SizedQueue", rb_cQueue, rb_struct_alloc_noinit,
597
	"que", "waiters", "queue_waiters", "size", NULL);
768
	"que", "waiters", "closed", "queue_waiters", "size", NULL);
598 769

  
599 770
#if 0
600 771
    rb_cConditionVariable = rb_define_class("ConditionVariable", rb_cObject); /* teach rdoc ConditionVariable */
......
620 791
    rb_define_method(rb_cQueue, "clear", rb_queue_clear, 0);
621 792
    rb_define_method(rb_cQueue, "length", rb_queue_length, 0);
622 793
    rb_define_method(rb_cQueue, "num_waiting", rb_queue_num_waiting, 0);
794
    rb_define_method(rb_cQueue, "close", rb_queue_close, 0);
795
    rb_define_method(rb_cQueue, "closed?", rb_queue_closed_p, 0);
623 796

  
624 797
    /* Alias for #push. */
625 798
    rb_define_alias(rb_cQueue, "enq", "push");
......
639 812
    rb_define_method(rb_cSizedQueue, "pop", rb_szqueue_pop, -1);
640 813
    rb_define_method(rb_cSizedQueue, "clear", rb_szqueue_clear, 0);
641 814
    rb_define_method(rb_cSizedQueue, "num_waiting", rb_szqueue_num_waiting, 0);
815
    rb_define_method(rb_cSizedQueue, "close", rb_szqueue_close, 0);
642 816

  
643 817
    /* Alias for #push. */
644 818
    rb_define_alias(rb_cSizedQueue, "enq", "push");