diff options
author | Kazuki Yamaguchi <k@rhe.jp> | 2015-06-19 01:42:24 +0900 |
---|---|---|
committer | Kazuki Yamaguchi <k@rhe.jp> | 2015-06-19 01:42:24 +0900 |
commit | 9b3bb776594c67c92fb457f89218dfd1586e2f7b (patch) | |
tree | 553fc8547bac40aecda146e7a281e563f66ed43a | |
parent | 4512b0bc9af5cdc616d843c4c837d939c0f16294 (diff) | |
download | aclog-9b3bb776594c67c92fb457f89218dfd1586e2f7b.tar.gz |
WorkerNode: refactor around user stream
-rw-r--r-- | worker_node/lib/user_connection.rb | 8 | ||||
-rw-r--r-- | worker_node/lib/user_stream/client.rb | 64 |
2 files changed, 42 insertions, 30 deletions
diff --git a/worker_node/lib/user_connection.rb b/worker_node/lib/user_connection.rb index 83c030a..2457d4f 100644 --- a/worker_node/lib/user_connection.rb +++ b/worker_node/lib/user_connection.rb @@ -12,11 +12,10 @@ class UserConnection end def update(hash) - if hash[:oauth_token] == @client.options[:oauth_token] - log(:debug, "Token is not changed") - else - @client.update(hash) + if @client.update_if_necessary(hash) log(:info, "Updated connection") + else + log(:debug, "Token is not changed") end end @@ -129,7 +128,6 @@ class UserConnection end def on_delete(json, timestamp = nil) - timestamp ||= json[:timestamp_ms] log(:debug, "Delete: #{json[:delete][:status]}") EventChannel << { event: :delete, identifier: "delete-#{json[:delete][:status][:id]}", diff --git a/worker_node/lib/user_stream/client.rb b/worker_node/lib/user_stream/client.rb index 4044292..e9e54f8 100644 --- a/worker_node/lib/user_stream/client.rb +++ b/worker_node/lib/user_stream/client.rb @@ -4,17 +4,26 @@ module UserStream class Client attr_reader :options - def initialize(options = {}) - @options = { compression: true }.merge(options).freeze + def initialize(options) + @options = options @callbacks = {} @exiting = false end - def update(options = {}) + def update(options) initialize(options) reconnect end + def update_if_necessary(options) + if options[:oauth_token] == @options[:oauth_token] + update(options) + true + else + false + end + end + def reconnect close connect @@ -31,47 +40,36 @@ module UserStream def connect @buftok = BufferedTokenizer.new("\r\n") + @http = setup_connection - opts = { query: (@options[:params] || {}), - head: { "accept-encoding": @options[:compression] ? "gzip" : "" } } - oauth = { consumer_key: @options[:consumer_key], - consumer_secret: @options[:consumer_secret], - access_token: @options[:oauth_token], - access_token_secret: @options[:oauth_token_secret] } - req = EM::HttpRequest.new("https://userstream.twitter.com/1.1/user.json", inactivity_timeout: 100) # at least one line per 90 seconds will come - req.use(EM::Middleware::OAuth, oauth) - http = req.get(opts) - - http.headers do |headers| + @http.headers do |headers| end - http.stream do |chunk| + @http.stream do |chunk| @buftok.extract(chunk).each do |line| next if line.empty? callback(:item, line) end end - http.callback do - case http.response_header.status + @http.callback do + case @http.response_header.status when 401 - callback(:unauthorized, http.response) + callback(:unauthorized, @http.response) when 420 - callback(:enhance_your_calm, http.response) + callback(:enhance_your_calm, @http.response) when 503 - callback(:service_unavailable, http.response) + callback(:service_unavailable, @http.response) when 200 callback(:disconnected) else - callback(:error, "#{http.response}: #{http.response}") + callback(:error, "#{@http.response}: #{@http.response}") end end - http.errback do - callback(:error, http.error) unless @exiting + @http.errback do + callback(:error, @http.error) unless @exiting end - - @http = http end def method_missing(name, &block) @@ -84,5 +82,21 @@ module UserStream def callback(name, *args) @callbacks.key?(name) && @callbacks[name].call(*args) end + + def setup_connection + opts = { query: {}, head: {} } + opts[:query].merge(@options[:params]) if @options[:params].is_a? Hash + opts[:head]["accept-encoding"] = "gzip" if @options[:compression] + + oauth = { consumer_key: @options[:consumer_key], + consumer_secret: @options[:consumer_secret], + access_token: @options[:oauth_token], + access_token_secret: @options[:oauth_token_secret] } + + req = EM::HttpRequest.new("https://userstream.twitter.com/1.1/user.json", inactivity_timeout: 100) # at least one line per 90 seconds will come + req.use(EM::Middleware::OAuth, oauth) + + req.get(opts) + end end end |