Feature #5461 ยป net.http.rb.pipeline.patch
| lib/net/http.rb (working copy) | ||
|---|---|---|
|
# See Net::HTTP::Proxy for further details and examples such as proxies that
|
||
|
# require a username and password.
|
||
|
#
|
||
|
# === Pipelining
|
||
|
#
|
||
|
# On HTTP/1.1 servers requests can be pipelined which can reduce response
|
||
|
# time for a series of requests. When pipelining Net::HTTP sends requests
|
||
|
# in the list without waiting for a response then retrieves the responses
|
||
|
# from the server after all requests are issued. The server will return the
|
||
|
# responses in the same order they were issued.
|
||
|
#
|
||
|
# Example:
|
||
|
#
|
||
|
# requests = []
|
||
|
# requests << Net::HTTP::Get.new('/images/bug.png')
|
||
|
# requests << Net::HTTP::Get.new('/images/date.png')
|
||
|
# requests << Net::HTTP::Get.new('/images/find.png')
|
||
|
#
|
||
|
# http = Net::HTTP.start 'localhost' do
|
||
|
# http.pipeline requests do |req, res|
|
||
|
# # ...
|
||
|
# end
|
||
|
# end
|
||
|
#
|
||
|
# Before attempting to pipeline a sequence of requests, Net::HTTP will
|
||
|
# consume one request to check if the server is pipelining-capable. To avoid
|
||
|
# the check set #pipelining to true:
|
||
|
#
|
||
|
# http = Net::HTTP.start 'localhost' do
|
||
|
# http.pipelining = true
|
||
|
# responses = http.pipeline requests
|
||
|
# # ...
|
||
|
#
|
||
|
# For more details see Net::HTTP#pipeline
|
||
|
#
|
||
|
# == HTTP Request Classes
|
||
|
#
|
||
|
# Here is the HTTP request class hierarchy.
|
||
| ... | ... | |
|
end
|
||
|
# :startdoc:
|
||
|
class Error < StandardError; end
|
||
|
class PipelineError < Error
|
||
|
##
|
||
|
# Remaining requests that have not been sent to the HTTP server
|
||
|
attr_reader :requests
|
||
|
##
|
||
|
# Retrieved responses up to the error point
|
||
|
attr_reader :responses
|
||
|
##
|
||
|
# Creates a new Error with +message+, a list of +requests+ that have not
|
||
|
# been sent to the server and a list of +responses+ that have been
|
||
|
# retrieved from the server.
|
||
|
def initialize message, requests, responses
|
||
|
super message
|
||
|
@requests = requests
|
||
|
@responses = responses
|
||
|
end
|
||
|
end
|
||
|
# Raised when the server appears to not support persistent connections
|
||
|
# which are required for pipelining requests.
|
||
|
class PipelinePersistenceError < PipelineError
|
||
|
# Creates a new PipelinePersistenceError with a list of +requests+ that
|
||
|
# have not been sent to the server and a list of +responses+ that have
|
||
|
# been retrieved from the server.
|
||
|
def initialize requests, responses
|
||
|
super 'persistent connections required', requests, responses
|
||
|
end
|
||
|
end
|
||
|
# Raised when the server appears to not support pipelining requests
|
||
|
class PipelineUnsupportedError < PipelineError
|
||
|
# Creates a new PipelineUnsupportedError with a list of +requests+ that
|
||
|
# have not been sent to the server and a list of +responses+ that have
|
||
|
# been retrieved from the server.
|
||
|
def initialize reason = nil, requests, responses
|
||
|
message = 'pipeline connections are not supported'
|
||
|
message << " (#{reason})" if reason
|
||
|
super message, requests, responses
|
||
|
end
|
||
|
end
|
||
|
# Raised if an error occurs while reading responses.
|
||
|
class PipelineResponseError < PipelineError
|
||
|
# The original exception
|
||
|
attr_accessor :original
|
||
|
# Creates a new PipelineResponseError with an original +exception+, a
|
||
|
# list of +requests+ that were in-flight and a list of +responses+ that
|
||
|
# have been retrieved from the server.
|
||
|
def initialize exception, requests, responses
|
||
|
@original = exception
|
||
|
message = "error reading responses: #{original} (#{original.class})"
|
||
|
super message, requests, responses
|
||
|
end
|
||
|
end
|
||
|
# Turns on net/http 1.2 (ruby 1.8) features.
|
||
|
# Defaults to ON in ruby 1.8 or later.
|
||
|
def HTTP.version_1_2
|
||
| ... | ... | |
|
@address = address
|
||
|
@port = (port || HTTP.default_port)
|
||
|
@curr_http_version = HTTPVersion
|
||
|
@pipelining = nil # nil means unknown
|
||
|
@no_keepalive_server = false
|
||
|
@close_on_empty_response = false
|
||
|
@socket = nil
|
||
| ... | ... | |
|
@enable_post_connection_check = true
|
||
|
@compression = nil
|
||
|
@sspi_enabled = false
|
||
|
@socket_class = BufferedIO
|
||
|
if defined?(SSL_ATTRIBUTES)
|
||
|
SSL_ATTRIBUTES.each do |name|
|
||
|
instance_variable_set "@#{name}", nil
|
||
| ... | ... | |
|
attr_accessor :close_on_empty_response
|
||
|
# Set to true if this server supports pipelining, false if it does not.
|
||
|
# If unset, #pipeline will figure it out
|
||
|
attr_accessor :pipelining
|
||
|
# Returns true if SSL/TLS is being used with HTTP.
|
||
|
def use_ssl?
|
||
|
@use_ssl
|
||
| ... | ... | |
|
s = OpenSSL::SSL::SSLSocket.new(s, @ssl_context)
|
||
|
s.sync_close = true
|
||
|
end
|
||
|
@socket = BufferedIO.new(s)
|
||
|
@socket = @socket_class.new(s)
|
||
|
@socket.read_timeout = @read_timeout
|
||
|
@socket.continue_timeout = @continue_timeout
|
||
|
@socket.debug_output = @debug_output
|
||
| ... | ... | |
|
end
|
||
|
private :do_finish
|
||
|
# Closes the connection and rescues any IOErrors this may cause
|
||
|
def reset_pipeline requests, responses
|
||
|
begin
|
||
|
finish
|
||
|
rescue IOError
|
||
|
end
|
||
|
start
|
||
|
rescue Errno::ECONNREFUSED
|
||
|
raise PipelineError.new("connection refused: #{address}:#{port}",
|
||
|
requests, responses)
|
||
|
rescue Errno::EHOSTDOWN
|
||
|
raise PipelineError.new("host down: #{address}:#{port}",
|
||
|
requests, responses)
|
||
|
end
|
||
|
private :reset_pipeline
|
||
|
#
|
||
|
# proxy
|
||
|
#
|
||
| ... | ... | |
|
res
|
||
|
end
|
||
|
# Pipelines +requests+ to the HTTP server yielding responses if a block is
|
||
|
# given. Returns all responses received.
|
||
|
#
|
||
|
# Only idempotent sequences of requests will be pipelined. If a
|
||
|
# non-idempotent request (like a POST) is included in a request sequence
|
||
|
# #pipeline will wait for a response before proceeding with further
|
||
|
# sequences of requests.
|
||
|
#
|
||
|
# The Net::HTTP connection must be started (#start must be called) before
|
||
|
# calling #pipeline.
|
||
|
#
|
||
|
# Raises a subclass of the PipelineError exception if the connection is
|
||
|
# not pipeline-capable, if the HTTP session has not been started, or their
|
||
|
# was a problem sending or receiving the +requests+. The remaining
|
||
|
# outstanding requests and returned responses can be retrieved from the
|
||
|
# PipelineError exception.
|
||
|
#
|
||
|
# Example:
|
||
|
#
|
||
|
# requests = []
|
||
|
# requests << Net::HTTP::Get.new('/images/bug.png')
|
||
|
# requests << Net::HTTP::Get.new('/images/date.png')
|
||
|
# requests << Net::HTTP::Get.new('/images/find.png')
|
||
|
#
|
||
|
# http = Net::HTTP.new 'localhost'
|
||
|
# http.start do
|
||
|
# http.pipeline requests do |req, res|
|
||
|
# open File.basename(req.path), 'wb' do |io|
|
||
|
# io.write res.body
|
||
|
# end
|
||
|
# end
|
||
|
# end
|
||
|
def pipeline requests, &block # :yields: response
|
||
|
requests = requests.dup
|
||
|
responses = []
|
||
|
raise PipelineError.new('Net::HTTP not started', requests, responses) unless
|
||
|
started?
|
||
|
pipeline_check requests, responses, &block
|
||
|
retried = responses.length
|
||
|
until requests.empty? do
|
||
|
begin
|
||
|
in_flight = pipeline_send requests
|
||
|
pipeline_receive in_flight, responses, &block
|
||
|
rescue PipelineResponseError => e
|
||
|
e.requests.reverse_each do |request|
|
||
|
requests.unshift request
|
||
|
end
|
||
|
raise if responses.length == retried or not requests.first.idempotent?
|
||
|
retried = responses.length
|
||
|
reset_pipeline requests, responses
|
||
|
retry
|
||
|
end
|
||
|
end
|
||
|
responses
|
||
|
end
|
||
|
private
|
||
|
# Executes a request which uses a representation
|
||
| ... | ... | |
|
raise HTTPAuthenticationError.new('HTTP authentication failed', err)
|
||
|
end
|
||
|
# Ensures this connection supports pipelining.
|
||
|
#
|
||
|
# If the server has not been tested for pipelining support one of the
|
||
|
# +requests+ will be consumed and placed in +responses+.
|
||
|
#
|
||
|
# A PersistenceError will be raised if the server does not support
|
||
|
# persistent connections. (The server is HTTP/1.1, but closed the
|
||
|
# connection.)
|
||
|
#
|
||
|
# A PipelineUnsupportedError will be raised if the server does not support
|
||
|
# pipelining. (The server is not HTTP/1.1.)
|
||
|
def pipeline_check requests, responses
|
||
|
unless @pipelining.nil? then # tri-state
|
||
|
return if @pipelining
|
||
|
raise PipelineUnsupportedError.new(requests, responses) unless
|
||
|
@pipelining
|
||
|
else
|
||
|
@pipelining = false
|
||
|
end
|
||
|
if '1.1' > @curr_http_version then
|
||
|
@pipelining = false
|
||
|
reason = "server is HTTP/#{@curr_http_version}"
|
||
|
raise PipelineUnsupportedError.new(reason, requests, responses)
|
||
|
end
|
||
|
req = requests.shift
|
||
|
tried_once = false
|
||
|
begin
|
||
|
res = request(req)
|
||
|
rescue Timeout::Error, EOFError, Errno::ECONNABORTED, Errno::ECONNRESET,
|
||
|
Errno::EPIPE, Net::HTTPBadResponse => e
|
||
|
if tried_once then
|
||
|
requests.unshift req
|
||
|
raise PipelineResponseError.new(e, requests, responses)
|
||
|
end
|
||
|
tried_once = true
|
||
|
reset_pipeline requests, responses
|
||
|
retry
|
||
|
end
|
||
|
responses << res
|
||
|
yield req, res if block_given?
|
||
|
@pipelining = keep_alive? req, res
|
||
|
if '1.1' > @curr_http_version then
|
||
|
@pipelining = false
|
||
|
reason = "server is HTTP/#{@curr_http_version}"
|
||
|
raise PipelineUnsupportedError.new(reason, requests, responses)
|
||
|
elsif not @pipelining then
|
||
|
raise PipelinePersistenceError.new(requests, responses)
|
||
|
end
|
||
|
@close_on_empty_response = false
|
||
|
end
|
||
|
# Receives HTTP responses for the +in_flight+ requests and adds them to
|
||
|
# +responses+
|
||
|
def pipeline_receive in_flight, responses
|
||
|
while req = in_flight.shift do
|
||
|
begin
|
||
|
begin
|
||
|
res = Net::HTTPResponse.read_new @socket
|
||
|
end while res.kind_of? Net::HTTPContinue
|
||
|
res.reading_body @socket, req.response_body_permitted? do
|
||
|
responses << res
|
||
|
yield req, res if block_given?
|
||
|
end
|
||
|
end_transport req, res
|
||
|
rescue StandardError, Timeout::Error
|
||
|
in_flight.unshift req
|
||
|
raise
|
||
|
end
|
||
|
end
|
||
|
responses
|
||
|
rescue Timeout::Error, EOFError, Errno::ECONNABORTED, Errno::ECONNRESET,
|
||
|
Errno::EPIPE, Net::HTTPBadResponse => e
|
||
|
finish
|
||
|
raise PipelineResponseError.new(e, in_flight, responses)
|
||
|
end
|
||
|
# Sends +requests+ to the HTTP server and removes them from the +requests+
|
||
|
# list. Returns the requests that have been pipelined and are in-flight.
|
||
|
#
|
||
|
# If a non-idempotent request is first in +requests+ it will be sent and no
|
||
|
# further requests will be pipelined.
|
||
|
#
|
||
|
# If a non-idempotent request is encountered after an idempotent request it
|
||
|
# will not be sent.
|
||
|
#
|
||
|
# After the response for a non-idempotent request is received another
|
||
|
# series of pipelined requests will be issued via #pipeline.
|
||
|
def pipeline_send requests
|
||
|
in_flight = []
|
||
|
while req = requests.shift do
|
||
|
idempotent = req.idempotent?
|
||
|
unless idempotent or in_flight.empty? then
|
||
|
requests.unshift req
|
||
|
break
|
||
|
end
|
||
|
begin_transport req
|
||
|
req.exec @socket, @curr_http_version, edit_path(req.path)
|
||
|
in_flight << req
|
||
|
break unless idempotent
|
||
|
end
|
||
|
in_flight
|
||
|
end
|
||
|
#
|
||
|
# utils
|
||
|
#
|
||
| ... | ... | |
|
"\#<#{self.class} #{@method}>"
|
||
|
end
|
||
|
##
|
||
|
# Is this request idempotent according to RFC 2616?
|
||
|
def idempotent?
|
||
|
case self
|
||
|
when Net::HTTP::Delete, Net::HTTP::Get, Net::HTTP::Head,
|
||
|
Net::HTTP::Options, Net::HTTP::Put, Net::HTTP::Trace then
|
||
|
true
|
||
|
end
|
||
|
end
|
||
|
def request_body_permitted?
|
||
|
@request_has_body
|
||
|
end
|
||
| test/net/http/test_http.rb (working copy) | ||
|---|---|---|
|
end
|
||
|
end
|
||
|
module TestNetHTTP_version_1_2_methods
|
||
|
def test_request
|
||
| ... | ... | |
|
assert_not_match(/HTTP\/1.1 100 continue/, @debug.string)
|
||
|
end
|
||
|
end
|
||
|
class TestNetHttp < Test::Unit::TestCase
|
||
|
def test_idempotent_eh
|
||
|
assert Net::HTTP::Delete.new('/').idempotent?
|
||
|
assert Net::HTTP::Get.new('/').idempotent?
|
||
|
assert Net::HTTP::Head.new('/').idempotent?
|
||
|
assert Net::HTTP::Options.new('/').idempotent?
|
||
|
assert Net::HTTP::Put.new('/').idempotent?
|
||
|
assert Net::HTTP::Trace.new('/').idempotent?
|
||
|
refute Net::HTTP::Post.new('/').idempotent?
|
||
|
end
|
||
|
end
|
||
| test/net/http/utils.rb (working copy) | ||
|---|---|---|
|
end
|
||
|
require 'webrick/httpservlet/abstract'
|
||
|
class Net::HTTP
|
||
|
attr_accessor :socket_class
|
||
|
end
|
||
|
module TestNetHTTPUtils
|
||
|
##
|
||
|
# Records the data Net::BufferedIO sends and receives in write_io and read_io
|
||
|
class TestIO < Net::BufferedIO
|
||
|
attr_reader :read_io
|
||
|
attr_reader :write_io
|
||
|
def self.ios
|
||
|
@ios
|
||
|
end
|
||
|
def self.new(io)
|
||
|
io = super
|
||
|
@ios << io
|
||
|
io
|
||
|
end
|
||
|
def self.reset
|
||
|
@ios = []
|
||
|
end
|
||
|
reset
|
||
|
def initialize(io)
|
||
|
super
|
||
|
@read_io = StringIO.new
|
||
|
@write_io = StringIO.new
|
||
|
end
|
||
|
def rbuf_consume(len)
|
||
|
s = super len
|
||
|
@read_io << s
|
||
|
s
|
||
|
end
|
||
|
def write0(str)
|
||
|
@write_io << str
|
||
|
super
|
||
|
end
|
||
|
end
|
||
|
##
|
||
|
# Raises Errno::ECONNRESET for every request
|
||
|
class ErrorAlways < TestIO
|
||
|
def readline
|
||
|
raise Errno::ECONNRESET
|
||
|
end
|
||
|
end
|
||
|
##
|
||
|
# Raises Errno::ECONNRESET for every request after the first
|
||
|
class ErrorAfterOne < TestIO
|
||
|
def self.reset
|
||
|
@@count = 0
|
||
|
super
|
||
|
end
|
||
|
def readline
|
||
|
@@count += 1
|
||
|
raise Errno::ECONNRESET if @@count >= 2
|
||
|
super
|
||
|
end
|
||
|
end
|
||
|
##
|
||
|
# Raises Errno::ECONNRESET upon the second request
|
||
|
class ErrorAfterOneOnce < TestIO
|
||
|
def self.reset
|
||
|
@@count = 0
|
||
|
super
|
||
|
end
|
||
|
def readline
|
||
|
@@count += 1
|
||
|
raise Errno::ECONNRESET if @@count == 2
|
||
|
super
|
||
|
end
|
||
|
end
|
||
|
##
|
||
|
# Raises Errno::ECONNRESET upon every even request 2, 4, 6
|
||
|
class ErrorEven < TestIO
|
||
|
def self.reset
|
||
|
@@count = 0
|
||
|
super
|
||
|
end
|
||
|
def readline
|
||
|
@@count += 1
|
||
|
raise Errno::ECONNRESET if @@count % 2 == 0
|
||
|
super
|
||
|
end
|
||
|
end
|
||
|
def start(&block)
|
||
|
new().start(&block)
|
||
|
end
|
||
| ... | ... | |
|
klass = Net::HTTP::Proxy(config('proxy_host'), config('proxy_port'))
|
||
|
http = klass.new(config('host'), config('port'))
|
||
|
http.set_debug_output logfile()
|
||
|
http.socket_class = @net_http_io
|
||
|
http
|
||
|
end
|
||
| ... | ... | |
|
end
|
||
|
def setup
|
||
|
TestIO.reset
|
||
|
ErrorAlways.reset
|
||
|
ErrorAfterOne.reset
|
||
|
ErrorAfterOneOnce.reset
|
||
|
ErrorEven.reset
|
||
|
BadOnce.reset
|
||
|
Counter.reset
|
||
|
spawn_server
|
||
|
@net_http_io = TestIO
|
||
|
end
|
||
|
def teardown
|
||
| ... | ... | |
|
:ServerType => Thread,
|
||
|
}
|
||
|
server_config[:OutputBufferSize] = 4 if config('chunked')
|
||
|
server_config[:HTTPVersion] = config('http_version') if
|
||
|
config('http_version')
|
||
|
if defined?(OpenSSL) and config('ssl_enable')
|
||
|
server_config.update({
|
||
|
:SSLEnable => true,
|
||
| ... | ... | |
|
})
|
||
|
end
|
||
|
@server = WEBrick::HTTPServer.new(server_config)
|
||
|
@server.mount('/', Servlet, config('chunked'))
|
||
|
@server.mount('/', Servlet, config('chunked'))
|
||
|
@server.mount('/bad_once', BadOnce)
|
||
|
@server.mount('/close', Closer)
|
||
|
@server.mount('/count', Counter)
|
||
|
@server.mount('/reset_after', ResetAfter)
|
||
|
@server.start
|
||
|
n_try_max = 5
|
||
|
begin
|
||
| ... | ... | |
|
end
|
||
|
end
|
||
|
class BadOnce < WEBrick::HTTPServlet::AbstractServlet
|
||
|
@instance = nil
|
||
|
def self.get_instance(server, *options)
|
||
|
@instance ||= super
|
||
|
end
|
||
|
def self.reset
|
||
|
@instance = nil
|
||
|
end
|
||
|
def initialize(server)
|
||
|
@count = 1
|
||
|
end
|
||
|
def do_GET(req, res)
|
||
|
def res.status_line
|
||
|
"bogus"
|
||
|
end if @count == 1
|
||
|
res.body = "Was bad. Now #{@count}"
|
||
|
@count += 1
|
||
|
end
|
||
|
alias do_POST do_GET
|
||
|
end
|
||
|
class Counter < WEBrick::HTTPServlet::AbstractServlet
|
||
|
@instance = nil
|
||
|
def self.get_instance(server, *options)
|
||
|
@instance ||= super
|
||
|
end
|
||
|
def self.reset
|
||
|
@instance = nil
|
||
|
end
|
||
|
def initialize(server)
|
||
|
@count = 1
|
||
|
end
|
||
|
def do_GET(req, res)
|
||
|
res['Content-Type'] = 'text/plain'
|
||
|
res.body = "Worked #{@count}!"
|
||
|
@count += 1
|
||
|
end
|
||
|
alias do_POST do_GET
|
||
|
end
|
||
|
class Closer < WEBrick::HTTPServlet::AbstractServlet
|
||
|
def do_GET(req, res)
|
||
|
res['Content-Type'] = 'text/plain'
|
||
|
res.body = "closing this connection"
|
||
|
res.close
|
||
|
end
|
||
|
alias do_POST do_GET
|
||
|
end
|
||
|
class ResetAfter < WEBrick::HTTPServlet::AbstractServlet
|
||
|
def do_GET(req, res)
|
||
|
def res._write_data(socket, data)
|
||
|
socket.close_read
|
||
|
socket << data
|
||
|
end
|
||
|
res.body = "Reading has been shut down"
|
||
|
end
|
||
|
alias do_POST do_GET
|
||
|
end
|
||
|
class NullWriter
|
||
|
def <<(s) end
|
||
|
def puts(*args) end
|
||
| test/net/http/test_pipeline.rb (revision 0) | ||
|---|---|---|
|
require 'test/unit'
|
||
|
require 'net/http'
|
||
|
require 'stringio'
|
||
|
require 'uri'
|
||
|
require_relative 'utils'
|
||
|
class TestNetHttpPipeline < Test::Unit::TestCase
|
||
|
CONFIG = {
|
||
|
'host' => '127.0.0.1',
|
||
|
'port' => 10083,
|
||
|
'proxy_host' => nil,
|
||
|
'proxy_port' => nil,
|
||
|
'chunked' => true,
|
||
|
}
|
||
|
include TestNetHTTPUtils
|
||
|
def setup
|
||
|
super
|
||
|
@get1 = Net::HTTP::Get.new '/count'
|
||
|
@get2 = Net::HTTP::Get.new '/count'
|
||
|
@get3 = Net::HTTP::Get.new '/count'
|
||
|
@post = Net::HTTP::Post.new '/count'
|
||
|
end
|
||
|
def test_pipeline
|
||
|
requests = [@get1, @get2]
|
||
|
responses = start do |http|
|
||
|
http.pipeline requests
|
||
|
end
|
||
|
assert_equal 'Worked 1!', responses.first.body
|
||
|
assert_equal 'Worked 2!', responses.last.body
|
||
|
refute_empty requests
|
||
|
end
|
||
|
def test_pipeline_block
|
||
|
requests = [@get1, @get2]
|
||
|
responses = []
|
||
|
start do |http|
|
||
|
http.pipeline requests do |req, res| responses << [req, res] end
|
||
|
end
|
||
|
refute_empty requests
|
||
|
assert_equal requests, responses.map { |req,| req }
|
||
|
responses = responses.map { |_, res| res }
|
||
|
assert_equal 'Worked 1!', responses.first.body
|
||
|
assert_equal 'Worked 2!', responses.last.body
|
||
|
end
|
||
|
def test_pipeline_non_idempotent
|
||
|
responses = start do |http|
|
||
|
http.pipelining = true
|
||
|
http.pipeline [@get1, @get2, @post, @get3]
|
||
|
end
|
||
|
assert_equal 'Worked 1!', responses.shift.body
|
||
|
assert_equal 'Worked 2!', responses.shift.body
|
||
|
assert_equal 'Worked 3!', responses.shift.body
|
||
|
assert_equal 'Worked 4!', responses.shift.body
|
||
|
assert responses.empty?
|
||
|
end
|
||
|
def test_pipeline_not_started
|
||
|
@started = false
|
||
|
e = assert_raises Net::HTTP::PipelineError do
|
||
|
http = new
|
||
|
http.pipeline []
|
||
|
end
|
||
|
assert_equal 'Net::HTTP not started', e.message
|
||
|
end
|
||
|
def test_pipeline_retry
|
||
|
requests = [@get1, @get2, @get3]
|
||
|
@net_http_io = ErrorAfterOneOnce
|
||
|
responses = start do |http|
|
||
|
http.pipelining = true
|
||
|
http.pipeline requests
|
||
|
end
|
||
|
assert_equal 'Worked 1!', responses.shift.body
|
||
|
assert_equal 'Worked 3!', responses.shift.body # response 2 was lost
|
||
|
assert_equal 'Worked 4!', responses.shift.body
|
||
|
assert_empty responses
|
||
|
refute_empty requests
|
||
|
end
|
||
|
def test_pipeline_retry_fail_post
|
||
|
@net_http_io = ErrorAlways
|
||
|
requests = [@post]
|
||
|
e = assert_raises Net::HTTP::PipelineResponseError do
|
||
|
start do |http|
|
||
|
http.pipelining = true
|
||
|
http.pipeline requests
|
||
|
end
|
||
|
end
|
||
|
assert_empty e.responses
|
||
|
assert_equal [@post], e.requests
|
||
|
end
|
||
|
def test_pipeline_retry_fail_different
|
||
|
@net_http_io = ErrorEven
|
||
|
requests = [@get1, @get2, @get3]
|
||
|
responses = start do |http|
|
||
|
http.pipelining = true
|
||
|
http.pipeline requests
|
||
|
end
|
||
|
assert_equal 'Worked 1!', responses.shift.body
|
||
|
assert_equal 'Worked 3!', responses.shift.body
|
||
|
assert_equal 'Worked 5!', responses.shift.body
|
||
|
assert_empty responses
|
||
|
refute_empty requests
|
||
|
end
|
||
|
def test_pipeline_retry_fail_same
|
||
|
@net_http_io = ErrorAfterOne
|
||
|
requests = [@get1, @get2, @get3]
|
||
|
e = assert_raises Net::HTTP::PipelineResponseError do
|
||
|
start do |http|
|
||
|
http.pipelining = true
|
||
|
http.pipeline requests
|
||
|
end
|
||
|
end
|
||
|
responses = e.responses
|
||
|
assert_equal 'Worked 1!', responses.shift.body
|
||
|
assert_empty responses
|
||
|
assert_equal [@get2, @get3], e.requests
|
||
|
end
|
||
|
# end #pipeline tests
|
||
|
def test_pipeline_check
|
||
|
requests = [@get1, @get2]
|
||
|
responses = []
|
||
|
start do |http|
|
||
|
http.send :pipeline_check, requests, responses
|
||
|
assert http.pipelining
|
||
|
end
|
||
|
assert_equal [@get2], requests
|
||
|
assert_equal 1, responses.length
|
||
|
assert_equal 'Worked 1!', responses.first.body
|
||
|
end
|
||
|
def test_pipeline_check_again
|
||
|
start do |http|
|
||
|
http.pipelining = false
|
||
|
e = assert_raises Net::HTTP::PipelineUnsupportedError do
|
||
|
http.send :pipeline_check, [@get1, @get2], []
|
||
|
end
|
||
|
assert_equal [@get1, @get2], e.requests
|
||
|
assert_empty e.responses
|
||
|
refute http.pipelining
|
||
|
end
|
||
|
end
|
||
|
def test_pipeline_check_bad_response
|
||
|
bad_once = Net::HTTP::Get.new '/bad_once'
|
||
|
start do |http|
|
||
|
requests = [bad_once, @get2]
|
||
|
responses = []
|
||
|
http.send :pipeline_check, requests, responses
|
||
|
assert_equal [@get2], requests
|
||
|
assert_equal 1, responses.length
|
||
|
assert_equal 'Was bad. Now 2', responses.first.body
|
||
|
assert http.pipelining
|
||
|
end
|
||
|
end
|
||
|
def test_pipeline_check_non_persistent
|
||
|
start do |http|
|
||
|
get1 = Net::HTTP::Get.new '/close'
|
||
|
e = assert_raises Net::HTTP::PipelinePersistenceError do
|
||
|
http.send :pipeline_check, [get1, @get2], []
|
||
|
end
|
||
|
refute http.pipelining
|
||
|
assert_equal [@get2], e.requests
|
||
|
assert_equal 1, e.responses.length
|
||
|
end
|
||
|
end
|
||
|
def test_pipeline_check_pipelining
|
||
|
start do |http|
|
||
|
http.pipelining = true
|
||
|
requests = [@get1, @get2]
|
||
|
responses = []
|
||
|
http.send :pipeline_check, requests, responses
|
||
|
assert_equal [@get1, @get2], requests
|
||
|
assert_empty responses
|
||
|
assert http.pipelining
|
||
|
end
|
||
|
end
|
||
|
def test_pipeline_receive
|
||
|
responses = []
|
||
|
r = start do |http|
|
||
|
in_flight = http.send :pipeline_send, [@get1, @get2]
|
||
|
assert_equal 2, in_flight.length
|
||
|
http.send :pipeline_receive, in_flight, responses
|
||
|
end
|
||
|
assert_equal 'Worked 1!', responses.first.body
|
||
|
assert_equal 'Worked 2!', responses.last.body
|
||
|
assert_same r, responses
|
||
|
end
|
||
|
def test_pipeline_receive_bad_response
|
||
|
bad_once = Net::HTTP::Get.new '/reset_after'
|
||
|
responses = []
|
||
|
start do |http|
|
||
|
in_flight = http.send :pipeline_send, [bad_once, @get2]
|
||
|
e = assert_raises Net::HTTP::PipelineResponseError do
|
||
|
http.send :pipeline_receive, in_flight, responses
|
||
|
end
|
||
|
assert_equal [@get2], e.requests
|
||
|
assert_equal 1, e.responses.length
|
||
|
assert_equal 'Reading has been shut down', e.responses.first.body
|
||
|
assert_kind_of EOFError, e.original
|
||
|
end
|
||
|
end
|
||
|
def test_pipeline_send
|
||
|
requests = [@get1, @get2, @post, @get3]
|
||
|
in_flight = start do |http|
|
||
|
http.send :pipeline_send, requests
|
||
|
end
|
||
|
assert_equal [@get1, @get2], in_flight
|
||
|
assert_equal [@post, @get3], requests
|
||
|
end
|
||
|
def test_pipeline_send_non_idempotent
|
||
|
requests = [@post, @get3]
|
||
|
in_flight = start do |http|
|
||
|
http.send :pipeline_send, requests
|
||
|
end
|
||
|
assert_equal [@post], in_flight
|
||
|
assert_equal [@get3], requests
|
||
|
end
|
||
|
end
|
||
|
class TestNetHttpPipeline_1_0 < Test::Unit::TestCase
|
||
|
CONFIG = {
|
||
|
'host' => '127.0.0.1',
|
||
|
'port' => 10084,
|
||
|
'proxy_host' => nil,
|
||
|
'proxy_port' => nil,
|
||
|
'chunked' => false,
|
||
|
'http_version' => '1.0'
|
||
|
}
|
||
|
include TestNetHTTPUtils
|
||
|
def setup
|
||
|
super
|
||
|
@get1 = Net::HTTP::Get.new '/count'
|
||
|
@get2 = Net::HTTP::Get.new '/count'
|
||
|
end
|
||
|
def test_pipeline_1_0
|
||
|
start do |http|
|
||
|
http.instance_variable_set :@curr_http_version, '1.0'
|
||
|
e = assert_raises Net::HTTP::PipelineUnsupportedError do
|
||
|
http.pipeline [@get1, @get2]
|
||
|
end
|
||
|
assert_equal [@get1, @get2], e.requests
|
||
|
assert_empty e.responses
|
||
|
end
|
||
|
end
|
||
|
def test_pipeline_check_1_0
|
||
|
start do |http|
|
||
|
e = assert_raises Net::HTTP::PipelineUnsupportedError do
|
||
|
http.send :pipeline_check, [@get1, @get2], []
|
||
|
end
|
||
|
assert_match %r%server is HTTP/1\.0%, e.message
|
||
|
assert_equal [@get2], e.requests
|
||
|
assert_equal 1, e.responses.length
|
||
|
assert_equal 'Worked 1!', e.responses.first.body
|
||
|
refute http.pipelining
|
||
|
end
|
||
|
end
|
||
|
end
|
||