Project

General

Profile

Actions

Feature #21869

open

Add receive_all Method to Ractor API for Message Batching

Feature #21869: Add receive_all Method to Ractor API for Message Batching

Added by synacker (Mikhail Milovidov) about 2 months ago. Updated 8 days ago.

Status:
Open
Assignee:
Target version:
-
[ruby-core:124711]

Description

Summary
The Ractor API provides an excellent mechanism for inter‑thread communication, but it currently lacks a built‑in message batching technique. I propose adding a receive_all method to enable batch processing of messages, which can significantly improve performance in high‑load scenarios.

Motivation
In distributed queued systems, processing messages one‑by‑one (as with the current receive method) can introduce unnecessary overhead. Batch processing allows:

Reduced context‑switching overhead.

More efficient I/O operations (e.g., fewer file writes).

Better throughput in high‑concurrency environments.

Proposed Solution
Add a receive_all method to the Ractor API that:

Returns all available messages in the Ractor’s mailbox at once (as an array).

Demonstration Code
Below is a benchmark comparing individual receive vs. batch receive_all:

require 'benchmark'
class RactorsTest

  def initialize(count)
    @count = count
    @ractor1 = Ractor.new(count, 'output1.txt') do |count, filename|
      File.open(filename, 'w') do |file|
        while count.positive?
          message = receive
          file.write("Ractor 1 received message: #{message}\n")
          file.flush
          count -= 1
        end
      end
    end
    
    @ractor2 = Ractor.new(count, 'output2.txt') do |count, filename|
      File.open(filename, 'w') do |file|
        while count.positive?
          messages = receive_all
          messages.each do |message|
            file.write("Ractor 2 received message: #{message}\n")
          end
          count -= messages.length
          file.flush
        end
      end
    end
  end

  def run1
    @count.times do |i|
      @ractor1.send("Message #{i + 1}")
    end
    @ractor1.join
  end

  def run2
    @count.times do |i|
      @ractor2.send("Message #{i + 1}")
    end
    @ractor2.join
  end
end

records = 1_000_000

test = RactorsTest.new(records)

p [:once, Benchmark.realtime { test.run1 }.round(2)]
p [:all, Benchmark.realtime { test.run2 }.round(2)]

Benchmark Results
On my system, receive_all shows ~4x improvement over individual receive:

Key Observations:
Ractor1 (using receive): Processes each message individually, resulting in frequent I/O calls.

Ractor2 (using receive_all): Processes all queued messages at once, minimizing I/O overhead

Updated by ko1 (Koichi Sasada) about 1 month ago · Edited Actions #2 [ruby-core:124752]

Does it block when the queue is empty or returns []? (I think the example expects blocking)

Updated by Eregon (Benoit Daloze) about 1 month ago Actions #3 [ruby-core:124757]

synacker (Mikhail Milovidov) wrote:

More efficient I/O operations (e.g., fewer file writes).

Is it? In your example you call file.write for each message in both cases.

But you also call file.flush after each file.write in ractor1 and only only once per batch in ractor2.
Could you benchmark without the file.flushs? I suspect the difference is much smaller then.

I understand the idea that batching helps in this case where you want to explicitly flush, but that's a pretty specific example, e.g. it's uncommon to even call IO#flush at all in Ruby.
One could also flush after N messages/bytes in ractor1.

Updated by synacker (Mikhail Milovidov) about 1 month ago · Edited Actions #4 [ruby-core:124759]

ko1 (Koichi Sasada) wrote in #note-2:

Does it block when the queue is empty or returns []? (I think the example expects blocking)

Yes, it blocks if the queue empty. The method receive_all accepts a limit parameter:

  1. limit > 0: collects up to limit messages (may return fewer if fewer are queued). Blocks if the queue is empty.
  2. limit == 0: returns an empty array immediately (no blocking)
  3. limit < 0 or nil (default): returns all messages from the queue or blocks if the queue is empty

Eregon (Benoit Daloze) wrote in #note-3:

I understand the idea that batching helps in this case where you want to explicitly flush, but that's a pretty specific example, e.g. it's uncommon to even call IO#flush at all in Ruby.

