Project

General

Profile

Feature #5461 ยป net.http.rb.pipeline.patch

drbrain (Eric Hodel), 10/19/2011 07:37 AM

View differences:

lib/net/http.rb (working copy)
# See Net::HTTP::Proxy for further details and examples such as proxies that
# require a username and password.
#
# === Pipelining
#
# On HTTP/1.1 servers requests can be pipelined which can reduce response
# time for a series of requests. When pipelining Net::HTTP sends requests
# in the list without waiting for a response then retrieves the responses
# from the server after all requests are issued. The server will return the
# responses in the same order they were issued.
#
# Example:
#
# requests = []
# requests << Net::HTTP::Get.new('/images/bug.png')
# requests << Net::HTTP::Get.new('/images/date.png')
# requests << Net::HTTP::Get.new('/images/find.png')
#
# http = Net::HTTP.start 'localhost' do
# http.pipeline requests do |req, res|
# # ...
# end
# end
#
# Before attempting to pipeline a sequence of requests, Net::HTTP will
# consume one request to check if the server is pipelining-capable. To avoid
# the check set #pipelining to true:
#
# http = Net::HTTP.start 'localhost' do
# http.pipelining = true
# responses = http.pipeline requests
# # ...
#
# For more details see Net::HTTP#pipeline
#
# == HTTP Request Classes
#
# Here is the HTTP request class hierarchy.
......
end
# :startdoc:
class Error < StandardError; end
class PipelineError < Error
##
# Remaining requests that have not been sent to the HTTP server
attr_reader :requests
##
# Retrieved responses up to the error point
attr_reader :responses
##
# Creates a new Error with +message+, a list of +requests+ that have not
# been sent to the server and a list of +responses+ that have been
# retrieved from the server.
def initialize message, requests, responses
super message
@requests = requests
@responses = responses
end
end
# Raised when the server appears to not support persistent connections
# which are required for pipelining requests.
class PipelinePersistenceError < PipelineError
# Creates a new PipelinePersistenceError with a list of +requests+ that
# have not been sent to the server and a list of +responses+ that have
# been retrieved from the server.
def initialize requests, responses
super 'persistent connections required', requests, responses
end
end
# Raised when the server appears to not support pipelining requests
class PipelineUnsupportedError < PipelineError
# Creates a new PipelineUnsupportedError with a list of +requests+ that
# have not been sent to the server and a list of +responses+ that have
# been retrieved from the server.
def initialize reason = nil, requests, responses
message = 'pipeline connections are not supported'
message << " (#{reason})" if reason
super message, requests, responses
end
end
# Raised if an error occurs while reading responses.
class PipelineResponseError < PipelineError
# The original exception
attr_accessor :original
# Creates a new PipelineResponseError with an original +exception+, a
# list of +requests+ that were in-flight and a list of +responses+ that
# have been retrieved from the server.
def initialize exception, requests, responses
@original = exception
message = "error reading responses: #{original} (#{original.class})"
super message, requests, responses
end
end
# Turns on net/http 1.2 (ruby 1.8) features.
# Defaults to ON in ruby 1.8 or later.
def HTTP.version_1_2
......
@address = address
@port = (port || HTTP.default_port)
@curr_http_version = HTTPVersion
@pipelining = nil # nil means unknown
@no_keepalive_server = false
@close_on_empty_response = false
@socket = nil
......
@enable_post_connection_check = true
@compression = nil
@sspi_enabled = false
@socket_class = BufferedIO
if defined?(SSL_ATTRIBUTES)
SSL_ATTRIBUTES.each do |name|
instance_variable_set "@#{name}", nil
......
attr_accessor :close_on_empty_response
# Set to true if this server supports pipelining, false if it does not.
# If unset, #pipeline will figure it out
attr_accessor :pipelining
# Returns true if SSL/TLS is being used with HTTP.
def use_ssl?
@use_ssl
......
s = OpenSSL::SSL::SSLSocket.new(s, @ssl_context)
s.sync_close = true
end
@socket = BufferedIO.new(s)
@socket = @socket_class.new(s)
@socket.read_timeout = @read_timeout
@socket.continue_timeout = @continue_timeout
@socket.debug_output = @debug_output
......
end
private :do_finish
# Closes the connection and rescues any IOErrors this may cause
def reset_pipeline requests, responses
begin
finish
rescue IOError
end
start
rescue Errno::ECONNREFUSED
raise PipelineError.new("connection refused: #{address}:#{port}",
requests, responses)
rescue Errno::EHOSTDOWN
raise PipelineError.new("host down: #{address}:#{port}",
requests, responses)
end
private :reset_pipeline
#
# proxy
#
......
res
end
# Pipelines +requests+ to the HTTP server yielding responses if a block is
# given. Returns all responses received.
#
# Only idempotent sequences of requests will be pipelined. If a
# non-idempotent request (like a POST) is included in a request sequence
# #pipeline will wait for a response before proceeding with further
# sequences of requests.
#
# The Net::HTTP connection must be started (#start must be called) before
# calling #pipeline.
#
# Raises a subclass of the PipelineError exception if the connection is
# not pipeline-capable, if the HTTP session has not been started, or their
# was a problem sending or receiving the +requests+. The remaining
# outstanding requests and returned responses can be retrieved from the
# PipelineError exception.
#
# Example:
#
# requests = []
# requests << Net::HTTP::Get.new('/images/bug.png')
# requests << Net::HTTP::Get.new('/images/date.png')
# requests << Net::HTTP::Get.new('/images/find.png')
#
# http = Net::HTTP.new 'localhost'
# http.start do
# http.pipeline requests do |req, res|
# open File.basename(req.path), 'wb' do |io|
# io.write res.body
# end
# end
# end
def pipeline requests, &block # :yields: response
requests = requests.dup
responses = []
raise PipelineError.new('Net::HTTP not started', requests, responses) unless
started?
pipeline_check requests, responses, &block
retried = responses.length
until requests.empty? do
begin
in_flight = pipeline_send requests
pipeline_receive in_flight, responses, &block
rescue PipelineResponseError => e
e.requests.reverse_each do |request|
requests.unshift request
end
raise if responses.length == retried or not requests.first.idempotent?
retried = responses.length
reset_pipeline requests, responses
retry
end
end
responses
end
private
# Executes a request which uses a representation
......
raise HTTPAuthenticationError.new('HTTP authentication failed', err)
end
# Ensures this connection supports pipelining.
#
# If the server has not been tested for pipelining support one of the
# +requests+ will be consumed and placed in +responses+.
#
# A PersistenceError will be raised if the server does not support
# persistent connections. (The server is HTTP/1.1, but closed the
# connection.)
#
# A PipelineUnsupportedError will be raised if the server does not support
# pipelining. (The server is not HTTP/1.1.)
def pipeline_check requests, responses
unless @pipelining.nil? then # tri-state
return if @pipelining
raise PipelineUnsupportedError.new(requests, responses) unless
@pipelining
else
@pipelining = false
end
if '1.1' > @curr_http_version then
@pipelining = false
reason = "server is HTTP/#{@curr_http_version}"
raise PipelineUnsupportedError.new(reason, requests, responses)
end
req = requests.shift
tried_once = false
begin
res = request(req)
rescue Timeout::Error, EOFError, Errno::ECONNABORTED, Errno::ECONNRESET,
Errno::EPIPE, Net::HTTPBadResponse => e
if tried_once then
requests.unshift req
raise PipelineResponseError.new(e, requests, responses)
end
tried_once = true
reset_pipeline requests, responses
retry
end
responses << res
yield req, res if block_given?
@pipelining = keep_alive? req, res
if '1.1' > @curr_http_version then
@pipelining = false
reason = "server is HTTP/#{@curr_http_version}"
raise PipelineUnsupportedError.new(reason, requests, responses)
elsif not @pipelining then
raise PipelinePersistenceError.new(requests, responses)
end
@close_on_empty_response = false
end
# Receives HTTP responses for the +in_flight+ requests and adds them to
# +responses+
def pipeline_receive in_flight, responses
while req = in_flight.shift do
begin
begin
res = Net::HTTPResponse.read_new @socket
end while res.kind_of? Net::HTTPContinue
res.reading_body @socket, req.response_body_permitted? do
responses << res
yield req, res if block_given?
end
end_transport req, res
rescue StandardError, Timeout::Error
in_flight.unshift req
raise
end
end
responses
rescue Timeout::Error, EOFError, Errno::ECONNABORTED, Errno::ECONNRESET,
Errno::EPIPE, Net::HTTPBadResponse => e
finish
raise PipelineResponseError.new(e, in_flight, responses)
end
# Sends +requests+ to the HTTP server and removes them from the +requests+
# list. Returns the requests that have been pipelined and are in-flight.
#
# If a non-idempotent request is first in +requests+ it will be sent and no
# further requests will be pipelined.
#
# If a non-idempotent request is encountered after an idempotent request it
# will not be sent.
#
# After the response for a non-idempotent request is received another
# series of pipelined requests will be issued via #pipeline.
def pipeline_send requests
in_flight = []
while req = requests.shift do
idempotent = req.idempotent?
unless idempotent or in_flight.empty? then
requests.unshift req
break
end
begin_transport req
req.exec @socket, @curr_http_version, edit_path(req.path)
in_flight << req
break unless idempotent
end
in_flight
end
#
# utils
#
......
"\#<#{self.class} #{@method}>"
end
##
# Is this request idempotent according to RFC 2616?
def idempotent?
case self
when Net::HTTP::Delete, Net::HTTP::Get, Net::HTTP::Head,
Net::HTTP::Options, Net::HTTP::Put, Net::HTTP::Trace then
true
end
end
def request_body_permitted?
@request_has_body
end
test/net/http/test_http.rb (working copy)
end
end
module TestNetHTTP_version_1_2_methods
def test_request
......
assert_not_match(/HTTP\/1.1 100 continue/, @debug.string)
end
end
class TestNetHttp < Test::Unit::TestCase
def test_idempotent_eh
assert Net::HTTP::Delete.new('/').idempotent?
assert Net::HTTP::Get.new('/').idempotent?
assert Net::HTTP::Head.new('/').idempotent?
assert Net::HTTP::Options.new('/').idempotent?
assert Net::HTTP::Put.new('/').idempotent?
assert Net::HTTP::Trace.new('/').idempotent?
refute Net::HTTP::Post.new('/').idempotent?
end
end
test/net/http/utils.rb (working copy)
end
require 'webrick/httpservlet/abstract'
class Net::HTTP
attr_accessor :socket_class
end
module TestNetHTTPUtils
##
# Records the data Net::BufferedIO sends and receives in write_io and read_io
class TestIO < Net::BufferedIO
attr_reader :read_io
attr_reader :write_io
def self.ios
@ios
end
def self.new(io)
io = super
@ios << io
io
end
def self.reset
@ios = []
end
reset
def initialize(io)
super
@read_io = StringIO.new
@write_io = StringIO.new
end
def rbuf_consume(len)
s = super len
@read_io << s
s
end
def write0(str)
@write_io << str
super
end
end
##
# Raises Errno::ECONNRESET for every request
class ErrorAlways < TestIO
def readline
raise Errno::ECONNRESET
end
end
##
# Raises Errno::ECONNRESET for every request after the first
class ErrorAfterOne < TestIO
def self.reset
@@count = 0
super
end
def readline
@@count += 1
raise Errno::ECONNRESET if @@count >= 2
super
end
end
##
# Raises Errno::ECONNRESET upon the second request
class ErrorAfterOneOnce < TestIO
def self.reset
@@count = 0
super
end
def readline
@@count += 1
raise Errno::ECONNRESET if @@count == 2
super
end
end
##
# Raises Errno::ECONNRESET upon every even request 2, 4, 6
class ErrorEven < TestIO
def self.reset
@@count = 0
super
end
def readline
@@count += 1
raise Errno::ECONNRESET if @@count % 2 == 0
super
end
end
def start(&block)
new().start(&block)
end
......
klass = Net::HTTP::Proxy(config('proxy_host'), config('proxy_port'))
http = klass.new(config('host'), config('port'))
http.set_debug_output logfile()
http.socket_class = @net_http_io
http
end
......
end
def setup
TestIO.reset
ErrorAlways.reset
ErrorAfterOne.reset
ErrorAfterOneOnce.reset
ErrorEven.reset
BadOnce.reset
Counter.reset
spawn_server
@net_http_io = TestIO
end
def teardown
......
:ServerType => Thread,
}
server_config[:OutputBufferSize] = 4 if config('chunked')
server_config[:HTTPVersion] = config('http_version') if
config('http_version')
if defined?(OpenSSL) and config('ssl_enable')
server_config.update({
:SSLEnable => true,
......
})
end
@server = WEBrick::HTTPServer.new(server_config)
@server.mount('/', Servlet, config('chunked'))
@server.mount('/', Servlet, config('chunked'))
@server.mount('/bad_once', BadOnce)
@server.mount('/close', Closer)
@server.mount('/count', Counter)
@server.mount('/reset_after', ResetAfter)
@server.start
n_try_max = 5
begin
......
end
end
class BadOnce < WEBrick::HTTPServlet::AbstractServlet
@instance = nil
def self.get_instance(server, *options)
@instance ||= super
end
def self.reset
@instance = nil
end
def initialize(server)
@count = 1
end
def do_GET(req, res)
def res.status_line
"bogus"
end if @count == 1
res.body = "Was bad. Now #{@count}"
@count += 1
end
alias do_POST do_GET
end
class Counter < WEBrick::HTTPServlet::AbstractServlet
@instance = nil
def self.get_instance(server, *options)
@instance ||= super
end
def self.reset
@instance = nil
end
def initialize(server)
@count = 1
end
def do_GET(req, res)
res['Content-Type'] = 'text/plain'
res.body = "Worked #{@count}!"
@count += 1
end
alias do_POST do_GET
end
class Closer < WEBrick::HTTPServlet::AbstractServlet
def do_GET(req, res)
res['Content-Type'] = 'text/plain'
res.body = "closing this connection"
res.close
end
alias do_POST do_GET
end
class ResetAfter < WEBrick::HTTPServlet::AbstractServlet
def do_GET(req, res)
def res._write_data(socket, data)
socket.close_read
socket << data
end
res.body = "Reading has been shut down"
end
alias do_POST do_GET
end
class NullWriter
def <<(s) end
def puts(*args) end
test/net/http/test_pipeline.rb (revision 0)
require 'test/unit'
require 'net/http'
require 'stringio'
require 'uri'
require_relative 'utils'
class TestNetHttpPipeline < Test::Unit::TestCase
CONFIG = {
'host' => '127.0.0.1',
'port' => 10083,
'proxy_host' => nil,
'proxy_port' => nil,
'chunked' => true,
}
include TestNetHTTPUtils
def setup
super
@get1 = Net::HTTP::Get.new '/count'
@get2 = Net::HTTP::Get.new '/count'
@get3 = Net::HTTP::Get.new '/count'
@post = Net::HTTP::Post.new '/count'
end
def test_pipeline
requests = [@get1, @get2]
responses = start do |http|
http.pipeline requests
end
assert_equal 'Worked 1!', responses.first.body
assert_equal 'Worked 2!', responses.last.body
refute_empty requests
end
def test_pipeline_block
requests = [@get1, @get2]
responses = []
start do |http|
http.pipeline requests do |req, res| responses << [req, res] end
end
refute_empty requests
assert_equal requests, responses.map { |req,| req }
responses = responses.map { |_, res| res }
assert_equal 'Worked 1!', responses.first.body
assert_equal 'Worked 2!', responses.last.body
end
def test_pipeline_non_idempotent
responses = start do |http|
http.pipelining = true
http.pipeline [@get1, @get2, @post, @get3]
end
assert_equal 'Worked 1!', responses.shift.body
assert_equal 'Worked 2!', responses.shift.body
assert_equal 'Worked 3!', responses.shift.body
assert_equal 'Worked 4!', responses.shift.body
assert responses.empty?
end
def test_pipeline_not_started
@started = false
e = assert_raises Net::HTTP::PipelineError do
http = new
http.pipeline []
end
assert_equal 'Net::HTTP not started', e.message
end
def test_pipeline_retry
requests = [@get1, @get2, @get3]
@net_http_io = ErrorAfterOneOnce
responses = start do |http|
http.pipelining = true
http.pipeline requests
end
assert_equal 'Worked 1!', responses.shift.body
assert_equal 'Worked 3!', responses.shift.body # response 2 was lost
assert_equal 'Worked 4!', responses.shift.body
assert_empty responses
refute_empty requests
end
def test_pipeline_retry_fail_post
@net_http_io = ErrorAlways
requests = [@post]
e = assert_raises Net::HTTP::PipelineResponseError do
start do |http|
http.pipelining = true
http.pipeline requests
end
end
assert_empty e.responses
assert_equal [@post], e.requests
end
def test_pipeline_retry_fail_different
@net_http_io = ErrorEven
requests = [@get1, @get2, @get3]
responses = start do |http|
http.pipelining = true
http.pipeline requests
end
assert_equal 'Worked 1!', responses.shift.body
assert_equal 'Worked 3!', responses.shift.body
assert_equal 'Worked 5!', responses.shift.body
assert_empty responses
refute_empty requests
end
def test_pipeline_retry_fail_same
@net_http_io = ErrorAfterOne
requests = [@get1, @get2, @get3]
e = assert_raises Net::HTTP::PipelineResponseError do
start do |http|
http.pipelining = true
http.pipeline requests
end
end
responses = e.responses
assert_equal 'Worked 1!', responses.shift.body
assert_empty responses
assert_equal [@get2, @get3], e.requests
end
# end #pipeline tests
def test_pipeline_check
requests = [@get1, @get2]
responses = []
start do |http|
http.send :pipeline_check, requests, responses
assert http.pipelining
end
assert_equal [@get2], requests
assert_equal 1, responses.length
assert_equal 'Worked 1!', responses.first.body
end
def test_pipeline_check_again
start do |http|
http.pipelining = false
e = assert_raises Net::HTTP::PipelineUnsupportedError do
http.send :pipeline_check, [@get1, @get2], []
end
assert_equal [@get1, @get2], e.requests
assert_empty e.responses
refute http.pipelining
end
end
def test_pipeline_check_bad_response
bad_once = Net::HTTP::Get.new '/bad_once'
start do |http|
requests = [bad_once, @get2]
responses = []
http.send :pipeline_check, requests, responses
assert_equal [@get2], requests
assert_equal 1, responses.length
assert_equal 'Was bad. Now 2', responses.first.body
assert http.pipelining
end
end
def test_pipeline_check_non_persistent
start do |http|
get1 = Net::HTTP::Get.new '/close'
e = assert_raises Net::HTTP::PipelinePersistenceError do
http.send :pipeline_check, [get1, @get2], []
end
refute http.pipelining
assert_equal [@get2], e.requests
assert_equal 1, e.responses.length
end
end
def test_pipeline_check_pipelining
start do |http|
http.pipelining = true
requests = [@get1, @get2]
responses = []
http.send :pipeline_check, requests, responses
assert_equal [@get1, @get2], requests
assert_empty responses
assert http.pipelining
end
end
def test_pipeline_receive
responses = []
r = start do |http|
in_flight = http.send :pipeline_send, [@get1, @get2]
assert_equal 2, in_flight.length
http.send :pipeline_receive, in_flight, responses
end
assert_equal 'Worked 1!', responses.first.body
assert_equal 'Worked 2!', responses.last.body
assert_same r, responses
end
def test_pipeline_receive_bad_response
bad_once = Net::HTTP::Get.new '/reset_after'
responses = []
start do |http|
in_flight = http.send :pipeline_send, [bad_once, @get2]
e = assert_raises Net::HTTP::PipelineResponseError do
http.send :pipeline_receive, in_flight, responses
end
assert_equal [@get2], e.requests
assert_equal 1, e.responses.length
assert_equal 'Reading has been shut down', e.responses.first.body
assert_kind_of EOFError, e.original
end
end
def test_pipeline_send
requests = [@get1, @get2, @post, @get3]
in_flight = start do |http|
http.send :pipeline_send, requests
end
assert_equal [@get1, @get2], in_flight
assert_equal [@post, @get3], requests
end
def test_pipeline_send_non_idempotent
requests = [@post, @get3]
in_flight = start do |http|
http.send :pipeline_send, requests
end
assert_equal [@post], in_flight
assert_equal [@get3], requests
end
end
class TestNetHttpPipeline_1_0 < Test::Unit::TestCase
CONFIG = {
'host' => '127.0.0.1',
'port' => 10084,
'proxy_host' => nil,
'proxy_port' => nil,
'chunked' => false,
'http_version' => '1.0'
}
include TestNetHTTPUtils
def setup
super
@get1 = Net::HTTP::Get.new '/count'
@get2 = Net::HTTP::Get.new '/count'
end
def test_pipeline_1_0
start do |http|
http.instance_variable_set :@curr_http_version, '1.0'
e = assert_raises Net::HTTP::PipelineUnsupportedError do
http.pipeline [@get1, @get2]
end
assert_equal [@get1, @get2], e.requests
assert_empty e.responses
end
end
def test_pipeline_check_1_0
start do |http|
e = assert_raises Net::HTTP::PipelineUnsupportedError do
http.send :pipeline_check, [@get1, @get2], []
end
assert_match %r%server is HTTP/1\.0%, e.message
assert_equal [@get2], e.requests
assert_equal 1, e.responses.length
assert_equal 'Worked 1!', e.responses.first.body
refute http.pipelining
end
end
end
    (1-1/1)