From ab491f0ceb52dff4224d329e29034fc6cac22938 Mon Sep 17 00:00:00 2001 From: Hiroshi Shirosaki Date: Sat, 6 Apr 2013 15:13:21 +0900 Subject: [PATCH] Fix multicast of rinda Bind a socket to another address before joining a multicast group. Add a socket to @sockets in make_socket() to be closed when shutdown. Add optional arguments to specify the inbound interface of multicast. --- lib/rinda/ring.rb | 60 ++++++++++++++++++++++++++++++++++++++++------ test/rinda/test_rinda.rb | 42 +++++++++++++++++++++++++++++--- 2 files changed, 92 insertions(+), 10 deletions(-) diff --git a/lib/rinda/ring.rb b/lib/rinda/ring.rb index 9b3e273..a56484d 100644 --- a/lib/rinda/ring.rb +++ b/lib/rinda/ring.rb @@ -67,6 +67,29 @@ module Rinda # +addresses+ can contain multiple addresses. If a multicast address is # given in +addresses+ then the RingServer will listen for multicast # queries. + # + # If you use IPv4 multicast you may need to set an address of the inbound + # interface which joins a multicast group. + # + # ts = Rinda::TupleSpace.new + # rs = Rinda::RingServer.new(ts, [['239.0.0.1', '9.5.1.1']]) + # + # You can set addresses as an Array Object. The first element of the + # Array is a multicast address and the second is an inbound interface + # address. If the second is omitted then '0.0.0.0' is used. + # + # If you use IPv6 multicast you may need to set both the local interface + # address and the inbound interface index: + # + # rs = Rinda::RingServer.new(ts, [['ff02::1', '::1', 1]]) + # + # The first element is a multicast address and the second is an inbound + # interface address. The third is an inbound interface index. + # + # At this time there is no easy way to get an interface index by name. + # + # If the second is omitted then '::1' is used. + # If the third is omitted then 0 (default interface) is used. def initialize(ts, addresses=[Socket::INADDR_ANY], port=Ring_PORT) @port = port @@ -78,8 +101,13 @@ module Rinda @renewer = Renewer.new @ts = ts - @sockets = addresses.map do |address| - make_socket(address) + @sockets = [] + addresses.each do |address| + if Array === address + make_socket(*address) + else + make_socket(address) + end end @w_services = write_services @@ -88,8 +116,18 @@ module Rinda ## # Creates a socket at +address+ + # + # If +address+ is multicast address then +interface_address+ and + # +multicast_interface+ can be set as optional. + # + # A created socket is bound to +interface_address+. If you use IPv4 + # multicast then the interface of +interface_address+ is used as the + # inbound interface. + # + # If you use IPv6 multicast then +multicast_interface+ is used as the + # inbound interface. +multicast_interface+ is a network interface index. - def make_socket(address) + def make_socket(address, interface_address=nil, multicast_interface=0) addrinfo = Addrinfo.udp(address, @port) socket = Socket.new(addrinfo.pfamily, addrinfo.socktype, @@ -103,19 +141,27 @@ module Rinda end if addrinfo.ipv4_multicast? then + interface_address = '0.0.0.0' if interface_address.nil? + socket.bind(Addrinfo.udp(interface_address, @port)) + mreq = IPAddr.new(addrinfo.ip_address).hton + - IPAddr.new('0.0.0.0').hton + IPAddr.new(interface_address).hton socket.setsockopt(:IPPROTO_IP, :IP_ADD_MEMBERSHIP, mreq) else - mreq = IPAddr.new(addrinfo.ip_address).hton + [0].pack('I') + interface_address = '::1' if interface_address.nil? + socket.bind(Addrinfo.udp(interface_address, @port)) + + mreq = IPAddr.new(addrinfo.ip_address).hton + + [multicast_interface].pack('I') socket.setsockopt(:IPPROTO_IPV6, :IPV6_JOIN_GROUP, mreq) end + else + socket.bind(addrinfo) end - socket.bind(addrinfo) - + @sockets << socket socket end diff --git a/test/rinda/test_rinda.rb b/test/rinda/test_rinda.rb index 7791c7e..c259482 100644 --- a/test/rinda/test_rinda.rb +++ b/test/rinda/test_rinda.rb @@ -570,8 +570,8 @@ class TestRingServer < Test::Unit::TestCase assert(v4mc.getsockopt(:SOCKET, :SO_REUSEADDR).bool) end - assert_equal('239.0.0.1', v4mc.local_address.ip_address) - assert_equal(@port, v4mc.local_address.ip_port) + assert_equal('0.0.0.0', v4mc.local_address.ip_address) + assert_equal(@port, v4mc.local_address.ip_port) end def test_make_socket_ipv6_multicast @@ -590,7 +590,43 @@ class TestRingServer < Test::Unit::TestCase assert v6mc.getsockopt(:SOCKET, :SO_REUSEADDR).bool end - assert_equal('ff02::1', v6mc.local_address.ip_address) + assert_equal('::1', v6mc.local_address.ip_address) + assert_equal(@port, v6mc.local_address.ip_port) + end + + def test_ring_server_ipv4_multicast + @rs = Rinda::RingServer.new(@ts, [['239.0.0.1', '0.0.0.0']], @port) + v4mc = @rs.instance_variable_get('@sockets').first + + if Socket.const_defined?(:SO_REUSEPORT) then + assert(v4mc.getsockopt(:SOCKET, :SO_REUSEPORT).bool) + else + assert(v4mc.getsockopt(:SOCKET, :SO_REUSEADDR).bool) + end + + assert_equal('0.0.0.0', v4mc.local_address.ip_address) + assert_equal(@port, v4mc.local_address.ip_port) + end + + def test_ring_server_ipv6_multicast + skip 'IPv6 not available' unless + Socket.ip_address_list.any? { |addrinfo| addrinfo.ipv6? } + + begin + @rs = Rinda::RingServer.new(@ts, [['ff02::1', '::1', 0]], @port) + rescue Errno::EADDRNOTAVAIL + return # IPv6 address for multicast not available + end + + v6mc = @rs.instance_variable_get('@sockets').first + + if Socket.const_defined?(:SO_REUSEPORT) then + assert v6mc.getsockopt(:SOCKET, :SO_REUSEPORT).bool + else + assert v6mc.getsockopt(:SOCKET, :SO_REUSEADDR).bool + end + + assert_equal('::1', v6mc.local_address.ip_address) assert_equal(@port, v6mc.local_address.ip_port) end -- 1.7.10.4