aboutsummaryrefslogtreecommitdiffstats
path: root/lib
diff options
context:
space:
mode:
authorKazuki Yamaguchi <k@rhe.jp>2015-11-03 16:27:14 +0900
committerKazuki Yamaguchi <k@rhe.jp>2015-11-03 16:27:14 +0900
commitc3dacc27e2efaa9ded721e71f35824fcb4e6dfc3 (patch)
tree0dfc25ff8abc3c095be039437653a360fba10e86 /lib
parentedec99d21afea23fc2592c71c3f3a717d3af16d0 (diff)
downloadplum-c3dacc27e2efaa9ded721e71f35824fcb4e6dfc3.tar.gz
add Plum::Client
Diffstat (limited to 'lib')
-rw-r--r--lib/plum.rb3
-rw-r--r--lib/plum/client.rb161
-rw-r--r--lib/plum/client/connection.rb19
-rw-r--r--lib/plum/client/response.rb48
4 files changed, 231 insertions, 0 deletions
diff --git a/lib/plum.rb b/lib/plum.rb
index 5248073..bcd72d2 100644
--- a/lib/plum.rb
+++ b/lib/plum.rb
@@ -22,3 +22,6 @@ require "plum/stream"
require "plum/server/connection"
require "plum/server/https_connection"
require "plum/server/http_connection"
+require "plum/client"
+require "plum/client/response"
+require "plum/client/connection"
diff --git a/lib/plum/client.rb b/lib/plum/client.rb
new file mode 100644
index 0000000..853b049
--- /dev/null
+++ b/lib/plum/client.rb
@@ -0,0 +1,161 @@
+# -*- frozen-string-literal: true -*-
+module Plum
+ class Client
+ DEFAULT_CONFIG = {
+ https: true
+ }.freeze
+
+ attr_reader :host, :port, :config
+ attr_reader :socket
+
+ def self.start(host, port, config = {}, &block)
+ client = self.new(host, port, config)
+ client.start(&block)
+ end
+
+ def initialize(host, port, config = {})
+ @host = host
+ @port = port
+ @config = DEFAULT_CONFIG.merge(config)
+ @response_handlers = {}
+ @responses = {}
+ end
+
+ def start
+ raise IOError, "Session already started" if @started
+ _start
+ if block_given?
+ begin
+ ret = yield(self)
+ wait
+ return ret
+ ensure
+ close
+ end
+ end
+ self
+ end
+
+ def wait
+ while !@responses.empty?
+ _succ
+ end
+ end
+
+ def close
+ begin
+ @plum.close if @plum
+ ensure
+ @socket.close if @socket
+ end
+ end
+
+ def request_async(headers, body = nil, &block)
+ stream = @plum.open_stream
+ response = Response.new
+ @responses[stream] = response
+
+ if body
+ stream.send_headers(headers, end_stream: false)
+ stream.send_data(body, end_stream: true)
+ else
+ stream.send_headers(headers, end_stream: true)
+ end
+
+ if block_given?
+ @response_handlers[stream] = block
+ end
+
+ response
+ end
+
+ def request(headers, body = nil)
+ response = request_async(headers, body)
+ _succ while !response.finished?
+ response
+ end
+
+ %w(GET POST HEAD PUT DELETE).each { |method|
+ define_method(method.downcase.to_sym) do |headers = {}|
+ request(headers)
+ end
+
+ define_method(:"#{method.downcase}_async") do |headers = {}, &block|
+ request_async(headers, &block)
+ end
+ }
+
+ def https?
+ !!@config[:https]
+ end
+
+ private
+ def _start
+ @started = true
+ sock = TCPSocket.open(host, port)
+ if https?
+ ctx = @config[:ssl_context] || new_ssl_ctx
+ sock = OpenSSL::SSL::SSLSocket.new(sock, ctx)
+ sock.sync_close = true
+ sock.connect
+ if ctx.verify_mode != OpenSSL::SSL::VERIFY_NONE
+ sock.post_connection_check(@config[:hostname] || @host)
+ end
+ end
+
+ @socket = sock
+ @plum = setup_plum(sock)
+ end
+
+ def setup_plum(sock)
+ local_settings = {
+ enable_push: 0,
+ initial_window_size: (1 << 30) - 1,
+ }
+ plum = ClientConnection.new(sock.method(:write), local_settings)
+ plum.on(:protocol_error) { |ex| raise ex }
+ plum.on(:stream_error) { |stream, ex| raise ex }
+ plum.on(:headers) { |stream, headers|
+ response = @responses[stream]
+ response._headers(headers)
+ }
+ plum.on(:data) { |stream, chunk|
+ response = @responses[stream]
+ response._chunk(chunk)
+ }
+ plum.on(:end_stream) { |stream|
+ response = @responses.delete(stream)
+ response._finish
+ if handler = @response_handlers.delete(stream)
+ handler.call(response)
+ end
+ }
+ plum
+ end
+
+ def _succ
+ @plum << @socket.readpartial(1024)
+ end
+
+ def new_ssl_ctx
+ ctx = OpenSSL::SSL::SSLContext.new
+ ctx.ssl_version = :TLSv1_2
+
+ if ctx.respond_to?(:hostname=)
+ ctx.hostname = @config[:hostname] || @host
+ end
+
+ if ctx.respond_to?(:alpn_protocols)
+ ctx.alpn_protocols = ["h2", "http/1.1"]
+ end
+
+ if ctx.respond_to?(:npn_select_cb)
+ ctx.alpn_select_cb = -> protocols {
+ protocols.include?("h2") ? "h2" : protocols.first
+ }
+ end
+
+ ctx
+ end
+ end
+end
diff --git a/lib/plum/client/connection.rb b/lib/plum/client/connection.rb
new file mode 100644
index 0000000..fc96a9a
--- /dev/null
+++ b/lib/plum/client/connection.rb
@@ -0,0 +1,19 @@
+# -*- frozen-string-literal: true -*-
+using Plum::BinaryString
+module Plum
+ class ClientConnection < Connection
+ def initialize(writer, local_settings = {})
+ super(writer, local_settings)
+
+ writer.call(CLIENT_CONNECTION_PREFACE)
+ settings(local_settings)
+ @state = :waiting_settings
+ end
+
+ def open_stream(**args)
+ next_id = @max_odd_stream_id > 0 ? @max_odd_stream_id + 2 : 1
+ stream = new_stream(next_id, state: :open, **args)
+ stream
+ end
+ end
+end
diff --git a/lib/plum/client/response.rb b/lib/plum/client/response.rb
new file mode 100644
index 0000000..0756546
--- /dev/null
+++ b/lib/plum/client/response.rb
@@ -0,0 +1,48 @@
+# -*- frozen-string-literal: true -*-
+module Plum
+ class Response
+ attr_reader :headers
+
+ def initialize
+ @body = Queue.new
+ @finished = false
+ @body_read = false
+ end
+
+ def status
+ @headers && @headers[":status"]
+ end
+
+ def finished?
+ @finished
+ end
+
+ def each_body(&block)
+ raise "Body already read" if @body_read
+ @body_read = true
+ while chunk = @body.pop
+ yield chunk
+ end
+ end
+
+ def body
+ body = String.new
+ each_body { |chunk| body << chunk }
+ body
+ end
+
+ def _headers(raw_headers)
+ # response headers should not have duplicates
+ @headers = raw_headers.to_h
+ end
+
+ def _chunk(chunk)
+ @body << chunk
+ end
+
+ def _finish
+ @finished = true
+ @body << nil # @body.close is not implemented in Ruby 2.2
+ end
+ end
+end