The example demonstrates how you can collect messages during long I/O operations and batch them together to reduce the number of subsequent I/O calls. The file.flush call simulates a long I/O operation - it could equally represent a database call or something like that

Updated by synacker (Mikhail Milovidov) about 1 month ago Actions #5 [ruby-core:124760]

Eregon (Benoit Daloze) wrote in #note-3:

Is it? In your example you call file.write for each message in both cases.

But you also call file.flush after each file.write in ractor1 and only only once per batch in ractor2.

This is also a realistic scenario. To guarantee messages are saved to file, you'd normally need to call flush after each message - but that's inefficient when processing single messages.

Updated by luke-gru (Luke Gruber) 19 days ago Actions #6

  • Assignee set to ractor

Updated by byroot (Jean Boussier) 18 days ago Actions #7 [ruby-core:124947]

I understand the idea that batching helps in this case where you want to explicitly flush, but that's a pretty specific example, e.g. it's uncommon to even call IO#flush at all in Ruby.

Not specific to Ractor, but I relatively often had a similar use case with Thread::Queue. When implementing instrumentation libraries (e.g. statsd or opentelemetry) what you generally want to do is to push as much work as possible to a background thread (possibly ractor in the future), but you also want this background thread to serialize and send packets in batch.

In such context, being able to pop 1..N elements from the queue would be convenient. Currently with queue it's semi-doable by first doing a blocking pop, followed by a bounded number of non-blocking one.

Updated by ko1 (Koichi Sasada) 9 days ago Actions #8 [ruby-core:125027]

synacker (Mikhail Milovidov) wrote in #note-4:

Yes, it blocks if the queue empty. The method receive_all accepts a limit parameter:

  1. limit > 0: collects up to limit messages (may return fewer if fewer are queued). Blocks if the queue is empty.
  2. limit == 0: returns an empty array immediately (no blocking)
  3. limit < 0 or nil (default): returns all messages from the queue or blocks if the queue is empty

The parameter limit is not in the description. Let me clarify limit == 0. In above quote, it returns empty array.
If it returns [] always, I think we don't need to introduce it.
If it returns existing messages, we can consider about it.

For this purpose, it is possible to introduce another parameter such as timeout: 0.

  • receive_all(3, timeout: 0) returns up to 3 messages already arrived. If there is no messages arrived, returns nil (timeout).
  • receive_all(timeout: 1) returns all messages in a queue, or wait for 1 second. If a message arrived, return the [received_one_message]. If no message arrived in 1 second, return nil.
  • receive_all(0 or negative parameter) raises an exception.

We can extend timeout: on receive method with same manner.

Updated by synacker (Mikhail Milovidov) 8 days ago Actions #9 [ruby-core:125050]

ko1 (Koichi Sasada) wrote in #note-8:

For this purpose, it is possible to introduce another parameter such as timeout: 0.

  • receive_all(3, timeout: 0) returns up to 3 messages already arrived. If there is no messages arrived, returns nil (timeout).
  • receive_all(timeout: 1) returns all messages in a queue, or wait for 1 second. If a message arrived, return the [received_one_message]. If no message arrived in 1 second, return nil.
  • receive_all(0 or negative parameter) raises an exception.

LGTM, I’ll update the PR with this suggestion.

Updated by synacker (Mikhail Milovidov) 8 days ago Actions #10 [ruby-core:125056]

ko1 (Koichi Sasada) wrote in #note-8:

If it returns [] always, I think we don't need to introduce it.

I've updated pr. The limit parameter behaves as fallow:

  1. limit == nil: returns all messages in the queue, or blocks if the queue is empty.
  2. limit > 0: returns an array of up to limit messages, or blocks if the queue is empty.
  3. limit <= 0: raises an ArgumentError

ko1 (Koichi Sasada) wrote in #note-8:

For this purpose, it is possible to introduce another parameter such as timeout: 0.

Regarding the timeout suggestion: implementing it would require significant changes to the Ractor logic. I propose addressing this in a separate task.

Actions

Also available in: PDF Atom