Feature #8073 » rinda.multicast.3.patch
| lib/rinda/ring.rb (working copy) | ||
|---|---|---|
|
require 'drb/drb'
|
||
|
require 'rinda/rinda'
|
||
|
require 'thread'
|
||
|
require 'ipaddr'
|
||
|
module Rinda
|
||
| ... | ... | |
|
include DRbUndumped
|
||
|
##
|
||
|
# Special renewer for the RingServer to allow shutdown
|
||
|
class Renewer # :nodoc:
|
||
|
include DRbUndumped
|
||
|
##
|
||
|
# Set to false to shutdown future requests using this Renewer
|
||
|
attr_accessor :renew
|
||
|
def initialize # :nodoc:
|
||
|
@renew = true
|
||
|
end
|
||
|
def renew # :nodoc:
|
||
|
@renew ? 1 : true
|
||
|
end
|
||
|
end
|
||
|
##
|
||
|
# Advertises +ts+ on the UDP broadcast address at +port+.
|
||
|
def initialize(ts, port=Ring_PORT)
|
||
|
def initialize(ts, addresses=[Socket::INADDR_ANY], port=Ring_PORT)
|
||
|
@port = port
|
||
|
if Integer === addresses then
|
||
|
addresses, @port = [Socket::INADDR_ANY], addresses
|
||
|
end
|
||
|
@renewer = Renewer.new
|
||
|
@ts = ts
|
||
|
@soc = UDPSocket.open
|
||
|
@soc.bind('', port)
|
||
|
@w_service = write_service
|
||
|
@r_service = reply_service
|
||
|
@sockets = addresses.map do |address|
|
||
|
make_socket(address)
|
||
|
end
|
||
|
@w_services = write_services
|
||
|
@r_service = reply_service
|
||
|
end
|
||
|
##
|
||
|
# Creates a thread that picks up UDP packets and passes them to do_write
|
||
|
# for decoding.
|
||
|
# Creates a socket at +address+
|
||
|
def write_service
|
||
|
Thread.new do
|
||
|
loop do
|
||
|
msg = @soc.recv(1024)
|
||
|
do_write(msg)
|
||
|
def make_socket(address)
|
||
|
addrinfo = Addrinfo.udp(address, @port)
|
||
|
socket = Socket.new(addrinfo.pfamily, addrinfo.socktype,
|
||
|
addrinfo.protocol)
|
||
|
if addrinfo.ipv4_multicast? or addrinfo.ipv6_multicast? then
|
||
|
if Socket.const_defined?(:SO_REUSEPORT) then
|
||
|
socket.setsockopt(:SOCKET, :SO_REUSEPORT, true)
|
||
|
else
|
||
|
socket.setsockopt(:SOCKET, :SO_REUSEADDR, true)
|
||
|
end
|
||
|
if addrinfo.ipv4_multicast? then
|
||
|
mreq = IPAddr.new(addrinfo.ip_address).hton +
|
||
|
IPAddr.new('0.0.0.0').hton
|
||
|
socket.setsockopt(:IPPROTO_IP, :IP_ADD_MEMBERSHIP, mreq)
|
||
|
else
|
||
|
mreq = IPAddr.new(addrinfo.ip_address).hton + [0].pack('I')
|
||
|
socket.setsockopt(:IPPROTO_IPV6, :IPV6_JOIN_GROUP, mreq)
|
||
|
end
|
||
|
end
|
||
|
socket.bind(addrinfo)
|
||
|
socket
|
||
|
end
|
||
|
##
|
||
|
# Creates threads that pick up UDP packets and passes them to do_write for
|
||
|
# decoding.
|
||
|
def write_services
|
||
|
@sockets.map do |s|
|
||
|
Thread.new(s) do |socket|
|
||
|
loop do
|
||
|
msg = socket.recv(1024)
|
||
|
do_write(msg)
|
||
|
end
|
||
|
end
|
||
|
end
|
||
|
end
|
||
| ... | ... | |
|
def reply_service
|
||
|
Thread.new do
|
||
|
loop do
|
||
|
thread = Thread.current
|
||
|
thread[:continue] = true
|
||
|
while thread[:continue] do
|
||
|
do_reply
|
||
|
end
|
||
|
end
|
||
| ... | ... | |
|
# address of the local TupleSpace.
|
||
|
def do_reply
|
||
|
tuple = @ts.take([:lookup_ring, DRbObject])
|
||
|
tuple = @ts.take([:lookup_ring, DRbObject], @renewer)
|
||
|
Thread.new { tuple[1].call(@ts) rescue nil}
|
||
|
rescue
|
||
|
end
|
||
|
##
|
||
|
# Shuts down the RingServer
|
||
|
def shutdown
|
||
|
@renewer.renew = false
|
||
|
@w_services.each do |thread|
|
||
|
thread.kill
|
||
|
end
|
||
|
@sockets.each do |socket|
|
||
|
socket.close
|
||
|
end
|
||
|
@r_service[:continue] = false
|
||
|
end
|
||
|
end
|
||
|
##
|
||
| ... | ... | |
|
attr_accessor :broadcast_list
|
||
|
##
|
||
|
# Maximum number of hops for sent multicast packets (if using a multicast
|
||
|
# address in the broadcast list). The default is 1 (same as UDP
|
||
|
# broadcast).
|
||
|
attr_accessor :multicast_hops
|
||
|
##
|
||
|
# The interface index to send IPv6 multicast packets from.
|
||
|
attr_accessor :multicast_interface
|
||
|
##
|
||
|
# The port that RingFinger will send query packets to.
|
||
|
attr_accessor :port
|
||
| ... | ... | |
|
@port = port
|
||
|
@primary = nil
|
||
|
@rings = []
|
||
|
@multicast_hops = 1
|
||
|
@multicast_interface = 0
|
||
|
end
|
||
|
##
|
||
| ... | ... | |
|
msg = Marshal.dump([[:lookup_ring, DRbObject.new(block)], timeout])
|
||
|
@broadcast_list.each do |it|
|
||
|
soc = UDPSocket.open
|
||
|
begin
|
||
|
soc.setsockopt(Socket::SOL_SOCKET, Socket::SO_BROADCAST, true)
|
||
|
soc.send(msg, 0, it, @port)
|
||
|
rescue
|
||
|
nil
|
||
|
ensure
|
||
|
soc.close
|
||
|
end
|
||
|
send_message(it, msg)
|
||
|
end
|
||
|
sleep(timeout)
|
||
|
end
|
||
| ... | ... | |
|
@primary
|
||
|
end
|
||
|
##
|
||
|
# Creates a socket for +address+ with the appropriate multicast options
|
||
|
# for multicast addresses.
|
||
|
def make_socket(address) # :nodoc:
|
||
|
addrinfo = Addrinfo.udp(address, @port)
|
||
|
soc = Socket.new(addrinfo.pfamily, addrinfo.socktype, addrinfo.protocol)
|
||
|
if addrinfo.ipv4_multicast? then
|
||
|
soc.setsockopt(:IPPROTO_IP, :IP_MULTICAST_LOOP, true)
|
||
|
soc.setsockopt(:IPPROTO_IP, :IP_MULTICAST_TTL,
|
||
|
[@multicast_hops].pack('c'))
|
||
|
elsif addrinfo.ipv6_multicast? then
|
||
|
soc.setsockopt(:IPPROTO_IPV6, :IPV6_MULTICAST_LOOP, true)
|
||
|
soc.setsockopt(:IPPROTO_IPV6, :IPV6_MULTICAST_HOPS,
|
||
|
[@multicast_hops].pack('I'))
|
||
|
soc.setsockopt(:IPPROTO_IPV6, :IPV6_MULTICAST_IF,
|
||
|
[@multicast_interface].pack('I'))
|
||
|
else
|
||
|
soc.setsockopt(:SOL_SOCKET, :SO_BROADCAST, true)
|
||
|
end
|
||
|
soc.connect(addrinfo)
|
||
|
soc
|
||
|
end
|
||
|
def send_message(address, message) # :nodoc:
|
||
|
soc = make_socket(address)
|
||
|
soc.send(message, 0)
|
||
|
rescue
|
||
|
nil
|
||
|
ensure
|
||
|
soc.close if soc
|
||
|
end
|
||
|
end
|
||
|
##
|
||
| test/rinda/test_rinda.rb (working copy) | ||
|---|---|---|
|
require 'drb/drb'
|
||
|
require 'drb/eq'
|
||
|
require 'rinda/ring'
|
||
|
require 'rinda/tuplespace'
|
||
|
require 'singleton'
|
||
| ... | ... | |
|
@server = DRb.primary_server || DRb.start_service
|
||
|
end
|
||
|
class TestRingServer < Test::Unit::TestCase
|
||
|
def setup
|
||
|
@port = Rinda::Ring_PORT
|
||
|
@ts = Rinda::TupleSpace.new
|
||
|
@rs = Rinda::RingServer.new(@ts, [], @port)
|
||
|
end
|
||
|
def teardown
|
||
|
@rs.shutdown
|
||
|
end
|
||
|
def test_make_socket_unicast
|
||
|
v4 = @rs.make_socket('127.0.0.1')
|
||
|
assert_equal('127.0.0.1', v4.local_address.ip_address)
|
||
|
assert_equal(@port, v4.local_address.ip_port)
|
||
|
end
|
||
|
def test_make_socket_ipv4_multicast
|
||
|
v4mc = @rs.make_socket('239.0.0.1')
|
||
|
if Socket.const_defined?(:SO_REUSEPORT) then
|
||
|
assert(v4mc.getsockopt(:SOCKET, :SO_REUSEPORT).bool)
|
||
|
else
|
||
|
assert(v4mc.getsockopt(:SOCKET, :SO_REUSEADDR).bool)
|
||
|
end
|
||
|
assert_equal('239.0.0.1', v4mc.local_address.ip_address)
|
||
|
assert_equal(@port, v4mc.local_address.ip_port)
|
||
|
end
|
||
|
def test_make_socket_ipv6_multicast
|
||
|
begin
|
||
|
v6mc = @rs.make_socket('ff02::1')
|
||
|
rescue Errno::EADDRNOTAVAIL
|
||
|
return # IPv6 address for multicast not available
|
||
|
end
|
||
|
if Socket.const_defined?(:SO_REUSEPORT) then
|
||
|
assert v6mc.getsockopt(:SOCKET, :SO_REUSEPORT).bool
|
||
|
else
|
||
|
assert v6mc.getsockopt(:SOCKET, :SO_REUSEADDR).bool
|
||
|
end
|
||
|
assert_equal('ff02::1', v6mc.local_address.ip_address)
|
||
|
assert_equal(@port, v6mc.local_address.ip_port)
|
||
|
end
|
||
|
def test_shutdown
|
||
|
@rs.shutdown
|
||
|
assert_nil(@rs.do_reply, 'otherwise should hang forever')
|
||
|
end
|
||
|
end
|
||
|
class TestRingFinger < Test::Unit::TestCase
|
||
|
def setup
|
||
|
@rf = Rinda::RingFinger.new
|
||
|
@rf.multicast_interface = 1
|
||
|
end
|
||
|
def test_make_socket_unicast
|
||
|
v4 = @rf.make_socket('127.0.0.1')
|
||
|
assert(v4.getsockopt(:SOL_SOCKET, :SO_BROADCAST).bool)
|
||
|
end
|
||
|
def test_make_socket_ipv4_multicast
|
||
|
v4mc = @rf.make_socket('239.0.0.1')
|
||
|
assert_equal(1, v4mc.getsockopt(:IPPROTO_IP, :IP_MULTICAST_LOOP).int)
|
||
|
assert_equal(1, v4mc.getsockopt(:IPPROTO_IP, :IP_MULTICAST_TTL).int)
|
||
|
end
|
||
|
def test_make_socket_ipv6_multicast
|
||
|
v6mc = @rf.make_socket('ff02::1')
|
||
|
assert_equal(1, v6mc.getsockopt(:IPPROTO_IPV6, :IPV6_MULTICAST_LOOP).int)
|
||
|
assert_equal(1, v6mc.getsockopt(:IPPROTO_IPV6, :IPV6_MULTICAST_HOPS).int)
|
||
|
end
|
||
|
def test_make_socket_multicast_hops
|
||
|
@rf.multicast_hops = 2
|
||
|
v4mc = @rf.make_socket('239.0.0.1')
|
||
|
assert_equal(2, v4mc.getsockopt(:IPPROTO_IP, :IP_MULTICAST_TTL).int)
|
||
|
v6mc = @rf.make_socket('ff02::1')
|
||
|
assert_equal(2, v6mc.getsockopt(:IPPROTO_IPV6, :IPV6_MULTICAST_HOPS).int)
|
||
|
end
|
||
|
end
|
||
|
end
|
||
- « Previous
- 1
- 2
- 3
- Next »