Index: ChangeLog =================================================================== --- ChangeLog (revision 51752) +++ ChangeLog (working copy) @@ -1,3 +1,10 @@ +Tue Sep 9 02:33:45 2015 Jonathan Cruz + + * thread_sync.c: Update Queue#pop(should_block=true) + Allow the caller to provide a block that accepts data from the + queue. Return the first element for which the block returns a + truthy value, and remove it from the queue. + Thu Sep 3 21:12:12 2015 Nobuyoshi Nakada * lib/cgi/session.rb (create_new_id): use SHA512 instead of MD5. Index: thread_sync.c =================================================================== --- thread_sync.c (revision 51752) +++ thread_sync.c (working copy) @@ -794,27 +794,47 @@ static VALUE queue_do_pop(VALUE self, int should_block) { + VALUE que = GET_QUEUE_QUE(self); struct waiting_delete args; args.waiting = GET_QUEUE_WAITERS(self); args.th = rb_thread_current(); - while (queue_length(self) == 0) { - if (!should_block) { - rb_raise(rb_eThreadError, "queue empty"); - } - else if (queue_closed_p(self)) { - return queue_closed_result(self); - } - else { - assert(queue_length(self) == 0); - assert(queue_closed_p(self) == 0); + for(;;) { + VALUE obj = 0; + int obj_index = -1; - rb_ary_push(args.waiting, args.th); - rb_ensure(queue_sleep, (VALUE)0, queue_delete_from_waiting, (VALUE)&args); - } + if (rb_block_given_p()) { + for (i=0; i < RARRAY_LEN(que); i++) { + obj = RARRAY_AREF(que, i) + if (rb_yield(obj)) { + obj_index = i; + break; + } + } + } + else if (RARRAY_LEN(que) > 0) { + return rb_ary_shift(que); + } + + if (obj_index >= 0) { + rb_ary_delete_at(que, obj_index); + return obj; + } + + if (!should_block) { + rb_raise(rb_eThreadError, "queue empty"); + } + else if (queue_closed_p(self)) { + return queue_closed_result(self); + } + else { + assert(queue_length(self) == 0); + assert(queue_closed_p(self) == 0); + + rb_ary_push(args.waiting, args.th); + rb_ensure(queue_sleep, (VALUE)0, queue_delete_from_waiting, (VALUE)&args); + } } - - return rb_ary_shift(GET_QUEUE_QUE(self)); } static int @@ -835,11 +855,12 @@ * deq(non_block=false) * shift(non_block=false) * - * Retrieves data from the queue. + * Retrieves data from the queue. If +block+ is given, retrieves data from the + * queue for which the given +block+ returns a true value. * - * If the queue is empty, the calling thread is suspended until data is pushed - * onto the queue. If +non_block+ is true, the thread isn't suspended, and an - * exception is raised. + * If the queue is empty (or +block+ never returns true), the calling thread is + * suspended until data is pushed onto the queue. If +non_block+ is true, the + * thread isn't suspended, and an exception is raised. */ static VALUE @@ -1067,11 +1088,14 @@ * deq(non_block=false) * shift(non_block=false) * - * Retrieves data from the queue. + * Retrieves data from the queue. If +block+ is given, retrieves data from the + * queue for which the given +block+ returns a true value. * - * If the queue is empty, the calling thread is suspended until data is pushed - * onto the queue. If +non_block+ is true, the thread isn't suspended, and an - * exception is raised. + * If the queue is empty (or +block+ never returns true), the calling thread is + * suspended until data is pushed onto the queue. If +non_block+ is true, the + * thread isn't suspended, and an exception is raised. */ static VALUE