aboutsummaryrefslogtreecommitdiffstats
path: root/collector_proxy/lib/worker_node_connection.rb
diff options
context:
space:
mode:
Diffstat (limited to 'collector_proxy/lib/worker_node_connection.rb')
-rw-r--r--collector_proxy/lib/worker_node_connection.rb117
1 files changed, 117 insertions, 0 deletions
diff --git a/collector_proxy/lib/worker_node_connection.rb b/collector_proxy/lib/worker_node_connection.rb
new file mode 100644
index 0000000..204b364
--- /dev/null
+++ b/collector_proxy/lib/worker_node_connection.rb
@@ -0,0 +1,117 @@
+class WorkerNodeConnection < EM::Connection
+ attr_reader :connection_id
+ attr_accessor :activated_time
+
+ @@_id = 0
+
+ def initialize
+ @unpacker = MessagePack::Unpacker.new(symbolize_keys: true)
+ @connection_id = (@@_id += 1)
+ @authenticated = false
+ @closing = false
+ @heartbeats = Set.new
+ @activated_time = nil
+ end
+
+ def post_init
+ # do nothing
+ end
+
+ def unbind
+ @heartbeat_timer.cancel if @heartbeat_timer
+ if @closing
+ log(:info, "Connection was closed.")
+ else
+ if @authenticated
+ log(:warn, "Connection was closed unexpectedly.")
+ NodeManager.unregister(self)
+ end
+ end
+ end
+
+ def receive_data(data)
+ @unpacker.feed_each(data) do |msg|
+ unless msg.is_a?(Hash) && msg[:event]
+ log(:warn, "Unknown message: #{msg}")
+ send_message(event: :error, data: "Unknown message.")
+ close_connection_after_writing
+ return
+ end
+
+ parse_message(msg)
+ end
+ end
+
+ def register_account(account)
+ send_message(event: "register", data: account)
+ end
+
+ def unregister_account(account)
+ send_message(event: "unregister", data: account)
+ end
+
+ private
+ def parse_message(msg)
+ unless @authenticated
+ if msg[:event] == "auth"
+ authenticate_node(msg[:data])
+ else
+ log(:error, "Unauthenticated client: #{msg}")
+ send_message(event: :error, data: "You aren't authenticated.")
+ close_connection_after_writing
+ end
+ return
+ end
+
+ case msg[:event]
+ when "exit"
+ log(:info, "Closing this connection...")
+ @closing = true
+ NodeManager.unregister(self)
+ when "heartbeat"
+ log(:debug, "Heartbeat reply: #{msg[:data]}")
+ @heartbeats.delete(msg[:data])
+ else
+ EventChannel << msg
+ end
+ end
+
+ def authenticate_node(data)
+ if data.key?(:secret_key) && Settings.secret_key == data[:secret_key]
+ log(:info, "Connection authenticated.")
+ send_message(event: :auth, data: nil)
+ @authenticated = true
+ @heartbeat_timer = EM.add_periodic_timer(10, &method(:heartbeat))
+ NodeManager.register(self)
+ else
+ log(:warn, "Invalid secret_key: #{data[:secret_key].inspect}")
+ send_message(event: :error, data: "Invalid secret_key.")
+ @closing = true
+ close_connection_after_writing
+ end
+ end
+
+ def heartbeat
+ if @heartbeats.size > 2 # 30 sec
+ log(:warn, "Node is dead.")
+ NodeManager.unregister(self)
+ @heartbeat_timer.cancel
+ @heartbeat_timer = nil
+ @closing = true
+ close_connection_after_writing
+ return
+ end
+
+ id = Time.now.to_i
+ @heartbeats << id
+ send_message(event: :heartbeat, data: id)
+ end
+
+ def send_message(data)
+ send_data(data.to_msgpack)
+ end
+
+ def log(level, message)
+ CollectorProxy.logger.__send__(level, "WorkerNodeConnection(##{@connection_id})") { message }
+ end
+end