Feature #5461 ยป net.http.rb.pipeline.patch
lib/net/http.rb (working copy) | ||
---|---|---|
# See Net::HTTP::Proxy for further details and examples such as proxies that
|
||
# require a username and password.
|
||
#
|
||
# === Pipelining
|
||
#
|
||
# On HTTP/1.1 servers requests can be pipelined which can reduce response
|
||
# time for a series of requests. When pipelining Net::HTTP sends requests
|
||
# in the list without waiting for a response then retrieves the responses
|
||
# from the server after all requests are issued. The server will return the
|
||
# responses in the same order they were issued.
|
||
#
|
||
# Example:
|
||
#
|
||
# requests = []
|
||
# requests << Net::HTTP::Get.new('/images/bug.png')
|
||
# requests << Net::HTTP::Get.new('/images/date.png')
|
||
# requests << Net::HTTP::Get.new('/images/find.png')
|
||
#
|
||
# http = Net::HTTP.start 'localhost' do
|
||
# http.pipeline requests do |req, res|
|
||
# # ...
|
||
# end
|
||
# end
|
||
#
|
||
# Before attempting to pipeline a sequence of requests, Net::HTTP will
|
||
# consume one request to check if the server is pipelining-capable. To avoid
|
||
# the check set #pipelining to true:
|
||
#
|
||
# http = Net::HTTP.start 'localhost' do
|
||
# http.pipelining = true
|
||
# responses = http.pipeline requests
|
||
# # ...
|
||
#
|
||
# For more details see Net::HTTP#pipeline
|
||
#
|
||
# == HTTP Request Classes
|
||
#
|
||
# Here is the HTTP request class hierarchy.
|
||
... | ... | |
end
|
||
# :startdoc:
|
||
class Error < StandardError; end
|
||
class PipelineError < Error
|
||
##
|
||
# Remaining requests that have not been sent to the HTTP server
|
||
attr_reader :requests
|
||
##
|
||
# Retrieved responses up to the error point
|
||
attr_reader :responses
|
||
##
|
||
# Creates a new Error with +message+, a list of +requests+ that have not
|
||
# been sent to the server and a list of +responses+ that have been
|
||
# retrieved from the server.
|
||
def initialize message, requests, responses
|
||
super message
|
||
@requests = requests
|
||
@responses = responses
|
||
end
|
||
end
|
||
# Raised when the server appears to not support persistent connections
|
||
# which are required for pipelining requests.
|
||
class PipelinePersistenceError < PipelineError
|
||
# Creates a new PipelinePersistenceError with a list of +requests+ that
|
||
# have not been sent to the server and a list of +responses+ that have
|
||
# been retrieved from the server.
|
||
def initialize requests, responses
|
||
super 'persistent connections required', requests, responses
|
||
end
|
||
end
|
||
# Raised when the server appears to not support pipelining requests
|
||
class PipelineUnsupportedError < PipelineError
|
||
# Creates a new PipelineUnsupportedError with a list of +requests+ that
|
||
# have not been sent to the server and a list of +responses+ that have
|
||
# been retrieved from the server.
|
||
def initialize reason = nil, requests, responses
|
||
message = 'pipeline connections are not supported'
|
||
message << " (#{reason})" if reason
|
||
super message, requests, responses
|
||
end
|
||
end
|
||
# Raised if an error occurs while reading responses.
|
||
class PipelineResponseError < PipelineError
|
||
# The original exception
|
||
attr_accessor :original
|
||
# Creates a new PipelineResponseError with an original +exception+, a
|
||
# list of +requests+ that were in-flight and a list of +responses+ that
|
||
# have been retrieved from the server.
|
||
def initialize exception, requests, responses
|
||
@original = exception
|
||
message = "error reading responses: #{original} (#{original.class})"
|
||
super message, requests, responses
|
||
end
|
||
end
|
||
# Turns on net/http 1.2 (ruby 1.8) features.
|
||
# Defaults to ON in ruby 1.8 or later.
|
||
def HTTP.version_1_2
|
||
... | ... | |
@address = address
|
||
@port = (port || HTTP.default_port)
|
||
@curr_http_version = HTTPVersion
|
||
@pipelining = nil # nil means unknown
|
||
@no_keepalive_server = false
|
||
@close_on_empty_response = false
|
||
@socket = nil
|
||
... | ... | |
@enable_post_connection_check = true
|
||
@compression = nil
|
||
@sspi_enabled = false
|
||
@socket_class = BufferedIO
|
||
if defined?(SSL_ATTRIBUTES)
|
||
SSL_ATTRIBUTES.each do |name|
|
||
instance_variable_set "@#{name}", nil
|
||
... | ... | |
attr_accessor :close_on_empty_response
|
||
# Set to true if this server supports pipelining, false if it does not.
|
||
# If unset, #pipeline will figure it out
|
||
attr_accessor :pipelining
|
||
# Returns true if SSL/TLS is being used with HTTP.
|
||
def use_ssl?
|
||
@use_ssl
|
||
... | ... | |
s = OpenSSL::SSL::SSLSocket.new(s, @ssl_context)
|
||
s.sync_close = true
|
||
end
|
||
@socket = BufferedIO.new(s)
|
||
@socket = @socket_class.new(s)
|
||
@socket.read_timeout = @read_timeout
|
||
@socket.continue_timeout = @continue_timeout
|
||
@socket.debug_output = @debug_output
|
||
... | ... | |
end
|
||
private :do_finish
|
||
# Closes the connection and rescues any IOErrors this may cause
|
||
def reset_pipeline requests, responses
|
||
begin
|
||
finish
|
||
rescue IOError
|
||
end
|
||
start
|
||
rescue Errno::ECONNREFUSED
|
||
raise PipelineError.new("connection refused: #{address}:#{port}",
|
||
requests, responses)
|
||
rescue Errno::EHOSTDOWN
|
||
raise PipelineError.new("host down: #{address}:#{port}",
|
||
requests, responses)
|
||
end
|
||
private :reset_pipeline
|
||
#
|
||
# proxy
|
||
#
|
||
... | ... | |
res
|
||
end
|
||
# Pipelines +requests+ to the HTTP server yielding responses if a block is
|
||
# given. Returns all responses received.
|
||
#
|
||
# Only idempotent sequences of requests will be pipelined. If a
|
||
# non-idempotent request (like a POST) is included in a request sequence
|
||
# #pipeline will wait for a response before proceeding with further
|
||
# sequences of requests.
|
||
#
|
||
# The Net::HTTP connection must be started (#start must be called) before
|
||
# calling #pipeline.
|
||
#
|
||
# Raises a subclass of the PipelineError exception if the connection is
|
||
# not pipeline-capable, if the HTTP session has not been started, or their
|
||
# was a problem sending or receiving the +requests+. The remaining
|
||
# outstanding requests and returned responses can be retrieved from the
|
||
# PipelineError exception.
|
||
#
|
||
# Example:
|
||
#
|
||
# requests = []
|
||
# requests << Net::HTTP::Get.new('/images/bug.png')
|
||
# requests << Net::HTTP::Get.new('/images/date.png')
|
||
# requests << Net::HTTP::Get.new('/images/find.png')
|
||
#
|
||
# http = Net::HTTP.new 'localhost'
|
||
# http.start do
|
||
# http.pipeline requests do |req, res|
|
||
# open File.basename(req.path), 'wb' do |io|
|
||
# io.write res.body
|
||
# end
|
||
# end
|
||
# end
|
||
def pipeline requests, &block # :yields: response
|
||
requests = requests.dup
|
||
responses = []
|
||
raise PipelineError.new('Net::HTTP not started', requests, responses) unless
|
||
started?
|
||
pipeline_check requests, responses, &block
|
||
retried = responses.length
|
||
until requests.empty? do
|
||
begin
|
||
in_flight = pipeline_send requests
|
||
pipeline_receive in_flight, responses, &block
|
||
rescue PipelineResponseError => e
|
||
e.requests.reverse_each do |request|
|
||
requests.unshift request
|
||
end
|
||
raise if responses.length == retried or not requests.first.idempotent?
|
||
retried = responses.length
|
||
reset_pipeline requests, responses
|
||
retry
|
||
end
|
||
end
|
||
responses
|
||
end
|
||
private
|
||
# Executes a request which uses a representation
|
||
... | ... | |
raise HTTPAuthenticationError.new('HTTP authentication failed', err)
|
||
end
|
||
# Ensures this connection supports pipelining.
|
||
#
|
||
# If the server has not been tested for pipelining support one of the
|
||
# +requests+ will be consumed and placed in +responses+.
|
||
#
|
||
# A PersistenceError will be raised if the server does not support
|
||
# persistent connections. (The server is HTTP/1.1, but closed the
|
||
# connection.)
|
||
#
|
||
# A PipelineUnsupportedError will be raised if the server does not support
|
||
# pipelining. (The server is not HTTP/1.1.)
|
||
def pipeline_check requests, responses
|
||
unless @pipelining.nil? then # tri-state
|
||
return if @pipelining
|
||
raise PipelineUnsupportedError.new(requests, responses) unless
|
||
@pipelining
|
||
else
|
||
@pipelining = false
|
||
end
|
||
if '1.1' > @curr_http_version then
|
||
@pipelining = false
|
||
reason = "server is HTTP/#{@curr_http_version}"
|
||
raise PipelineUnsupportedError.new(reason, requests, responses)
|
||
end
|
||
req = requests.shift
|
||
tried_once = false
|
||
begin
|
||
res = request(req)
|
||
rescue Timeout::Error, EOFError, Errno::ECONNABORTED, Errno::ECONNRESET,
|
||
Errno::EPIPE, Net::HTTPBadResponse => e
|
||
if tried_once then
|
||
requests.unshift req
|
||
raise PipelineResponseError.new(e, requests, responses)
|
||
end
|
||
tried_once = true
|
||
reset_pipeline requests, responses
|
||
retry
|
||
end
|
||
responses << res
|
||
yield req, res if block_given?
|
||
@pipelining = keep_alive? req, res
|
||
if '1.1' > @curr_http_version then
|
||
@pipelining = false
|
||
reason = "server is HTTP/#{@curr_http_version}"
|
||
raise PipelineUnsupportedError.new(reason, requests, responses)
|
||
elsif not @pipelining then
|
||
raise PipelinePersistenceError.new(requests, responses)
|
||
end
|
||
@close_on_empty_response = false
|
||
end
|
||
# Receives HTTP responses for the +in_flight+ requests and adds them to
|
||
# +responses+
|
||
def pipeline_receive in_flight, responses
|
||
while req = in_flight.shift do
|
||
begin
|
||
begin
|
||
res = Net::HTTPResponse.read_new @socket
|
||
end while res.kind_of? Net::HTTPContinue
|
||
res.reading_body @socket, req.response_body_permitted? do
|
||
responses << res
|
||
yield req, res if block_given?
|
||
end
|
||
end_transport req, res
|
||
rescue StandardError, Timeout::Error
|
||
in_flight.unshift req
|
||
raise
|
||
end
|
||
end
|
||
responses
|
||
rescue Timeout::Error, EOFError, Errno::ECONNABORTED, Errno::ECONNRESET,
|
||
Errno::EPIPE, Net::HTTPBadResponse => e
|
||
finish
|
||
raise PipelineResponseError.new(e, in_flight, responses)
|
||
end
|
||
# Sends +requests+ to the HTTP server and removes them from the +requests+
|
||
# list. Returns the requests that have been pipelined and are in-flight.
|
||
#
|
||
# If a non-idempotent request is first in +requests+ it will be sent and no
|
||
# further requests will be pipelined.
|
||
#
|
||
# If a non-idempotent request is encountered after an idempotent request it
|
||
# will not be sent.
|
||
#
|
||
# After the response for a non-idempotent request is received another
|
||
# series of pipelined requests will be issued via #pipeline.
|
||
def pipeline_send requests
|
||
in_flight = []
|
||
while req = requests.shift do
|
||
idempotent = req.idempotent?
|
||
unless idempotent or in_flight.empty? then
|
||
requests.unshift req
|
||
break
|
||
end
|
||
begin_transport req
|
||
req.exec @socket, @curr_http_version, edit_path(req.path)
|
||
in_flight << req
|
||
break unless idempotent
|
||
end
|
||
in_flight
|
||
end
|
||
#
|
||
# utils
|
||
#
|
||
... | ... | |
"\#<#{self.class} #{@method}>"
|
||
end
|
||
##
|
||
# Is this request idempotent according to RFC 2616?
|
||
def idempotent?
|
||
case self
|
||
when Net::HTTP::Delete, Net::HTTP::Get, Net::HTTP::Head,
|
||
Net::HTTP::Options, Net::HTTP::Put, Net::HTTP::Trace then
|
||
true
|
||
end
|
||
end
|
||
def request_body_permitted?
|
||
@request_has_body
|
||
end
|
test/net/http/test_http.rb (working copy) | ||
---|---|---|
end
|
||
end
|
||
module TestNetHTTP_version_1_2_methods
|
||
def test_request
|
||
... | ... | |
assert_not_match(/HTTP\/1.1 100 continue/, @debug.string)
|
||
end
|
||
end
|
||
class TestNetHttp < Test::Unit::TestCase
|
||
def test_idempotent_eh
|
||
assert Net::HTTP::Delete.new('/').idempotent?
|
||
assert Net::HTTP::Get.new('/').idempotent?
|
||
assert Net::HTTP::Head.new('/').idempotent?
|
||
assert Net::HTTP::Options.new('/').idempotent?
|
||
assert Net::HTTP::Put.new('/').idempotent?
|
||
assert Net::HTTP::Trace.new('/').idempotent?
|
||
refute Net::HTTP::Post.new('/').idempotent?
|
||
end
|
||
end
|
||
test/net/http/utils.rb (working copy) | ||
---|---|---|
end
|
||
require 'webrick/httpservlet/abstract'
|
||
class Net::HTTP
|
||
attr_accessor :socket_class
|
||
end
|
||
module TestNetHTTPUtils
|
||
##
|
||
# Records the data Net::BufferedIO sends and receives in write_io and read_io
|
||
class TestIO < Net::BufferedIO
|
||
attr_reader :read_io
|
||
attr_reader :write_io
|
||
def self.ios
|
||
@ios
|
||
end
|
||
def self.new(io)
|
||
io = super
|
||
@ios << io
|
||
io
|
||
end
|
||
def self.reset
|
||
@ios = []
|
||
end
|
||
reset
|
||
def initialize(io)
|
||
super
|
||
@read_io = StringIO.new
|
||
@write_io = StringIO.new
|
||
end
|
||
def rbuf_consume(len)
|
||
s = super len
|
||
@read_io << s
|
||
s
|
||
end
|
||
def write0(str)
|
||
@write_io << str
|
||
super
|
||
end
|
||
end
|
||
##
|
||
# Raises Errno::ECONNRESET for every request
|
||
class ErrorAlways < TestIO
|
||
def readline
|
||
raise Errno::ECONNRESET
|
||
end
|
||
end
|
||
##
|
||
# Raises Errno::ECONNRESET for every request after the first
|
||
class ErrorAfterOne < TestIO
|
||
def self.reset
|
||
@@count = 0
|
||
super
|
||
end
|
||
def readline
|
||
@@count += 1
|
||
raise Errno::ECONNRESET if @@count >= 2
|
||
super
|
||
end
|
||
end
|
||
##
|
||
# Raises Errno::ECONNRESET upon the second request
|
||
class ErrorAfterOneOnce < TestIO
|
||
def self.reset
|
||
@@count = 0
|
||
super
|
||
end
|
||
def readline
|
||
@@count += 1
|
||
raise Errno::ECONNRESET if @@count == 2
|
||
super
|
||
end
|
||
end
|
||
##
|
||
# Raises Errno::ECONNRESET upon every even request 2, 4, 6
|
||
class ErrorEven < TestIO
|
||
def self.reset
|
||
@@count = 0
|
||
super
|
||
end
|
||
def readline
|
||
@@count += 1
|
||
raise Errno::ECONNRESET if @@count % 2 == 0
|
||
super
|
||
end
|
||
end
|
||
def start(&block)
|
||
new().start(&block)
|
||
end
|
||
... | ... | |
klass = Net::HTTP::Proxy(config('proxy_host'), config('proxy_port'))
|
||
http = klass.new(config('host'), config('port'))
|
||
http.set_debug_output logfile()
|
||
http.socket_class = @net_http_io
|
||
http
|
||
end
|
||
... | ... | |
end
|
||
def setup
|
||
TestIO.reset
|
||
ErrorAlways.reset
|
||
ErrorAfterOne.reset
|
||
ErrorAfterOneOnce.reset
|
||
ErrorEven.reset
|
||
BadOnce.reset
|
||
Counter.reset
|
||
spawn_server
|
||
@net_http_io = TestIO
|
||
end
|
||
def teardown
|
||
... | ... | |
:ServerType => Thread,
|
||
}
|
||
server_config[:OutputBufferSize] = 4 if config('chunked')
|
||
server_config[:HTTPVersion] = config('http_version') if
|
||
config('http_version')
|
||
if defined?(OpenSSL) and config('ssl_enable')
|
||
server_config.update({
|
||
:SSLEnable => true,
|
||
... | ... | |
})
|
||
end
|
||
@server = WEBrick::HTTPServer.new(server_config)
|
||
@server.mount('/', Servlet, config('chunked'))
|
||
@server.mount('/', Servlet, config('chunked'))
|
||
@server.mount('/bad_once', BadOnce)
|
||
@server.mount('/close', Closer)
|
||
@server.mount('/count', Counter)
|
||
@server.mount('/reset_after', ResetAfter)
|
||
@server.start
|
||
n_try_max = 5
|
||
begin
|
||
... | ... | |
end
|
||
end
|
||
class BadOnce < WEBrick::HTTPServlet::AbstractServlet
|
||
@instance = nil
|
||
def self.get_instance(server, *options)
|
||
@instance ||= super
|
||
end
|
||
def self.reset
|
||
@instance = nil
|
||
end
|
||
def initialize(server)
|
||
@count = 1
|
||
end
|
||
def do_GET(req, res)
|
||
def res.status_line
|
||
"bogus"
|
||
end if @count == 1
|
||
res.body = "Was bad. Now #{@count}"
|
||
@count += 1
|
||
end
|
||
alias do_POST do_GET
|
||
end
|
||
class Counter < WEBrick::HTTPServlet::AbstractServlet
|
||
@instance = nil
|
||
def self.get_instance(server, *options)
|
||
@instance ||= super
|
||
end
|
||
def self.reset
|
||
@instance = nil
|
||
end
|
||
def initialize(server)
|
||
@count = 1
|
||
end
|
||
def do_GET(req, res)
|
||
res['Content-Type'] = 'text/plain'
|
||
res.body = "Worked #{@count}!"
|
||
@count += 1
|
||
end
|
||
alias do_POST do_GET
|
||
end
|
||
class Closer < WEBrick::HTTPServlet::AbstractServlet
|
||
def do_GET(req, res)
|
||
res['Content-Type'] = 'text/plain'
|
||
res.body = "closing this connection"
|
||
res.close
|
||
end
|
||
alias do_POST do_GET
|
||
end
|
||
class ResetAfter < WEBrick::HTTPServlet::AbstractServlet
|
||
def do_GET(req, res)
|
||
def res._write_data(socket, data)
|
||
socket.close_read
|
||
socket << data
|
||
end
|
||
res.body = "Reading has been shut down"
|
||
end
|
||
alias do_POST do_GET
|
||
end
|
||
class NullWriter
|
||
def <<(s) end
|
||
def puts(*args) end
|
test/net/http/test_pipeline.rb (revision 0) | ||
---|---|---|
require 'test/unit'
|
||
require 'net/http'
|
||
require 'stringio'
|
||
require 'uri'
|
||
require_relative 'utils'
|
||
class TestNetHttpPipeline < Test::Unit::TestCase
|
||
CONFIG = {
|
||
'host' => '127.0.0.1',
|
||
'port' => 10083,
|
||
'proxy_host' => nil,
|
||
'proxy_port' => nil,
|
||
'chunked' => true,
|
||
}
|
||
include TestNetHTTPUtils
|
||
def setup
|
||
super
|
||
@get1 = Net::HTTP::Get.new '/count'
|
||
@get2 = Net::HTTP::Get.new '/count'
|
||
@get3 = Net::HTTP::Get.new '/count'
|
||
@post = Net::HTTP::Post.new '/count'
|
||
end
|
||
def test_pipeline
|
||
requests = [@get1, @get2]
|
||
responses = start do |http|
|
||
http.pipeline requests
|
||
end
|
||
assert_equal 'Worked 1!', responses.first.body
|
||
assert_equal 'Worked 2!', responses.last.body
|
||
refute_empty requests
|
||
end
|
||
def test_pipeline_block
|
||
requests = [@get1, @get2]
|
||
responses = []
|
||
start do |http|
|
||
http.pipeline requests do |req, res| responses << [req, res] end
|
||
end
|
||
refute_empty requests
|
||
assert_equal requests, responses.map { |req,| req }
|
||
responses = responses.map { |_, res| res }
|
||
assert_equal 'Worked 1!', responses.first.body
|
||
assert_equal 'Worked 2!', responses.last.body
|
||
end
|
||
def test_pipeline_non_idempotent
|
||
responses = start do |http|
|
||
http.pipelining = true
|
||
http.pipeline [@get1, @get2, @post, @get3]
|
||
end
|
||
assert_equal 'Worked 1!', responses.shift.body
|
||
assert_equal 'Worked 2!', responses.shift.body
|
||
assert_equal 'Worked 3!', responses.shift.body
|
||
assert_equal 'Worked 4!', responses.shift.body
|
||
assert responses.empty?
|
||
end
|
||
def test_pipeline_not_started
|
||
@started = false
|
||
e = assert_raises Net::HTTP::PipelineError do
|
||
http = new
|
||
http.pipeline []
|
||
end
|
||
assert_equal 'Net::HTTP not started', e.message
|
||
end
|
||
def test_pipeline_retry
|
||
requests = [@get1, @get2, @get3]
|
||
@net_http_io = ErrorAfterOneOnce
|
||
responses = start do |http|
|
||
http.pipelining = true
|
||
http.pipeline requests
|
||
end
|
||
assert_equal 'Worked 1!', responses.shift.body
|
||
assert_equal 'Worked 3!', responses.shift.body # response 2 was lost
|
||
assert_equal 'Worked 4!', responses.shift.body
|
||
assert_empty responses
|
||
refute_empty requests
|
||
end
|
||
def test_pipeline_retry_fail_post
|
||
@net_http_io = ErrorAlways
|
||
requests = [@post]
|
||
e = assert_raises Net::HTTP::PipelineResponseError do
|
||
start do |http|
|
||
http.pipelining = true
|
||
http.pipeline requests
|
||
end
|
||
end
|
||
assert_empty e.responses
|
||
assert_equal [@post], e.requests
|
||
end
|
||
def test_pipeline_retry_fail_different
|
||
@net_http_io = ErrorEven
|
||
requests = [@get1, @get2, @get3]
|
||
responses = start do |http|
|
||
http.pipelining = true
|
||
http.pipeline requests
|
||
end
|
||
assert_equal 'Worked 1!', responses.shift.body
|
||
assert_equal 'Worked 3!', responses.shift.body
|
||
assert_equal 'Worked 5!', responses.shift.body
|
||
assert_empty responses
|
||
refute_empty requests
|
||
end
|
||
def test_pipeline_retry_fail_same
|
||
@net_http_io = ErrorAfterOne
|
||
requests = [@get1, @get2, @get3]
|
||
e = assert_raises Net::HTTP::PipelineResponseError do
|
||
start do |http|
|
||
http.pipelining = true
|
||
http.pipeline requests
|
||
end
|
||
end
|
||
responses = e.responses
|
||
assert_equal 'Worked 1!', responses.shift.body
|
||
assert_empty responses
|
||
assert_equal [@get2, @get3], e.requests
|
||
end
|
||
# end #pipeline tests
|
||
def test_pipeline_check
|
||
requests = [@get1, @get2]
|
||
responses = []
|
||
start do |http|
|
||
http.send :pipeline_check, requests, responses
|
||
assert http.pipelining
|
||
end
|
||
assert_equal [@get2], requests
|
||
assert_equal 1, responses.length
|
||
assert_equal 'Worked 1!', responses.first.body
|
||
end
|
||
def test_pipeline_check_again
|
||
start do |http|
|
||
http.pipelining = false
|
||
e = assert_raises Net::HTTP::PipelineUnsupportedError do
|
||
http.send :pipeline_check, [@get1, @get2], []
|
||
end
|
||
assert_equal [@get1, @get2], e.requests
|
||
assert_empty e.responses
|
||
refute http.pipelining
|
||
end
|
||
end
|
||
def test_pipeline_check_bad_response
|
||
bad_once = Net::HTTP::Get.new '/bad_once'
|
||
start do |http|
|
||
requests = [bad_once, @get2]
|
||
responses = []
|
||
http.send :pipeline_check, requests, responses
|
||
assert_equal [@get2], requests
|
||
assert_equal 1, responses.length
|
||
assert_equal 'Was bad. Now 2', responses.first.body
|
||
assert http.pipelining
|
||
end
|
||
end
|
||
def test_pipeline_check_non_persistent
|
||
start do |http|
|
||
get1 = Net::HTTP::Get.new '/close'
|
||
e = assert_raises Net::HTTP::PipelinePersistenceError do
|
||
http.send :pipeline_check, [get1, @get2], []
|
||
end
|
||
refute http.pipelining
|
||
assert_equal [@get2], e.requests
|
||
assert_equal 1, e.responses.length
|
||
end
|
||
end
|
||
def test_pipeline_check_pipelining
|
||
start do |http|
|
||
http.pipelining = true
|
||
requests = [@get1, @get2]
|
||
responses = []
|
||
http.send :pipeline_check, requests, responses
|
||
assert_equal [@get1, @get2], requests
|
||
assert_empty responses
|
||
assert http.pipelining
|
||
end
|
||
end
|
||
def test_pipeline_receive
|
||
responses = []
|
||
r = start do |http|
|
||
in_flight = http.send :pipeline_send, [@get1, @get2]
|
||
assert_equal 2, in_flight.length
|
||
http.send :pipeline_receive, in_flight, responses
|
||
end
|
||
assert_equal 'Worked 1!', responses.first.body
|
||
assert_equal 'Worked 2!', responses.last.body
|
||
assert_same r, responses
|
||
end
|
||
def test_pipeline_receive_bad_response
|
||
bad_once = Net::HTTP::Get.new '/reset_after'
|
||
responses = []
|
||
start do |http|
|
||
in_flight = http.send :pipeline_send, [bad_once, @get2]
|
||
e = assert_raises Net::HTTP::PipelineResponseError do
|
||
http.send :pipeline_receive, in_flight, responses
|
||
end
|
||
assert_equal [@get2], e.requests
|
||
assert_equal 1, e.responses.length
|
||
assert_equal 'Reading has been shut down', e.responses.first.body
|
||
assert_kind_of EOFError, e.original
|
||
end
|
||
end
|
||
def test_pipeline_send
|
||
requests = [@get1, @get2, @post, @get3]
|
||
in_flight = start do |http|
|
||
http.send :pipeline_send, requests
|
||
end
|
||
assert_equal [@get1, @get2], in_flight
|
||
assert_equal [@post, @get3], requests
|
||
end
|
||
def test_pipeline_send_non_idempotent
|
||
requests = [@post, @get3]
|
||
in_flight = start do |http|
|
||
http.send :pipeline_send, requests
|
||
end
|
||
assert_equal [@post], in_flight
|
||
assert_equal [@get3], requests
|
||
end
|
||
end
|
||
class TestNetHttpPipeline_1_0 < Test::Unit::TestCase
|
||
CONFIG = {
|
||
'host' => '127.0.0.1',
|
||
'port' => 10084,
|
||
'proxy_host' => nil,
|
||
'proxy_port' => nil,
|
||
'chunked' => false,
|
||
'http_version' => '1.0'
|
||
}
|
||
include TestNetHTTPUtils
|
||
def setup
|
||
super
|
||
@get1 = Net::HTTP::Get.new '/count'
|
||
@get2 = Net::HTTP::Get.new '/count'
|
||
end
|
||
def test_pipeline_1_0
|
||
start do |http|
|
||
http.instance_variable_set :@curr_http_version, '1.0'
|
||
e = assert_raises Net::HTTP::PipelineUnsupportedError do
|
||
http.pipeline [@get1, @get2]
|
||
end
|
||
assert_equal [@get1, @get2], e.requests
|
||
assert_empty e.responses
|
||
end
|
||
end
|
||
def test_pipeline_check_1_0
|
||
start do |http|
|
||
e = assert_raises Net::HTTP::PipelineUnsupportedError do
|
||
http.send :pipeline_check, [@get1, @get2], []
|
||
end
|
||
assert_match %r%server is HTTP/1\.0%, e.message
|
||
assert_equal [@get2], e.requests
|
||
assert_equal 1, e.responses.length
|
||
assert_equal 'Worked 1!', e.responses.first.body
|
||
refute http.pipelining
|
||
end
|
||
end
|
||
end
|
||