diff --git a/lib/thread.rb b/lib/thread.rb index 88e86ab..9857599 100644 --- a/lib/thread.rb +++ b/lib/thread.rb @@ -129,7 +129,7 @@ end # # consumer = Thread.new do # 5.times do |i| -# value = queue.pop +# value = queue.shift # sleep rand(i/2) # simulate expense # puts "consumed #{value}" # end diff --git a/lib/thread/queue.rb b/lib/thread/queue.rb new file mode 100644 index 0000000..3b8d436 --- /dev/null +++ b/lib/thread/queue.rb @@ -0,0 +1,187 @@ +require 'timeout' + +class Thread + # Thread::Queue is thread safe a FIFO queue. It provdies a way to synchronize + # communication between threads. This queue does not block when items are + # removed (see Thread::Queue#remove) + # + # This queue does not allow nil elements. + class Queue + class NoSuchElementError < StandardError + end + + # + # Creates a new queue. + # + def initialize + @que = [] + @que.taint # enable tainted communication + self.taint + @mutex = Mutex.new + end + + def to_a + @que.dup + end + + # + # Adds +obj+ to the head of the queue. + # + # Raises an ArgumentError if +obj+ is nil. + # + def add(obj) + raise ArgumentError if obj.nil? + @mutex.synchronize { @que.push obj } + self + end + + alias :push :add + alias :<< :add + + def offer(obj, timeout = nil) + add obj + end + + # Retrieves data from the queue head, and removes it. + # + # Raises a NoSuchElementError if the queue is empty. + def remove + @mutex.synchronize { + raise NoSuchElementError if empty? + @que.shift + } + end + + alias :pop :remove + alias :shift :remove + alias :deq :remove + + # Retrieves data from the queue head, and removes it. + # + # Returns nil if this queue is empty. + def poll + @mutex.synchronize { + if empty? + nil + else + @que.shift + end + } + end + + # Retrieves data from the queue head, but does not removes it. + # + # Returns nil if the queue is empty + def peek + @mutex.synchronize { @que.first } + end + + # Retrieves data from the queue head, but does not removes it. + # + # Raises NoSuchElementError if the queue is empty. + def element + @mutex.synchronize { + if empty? + raise NoSuchElementError + else + @que.first + end + } + end + + # + # Returns +true+ if the queue is empty. + # + def empty? + @que.empty? + end + + # + # Removes all objects from the queue. + # + def clear + @que.clear + end + + # + # Returns the length of the queue. + # + def length + @que.length + end + alias :size :length + end + + # Thread::Queue is thread safe a FIFO queue. It provdies a way to synchronize + # communication between threads. + # + # This queue does not allow nil elements. + class BlockingQueue < Queue + def initialize + @waiting = [] + @waiting.taint + super + end + + # Retrieves data from the queue head, and removes it. + # + # If the queue is empty, remove will block until there is something + # in the queue. + def take + @mutex.synchronize { + while true + if @que.empty? + # @waiting.include? check is necessary for avoiding a race against + # Thread.wakeup [Bug 5195] + @waiting.push Thread.current unless @waiting.include?(Thread.current) + @mutex.sleep + else + return @que.shift + end + end + } + end + + alias :pop :take + alias :shift :take + alias :deq :take + + # Adds +obj+ to the head of the queue. + # + # Raises an ArgumentError if +obj+ is nil. + # + def add(obj) + raise ArgumentError if obj.nil? + + @mutex.synchronize { + @que.push obj + begin + t = @waiting.shift + t.wakeup if t + rescue ThreadError + retry + end + } + self + end + + alias :push :add + alias :<< :add + + # Retrieves data from the queue head, and removes it. + # + # Blocks for +timeout+ seconds if the queue is empty, and returns nil if + # the timeout expires. + def poll(timeout = nil) + return super() unless timeout + + begin + Timeout.timeout(timeout) do + take + end + rescue TimeoutError + nil + end + end + end +end diff --git a/test/thread/helper.rb b/test/thread/helper.rb new file mode 100644 index 0000000..d3c1258 --- /dev/null +++ b/test/thread/helper.rb @@ -0,0 +1,102 @@ +require 'minitest/autorun' +require 'thread/queue' + +class Thread + class TestCase < MiniTest::Unit::TestCase + class Latch + def initialize + @mutex = Mutex.new + @cond = ConditionVariable.new + end + + def release + @mutex.synchronize { @cond.broadcast } + end + + def await + @mutex.synchronize { @cond.wait @mutex } + end + end + + attr_reader :queue + + POISON = Object.new + + def grind(num_threads, num_objects, num_iterations, klass, *args) + from_workers = klass.new(*args) + to_workers = klass.new(*args) + + to_consumers = num_threads.times.map { + Thread.new { + while object = to_workers.pop + break if object == POISON + from_workers.push object + end + } + } + + from_consumer = Thread.new { + num_iterations.times { + num_objects.times { from_workers.pop } + } + } + + num_iterations.times { + num_objects.times { to_workers.push 99 } + } + num_threads.times { to_workers.push POISON } + + to_consumers.each { |t| t.join } + + from_consumer.join + + assert_equal 0, from_workers.size + assert_equal 0, to_workers.size + end + + def non_block_grind(num_threads, num_objects, num_iterations, klass, *args) + from_workers = klass.new(*args) + to_workers = klass.new(*args) + + to_latch = Latch.new + from_latch = Latch.new + + to_consumers = num_threads.times.map { + Thread.new { + to_latch.await + + while object = to_workers.pop + break if object == POISON + from_workers.push object + end + } + } + + from_consumer = Thread.new { + from_latch.await + + num_iterations.times { + num_objects.times { from_workers.pop } + } + } + + num_iterations.times { + num_objects.times { to_workers.push 99 } + } + num_threads.times { to_workers.push POISON } + + Thread.pass until to_consumers.all? { |c| c.status == "sleep" } + Thread.pass until from_consumer.status == "sleep" + + to_latch.release + + to_consumers.each { |t| t.join } + + from_latch.release + from_consumer.join + + assert_equal 0, from_workers.size + assert_equal 0, to_workers.size + end + end +end diff --git a/test/thread/test_blocking_queue.rb b/test/thread/test_blocking_queue.rb new file mode 100644 index 0000000..f80b063 --- /dev/null +++ b/test/thread/test_blocking_queue.rb @@ -0,0 +1,93 @@ +require 'helper' + +class Thread + class TestBlockingQueue < TestCase + attr_reader :queue + + def setup + @queue = Thread::BlockingQueue.new + super + end + + def test_add_returns_self + assert_equal queue, queue.add(1) + end + + def test_queue + grind(5, 1000, 15, Thread::BlockingQueue) + end + + def test_offer + assert queue.offer(1) + assert_equal 1, queue.length + end + + def test_clear + 10.times { |i| queue << i } + assert_equal 10, queue.length + queue.clear + assert_equal 0, queue.length + assert queue.empty? + end + + def test_add + queue.add "foo" + assert_equal "foo", queue.take + assert queue.empty? + end + + def test_add_nil + assert_raises(ArgumentError) do + queue.add nil + end + end + + def test_remove_empty + assert queue.empty? + t = Thread.new { queue.take } + queue << 1 + assert_equal 1, t.join.value + end + + def test_poll + queue.add "foo" + assert_equal "foo", queue.poll + end + + def test_poll_empty + assert_nil queue.poll + end + + def test_poll_timeout + assert_nil queue.poll(1) + + t = Thread.new { queue.poll(10) } + queue << "foo" + assert_equal "foo", t.join.value + end + + def test_peek + queue.add "foo" + assert_equal "foo", queue.peek + assert_equal "foo", queue.take + end + + def test_peek_empty + assert queue.empty? + assert_nil queue.peek + end + + def test_element + queue.add "foo" + assert_equal "foo", queue.element + assert_equal "foo", queue.take + end + + def test_element_empty + assert queue.empty? + assert_raises(Queue::NoSuchElementError) do + queue.element + end + end + end +end diff --git a/test/thread/test_non_block_queue.rb b/test/thread/test_non_block_queue.rb new file mode 100644 index 0000000..fa22a74 --- /dev/null +++ b/test/thread/test_non_block_queue.rb @@ -0,0 +1,90 @@ +require 'helper' + +class Thread + class TestQueue < TestCase + alias :grind :non_block_grind + + def setup + super + @queue = Thread::Queue.new + end + + def test_queue + grind(5, 1000, 15, Thread::Queue) + end + + def test_add_returns_self + assert_equal queue, queue.add(1) + end + + def test_offer + assert queue.offer(1) + end + + def test_clear + 10.times { |i| queue << i } + assert_equal 10, queue.length + queue.clear + assert_equal 0, queue.length + assert queue.empty? + end + + def test_add + queue.add "foo" + assert_equal "foo", queue.remove + assert queue.empty? + end + + def test_add_nil + assert_raises(ArgumentError) do + queue.add nil + end + end + + def test_remove_empty + assert queue.empty? + assert_raises(Queue::NoSuchElementError) do + queue.remove + end + end + + def test_poll + queue.add "foo" + assert_equal "foo", queue.poll + end + + def test_poll_empty + assert_nil queue.poll + end + + def test_peek + queue.add "foo" + assert_equal "foo", queue.peek + assert_equal "foo", queue.remove + end + + def test_peek_empty + assert queue.empty? + assert_nil queue.peek + end + + def test_element + queue.add "foo" + assert_equal "foo", queue.element + assert_equal "foo", queue.remove + end + + def test_element_empty + assert queue.empty? + assert_raises(Queue::NoSuchElementError) do + queue.element + end + end + + def test_offer_optionally_takes_timeout + assert queue.empty? + queue.offer 0, 10 + assert_equal 1, queue.length + end + end +end