Project

General

Profile

Feature #8919 ยป patch.diff

Queue Implementation as embedded class - Glass_saga (Masaki Matsushita), 11/27/2014 12:30 AM

View differences:

ext/thread/thread.c
#define GET_CONDVAR_WAITERS(cv) get_array((cv), CONDVAR_WAITERS)
#define GET_QUEUE_QUE(q) get_array((q), QUEUE_QUE)
#define GET_QUEUE_WAITERS(q) get_array((q), QUEUE_WAITERS)
#define GET_SZQUEUE_WAITERS(q) get_array((q), SZQUEUE_WAITERS)
#define GET_SZQUEUE_MAX(q) RSTRUCT_GET((q), SZQUEUE_MAX)
#define GET_SZQUEUE_ULONGMAX(q) NUM2ULONG(GET_SZQUEUE_MAX(q))
......
VALUE timeout;
};
static ID id_sleep;
static ID id_sleep, id_length;
static VALUE
do_sleep(VALUE args)
......
return self;
}
/*
* Document-class: Queue
*
* This class provides a way to synchronize communication between threads.
*
* Example:
*
* require 'thread'
* queue = Queue.new
*
* producer = Thread.new do
* 5.times do |i|
* sleep rand(i) # simulate expense
* queue << i
* puts "#{i} produced"
* end
* end
*
* consumer = Thread.new do
* 5.times do |i|
* value = queue.pop
* sleep rand(i/2) # simulate expense
* puts "consumed #{value}"
* end
* end
*
*/
/*
* Document-method: Queue::new
*
* Creates a new queue instance.
*/
static VALUE
rb_queue_initialize(VALUE self)
{
RSTRUCT_SET(self, QUEUE_QUE, ary_buf_new());
RSTRUCT_SET(self, QUEUE_WAITERS, ary_buf_new());
return self;
}
static VALUE
queue_do_push(VALUE self, VALUE obj)
{
rb_ary_push(GET_QUEUE_QUE(self), obj);
wakeup_first_thread(GET_QUEUE_WAITERS(self));
return self;
}
/*
* Document-method: Queue#push
* call-seq:
* push(object)
* enq(object)
* <<(object)
*
* Pushes the given +object+ to the queue.
*/
static VALUE
rb_queue_push(VALUE self, VALUE obj)
{
return queue_do_push(self, obj);
}
static unsigned long
queue_length(VALUE self)
{
VALUE que = GET_QUEUE_QUE(self);
return RARRAY_LEN(que);
}
static unsigned long
queue_num_waiting(VALUE self)
{
VALUE waiters = GET_QUEUE_WAITERS(self);
return RARRAY_LEN(waiters);
VALUE length_v = rb_funcall2(self, id_length, 0, NULL);
return NUM2ULONG(length_v);
}
struct waiting_delete {
......
return Qnil;
}
static VALUE
queue_sleep(VALUE arg)
{
rb_thread_sleep_deadly();
return Qnil;
}
static VALUE
queue_do_pop(VALUE self, int should_block)
{
struct waiting_delete args;
args.waiting = GET_QUEUE_WAITERS(self);
args.th = rb_thread_current();
while (queue_length(self) == 0) {
if (!should_block) {
rb_raise(rb_eThreadError, "queue empty");
}
rb_ary_push(args.waiting, args.th);
rb_ensure(queue_sleep, (VALUE)0, queue_delete_from_waiting, (VALUE)&args);
}
return rb_ary_shift(GET_QUEUE_QUE(self));
}
static int
queue_pop_should_block(int argc, const VALUE *argv)
{
int should_block = 1;
rb_check_arity(argc, 0, 1);
if (argc > 0) {
should_block = !RTEST(argv[0]);
}
return should_block;
}
/*
* Document-method: Queue#pop
* call-seq:
* pop(non_block=false)
* deq(non_block=false)
* shift(non_block=false)
*
* Retrieves data from the queue.
*
* If the queue is empty, the calling thread is suspended until data is pushed
* onto the queue. If +non_block+ is true, the thread isn't suspended, and an
* exception is raised.
*/
static VALUE
rb_queue_pop(int argc, VALUE *argv, VALUE self)
{
int should_block = queue_pop_should_block(argc, argv);
return queue_do_pop(self, should_block);
}
/*
* Document-method: Queue#empty?
* call-seq: empty?
*
* Returns +true+ if the queue is empty.
*/
static VALUE
rb_queue_empty_p(VALUE self)
{
return queue_length(self) == 0 ? Qtrue : Qfalse;
}
/*
* Document-method: Queue#clear
*
* Removes all objects from the queue.
*/
static VALUE
rb_queue_clear(VALUE self)
{
rb_ary_clear(GET_QUEUE_QUE(self));
return self;
}
/*
* Document-method: Queue#length
* call-seq:
* length
* size
*
* Returns the length of the queue.
*/
static VALUE
rb_queue_length(VALUE self)
{
unsigned long len = queue_length(self);
return ULONG2NUM(len);
}
/*
* Document-method: Queue#num_waiting
*
* Returns the number of threads waiting on the queue.
*/
static VALUE
rb_queue_num_waiting(VALUE self)
{
unsigned long len = queue_num_waiting(self);
return ULONG2NUM(len);
}
/*
* Document-class: SizedQueue
*
......
rb_ary_push(args.waiting, args.th);
rb_ensure((VALUE (*)())rb_thread_sleep_deadly, (VALUE)0, queue_delete_from_waiting, (VALUE)&args);
}
return queue_do_push(self, argv[0]);
}
static VALUE
szqueue_do_pop(VALUE self, int should_block)
{
VALUE retval = queue_do_pop(self, should_block);
if (queue_length(self) < GET_SZQUEUE_ULONGMAX(self)) {
wakeup_first_thread(GET_SZQUEUE_WAITERS(self));
}
return retval;
return rb_call_super(argc, argv);
}
/*
......
static VALUE
rb_szqueue_pop(int argc, VALUE *argv, VALUE self)
{
int should_block = queue_pop_should_block(argc, argv);
return szqueue_do_pop(self, should_block);
VALUE retval = rb_call_super(argc, argv);
if (queue_length(self) < GET_SZQUEUE_ULONGMAX(self)) {
wakeup_first_thread(GET_SZQUEUE_WAITERS(self));
}
return retval;
}
/*
* Document-method: Queue#clear
* Document-method: SizedQueue#clear
*
* Removes all objects from the queue.
*/
......
static VALUE
rb_szqueue_clear(VALUE self)
{
rb_ary_clear(GET_QUEUE_QUE(self));
wakeup_all_threads(GET_SZQUEUE_WAITERS(self));
return self;
return rb_call_super(0, NULL);
}
/*
......
static VALUE
rb_szqueue_num_waiting(VALUE self)
{
long len = queue_num_waiting(self);
VALUE len_v = rb_call_super(0, NULL);
long len = NUM2LONG(len_v);
VALUE waiters = GET_SZQUEUE_WAITERS(self);
len += RARRAY_LEN(waiters);
return ULONG2NUM(len);
......
OUTER,
"ConditionVariable", rb_cObject, rb_struct_alloc_noinit,
"waiters", NULL);
VALUE rb_cQueue = rb_struct_define_without_accessor_under(
OUTER,
"Queue", rb_cObject, rb_struct_alloc_noinit,
"que", "waiters", NULL);
VALUE rb_cSizedQueue = rb_struct_define_without_accessor_under(
OUTER,
rb_cQueue,
"SizedQueue", rb_cQueue, rb_struct_alloc_noinit,
"que", "waiters", "queue_waiters", "size", NULL);
#if 0
rb_cConditionVariable = rb_define_class("ConditionVariable", rb_cObject); /* teach rdoc ConditionVariable */
rb_cQueue = rb_define_class("Queue", rb_cObject); /* teach rdoc Queue */
rb_cSizedQueue = rb_define_class("SizedQueue", rb_cObject); /* teach rdoc SizedQueue */
#endif
id_sleep = rb_intern("sleep");
id_length = rb_intern("length");
rb_define_method(rb_cConditionVariable, "initialize", rb_condvar_initialize, 0);
rb_undef_method(rb_cConditionVariable, "initialize_copy");
......
rb_define_method(rb_cConditionVariable, "signal", rb_condvar_signal, 0);
rb_define_method(rb_cConditionVariable, "broadcast", rb_condvar_broadcast, 0);
rb_define_method(rb_cQueue, "initialize", rb_queue_initialize, 0);
rb_undef_method(rb_cQueue, "initialize_copy");
rb_define_method(rb_cQueue, "marshal_dump", undumpable, 0);
rb_define_method(rb_cQueue, "push", rb_queue_push, 1);
rb_define_method(rb_cQueue, "pop", rb_queue_pop, -1);
rb_define_method(rb_cQueue, "empty?", rb_queue_empty_p, 0);
rb_define_method(rb_cQueue, "clear", rb_queue_clear, 0);
rb_define_method(rb_cQueue, "length", rb_queue_length, 0);
rb_define_method(rb_cQueue, "num_waiting", rb_queue_num_waiting, 0);
/* Alias for #push. */
rb_define_alias(rb_cQueue, "enq", "push");
/* Alias for #push. */
rb_define_alias(rb_cQueue, "<<", "push");
/* Alias for #pop. */
rb_define_alias(rb_cQueue, "deq", "pop");
/* Alias for #pop. */
rb_define_alias(rb_cQueue, "shift", "pop");
/* Alias for #length. */
rb_define_alias(rb_cQueue, "size", "length");
rb_define_method(rb_cSizedQueue, "initialize", rb_szqueue_initialize, 1);
rb_define_method(rb_cSizedQueue, "max", rb_szqueue_max_get, 0);
rb_define_method(rb_cSizedQueue, "max=", rb_szqueue_max_set, 1);
......
rb_provide("thread.rb");
ALIAS_GLOBAL_CONST(ConditionVariable);
ALIAS_GLOBAL_CONST(Queue);
ALIAS_GLOBAL_CONST(SizedQueue);
}
include/ruby/ruby.h
RUBY_EXTERN VALUE rb_cTime;
RUBY_EXTERN VALUE rb_cTrueClass;
RUBY_EXTERN VALUE rb_cUnboundMethod;
RUBY_EXTERN VALUE rb_cQueue;
RUBY_EXTERN VALUE rb_eException;
RUBY_EXTERN VALUE rb_eStandardError;
thread.c
#endif
VALUE rb_cMutex;
VALUE rb_cQueue;
VALUE rb_cThreadShield;
static VALUE sym_immediate;
......
m->allow_trap = val;
}
enum {
CONDVAR_WAITERS = 0
};
enum {
QUEUE_QUE = 0,
QUEUE_WAITERS = 1
};
#define GET_QUEUE_QUE(q) get_array((q), QUEUE_QUE)
#define GET_QUEUE_WAITERS(q) get_array((q), QUEUE_WAITERS)
static VALUE
get_array(VALUE obj, int idx)
{
VALUE ary = RSTRUCT_GET(obj, idx);
if (!RB_TYPE_P(ary, T_ARRAY)) {
rb_raise(rb_eTypeError, "%+"PRIsVALUE" not initialized", obj);
}
return ary;
}
static VALUE
ary_buf_new(void)
{
return rb_ary_tmp_new(1);
}
static void
wakeup_first_thread(VALUE list)
{
VALUE thread;
while (!NIL_P(thread = rb_ary_shift(list))) {
if (RTEST(rb_thread_wakeup_alive(thread))) break;
}
}
/*
* Document-class: Queue
*
* This class provides a way to synchronize communication between threads.
*
* Example:
*
* require 'thread'
* queue = Queue.new
*
* producer = Thread.new do
* 5.times do |i|
* sleep rand(i) # simulate expense
* queue << i
* puts "#{i} produced"
* end
* end
*
* consumer = Thread.new do
* 5.times do |i|
* value = queue.pop
* sleep rand(i/2) # simulate expense
* puts "consumed #{value}"
* end
* end
*
*/
/*
* Document-method: Queue::new
*
* Creates a new queue instance.
*/
static VALUE
rb_queue_initialize(VALUE self)
{
RSTRUCT_SET(self, QUEUE_QUE, ary_buf_new());
RSTRUCT_SET(self, QUEUE_WAITERS, ary_buf_new());
return self;
}
static VALUE
queue_do_push(VALUE self, VALUE obj)
{
rb_ary_push(GET_QUEUE_QUE(self), obj);
wakeup_first_thread(GET_QUEUE_WAITERS(self));
return self;
}
/*
* Document-method: Queue#push
* call-seq:
* push(object)
* enq(object)
* <<(object)
*
* Pushes the given +object+ to the queue.
*/
static VALUE
rb_queue_push(VALUE self, VALUE obj)
{
return queue_do_push(self, obj);
}
static unsigned long
queue_length(VALUE self)
{
VALUE que = GET_QUEUE_QUE(self);
return RARRAY_LEN(que);
}
static unsigned long
queue_num_waiting(VALUE self)
{
VALUE waiters = GET_QUEUE_WAITERS(self);
return RARRAY_LEN(waiters);
}
struct waiting_delete {
VALUE waiting;
VALUE th;
};
static VALUE
queue_delete_from_waiting(struct waiting_delete *p)
{
rb_ary_delete(p->waiting, p->th);
return Qnil;
}
static VALUE
queue_sleep(VALUE arg)
{
rb_thread_sleep_deadly();
return Qnil;
}
static VALUE
queue_do_pop(VALUE self, int should_block)
{
struct waiting_delete args;
args.waiting = GET_QUEUE_WAITERS(self);
args.th = rb_thread_current();
while (queue_length(self) == 0) {
if (!should_block) {
rb_raise(rb_eThreadError, "queue empty");
}
rb_ary_push(args.waiting, args.th);
rb_ensure(queue_sleep, (VALUE)0, queue_delete_from_waiting, (VALUE)&args);
}
return rb_ary_shift(GET_QUEUE_QUE(self));
}
static int
queue_pop_should_block(int argc, const VALUE *argv)
{
int should_block = 1;
rb_check_arity(argc, 0, 1);
if (argc > 0) {
should_block = !RTEST(argv[0]);
}
return should_block;
}
/*
* Document-method: Queue#pop
* call-seq:
* pop(non_block=false)
* deq(non_block=false)
* shift(non_block=false)
*
* Retrieves data from the queue.
*
* If the queue is empty, the calling thread is suspended until data is pushed
* onto the queue. If +non_block+ is true, the thread isn't suspended, and an
* exception is raised.
*/
static VALUE
rb_queue_pop(int argc, VALUE *argv, VALUE self)
{
int should_block = queue_pop_should_block(argc, argv);
return queue_do_pop(self, should_block);
}
/*
* Document-method: Queue#empty?
* call-seq: empty?
*
* Returns +true+ if the queue is empty.
*/
static VALUE
rb_queue_empty_p(VALUE self)
{
return queue_length(self) == 0 ? Qtrue : Qfalse;
}
/*
* Document-method: Queue#clear
*
* Removes all objects from the queue.
*/
static VALUE
rb_queue_clear(VALUE self)
{
rb_ary_clear(GET_QUEUE_QUE(self));
return self;
}
/*
* Document-method: Queue#length
* call-seq:
* length
* size
*
* Returns the length of the queue.
*/
static VALUE
rb_queue_length(VALUE self)
{
unsigned long len = queue_length(self);
return ULONG2NUM(len);
}
/*
* Document-method: Queue#num_waiting
*
* Returns the number of threads waiting on the queue.
*/
static VALUE
rb_queue_num_waiting(VALUE self)
{
unsigned long len = queue_num_waiting(self);
return ULONG2NUM(len);
}
/* :nodoc: */
static VALUE
undumpable(VALUE obj)
{
rb_raise(rb_eTypeError, "can't dump %"PRIsVALUE, rb_obj_class(obj));
UNREACHABLE;
}
/*
* Document-class: ThreadShield
*/
......
rb_define_method(rb_cMutex, "synchronize", rb_mutex_synchronize_m, 0);
rb_define_method(rb_cMutex, "owned?", rb_mutex_owned_p, 0);
rb_cQueue = rb_struct_define_without_accessor(
"Queue", rb_cObject, rb_struct_alloc_noinit,
"que", "waiters", NULL
);
#if 0
rb_cQueue = rb_define_class("Queue", rb_cObject); /* teach rdoc Queue */
#endif
rb_define_method(rb_cQueue, "initialize", rb_queue_initialize, 0);
rb_undef_method(rb_cQueue, "initialize_copy");
rb_define_method(rb_cQueue, "marshal_dump", undumpable, 0);
rb_define_method(rb_cQueue, "push", rb_queue_push, 1);
rb_define_method(rb_cQueue, "pop", rb_queue_pop, -1);
rb_define_method(rb_cQueue, "empty?", rb_queue_empty_p, 0);
rb_define_method(rb_cQueue, "clear", rb_queue_clear, 0);
rb_define_method(rb_cQueue, "length", rb_queue_length, 0);
rb_define_method(rb_cQueue, "num_waiting", rb_queue_num_waiting, 0);
/* Alias for #push. */
rb_define_alias(rb_cQueue, "enq", "push");
/* Alias for #push. */
rb_define_alias(rb_cQueue, "<<", "push");
/* Alias for #pop. */
rb_define_alias(rb_cQueue, "deq", "pop");
/* Alias for #pop. */
rb_define_alias(rb_cQueue, "shift", "pop");
/* Alias for #length. */
rb_define_alias(rb_cQueue, "size", "length");
recursive_key = rb_intern("__recursive_key__");
rb_eThreadError = rb_define_class("ThreadError", rb_eStandardError);
    (1-1/1)