aboutsummaryrefslogtreecommitdiffstats
path: root/worker_node/lib/user_stream/client.rb
blob: 1f83914e2c8d064103e0da913661a718af6c8800 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
require "em-http/middleware/oauth"

module UserStream
  class Client
    attr_reader :options

    def initialize(options = {})
      @options = { compression: true }.merge(options).freeze
      @callbacks = {}
      @closing = false
    end

    def update(options = {})
      initialize(options)
      reconnect
    end

    def reconnect
      close
      connect
    end

    def close
      @closing = true
      @http.close
    end

    def connect
      @buftok = BufferedTokenizer.new("\r\n")

      opts = { query: (@options[:params] || {}),
               head: { "accept-encoding": @options[:compression] ? "gzip" : "" } }
      oauth = { consumer_key: @options[:consumer_key],
                consumer_secret: @options[:consumer_secret],
                access_token: @options[:oauth_token],
                access_token_secret: @options[:oauth_token_secret] }
      req = EM::HttpRequest.new("https://userstream.twitter.com/1.1/user.json", inactivity_timeout: 100) # at least one line per 90 seconds will come
      req.use(EM::Middleware::OAuth, oauth)
      http = req.get(opts)

      http.headers do |headers|
      end

      http.stream do |chunk|
        @buftok.extract(chunk).each do |line|
          next if line.empty?
          callback(:item, line)
        end
      end

      http.callback do
        case http.response_header.status
        when 401
          callback(:unauthorized, http.response)
        when 420
          callback(:enhance_your_calm, http.response)
        when 503
          callback(:service_unavailable, http.response)
        when 200
          callback(:disconnected)
        else
          callback(:error, "#{http.response}: #{http.response}")
        end
      end

      http.errback do
        callback(:error, http.error) unless @closing
      end

      @http = http
    end

    def method_missing(name, &block)
      if /^on_.+/ =~ name.to_s
        @callbacks[name.to_s.sub(/^on_/, "").to_sym] = block
      end
    end

    private
    def callback(name, *args)
      @callbacks.key?(name) && @callbacks[name].call(*args)
    end
  end
end