From b49780e801e27adb4f3063686f3dde57ff6e2df3 Mon Sep 17 00:00:00 2001 From: Kazuki Yamaguchi Date: Sun, 19 Apr 2015 12:10:03 +0900 Subject: worker_node/collector: implement heartbeat --- lib/collector/node_connection.rb | 24 +++++++++++++++++++++--- worker_node/lib/collector_connection.rb | 3 +++ 2 files changed, 24 insertions(+), 3 deletions(-) diff --git a/lib/collector/node_connection.rb b/lib/collector/node_connection.rb index d12f567..3059577 100644 --- a/lib/collector/node_connection.rb +++ b/lib/collector/node_connection.rb @@ -1,3 +1,5 @@ +require "set" + module Collector class NodeConnection < EM::Connection attr_reader :connection_id @@ -6,15 +8,13 @@ module Collector @@_id = 0 def initialize(queue) - # comm_inactivity_timeout exceed -> heatbeat -> (when alive) -> continue - # -> (when not) -> unbind - self.comm_inactivity_timeout = 10 @unpacker = MessagePack::Unpacker.new(symbolize_keys: true) @connection_id = (@@_id += 1) @authenticated = false @closing = false @activated_time = nil @queue = queue + @heartbeats = Set.new end def unbind @@ -96,6 +96,9 @@ module Collector log(:info, "Closing this connection...") @closing = true NodeManager.unregister(self) + when "heartbeat" + log(:debug, "Heartbeat reply: #{msg[:data]}") + @heartbeats.delete(msg[:data]) else log(:warn, "Unknown message: #{msg.inspect}") send_message(event: :error, data: "Unknown message.") @@ -108,6 +111,7 @@ module Collector log(:info, "Connection authenticated.") send_message(event: :auth, data: nil) NodeManager.register(self) + @heartbeat_timer = EM.add_periodic_timer(10, &method(:heartbeat)) else log(:warn, "Invalid secret_key: #{secret_key.inspect}") send_message(event: :error, data: "Invalid secret_key.") @@ -120,6 +124,20 @@ module Collector send_data(data.to_msgpack) end + def heartbeat + if @heartbeats.size > 2 # 30 sec + log(:warn, "Node is dead.") + @heartbeat_timer.cancel + @closing = true + close_connection_after_writing + return + end + + id = Time.now.to_i + @heartbeats << id + send_message(event: :heartbeat, data: id) + end + def log(level, message) Rails.logger.__send__(level, "Node(#{@connection_id})") { message } end diff --git a/worker_node/lib/collector_connection.rb b/worker_node/lib/collector_connection.rb index e19383f..9a968f8 100644 --- a/worker_node/lib/collector_connection.rb +++ b/worker_node/lib/collector_connection.rb @@ -52,6 +52,9 @@ class CollectorConnection < EM::Connection register_account(msg[:data]) when "unregister" unregister_account(msg[:data]) + when "heartbeat" + log(:debug, "Heartbeat: #{msg[:data]}") + send_message(msg) else log(:warn, "Unknown message: #{msg.inspect}") end -- cgit v1.2.3