aboutsummaryrefslogtreecommitdiffstats
path: root/lib/plum
diff options
context:
space:
mode:
Diffstat (limited to 'lib/plum')
-rw-r--r--lib/plum/client.rb67
-rw-r--r--lib/plum/client/client_session.rb20
-rw-r--r--lib/plum/client/legacy_client_session.rb19
-rw-r--r--lib/plum/client/response.rb47
4 files changed, 65 insertions, 88 deletions
diff --git a/lib/plum/client.rb b/lib/plum/client.rb
index 959b445..d0e6e56 100644
--- a/lib/plum/client.rb
+++ b/lib/plum/client.rb
@@ -3,8 +3,7 @@
module Plum
class Client
DEFAULT_CONFIG = {
- http2: true,
- scheme: "https",
+ https: true,
hostname: nil,
verify_mode: OpenSSL::SSL::VERIFY_PEER,
ssl_context: nil,
@@ -33,9 +32,9 @@ module Plum
else
@socket = nil
@host = host
- @port = port || (config[:scheme] == "https" ? 443 : 80)
+ @port = port || (config[:https] ? 443 : 80)
end
- @config = DEFAULT_CONFIG.merge(hostname: host).merge(config)
+ @config = DEFAULT_CONFIG.merge(hostname: host).merge!(config)
@started = false
end
@@ -56,16 +55,9 @@ module Plum
self
end
- # Resume communication with the server, until the specified (or all running) requests are complete.
- # @param response [Response] if specified, waits only for the response
- # @return [Response] if parameter response is specified
- def resume(response = nil)
- if response
- @session.succ until response.failed? || response.finished?
- response
- else
- @session.succ until @session.empty?
- end
+ # Resume communication with the server until all running requests are complete.
+ def resume
+ @session.succ until @session.empty?
end
# Closes the connection immediately.
@@ -85,14 +77,6 @@ module Plum
@session.request(headers, body, @config.merge(options), &block)
end
- # @!method get!
- # @!method head!
- # @!method delete!
- # @param path [String] the absolute path to request (translated into :path header)
- # @param options [Hash<Symbol, Object>] the request options
- # @param block [Proc] if specified, calls the block when finished
- # Shorthand method for `Client#resume(Client#request(*args))`
-
# @!method get
# @!method head
# @!method delete
@@ -101,20 +85,10 @@ module Plum
# @param block [Proc] if specified, calls the block when finished
# Shorthand method for `#request`
%w(GET HEAD DELETE).each { |method|
- define_method(:"#{method.downcase}!") do |path, options = {}, &block|
- resume _request_helper(method, path, nil, options, &block)
- end
define_method(:"#{method.downcase}") do |path, options = {}, &block|
_request_helper(method, path, nil, options, &block)
end
}
- # @!method post!
- # @!method put!
- # @param path [String] the absolute path to request (translated into :path header)
- # @param body [String] the request body
- # @param options [Hash<Symbol, Object>] the request options
- # @param block [Proc] if specified, calls the block when finished
- # Shorthand method for `Client#resume(Client#request(*args))`
# @!method post
# @!method put
@@ -124,9 +98,6 @@ module Plum
# @param block [Proc] if specified, calls the block when finished
# Shorthand method for `#request`
%w(POST PUT).each { |method|
- define_method(:"#{method.downcase}!") do |path, body, options = {}, &block|
- resume _request_helper(method, path, body, options, &block)
- end
define_method(:"#{method.downcase}") do |path, body, options = {}, &block|
_request_helper(method, path, body, options, &block)
end
@@ -137,13 +108,15 @@ module Plum
def _connect
@socket = TCPSocket.open(@host, @port)
- if @config[:scheme] == "https"
+ if @config[:https]
ctx = @config[:ssl_context] || new_ssl_ctx
@socket = OpenSSL::SSL::SSLSocket.new(@socket, ctx)
- @socket.hostname = @config[:hostname] if @socket.respond_to?(:hostname=)
+ @socket.hostname = @config[:hostname]
@socket.sync_close = true
@socket.connect
- @socket.post_connection_check(@config[:hostname]) if ctx.verify_mode != OpenSSL::SSL::VERIFY_NONE
+ if ctx.verify_mode != OpenSSL::SSL::VERIFY_NONE
+ @socket.post_connection_check(@config[:hostname])
+ end
@socket.alpn_protocol == "h2"
end
@@ -151,18 +124,12 @@ module Plum
def _start
@started = true
-
- klass = @config[:http2] ? ClientSession : LegacyClientSession
nego = @socket || _connect
- if @config[:http2]
- if @config[:scheme] == "https"
- klass = nego ? ClientSession : LegacyClientSession
- else
- klass = UpgradeClientSession
- end
+ if @config[:https]
+ klass = nego ? ClientSession : LegacyClientSession
else
- klass = LegacyClientSession
+ klass = UpgradeClientSession
end
@session = klass.new(@socket, @config)
@@ -175,10 +142,8 @@ module Plum
cert_store = OpenSSL::X509::Store.new
cert_store.set_default_paths
ctx.cert_store = cert_store
- if @config[:http2]
- ctx.ciphers = "ALL:!" + SSLSocketServerConnection::CIPHER_BLACKLIST.join(":!")
- ctx.alpn_protocols = ["h2", "http/1.1"]
- end
+ ctx.ciphers = "ALL:!" + SSLSocketServerConnection::CIPHER_BLACKLIST.join(":!")
+ ctx.alpn_protocols = ["h2", "http/1.1"]
ctx
end
diff --git a/lib/plum/client/client_session.rb b/lib/plum/client/client_session.rb
index 10276d4..81d2341 100644
--- a/lib/plum/client/client_session.rb
+++ b/lib/plum/client/client_session.rb
@@ -31,7 +31,7 @@ module Plum
def close
@closed = true
- @responses.each(&:_fail)
+ @responses.each { |res| res.send(:fail) }
@responses.clear
@plum.close
end
@@ -40,34 +40,34 @@ module Plum
headers = { ":method" => nil,
":path" => nil,
":authority" => @config[:hostname],
- ":scheme" => @config[:scheme]
+ ":scheme" => @config[:https] ? "https" : "http",
}.merge(headers)
- response = Response.new(**options)
+ response = Response.new(self, **options, &headers_cb)
@responses << response
stream = @plum.open_stream
stream.send_headers(headers, end_stream: !body)
stream.send_data(body, end_stream: true) if body
stream.on(:headers) { |resp_headers_raw|
- response._headers(resp_headers_raw)
- headers_cb.call(response) if headers_cb
+ response.send(:set_headers, resp_headers_raw.to_h)
}
stream.on(:data) { |chunk|
- response._chunk(chunk)
+ response.send(:add_chunk, chunk)
check_window(stream)
}
stream.on(:end_stream) {
- response._finish
+ response.send(:finish)
@responses.delete(response)
}
stream.on(:stream_error) { |ex|
- response._fail
+ response.send(:fail, ex)
raise ex
}
stream.on(:local_stream_error) { |type|
- response.fail
- raise LocalStreamError.new(type)
+ ex = LocalStreamError.new(type)
+ response.send(:fail, ex)
+ raise ex
}
response
end
diff --git a/lib/plum/client/legacy_client_session.rb b/lib/plum/client/legacy_client_session.rb
index e9ff20b..a12f582 100644
--- a/lib/plum/client/legacy_client_session.rb
+++ b/lib/plum/client/legacy_client_session.rb
@@ -12,7 +12,6 @@ module Plum
@parser = setup_parser
@requests = []
@response = nil
- @headers_callback = nil
end
def succ
@@ -27,7 +26,7 @@ module Plum
def close
@closed = true
- @response._fail if @response
+ @response.send(:fail) if @response
end
def request(headers, body, options, &headers_cb)
@@ -41,8 +40,8 @@ module Plum
end
end
- response = Response.new(**options)
- @requests << [response, headers, body, chunked, headers_cb]
+ response = Response.new(self, **options, &headers_cb)
+ @requests << [response, headers, body, chunked]
consume_queue
response
end
@@ -56,9 +55,8 @@ module Plum
def consume_queue
return if @response || @requests.empty?
- response, headers, body, chunked, cb = @requests.shift
+ response, headers, body, chunked = @requests.shift
@response = response
- @headers_callback = cb
@socket << construct_request(headers)
@@ -96,19 +94,18 @@ module Plum
def setup_parser
parser = HTTP::Parser.new
parser.on_headers_complete = proc {
+ # FIXME: duplicate header name?
resp_headers = parser.headers.map { |key, value| [key.downcase, value] }.to_h
- @response._headers({ ":status" => parser.status_code.to_s }.merge(resp_headers))
- @headers_callback.call(@response) if @headers_callback
+ @response.send(:set_headers, { ":status" => parser.status_code.to_s }.merge(resp_headers))
}
parser.on_body = proc { |chunk|
- @response._chunk(chunk)
+ @response.send(:add_chunk, chunk)
}
parser.on_message_complete = proc { |env|
- @response._finish
+ @response.send(:finish)
@response = nil
- @headers_callback = nil
close unless parser.keep_alive?
consume_queue
}
diff --git a/lib/plum/client/response.rb b/lib/plum/client/response.rb
index ce263f7..e937fa6 100644
--- a/lib/plum/client/response.rb
+++ b/lib/plum/client/response.rb
@@ -7,19 +7,21 @@ module Plum
attr_reader :headers
# @api private
- def initialize(auto_decode: true, **options)
- @body = Queue.new
+ def initialize(session, auto_decode: true, **options, &on_headers)
+ @session = session
+ @headers = nil
@finished = false
@failed = false
@body = []
@auto_decode = auto_decode
+ @on_headers = on_headers
@on_chunk = @on_finish = nil
end
# Returns the HTTP status code.
# @return [String] the HTTP status code
def status
- @headers && @headers[":status"]
+ @headers&.fetch(":status")
end
# Returns the header value that correspond to the header name.
@@ -41,7 +43,16 @@ module Plum
@failed
end
- # Set callback tha called when received a chunk of response body.
+ # Set callback that will be called when the response headers arrive
+ # @yield [self]
+ def on_headers(&block)
+ raise ArgumentError, "block must be given" unless block_given?
+ @on_headers = block
+ yield self if @headers
+ self
+ end
+
+ # Set callback that will be called when received a chunk of response body.
# @yield [chunk] A chunk of the response body.
def on_chunk(&block)
raise "Body already read" if @on_chunk
@@ -51,6 +62,7 @@ module Plum
@body.each(&block)
@body.clear
end
+ self
end
# Set callback that will be called when the response finished.
@@ -61,6 +73,7 @@ module Plum
else
@on_finish = block
end
+ self
end
# Returns the complete response body. Use #each_body instead if the body can be very large.
@@ -71,15 +84,20 @@ module Plum
@body.join
end
- # @api private
- def _headers(raw_headers)
- # response headers should not have duplicates
- @headers = raw_headers.to_h.freeze
+ def join
+ @session.succ until (@finished || @failed)
+ self
+ end
+
+ private
+ # internal: set headers and setup decoder
+ def set_headers(headers)
+ @headers = headers.freeze
+ @on_headers.call(self) if @on_headers
@decoder = setup_decoder
end
- # @api private
- def _chunk(encoded)
+ def add_chunk(encoded)
chunk = @decoder.decode(encoded)
if @on_chunk
@on_chunk.call(chunk)
@@ -88,19 +106,16 @@ module Plum
end
end
- # @api private
- def _finish
+ def finish
@finished = true
@decoder.finish
@on_finish.call if @on_finish
end
- # @api private
- def _fail
- @failed = true
+ def fail(ex = nil)
+ @failed = ex || true # FIXME
end
- private
def setup_decoder
if @auto_decode
klass = Decoders::DECODERS[@headers["content-encoding"]]