Feature #17378
closedRactor#receive with filtering like other actor langauge
Description
With discussion with @marcandre (Marc-Andre Lafortune), we found good extension for Ractor.receive
with block.
Ractor.receive do |msg|
if msg is match to condition?
true
else
false
end
end
This block iterates incoming queue's values and the value is passed in msg
.
If the passed value is matched you want to process, the block should return true and the value will be removed from the queue.
Otherwise (returning falsy value), the value remains in the queue, and other Ractor.receive
accesses it again.
If there is not more values in queue, the interpreter sleeps until new values are received.
r = Ractor.new do |;r|
loop do
r, msg = Ractor.receive
r << msg
r << 1
r << 2
r << 3
end
end
r.send([Ractor.current, :ping])
# incoming queue: [:ping, 1, 2, 3] (3 is at the last)
Ractor.receive #=> :ping # without a block
# incoming queue: [1, 2, 3]
p Ractor.receive{|msg| msg == 2}
#=> 2
# incoming queue: [1, 3]
begin
# exception in the block doesn't touch the queue
p Ractor.receive{|msg| raise "err"}
rescue => e
p e.message #=> "err"
end
# incoming queue: [1, 3]
p Ractor.receive #=> 1
# incoming queue: [3]
p Ractor.receive #=> 3
With multiple server, we can make it:
morning_server = Ractor.new do
loop do
task = Proc.new{|obj| "Good morning #{obj}"}
r, req = Ractor.receive
res = task.(req)
r.send([Ractor.current, res])
end
end
evening_server = Ractor.new do
loop do
task = Proc.new{|obj| "Good evening #{obj}"}
r, req = Ractor.receive
res = task.(req)
r.send([Ractor.current, res])
end
end
morning_server << [Ractor.current, 'ko1']
morning_server << [Ractor.current, 'ko2']
morning_server << [Ractor.current, 'ko3']
evening_server << [Ractor.current, 'ko1']
evening_server << [Ractor.current, 'ko2']
evening_server << [Ractor.current, 'ko3']
def receive r
Ractor.receive{|(from, msg)|
r == from
}[1]
end
p receive(morning_server) #=> "Good morning ko1"
p receive(evening_server) #=> "Good evening ko1"
p receive(morning_server) #=> "Good morning ko2"
p receive(evening_server) #=> "Good evening ko2"
p receive(morning_server) #=> "Good morning ko3"
p receive(evening_server) #=> "Good evening ko3"
Updated by ko1 (Koichi Sasada) about 4 years ago
Implementation is: https://github.com/ruby/ruby/pull/3862
Updated by Eregon (Benoit Daloze) about 4 years ago
- Related to Feature #17365: Can not respond (well) to a Ractor added
Updated by matz (Yukihiro Matsumoto) about 4 years ago
The receive
method with a block suggest the block takes an incoming message as a block parameter. So I agree with the proposal if the name is receive_if
.
Matz.
Updated by marcandre (Marc-Andre Lafortune) about 4 years ago
When discussing with @ko1 (Koichi Sasada) and @Eregon (Benoit Daloze), I had an idea for a better API.
I'd like to be able to call Ractor.receive
this way:
def example
loop do
Ractor.receive do |message|
case message
in [:cmd1, arg] then task1(arg)
in [:cmd2] then return task2
end
end
end
end
It does not matter what task1
returns; the value of the block is not used to determine filtering.
task1
and task2
can also call Ractor.receive
, for example:
def task1(arg)
other_ractor.send(:do_something, arg)
Ractor.receive do |message|
case message
in [:do_something_response, :success, response] then process(response)
in [:do_something_response, :failure] then raise 'Oh oh'
end
end
end
The filtering is implicit in these examples, like in Elixir/Erlang.
If there is no match, the case in
raises a NoMatchingPatternError
. This is a way to signal that message
was not consumed. Ractor.receive
rescues the exception and saves the message.
For example, if some :cmd2
is sent just when task1
is doing it's Ractor.receive
, it will not match the case
, and the message is not consumed. Just after that, the response from other_ractor
arrives and task1
finishes, the :cmd2
will be received in example
.
This has the benefits:
-
task1
andtask2
can return anything, includingnil
orfalse
(otherwise can be error prone) - you can
return/break
out ofreceive
- implicit handling of exception case
- very close to Elixir/Erlang semantics
- also would make
case/in
more popular!
There would be an additional way to reject a message, with a second parameter with meta information (client ractor and unique request ID):
Ractor.receive do |message, req|
cmd, arg = message
case cmd
when :cmd1 then task1(arg)
when :cmd2 then return task2
else
req.reject # => same effect as `raise NoMatchingPatternError`
end
end
The req
object would have methods returning the ractor
that sent the request, as well as a unique id. This makes responding correctly very easy. Another thread could be communicating with the same ractor and responses could not be mixed:
def task1(arg)
req = other_ractor.send(:do_something, arg)
Ractor.receive do |message|
case message
in [^req, :success, response] then process(response)
in [^req, :failure] then raise 'Oh oh'
end
end
end
Matz, what do you think of this API?
Note: there are remaining questions on how to handle receive
/receive_if
with block and multi-thread; hopefully we can keep this issue focused on the API assuming we have a such a method.
Updated by ko1 (Koichi Sasada) about 4 years ago
- Status changed from Open to Closed
Applied in changeset git|a9a7f4d8b8ec30abc7a47ce700edc7209ae12279.
Ractor#receive_if to receive only matched messages
Instead of Ractor.receive, Ractor.receive_if can provide a pattern
by a block and you can choose the receiving message.
[Feature #17378]
Updated by ko1 (Koichi Sasada) about 4 years ago
I have several concern about Ractor#receive
proposed at #note-6.
-
NoMatchingPatternError
can be raised in tasks unexpectedly (maybe by bug) and it can cause something strange behavior (like falsy onreceive_if
). - 2nd parameter
req
is not clear and I believe people forget to call it correctly. - Not sure how to re-insert (
req.reject
), for example when other threads consumes the incoming queue.
There is no time to discuss enough for ruby 3.0.
Maybe we need a way to write the following code:
Ractor.receive do |msg|
case msg
when pat1
[Ractor.receive_commit]
task1(msg)
when pat2
[Ractor.receive_commit]
task2(msg)
when
end
[Ractor.receive_commit]
is a pseudo operation that remove the message from incoming queue.
The above code means "if matched to the pattern, remove the msg from the incoming queue and do the corresponding task, otherwise the msg should be remained in the incoming queue and try to check next message".
To make it with Ractor.receive_if
, the following code is maybe similar:
Ractor.receive_if do
case msg
when pat1
break -> { task1(msg) }
when pat2
break -> { task2(msg) }
end
end.call
But nobody want to write such code.
I hope macro will translate something to it.
#note-6's approach with similar notation is:
Ractor.receive do |msg|
[Ractor.receive_commit]
case msg
in pat1
task1(msg)
in pat2
task2(msg)
else
[Ractor.receive_reject] # or call `req.reject` explicitly
end
end
[Ractor.receive_reject]
is re-insert the message to the incoming queue.
Updated by marcandre (Marc-Andre Lafortune) about 4 years ago
ko1 (Koichi Sasada) wrote in #note-8:
I have several concern about
Ractor#receive
proposed at #note-6.
NoMatchingPatternError
can be raised in tasks unexpectedly (maybe by bug) and it can cause something strange behavior (like falsy onreceive_if
).
I agree that is an issue. It would be much better if we had NoMatchingPatternError#depth
#17406. We could intercept only depth 0.
- 2nd parameter
req
is not clear and I believe people forget to call it correctly.
I think this API would be less used, but people can shoot themselves in the foot.
- Not sure how to re-insert (
req.reject
), for example when other threads consumes the incoming queue.
I don't understand this issue. Seems same as returning false
from receive_if
There is no time to discuss enough for ruby 3.0.
😢
Updated by Eregon (Benoit Daloze) about 4 years ago
I think a good way to fix this would be to have some variant of pattern matching, that automatically wraps each in
body in a lambda.
That would allow matching patterns, executing some code, and then executing the body.
This was actually already proposed by @pitr.ch in https://bugs.ruby-lang.org/issues/14912#change-78398
Such a case
expression would also return an object, which could be useful for many other cases too, and seems a much cleaner design than #17406.
Removing eagerly seems like it will inherently be problematic if there are multiple Thread calling Ractor.receive {}
.
Re-adding a message in the Queue seems quite hacky, and it might mess up the message ordering, which would be a serious bug.
Updated by marcandre (Marc-Andre Lafortune) about 4 years ago
Eregon (Benoit Daloze) wrote in #note-10:
Removing eagerly seems like it will inherently be problematic if there are multiple Thread calling
Ractor.receive {}
.
It is imperative that a message is not processed twice though. You can change "removing" to "reserving" if that helps. Effect is same.
Re-adding a message in the Queue seems quite hacky, and it might mess up the message ordering, which would be a serious bug.
Not a bug. Ordering can not be made deterministic with multi-thread & filtering:
Message B is sent for thread B.
Independently, message A-1 and then A-2 are sent to be consumed by thread A.
Thread A calls receive_if
, B is reserved and yielded.
Thread B calls receive_if
, A-1 is reserved and yielded.
Thread C calls receive_if
, A-2 is reserved and yielded.
Thread A, C rejects message => message A-2 is yielded and accepted by A.
I believe this is unavoidable and not a real issue. If order is important, it will be up to the programmers to design their protocol accordingly, for example by passing a not_before_id
parameter (that can be used in filtering).
That being said, replace "Re-adding" by "putting back in its original place" if it helps. Order would be more often maintained, but can not always be done.
Updated by ko1 (Koichi Sasada) about 4 years ago
marcandre (Marc-Andre Lafortune) wrote in #note-9:
- 2nd parameter
req
is not clear and I believe people forget to call it correctly.I think this API would be less used, but people can shoot themselves in the foot.
I think most of case it will be called if people using send/receive in various case.
To determine that we need to observe the usages.
- Not sure how to re-insert (
req.reject
), for example when other threads consumes the incoming queue.I don't understand this issue. Seems same as returning
false
fromreceive_if
No problem because receive_if
doesn't remove a message.