Index: lib/rinda/ring.rb =================================================================== --- lib/rinda/ring.rb (revision 39620) +++ lib/rinda/ring.rb (working copy) @@ -4,6 +4,7 @@ require 'drb/drb' require 'rinda/rinda' require 'thread' +require 'ipaddr' module Rinda @@ -27,25 +28,90 @@ module Rinda include DRbUndumped ## + # Special renewer for the RingServer to allow shutdown + + class Renewer # :nodoc: + include DRbUndumped + + ## + # Set to false to shutdown future requests using this Renewer + + attr_accessor :renew + + def initialize # :nodoc: + @renew = true + end + + def renew # :nodoc: + @renew ? 1 : true + end + end + + ## # Advertises +ts+ on the UDP broadcast address at +port+. - def initialize(ts, port=Ring_PORT) + def initialize(ts, addresses=[Socket::INADDR_ANY], port=Ring_PORT) + @port = port + + if Integer === addresses then + addresses, @port = [Socket::INADDR_ANY], addresses + end + + @renewer = Renewer.new + @ts = ts - @soc = UDPSocket.open - @soc.bind('', port) - @w_service = write_service - @r_service = reply_service + @sockets = addresses.map do |address| + make_socket address + end + + @w_services = write_services + @r_service = reply_service end ## - # Creates a thread that picks up UDP packets and passes them to do_write - # for decoding. + # Creates a socket at +address+ - def write_service - Thread.new do - loop do - msg = @soc.recv(1024) - do_write(msg) + def make_socket address + addrinfo = Addrinfo.udp address, @port + + socket = Socket.new(addrinfo.pfamily, addrinfo.socktype, + addrinfo.protocol) + + if addrinfo.ipv4_multicast? or addrinfo.ipv6_multicast? then + if Socket.const_defined? :SO_REUSEPORT then + socket.setsockopt(:SOCKET, :SO_REUSEPORT, true) + else + socket.setsockopt(:SOCKET, :SO_REUSEADDR, true) + end + + if addrinfo.ipv4_multicast? then + mreq = IPAddr.new(addrinfo.ip_address).hton + + IPAddr.new('0.0.0.0').hton + + socket.setsockopt(:IPPROTO_IP, :IP_ADD_MEMBERSHIP, mreq) + else + mreq = IPAddr.new(addrinfo.ip_address).hton + [0].pack('I') + + socket.setsockopt(:IPPROTO_IPV6, :IPV6_JOIN_GROUP, mreq) + end + end + + socket.bind(addrinfo) + + socket + end + + ## + # Creates threads that pick up UDP packets and passes them to do_write for + # decoding. + + def write_services + @sockets.map do |socket| + Thread.new do + loop do + msg = socket.recv(1024) + do_write(msg) + end end end end @@ -69,7 +135,10 @@ module Rinda def reply_service Thread.new do - loop do + thread = Thread.current + thread[:continue] = true + + while thread[:continue] do do_reply end end @@ -80,11 +149,28 @@ module Rinda # address of the local TupleSpace. def do_reply - tuple = @ts.take([:lookup_ring, DRbObject]) + tuple = @ts.take([:lookup_ring, DRbObject], @renewer) Thread.new { tuple[1].call(@ts) rescue nil} rescue end + ## + # Shuts down the RingServer + + def shutdown + @renewer.renew = false + + @w_services.each do |thread| + thread.kill + end + + @sockets.each do |socket| + socket.close + end + + @r_service[:continue] = false + end + end ## @@ -131,6 +217,18 @@ module Rinda attr_accessor :broadcast_list ## + # Maximum number of hops for sent multicast packets (if using a multicast + # address in the broadcast list). The default is 1 (same as UDP + # broadcast). + + attr_accessor :multicast_hops + + ## + # The interface index to send IPv6 multicast packets from. + + attr_accessor :multicast_interface + + ## # The port that RingFinger will send query packets to. attr_accessor :port @@ -149,6 +247,9 @@ module Rinda @port = port @primary = nil @rings = [] + + @multicast_hops = 1 + @multicast_interface = 0 end ## @@ -178,15 +279,7 @@ module Rinda msg = Marshal.dump([[:lookup_ring, DRbObject.new(block)], timeout]) @broadcast_list.each do |it| - soc = UDPSocket.open - begin - soc.setsockopt(Socket::SOL_SOCKET, Socket::SO_BROADCAST, true) - soc.send(msg, 0, it, @port) - rescue - nil - ensure - soc.close - end + send_message it, msg end sleep(timeout) end @@ -217,6 +310,44 @@ module Rinda @primary end + ## + # Creates a socket for +address+ with the appropriate multicast options + # for multicast addresses. + + def make_socket address # :nodoc: + addrinfo = Addrinfo.udp address, @port + + soc = Socket.new(addrinfo.pfamily, addrinfo.socktype, addrinfo.protocol) + + if addrinfo.ipv4_multicast? then + soc.setsockopt(:IPPROTO_IP, :IP_MULTICAST_LOOP, true) + soc.setsockopt(:IPPROTO_IP, :IP_MULTICAST_TTL, + [@multicast_hops].pack('c')) + elsif addrinfo.ipv6_multicast? then + soc.setsockopt(:IPPROTO_IPV6, :IPV6_MULTICAST_LOOP, true) + soc.setsockopt(:IPPROTO_IPV6, :IPV6_MULTICAST_HOPS, + [@multicast_hops].pack('I')) + soc.setsockopt(:IPPROTO_IPV6, :IPV6_MULTICAST_IF, + [@multicast_interface].pack('I')) + else + soc.setsockopt(:SOL_SOCKET, :SO_BROADCAST, true) + end + + soc.connect addrinfo + + soc + end + + def send_message address, message # :nodoc: + soc = make_socket(address) + + soc.send(message, 0) + rescue + nil + ensure + soc.close if soc + end + end ## Index: test/rinda/test_rinda.rb =================================================================== --- test/rinda/test_rinda.rb (revision 39620) +++ test/rinda/test_rinda.rb (working copy) @@ -2,6 +2,7 @@ require 'test/unit' require 'drb/drb' require 'drb/eq' +require 'rinda/ring' require 'rinda/tuplespace' require 'singleton' @@ -480,5 +481,104 @@ class TupleSpaceProxyTest < Test::Unit:: @server = DRb.primary_server || DRb.start_service end +class TestRingServer < Test::Unit::TestCase + + def setup + @port = Rinda::Ring_PORT + + @ts = Rinda::TupleSpace.new + @rs = Rinda::RingServer.new @ts, [], @port + end + + def teardown + @rs.shutdown + end + + def test_make_socket_unicast + v4 = @rs.make_socket '127.0.0.1' + + assert_equal '127.0.0.1', v4.local_address.ip_address + assert_equal @port, v4.local_address.ip_port + end + + def test_make_socket_ipv4_multicast + v4mc = @rs.make_socket '239.0.0.1' + + if Socket.const_defined? :SO_REUSEPORT then + assert v4mc.getsockopt(:SOCKET, :SO_REUSEPORT).bool + else + assert v4mc.getsockopt(:SOCKET, :SO_REUSEADDR).bool + end + + assert_equal '239.0.0.1', v4mc.local_address.ip_address + assert_equal @port, v4mc.local_address.ip_port + end + + def test_make_socket_ipv6_multicast + begin + v6mc = @rs.make_socket 'ff02::1' + rescue Errno::EADDRNOTAVAIL + return # IPv6 address for multicast not available + end + + if Socket.const_defined? :SO_REUSEPORT then + assert v6mc.getsockopt(:SOCKET, :SO_REUSEPORT).bool + else + assert v6mc.getsockopt(:SOCKET, :SO_REUSEADDR).bool + end + + assert_equal 'ff02::1', v6mc.local_address.ip_address + assert_equal @port, v6mc.local_address.ip_port + end + + def test_shutdown + @rs.shutdown + + assert_nil @rs.do_reply, 'otherwise should hang forever' + end + +end + +class TestRingFinger < Test::Unit::TestCase + + def setup + @rf = Rinda::RingFinger.new + @rf.multicast_interface = 1 + end + + def test_make_socket_unicast + v4 = @rf.make_socket '127.0.0.1' + + assert v4.getsockopt(:SOL_SOCKET, :SO_BROADCAST).bool + end + + def test_make_socket_ipv4_multicast + v4mc = @rf.make_socket '239.0.0.1' + + assert_equal 1, v4mc.getsockopt(:IPPROTO_IP, :IP_MULTICAST_LOOP).int + assert_equal 1, v4mc.getsockopt(:IPPROTO_IP, :IP_MULTICAST_TTL).int + end + + def test_make_socket_ipv6_multicast + v6mc = @rf.make_socket 'ff02::1' + + assert_equal 1, v6mc.getsockopt(:IPPROTO_IPV6, :IPV6_MULTICAST_LOOP).int + assert_equal 1, v6mc.getsockopt(:IPPROTO_IPV6, :IPV6_MULTICAST_HOPS).int + end + + def test_make_socket_multicast_hops + @rf.multicast_hops = 2 + + v4mc = @rf.make_socket '239.0.0.1' + + assert_equal 2, v4mc.getsockopt(:IPPROTO_IP, :IP_MULTICAST_TTL).int + + v6mc = @rf.make_socket 'ff02::1' + + assert_equal 2, v6mc.getsockopt(:IPPROTO_IPV6, :IPV6_MULTICAST_HOPS).int + end + +end + end