aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorKazuki Yamaguchi <k@rhe.jp>2015-06-19 01:42:24 +0900
committerKazuki Yamaguchi <k@rhe.jp>2015-06-19 01:42:24 +0900
commit9b3bb776594c67c92fb457f89218dfd1586e2f7b (patch)
tree553fc8547bac40aecda146e7a281e563f66ed43a
parent4512b0bc9af5cdc616d843c4c837d939c0f16294 (diff)
downloadaclog-9b3bb776594c67c92fb457f89218dfd1586e2f7b.tar.gz
WorkerNode: refactor around user stream
-rw-r--r--worker_node/lib/user_connection.rb8
-rw-r--r--worker_node/lib/user_stream/client.rb64
2 files changed, 42 insertions, 30 deletions
diff --git a/worker_node/lib/user_connection.rb b/worker_node/lib/user_connection.rb
index 83c030a..2457d4f 100644
--- a/worker_node/lib/user_connection.rb
+++ b/worker_node/lib/user_connection.rb
@@ -12,11 +12,10 @@ class UserConnection
end
def update(hash)
- if hash[:oauth_token] == @client.options[:oauth_token]
- log(:debug, "Token is not changed")
- else
- @client.update(hash)
+ if @client.update_if_necessary(hash)
log(:info, "Updated connection")
+ else
+ log(:debug, "Token is not changed")
end
end
@@ -129,7 +128,6 @@ class UserConnection
end
def on_delete(json, timestamp = nil)
- timestamp ||= json[:timestamp_ms]
log(:debug, "Delete: #{json[:delete][:status]}")
EventChannel << { event: :delete,
identifier: "delete-#{json[:delete][:status][:id]}",
diff --git a/worker_node/lib/user_stream/client.rb b/worker_node/lib/user_stream/client.rb
index 4044292..e9e54f8 100644
--- a/worker_node/lib/user_stream/client.rb
+++ b/worker_node/lib/user_stream/client.rb
@@ -4,17 +4,26 @@ module UserStream
class Client
attr_reader :options
- def initialize(options = {})
- @options = { compression: true }.merge(options).freeze
+ def initialize(options)
+ @options = options
@callbacks = {}
@exiting = false
end
- def update(options = {})
+ def update(options)
initialize(options)
reconnect
end
+ def update_if_necessary(options)
+ if options[:oauth_token] == @options[:oauth_token]
+ update(options)
+ true
+ else
+ false
+ end
+ end
+
def reconnect
close
connect
@@ -31,47 +40,36 @@ module UserStream
def connect
@buftok = BufferedTokenizer.new("\r\n")
+ @http = setup_connection
- opts = { query: (@options[:params] || {}),
- head: { "accept-encoding": @options[:compression] ? "gzip" : "" } }
- oauth = { consumer_key: @options[:consumer_key],
- consumer_secret: @options[:consumer_secret],
- access_token: @options[:oauth_token],
- access_token_secret: @options[:oauth_token_secret] }
- req = EM::HttpRequest.new("https://userstream.twitter.com/1.1/user.json", inactivity_timeout: 100) # at least one line per 90 seconds will come
- req.use(EM::Middleware::OAuth, oauth)
- http = req.get(opts)
-
- http.headers do |headers|
+ @http.headers do |headers|
end
- http.stream do |chunk|
+ @http.stream do |chunk|
@buftok.extract(chunk).each do |line|
next if line.empty?
callback(:item, line)
end
end
- http.callback do
- case http.response_header.status
+ @http.callback do
+ case @http.response_header.status
when 401
- callback(:unauthorized, http.response)
+ callback(:unauthorized, @http.response)
when 420
- callback(:enhance_your_calm, http.response)
+ callback(:enhance_your_calm, @http.response)
when 503
- callback(:service_unavailable, http.response)
+ callback(:service_unavailable, @http.response)
when 200
callback(:disconnected)
else
- callback(:error, "#{http.response}: #{http.response}")
+ callback(:error, "#{@http.response}: #{@http.response}")
end
end
- http.errback do
- callback(:error, http.error) unless @exiting
+ @http.errback do
+ callback(:error, @http.error) unless @exiting
end
-
- @http = http
end
def method_missing(name, &block)
@@ -84,5 +82,21 @@ module UserStream
def callback(name, *args)
@callbacks.key?(name) && @callbacks[name].call(*args)
end
+
+ def setup_connection
+ opts = { query: {}, head: {} }
+ opts[:query].merge(@options[:params]) if @options[:params].is_a? Hash
+ opts[:head]["accept-encoding"] = "gzip" if @options[:compression]
+
+ oauth = { consumer_key: @options[:consumer_key],
+ consumer_secret: @options[:consumer_secret],
+ access_token: @options[:oauth_token],
+ access_token_secret: @options[:oauth_token_secret] }
+
+ req = EM::HttpRequest.new("https://userstream.twitter.com/1.1/user.json", inactivity_timeout: 100) # at least one line per 90 seconds will come
+ req.use(EM::Middleware::OAuth, oauth)
+
+ req.get(opts)
+ end
end
end