Project

General

Profile

Actions

Feature #17378

closed

Ractor#receive with filtering like other actor langauge

Added by ko1 (Koichi Sasada) about 4 years ago. Updated about 4 years ago.

Status:
Closed
Assignee:
-
Target version:
-
[ruby-core:101317]

Description

With discussion with @marcandre (Marc-Andre Lafortune), we found good extension for Ractor.receive with block.

Ractor.receive do |msg|
  if msg is match to condition?
    true
  else
    false
  end
end

This block iterates incoming queue's values and the value is passed in msg.
If the passed value is matched you want to process, the block should return true and the value will be removed from the queue.
Otherwise (returning falsy value), the value remains in the queue, and other Ractor.receive accesses it again.
If there is not more values in queue, the interpreter sleeps until new values are received.

r = Ractor.new do |;r|
  loop do
    r, msg = Ractor.receive
    r << msg
    r << 1
    r << 2
    r << 3
  end
end

r.send([Ractor.current, :ping])

# incoming queue: [:ping, 1, 2, 3] (3 is at the last)

Ractor.receive #=> :ping # without a block

# incoming queue: [1, 2, 3]

p Ractor.receive{|msg| msg == 2}
#=> 2 

# incoming queue: [1, 3]
begin
  # exception in the block doesn't touch the queue
  p Ractor.receive{|msg| raise "err"}
rescue => e
  p e.message #=> "err"
end

# incoming queue: [1, 3]
p Ractor.receive #=> 1

# incoming queue: [3]
p Ractor.receive #=> 3

With multiple server, we can make it:

morning_server = Ractor.new do
  loop do
    task = Proc.new{|obj| "Good morning #{obj}"}
    r, req = Ractor.receive
    res = task.(req)
    r.send([Ractor.current, res])
  end
end

evening_server = Ractor.new do
  loop do
    task = Proc.new{|obj| "Good evening #{obj}"}
    r, req = Ractor.receive
    res = task.(req)
    r.send([Ractor.current, res])
  end
end

morning_server << [Ractor.current, 'ko1']
morning_server << [Ractor.current, 'ko2']
morning_server << [Ractor.current, 'ko3']

evening_server << [Ractor.current, 'ko1']
evening_server << [Ractor.current, 'ko2']
evening_server << [Ractor.current, 'ko3']

def receive r
  Ractor.receive{|(from, msg)|
    r == from
  }[1]
end

p receive(morning_server) #=> "Good morning ko1"
p receive(evening_server) #=> "Good evening ko1"
p receive(morning_server) #=> "Good morning ko2"
p receive(evening_server) #=> "Good evening ko2"
p receive(morning_server) #=> "Good morning ko3"
p receive(evening_server) #=> "Good evening ko3"

Related issues 1 (0 open1 closed)

Related to Ruby master - Feature #17365: Can not respond (well) to a Ractor ClosedActions
Actions #2

Updated by ko1 (Koichi Sasada) about 4 years ago

  • Description updated (diff)
Actions #3

Updated by Eregon (Benoit Daloze) about 4 years ago

  • Description updated (diff)
Actions #4

Updated by Eregon (Benoit Daloze) about 4 years ago

Updated by matz (Yukihiro Matsumoto) about 4 years ago

The receive method with a block suggest the block takes an incoming message as a block parameter. So I agree with the proposal if the name is receive_if.

Matz.

Updated by marcandre (Marc-Andre Lafortune) about 4 years ago

When discussing with @ko1 (Koichi Sasada) and @Eregon (Benoit Daloze), I had an idea for a better API.

I'd like to be able to call Ractor.receive this way:

def example
  loop do
    Ractor.receive do |message|
      case message
      in [:cmd1, arg] then task1(arg)
      in [:cmd2] then return task2
      end
    end
  end
end

It does not matter what task1 returns; the value of the block is not used to determine filtering.

task1 and task2 can also call Ractor.receive, for example:

def task1(arg)
  other_ractor.send(:do_something, arg)
  Ractor.receive do |message|
    case message
    in [:do_something_response, :success, response] then process(response)
    in [:do_something_response, :failure] then raise 'Oh oh'
    end
  end
end

The filtering is implicit in these examples, like in Elixir/Erlang.

If there is no match, the case in raises a NoMatchingPatternError. This is a way to signal that message was not consumed. Ractor.receive rescues the exception and saves the message.

For example, if some :cmd2 is sent just when task1 is doing it's Ractor.receive, it will not match the case, and the message is not consumed. Just after that, the response from other_ractor arrives and task1 finishes, the :cmd2 will be received in example.

This has the benefits:

  • task1 and task2 can return anything, including nil or false (otherwise can be error prone)
  • you can return/break out of receive
  • implicit handling of exception case
  • very close to Elixir/Erlang semantics
  • also would make case/in more popular!

There would be an additional way to reject a message, with a second parameter with meta information (client ractor and unique request ID):

Ractor.receive do |message, req|
  cmd, arg = message
  case cmd
  when :cmd1 then task1(arg)
  when :cmd2 then return task2
  else
    req.reject # => same effect as `raise NoMatchingPatternError`
  end
end

The req object would have methods returning the ractor that sent the request, as well as a unique id. This makes responding correctly very easy. Another thread could be communicating with the same ractor and responses could not be mixed:

def task1(arg)
  req = other_ractor.send(:do_something, arg)
  Ractor.receive do |message|
    case message
    in [^req, :success, response] then process(response)
    in [^req, :failure] then raise 'Oh oh'
    end
  end
