aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorKazuki Yamaguchi <k@rhe.jp>2015-04-19 12:10:03 +0900
committerKazuki Yamaguchi <k@rhe.jp>2015-04-19 12:10:03 +0900
commitb49780e801e27adb4f3063686f3dde57ff6e2df3 (patch)
treeebb0b32573fd752b5d821ec8d0fa33b8a50c4571
parent63f7f67f14e838503448ab7185106705aff0abf6 (diff)
downloadaclog-b49780e801e27adb4f3063686f3dde57ff6e2df3.tar.gz
worker_node/collector: implement heartbeat
-rw-r--r--lib/collector/node_connection.rb24
-rw-r--r--worker_node/lib/collector_connection.rb3
2 files changed, 24 insertions, 3 deletions
diff --git a/lib/collector/node_connection.rb b/lib/collector/node_connection.rb
index d12f567..3059577 100644
--- a/lib/collector/node_connection.rb
+++ b/lib/collector/node_connection.rb
@@ -1,3 +1,5 @@
+require "set"
+
module Collector
class NodeConnection < EM::Connection
attr_reader :connection_id
@@ -6,15 +8,13 @@ module Collector
@@_id = 0
def initialize(queue)
- # comm_inactivity_timeout exceed -> heatbeat -> (when alive) -> continue
- # -> (when not) -> unbind
- self.comm_inactivity_timeout = 10
@unpacker = MessagePack::Unpacker.new(symbolize_keys: true)
@connection_id = (@@_id += 1)
@authenticated = false
@closing = false
@activated_time = nil
@queue = queue
+ @heartbeats = Set.new
end
def unbind
@@ -96,6 +96,9 @@ module Collector
log(:info, "Closing this connection...")
@closing = true
NodeManager.unregister(self)
+ when "heartbeat"
+ log(:debug, "Heartbeat reply: #{msg[:data]}")
+ @heartbeats.delete(msg[:data])
else
log(:warn, "Unknown message: #{msg.inspect}")
send_message(event: :error, data: "Unknown message.")
@@ -108,6 +111,7 @@ module Collector
log(:info, "Connection authenticated.")
send_message(event: :auth, data: nil)
NodeManager.register(self)
+ @heartbeat_timer = EM.add_periodic_timer(10, &method(:heartbeat))
else
log(:warn, "Invalid secret_key: #{secret_key.inspect}")
send_message(event: :error, data: "Invalid secret_key.")
@@ -120,6 +124,20 @@ module Collector
send_data(data.to_msgpack)
end
+ def heartbeat
+ if @heartbeats.size > 2 # 30 sec
+ log(:warn, "Node is dead.")
+ @heartbeat_timer.cancel
+ @closing = true
+ close_connection_after_writing
+ return
+ end
+
+ id = Time.now.to_i
+ @heartbeats << id
+ send_message(event: :heartbeat, data: id)
+ end
+
def log(level, message)
Rails.logger.__send__(level, "Node(#{@connection_id})") { message }
end
diff --git a/worker_node/lib/collector_connection.rb b/worker_node/lib/collector_connection.rb
index e19383f..9a968f8 100644
--- a/worker_node/lib/collector_connection.rb
+++ b/worker_node/lib/collector_connection.rb
@@ -52,6 +52,9 @@ class CollectorConnection < EM::Connection
register_account(msg[:data])
when "unregister"
unregister_account(msg[:data])
+ when "heartbeat"
+ log(:debug, "Heartbeat: #{msg[:data]}")
+ send_message(msg)
else
log(:warn, "Unknown message: #{msg.inspect}")
end