diff options
author | Kazuki Yamaguchi <k@rhe.jp> | 2015-04-19 12:10:03 +0900 |
---|---|---|
committer | Kazuki Yamaguchi <k@rhe.jp> | 2015-04-19 12:10:03 +0900 |
commit | b49780e801e27adb4f3063686f3dde57ff6e2df3 (patch) | |
tree | ebb0b32573fd752b5d821ec8d0fa33b8a50c4571 | |
parent | 63f7f67f14e838503448ab7185106705aff0abf6 (diff) | |
download | aclog-b49780e801e27adb4f3063686f3dde57ff6e2df3.tar.gz |
worker_node/collector: implement heartbeat
-rw-r--r-- | lib/collector/node_connection.rb | 24 | ||||
-rw-r--r-- | worker_node/lib/collector_connection.rb | 3 |
2 files changed, 24 insertions, 3 deletions
diff --git a/lib/collector/node_connection.rb b/lib/collector/node_connection.rb index d12f567..3059577 100644 --- a/lib/collector/node_connection.rb +++ b/lib/collector/node_connection.rb @@ -1,3 +1,5 @@ +require "set" + module Collector class NodeConnection < EM::Connection attr_reader :connection_id @@ -6,15 +8,13 @@ module Collector @@_id = 0 def initialize(queue) - # comm_inactivity_timeout exceed -> heatbeat -> (when alive) -> continue - # -> (when not) -> unbind - self.comm_inactivity_timeout = 10 @unpacker = MessagePack::Unpacker.new(symbolize_keys: true) @connection_id = (@@_id += 1) @authenticated = false @closing = false @activated_time = nil @queue = queue + @heartbeats = Set.new end def unbind @@ -96,6 +96,9 @@ module Collector log(:info, "Closing this connection...") @closing = true NodeManager.unregister(self) + when "heartbeat" + log(:debug, "Heartbeat reply: #{msg[:data]}") + @heartbeats.delete(msg[:data]) else log(:warn, "Unknown message: #{msg.inspect}") send_message(event: :error, data: "Unknown message.") @@ -108,6 +111,7 @@ module Collector log(:info, "Connection authenticated.") send_message(event: :auth, data: nil) NodeManager.register(self) + @heartbeat_timer = EM.add_periodic_timer(10, &method(:heartbeat)) else log(:warn, "Invalid secret_key: #{secret_key.inspect}") send_message(event: :error, data: "Invalid secret_key.") @@ -120,6 +124,20 @@ module Collector send_data(data.to_msgpack) end + def heartbeat + if @heartbeats.size > 2 # 30 sec + log(:warn, "Node is dead.") + @heartbeat_timer.cancel + @closing = true + close_connection_after_writing + return + end + + id = Time.now.to_i + @heartbeats << id + send_message(event: :heartbeat, data: id) + end + def log(level, message) Rails.logger.__send__(level, "Node(#{@connection_id})") { message } end diff --git a/worker_node/lib/collector_connection.rb b/worker_node/lib/collector_connection.rb index e19383f..9a968f8 100644 --- a/worker_node/lib/collector_connection.rb +++ b/worker_node/lib/collector_connection.rb @@ -52,6 +52,9 @@ class CollectorConnection < EM::Connection register_account(msg[:data]) when "unregister" unregister_account(msg[:data]) + when "heartbeat" + log(:debug, "Heartbeat: #{msg[:data]}") + send_message(msg) else log(:warn, "Unknown message: #{msg.inspect}") end |