end

Matz, what do you think of this API?

Note: there are remaining questions on how to handle receive/receive_if with block and multi-thread; hopefully we can keep this issue focused on the API assuming we have a such a method.

Actions #7

Updated by ko1 (Koichi Sasada) about 4 years ago

  • Status changed from Open to Closed

Applied in changeset git|a9a7f4d8b8ec30abc7a47ce700edc7209ae12279.


Ractor#receive_if to receive only matched messages

Instead of Ractor.receive, Ractor.receive_if can provide a pattern
by a block and you can choose the receiving message.

[Feature #17378]

Updated by ko1 (Koichi Sasada) about 4 years ago

I have several concern about Ractor#receive proposed at #note-6.

  • NoMatchingPatternError can be raised in tasks unexpectedly (maybe by bug) and it can cause something strange behavior (like falsy on receive_if).
  • 2nd parameter req is not clear and I believe people forget to call it correctly.
  • Not sure how to re-insert (req.reject), for example when other threads consumes the incoming queue.

There is no time to discuss enough for ruby 3.0.

Maybe we need a way to write the following code:

Ractor.receive do |msg|
  case msg
  when pat1
    [Ractor.receive_commit]
    task1(msg)
  when pat2
    [Ractor.receive_commit]
    task2(msg)
  when
end

[Ractor.receive_commit] is a pseudo operation that remove the message from incoming queue.
The above code means "if matched to the pattern, remove the msg from the incoming queue and do the corresponding task, otherwise the msg should be remained in the incoming queue and try to check next message".

To make it with Ractor.receive_if, the following code is maybe similar:

Ractor.receive_if do
  case msg
  when pat1
    break -> { task1(msg) }
  when pat2
    break -> { task2(msg) }
  end
end.call

But nobody want to write such code.
I hope macro will translate something to it.


#note-6's approach with similar notation is:

Ractor.receive do |msg|
  [Ractor.receive_commit]
  
  case msg
  in pat1
    task1(msg)
  in pat2
    task2(msg)
  else
    [Ractor.receive_reject] # or call `req.reject` explicitly
  end
end

[Ractor.receive_reject] is re-insert the message to the incoming queue.

Updated by marcandre (Marc-Andre Lafortune) about 4 years ago

ko1 (Koichi Sasada) wrote in #note-8:

I have several concern about Ractor#receive proposed at #note-6.

  • NoMatchingPatternError can be raised in tasks unexpectedly (maybe by bug) and it can cause something strange behavior (like falsy on receive_if).

I agree that is an issue. It would be much better if we had NoMatchingPatternError#depth #17406. We could intercept only depth 0.

  • 2nd parameter req is not clear and I believe people forget to call it correctly.

I think this API would be less used, but people can shoot themselves in the foot.

  • Not sure how to re-insert (req.reject), for example when other threads consumes the incoming queue.

I don't understand this issue. Seems same as returning false from receive_if

There is no time to discuss enough for ruby 3.0.

😢

Updated by Eregon (Benoit Daloze) about 4 years ago

I think a good way to fix this would be to have some variant of pattern matching, that automatically wraps each in body in a lambda.
That would allow matching patterns, executing some code, and then executing the body.
This was actually already proposed by @pitr.ch (Petr Chalupa) in https://bugs.ruby-lang.org/issues/14912#change-78398
Such a case expression would also return an object, which could be useful for many other cases too, and seems a much cleaner design than #17406.

Removing eagerly seems like it will inherently be problematic if there are multiple Thread calling Ractor.receive {}.
Re-adding a message in the Queue seems quite hacky, and it might mess up the message ordering, which would be a serious bug.

Updated by marcandre (Marc-Andre Lafortune) about 4 years ago

Eregon (Benoit Daloze) wrote in #note-10:

Removing eagerly seems like it will inherently be problematic if there are multiple Thread calling Ractor.receive {}.

It is imperative that a message is not processed twice though. You can change "removing" to "reserving" if that helps. Effect is same.

Re-adding a message in the Queue seems quite hacky, and it might mess up the message ordering, which would be a serious bug.

Not a bug. Ordering can not be made deterministic with multi-thread & filtering:

Message B is sent for thread B.
Independently, message A-1 and then A-2 are sent to be consumed by thread A.

Thread A calls receive_if, B is reserved and yielded.
Thread B calls receive_if, A-1 is reserved and yielded.
Thread C calls receive_if, A-2 is reserved and yielded.
Thread A, C rejects message => message A-2 is yielded and accepted by A.

I believe this is unavoidable and not a real issue. If order is important, it will be up to the programmers to design their protocol accordingly, for example by passing a not_before_id parameter (that can be used in filtering).

That being said, replace "Re-adding" by "putting back in its original place" if it helps. Order would be more often maintained, but can not always be done.

Updated by ko1 (Koichi Sasada) about 4 years ago

marcandre (Marc-Andre Lafortune) wrote in #note-9:

  • 2nd parameter req is not clear and I believe people forget to call it correctly.

I think this API would be less used, but people can shoot themselves in the foot.

I think most of case it will be called if people using send/receive in various case.
To determine that we need to observe the usages.

  • Not sure how to re-insert (req.reject), for example when other threads consumes the incoming queue.

I don't understand this issue. Seems same as returning false from receive_if

No problem because receive_if doesn't remove a message.

Actions

Also available in: Atom PDF

Like0
Like0Like0Like0Like0Like0Like0Like0Like0Like0Like0Like0Like0