1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
|
require "em-http/middleware/oauth"
module UserStream
class Client
attr_reader :options
def initialize(options = {})
@options = { compression: true }.merge(options).freeze
@callbacks = {}
@closing = false
end
def update(options = {})
initialize(options)
reconnect
end
def reconnect
close
connect
end
def close
@closing = true
@http.close
end
def connect
@buftok = BufferedTokenizer.new("\r\n")
opts = { query: (@options[:params] || {}),
head: { "accept-encoding": @options[:compression] ? "gzip" : "" } }
oauth = { consumer_key: @options[:consumer_key],
consumer_secret: @options[:consumer_secret],
access_token: @options[:oauth_token],
access_token_secret: @options[:oauth_token_secret] }
req = EM::HttpRequest.new("https://userstream.twitter.com/1.1/user.json", inactivity_timeout: 100) # at least one line per 90 seconds will come
req.use(EM::Middleware::OAuth, oauth)
http = req.get(opts)
http.headers do |headers|
end
http.stream do |chunk|
@buftok.extract(chunk).each do |line|
next if line.empty?
callback(:item, line)
end
end
http.callback do
case http.response_header.status
when 401
callback(:unauthorized, http.response)
when 420
callback(:enhance_your_calm, http.response)
when 503
callback(:service_unavailable, http.response)
when 200
callback(:disconnected)
else
callback(:error, "#{http.response}: #{http.response}")
end
end
http.errback do
callback(:error, http.error) unless @closing
end
@http = http
end
def method_missing(name, &block)
if /^on_.+/ =~ name.to_s
@callbacks[name.to_s.sub(/^on_/, "").to_sym] = block
end
end
private
def callback(name, *args)
@callbacks.key?(name) && @callbacks[name].call(*args)
end
end
end
|