Project

General

Profile

Feature #8073 » rinda.multicast.patch

drbrain (Eric Hodel), 03/11/2013 10:25 AM

View differences:

lib/rinda/ring.rb (working copy)
require 'drb/drb'
require 'rinda/rinda'
require 'thread'
require 'ipaddr'
module Rinda
......
include DRbUndumped
##
# Special renewer for the RingServer to allow shutdown
class Renewer # :nodoc:
include DRbUndumped
##
# Set to false to shutdown future requests using this Renewer
attr_accessor :renew
def initialize # :nodoc:
@renew = true
end
def renew # :nodoc:
@renew ? 1 : true
end
end
##
# Advertises +ts+ on the UDP broadcast address at +port+.
def initialize(ts, port=Ring_PORT)
def initialize(ts, addresses=[Socket::INADDR_ANY], port=Ring_PORT)
@port = port
if Integer === addresses then
addresses, @port = [Socket::INADDR_ANY], addresses
end
@renewer = Renewer.new
@ts = ts
@soc = UDPSocket.open
@soc.bind('', port)
@w_service = write_service
@r_service = reply_service
@sockets = addresses.map do |address|
make_socket address
end
@w_services = write_services
@r_service = reply_service
end
##
# Creates a thread that picks up UDP packets and passes them to do_write
# for decoding.
# Creates a socket at +address+
def write_service
Thread.new do
loop do
msg = @soc.recv(1024)
do_write(msg)
def make_socket address
addrinfo = Addrinfo.udp address, @port
socket = Socket.new(addrinfo.pfamily, addrinfo.socktype,
addrinfo.protocol)
if addrinfo.ipv4_multicast? or addrinfo.ipv6_multicast? then
if Socket.const_defined? :SO_REUSEPORT then
socket.setsockopt(:SOCKET, :SO_REUSEPORT, true)
else
socket.setsockopt(:SOCKET, :SO_REUSEADDR, true)
end
if addrinfo.ipv4_multicast? then
mreq = IPAddr.new(addrinfo.ip_address).hton +
IPAddr.new('0.0.0.0').hton
socket.setsockopt(:IPPROTO_IP, :IP_ADD_MEMBERSHIP, mreq)
else
mreq = IPAddr.new(addrinfo.ip_address).hton + [0].pack('I')
socket.setsockopt(:IPPROTO_IPV6, :IPV6_JOIN_GROUP, mreq)
end
end
socket.bind(addrinfo)
socket
end
##
# Creates threads that pick up UDP packets and passes them to do_write for
# decoding.
def write_services
@sockets.map do |socket|
Thread.new do
loop do
msg = socket.recv(1024)
do_write(msg)
end
end
end
end
......
def reply_service
Thread.new do
loop do
thread = Thread.current
thread[:continue] = true
while thread[:continue] do
do_reply
end
end
......
# address of the local TupleSpace.
def do_reply
tuple = @ts.take([:lookup_ring, DRbObject])
tuple = @ts.take([:lookup_ring, DRbObject], @renewer)
Thread.new { tuple[1].call(@ts) rescue nil}
rescue
end
##
# Shuts down the RingServer
def shutdown
@renewer.renew = false
@w_services.each do |thread|
thread.kill
end
@sockets.each do |socket|
socket.close
end
@r_service[:continue] = false
end
end
##
......
attr_accessor :broadcast_list
##
# Maximum number of hops for sent multicast packets (if using a multicast
# address in the broadcast list). The default is 1 (same as UDP
# broadcast).
attr_accessor :multicast_hops
##
# The interface index to send IPv6 multicast packets from.
attr_accessor :multicast_interface
##
# The port that RingFinger will send query packets to.
attr_accessor :port
......
@port = port
@primary = nil
@rings = []
@multicast_hops = 1
@multicast_interface = 0
end
##
......
msg = Marshal.dump([[:lookup_ring, DRbObject.new(block)], timeout])
@broadcast_list.each do |it|
soc = UDPSocket.open
begin
soc.setsockopt(Socket::SOL_SOCKET, Socket::SO_BROADCAST, true)
soc.send(msg, 0, it, @port)
rescue
nil
ensure
soc.close
end
send_message it, msg
end
sleep(timeout)
end
......
@primary
end
##
# Creates a socket for +address+ with the appropriate multicast options
# for multicast addresses.
def make_socket address # :nodoc:
addrinfo = Addrinfo.udp address, @port
soc = Socket.new(addrinfo.pfamily, addrinfo.socktype, addrinfo.protocol)
if addrinfo.ipv4_multicast? then
soc.setsockopt(:IPPROTO_IP, :IP_MULTICAST_LOOP, true)
soc.setsockopt(:IPPROTO_IP, :IP_MULTICAST_TTL,
[@multicast_hops].pack('c'))
elsif addrinfo.ipv6_multicast? then
soc.setsockopt(:IPPROTO_IPV6, :IPV6_MULTICAST_LOOP, true)
soc.setsockopt(:IPPROTO_IPV6, :IPV6_MULTICAST_HOPS,
[@multicast_hops].pack('I'))
soc.setsockopt(:IPPROTO_IPV6, :IPV6_MULTICAST_IF,
[@multicast_interface].pack('I'))
else
soc.setsockopt(:SOL_SOCKET, :SO_BROADCAST, true)
end
soc.connect addrinfo
soc
end
def send_message address, message # :nodoc:
soc = make_socket(address)
soc.send(message, 0)
rescue
nil
ensure
soc.close if soc
end
end
##
test/rinda/test_rinda.rb (working copy)
require 'drb/drb'
require 'drb/eq'
require 'rinda/ring'
require 'rinda/tuplespace'
require 'singleton'
......
@server = DRb.primary_server || DRb.start_service
end
class TestRingServer < Test::Unit::TestCase
def setup
@port = Rinda::Ring_PORT
@ts = Rinda::TupleSpace.new
@rs = Rinda::RingServer.new @ts, [], @port
end
def teardown
@rs.shutdown
end
def test_make_socket_unicast
v4 = @rs.make_socket '127.0.0.1'
assert_equal '127.0.0.1', v4.local_address.ip_address
assert_equal @port, v4.local_address.ip_port
end
def test_make_socket_ipv4_multicast
v4mc = @rs.make_socket '239.0.0.1'
if Socket.const_defined? :SO_REUSEPORT then
assert v4mc.getsockopt(:SOCKET, :SO_REUSEPORT).bool
else
assert v4mc.getsockopt(:SOCKET, :SO_REUSEADDR).bool
end
assert_equal '239.0.0.1', v4mc.local_address.ip_address
assert_equal @port, v4mc.local_address.ip_port
end
def test_make_socket_ipv6_multicast
begin
v6mc = @rs.make_socket 'ff02::1'
rescue Errno::EADDRNOTAVAIL
return # IPv6 address for multicast not available
end
if Socket.const_defined? :SO_REUSEPORT then
assert v6mc.getsockopt(:SOCKET, :SO_REUSEPORT).bool
else
assert v6mc.getsockopt(:SOCKET, :SO_REUSEADDR).bool
end
assert_equal 'ff02::1', v6mc.local_address.ip_address
assert_equal @port, v6mc.local_address.ip_port
end
def test_shutdown
@rs.shutdown
assert_nil @rs.do_reply, 'otherwise should hang forever'
end
end
class TestRingFinger < Test::Unit::TestCase
def setup
@rf = Rinda::RingFinger.new
@rf.multicast_interface = 1
end
def test_make_socket_unicast
v4 = @rf.make_socket '127.0.0.1'
assert v4.getsockopt(:SOL_SOCKET, :SO_BROADCAST).bool
end
def test_make_socket_ipv4_multicast
v4mc = @rf.make_socket '239.0.0.1'
assert_equal 1, v4mc.getsockopt(:IPPROTO_IP, :IP_MULTICAST_LOOP).int
assert_equal 1, v4mc.getsockopt(:IPPROTO_IP, :IP_MULTICAST_TTL).int
end
def test_make_socket_ipv6_multicast
v6mc = @rf.make_socket 'ff02::1'
assert_equal 1, v6mc.getsockopt(:IPPROTO_IPV6, :IPV6_MULTICAST_LOOP).int
assert_equal 1, v6mc.getsockopt(:IPPROTO_IPV6, :IPV6_MULTICAST_HOPS).int
end
def test_make_socket_multicast_hops
@rf.multicast_hops = 2
v4mc = @rf.make_socket '239.0.0.1'
assert_equal 2, v4mc.getsockopt(:IPPROTO_IP, :IP_MULTICAST_TTL).int
v6mc = @rf.make_socket 'ff02::1'
assert_equal 2, v6mc.getsockopt(:IPPROTO_IPV6, :IPV6_MULTICAST_HOPS).int
end
end
end
(1-1/3)