From c3dacc27e2efaa9ded721e71f35824fcb4e6dfc3 Mon Sep 17 00:00:00 2001 From: Kazuki Yamaguchi Date: Tue, 3 Nov 2015 16:27:14 +0900 Subject: add Plum::Client --- lib/plum.rb | 3 + lib/plum/client.rb | 161 ++++++++++++++++++++++++++++++++++++++++++ lib/plum/client/connection.rb | 19 +++++ lib/plum/client/response.rb | 48 +++++++++++++ 4 files changed, 231 insertions(+) create mode 100644 lib/plum/client.rb create mode 100644 lib/plum/client/connection.rb create mode 100644 lib/plum/client/response.rb (limited to 'lib') diff --git a/lib/plum.rb b/lib/plum.rb index 5248073..bcd72d2 100644 --- a/lib/plum.rb +++ b/lib/plum.rb @@ -22,3 +22,6 @@ require "plum/stream" require "plum/server/connection" require "plum/server/https_connection" require "plum/server/http_connection" +require "plum/client" +require "plum/client/response" +require "plum/client/connection" diff --git a/lib/plum/client.rb b/lib/plum/client.rb new file mode 100644 index 0000000..853b049 --- /dev/null +++ b/lib/plum/client.rb @@ -0,0 +1,161 @@ +# -*- frozen-string-literal: true -*- +module Plum + class Client + DEFAULT_CONFIG = { + https: true + }.freeze + + attr_reader :host, :port, :config + attr_reader :socket + + def self.start(host, port, config = {}, &block) + client = self.new(host, port, config) + client.start(&block) + end + + def initialize(host, port, config = {}) + @host = host + @port = port + @config = DEFAULT_CONFIG.merge(config) + @response_handlers = {} + @responses = {} + end + + def start + raise IOError, "Session already started" if @started + _start + if block_given? + begin + ret = yield(self) + wait + return ret + ensure + close + end + end + self + end + + def wait + while !@responses.empty? + _succ + end + end + + def close + begin + @plum.close if @plum + ensure + @socket.close if @socket + end + end + + def request_async(headers, body = nil, &block) + stream = @plum.open_stream + response = Response.new + @responses[stream] = response + + if body + stream.send_headers(headers, end_stream: false) + stream.send_data(body, end_stream: true) + else + stream.send_headers(headers, end_stream: true) + end + + if block_given? + @response_handlers[stream] = block + end + + response + end + + def request(headers, body = nil) + response = request_async(headers, body) + _succ while !response.finished? + response + end + + %w(GET POST HEAD PUT DELETE).each { |method| + define_method(method.downcase.to_sym) do |headers = {}| + request(headers) + end + + define_method(:"#{method.downcase}_async") do |headers = {}, &block| + request_async(headers, &block) + end + } + + def https? + !!@config[:https] + end + + private + def _start + @started = true + sock = TCPSocket.open(host, port) + if https? + ctx = @config[:ssl_context] || new_ssl_ctx + sock = OpenSSL::SSL::SSLSocket.new(sock, ctx) + sock.sync_close = true + sock.connect + if ctx.verify_mode != OpenSSL::SSL::VERIFY_NONE + sock.post_connection_check(@config[:hostname] || @host) + end + end + + @socket = sock + @plum = setup_plum(sock) + end + + def setup_plum(sock) + local_settings = { + enable_push: 0, + initial_window_size: (1 << 30) - 1, + } + plum = ClientConnection.new(sock.method(:write), local_settings) + plum.on(:protocol_error) { |ex| raise ex } + plum.on(:stream_error) { |stream, ex| raise ex } + plum.on(:headers) { |stream, headers| + response = @responses[stream] + response._headers(headers) + } + plum.on(:data) { |stream, chunk| + response = @responses[stream] + response._chunk(chunk) + } + plum.on(:end_stream) { |stream| + response = @responses.delete(stream) + response._finish + if handler = @response_handlers.delete(stream) + handler.call(response) + end + } + plum + end + + def _succ + @plum << @socket.readpartial(1024) + end + + def new_ssl_ctx + ctx = OpenSSL::SSL::SSLContext.new + ctx.ssl_version = :TLSv1_2 + + if ctx.respond_to?(:hostname=) + ctx.hostname = @config[:hostname] || @host + end + + if ctx.respond_to?(:alpn_protocols) + ctx.alpn_protocols = ["h2", "http/1.1"] + end + + if ctx.respond_to?(:npn_select_cb) + ctx.alpn_select_cb = -> protocols { + protocols.include?("h2") ? "h2" : protocols.first + } + end + + ctx + end + end +end diff --git a/lib/plum/client/connection.rb b/lib/plum/client/connection.rb new file mode 100644 index 0000000..fc96a9a --- /dev/null +++ b/lib/plum/client/connection.rb @@ -0,0 +1,19 @@ +# -*- frozen-string-literal: true -*- +using Plum::BinaryString +module Plum + class ClientConnection < Connection + def initialize(writer, local_settings = {}) + super(writer, local_settings) + + writer.call(CLIENT_CONNECTION_PREFACE) + settings(local_settings) + @state = :waiting_settings + end + + def open_stream(**args) + next_id = @max_odd_stream_id > 0 ? @max_odd_stream_id + 2 : 1 + stream = new_stream(next_id, state: :open, **args) + stream + end + end +end diff --git a/lib/plum/client/response.rb b/lib/plum/client/response.rb new file mode 100644 index 0000000..0756546 --- /dev/null +++ b/lib/plum/client/response.rb @@ -0,0 +1,48 @@ +# -*- frozen-string-literal: true -*- +module Plum + class Response + attr_reader :headers + + def initialize + @body = Queue.new + @finished = false + @body_read = false + end + + def status + @headers && @headers[":status"] + end + + def finished? + @finished + end + + def each_body(&block) + raise "Body already read" if @body_read + @body_read = true + while chunk = @body.pop + yield chunk + end + end + + def body + body = String.new + each_body { |chunk| body << chunk } + body + end + + def _headers(raw_headers) + # response headers should not have duplicates + @headers = raw_headers.to_h + end + + def _chunk(chunk) + @body << chunk + end + + def _finish + @finished = true + @body << nil # @body.close is not implemented in Ruby 2.2 + end + end +end -- cgit v1.2.3