aboutsummaryrefslogtreecommitdiffstats
path: root/lib/collector/node_connection.rb
blob: 83cde045544ec7154351a5776d2db7e8bc381a95 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
module Collector
  class NodeConnection < EM::Connection
    attr_reader :connection_id
    attr_accessor :activated_time

    @@_id = 0

    def initialize(queue)
      @unpacker = MessagePack::Unpacker.new(symbolize_keys: true)
      @connection_id = (@@_id += 1)
      @authenticated = false
      @closing = false
      @activated_time = nil
      @queue = queue
    end

    def unbind
      if @closing
        log(:info, "Connection was closed.")
      else
        log(:warn, "Connection was closed unexpectedly.")
        NodeManager.unregister(self)
      end
    end

    def receive_data(data)
      @unpacker.feed_each(data) do |msg|
        unless msg.is_a?(Hash) && msg[:title]
          log(:warn, "Unknown message: #{msg}")
          send_message(:error, text: "Unknown message.")
          close_connection_after_writing
          next
        end

        parse_message(msg)
      end
    end

    def register_account(account)
      send_message(:register,
                   id: account.id,
                   consumer_key: Settings.consumer.key,
                   consumer_secret: Settings.consumer.secret,
                   oauth_token: account.oauth_token,
                   oauth_token_secret: account.oauth_token_secret,
                   user_id: account.user_id)
      log(:info, "Registered account ##{account.id}/#{account.user_id}")
    end

    def unregister_account(account)
      send_message(:unregister,
                   id: account.id,
                   user_id: account.user_id)
      log(:info, "Unregistered account ##{account.id}/#{account.user_id}")
    end

    private
    def parse_message(msg)
      unless @authenticated
        if msg[:title] == "auth"
          authenticate_node(msg)
        else
          log(:error, "Unauthenticated client: #{msg}")
          send_message(:fatal, text: "You aren't authenticated.")
          close_connection_after_writing
        end
        return
      end

      case msg[:title]
      when "unauthorized"
        log(:info, "Received unauthorized: ##{msg[:id]}/#{msg[:user_id]}")
      when "tweet"
        log(:debug, "Received tweet: #{msg[:id]}")
        @queue.push_tweet(msg)
      when "favorite"
        log(:debug, "Receive favorite: #{msg[:source][:id]} => #{msg[:target_object][:id]}")
        @queue.push_favorite(msg)
      when "unfavorite"
        log(:debug, "Receive unfavorite: #{msg[:source][:id]} => #{msg[:target_object][:id]}")
        @queue.push_unfavorite(msg)
      when "retweet"
        log(:debug, "Receive retweet: #{msg[:user][:id]} => #{msg[:retweeted_status][:id]}")
        @queue.push_retweet(msg)
      when "delete"
        log(:debug, "Receive delete: #{msg[:delete][:status][:id]}")
        @queue.push_delete(msg)
      when "exit"
        log(:info, "Closing this connection...")
        @closing = true
        NodeManager.unregister(self)
      else
        log(:warn, "Unknown message: #{msg[:title]}")
        send_message(:error, text: "Unknown message.")
      end
    end

    def authenticate_node(msg)
      if msg.key?(:secret_key) && Settings.collector.secret_key == msg[:secret_key]
        @authenticated = true
        log(:info, "Connection authenticated.")
        send_message(:authenticated)
        NodeManager.register(self)
      else
        log(:warn, "Invalid secret_key: #{secret_key.inspect}")
        send_message(:fatal, text: "Invalid secret_key.")
        close_connection_after_writing
        return
      end
    end

    def send_message(title, hash = {})
      send_data(hash.merge(title: title).to_msgpack)
    end

    def log(level, message)
      Rails.logger.__send__(level, "[Node:#{@connection_id}] #{message}")
    end
  end
end