diff options
-rw-r--r-- | Gemfile | 3 | ||||
-rw-r--r-- | Gemfile.lock | 15 | ||||
-rw-r--r-- | README.md | 10 | ||||
-rw-r--r-- | bin/start.rb | 8 | ||||
-rw-r--r-- | lib/aclog/receiver/collector_connection.rb | 141 | ||||
-rw-r--r-- | lib/aclog/receiver/register_server.rb | 25 | ||||
-rw-r--r-- | lib/aclog/receiver/worker.rb | 46 | ||||
-rw-r--r-- | lib/collector/collector_connection.rb | 138 | ||||
-rw-r--r-- | lib/collector/daemon.rb | 37 | ||||
-rw-r--r-- | lib/collector/register_server.rb | 22 | ||||
-rw-r--r-- | lib/tasks/collector.rake | 68 | ||||
-rw-r--r-- | lib/tasks/web.rake | 56 | ||||
-rwxr-xr-x | start_receiver.sh | 22 | ||||
-rwxr-xr-x | start_webserver.sh | 36 |
14 files changed, 334 insertions, 293 deletions
@@ -7,7 +7,7 @@ gem "yajl-ruby", require: "yajl" gem "grape", github: "intridea/grape" gem "grape-rabl" gem "haml-rails" -gem "sass-rails" +gem "sass-rails", "~> 4.0.2" gem "uglifier" gem "jquery-rails" gem "bootstrap-sass" @@ -20,7 +20,6 @@ gem "connection_pool" gem "omniauth-twitter" gem "twitter" gem "twitter-text" -gem "daemon-spawn", require: "daemon_spawn" gem "msgpack" gem "msgpack-rpc" gem "em-work_queue" diff --git a/Gemfile.lock b/Gemfile.lock index 42d268e..9401ba4 100644 --- a/Gemfile.lock +++ b/Gemfile.lock @@ -79,7 +79,6 @@ GEM thor crack (0.4.2) safe_yaml (~> 1.0.0) - daemon-spawn (0.4.2) dalli (2.7.0) descendants_tracker (0.0.3) diff-lcs (1.2.5) @@ -232,10 +231,11 @@ GEM rspec-support (= 3.0.0.beta2) rspec-support (3.0.0.beta2) safe_yaml (1.0.1) - sass (3.3.3) - sass-rails (4.0.1) + sass (3.2.18) + sass-rails (4.0.2) railties (>= 4.0.0, < 5.0) - sass (>= 3.1.10) + sass (~> 3.2.0) + sprockets (~> 2.8, <= 2.11.0) sprockets-rails (~> 2.0.0) settingslogic (2.0.9) simple_oauth (0.2.0) @@ -248,7 +248,7 @@ GEM spring (1.1.2) spring-commands-rspec (1.0.1) spring (>= 0.9.1) - sprockets (2.12.0) + sprockets (2.11.0) hike (~> 1.2) multi_json (~> 1.0) rack (~> 1.0) @@ -259,7 +259,7 @@ GEM sprockets (~> 2.8) term-ansicolor (1.3.0) tins (~> 1.0) - thor (0.18.1) + thor (0.19.0) thread_safe (0.1.3) atomic tilt (1.4.1) @@ -308,7 +308,6 @@ DEPENDENCIES bootstrap-sass connection_pool coveralls - daemon-spawn dalli em-work_queue factory_girl_rails @@ -327,7 +326,7 @@ DEPENDENCIES rails (~> 4.1.0.rc1) rspec (~> 3.0.0.beta2) rspec-rails (~> 3.0.0.beta2) - sass-rails + sass-rails (~> 4.0.2) settingslogic simplecov spring @@ -60,14 +60,14 @@ Collects favs and retweets in real time by UserStreams. * Start your aclog $ # Start Puma (Web server) - $ ./start_webserver.sh start - $ # Start Background server - $ ./start_receiver.sh start + $ rake web:start + $ # Start Background worker + $ rake collector:start -### Aclog (Collector Clusters) +### Aclog (Collector worker nodes) * Chdir - $ cd /var/webapps/collector + $ cd /var/webapps/worker_nodes * Configure it diff --git a/bin/start.rb b/bin/start.rb deleted file mode 100644 index 98ede1f..0000000 --- a/bin/start.rb +++ /dev/null @@ -1,8 +0,0 @@ -Aclog::Receiver::Worker.spawn!( - working_dir: Rails.root, - pid_file: Rails.root.join("tmp", "pids", "receiver.pid").to_s, - log_file: Rails.root.join("log", "receiver.log").to_s, - sync_log: true, - singleton: true -) - diff --git a/lib/aclog/receiver/collector_connection.rb b/lib/aclog/receiver/collector_connection.rb deleted file mode 100644 index 9a6128f..0000000 --- a/lib/aclog/receiver/collector_connection.rb +++ /dev/null @@ -1,141 +0,0 @@ -module Aclog - module Receiver - class CollectorConnection < EM::Connection - def initialize(channel, connections) - @channel = channel - @connections = connections - - @worker_number = nil - @unpacker = MessagePack::Unpacker.new(symbolize_keys: true) - end - - def send_account(account) - send_object(type: "account", - id: account.id, - consumer_key: Settings.consumer.key, - consumer_secret: Settings.consumer.secret, - oauth_token: account.oauth_token, - oauth_token_secret: account.oauth_token_secret, - user_id: account.user_id) - log(:debug, "send: #{account.id}/#{account.user_id}") - end - - def send_stop_account(account) - send_object(type: "stop", - id: account.id) - log(:debug, "send stop: #{account.id}/#{account.user_id}") - end - - def post_init - end - - def unbind - @connections.reject! {|k, v| v == self } - log(:info, "connection closed") - end - - def receive_data(data) - @unpacker.feed_each(data) do |msg| - unless msg.is_a?(Hash) && msg[:type] - log(:error, "unknown data: #{msg}") - send_object(type: "fatal", message: "unknown data") - close_connection_after_writing - return - end - - unless @authorized - if msg[:type] == "auth" - auth(msg) - else - log(:warn, "not authorized client: #{msg}") - send_object(type: "fatal", message: "You aren't authorized") - close_connection_after_writing - end - return - end - - case msg[:type] - when "unauthorized" - @channel << -> { - log(:warn, "unauthorized: ##{msg[:id]}/#{msg[:user_id]}") - } - when "tweet" - @channel << -> { - log(:debug, "receive tweet: #{msg[:id]}") - Tweet.create_from_json(msg) - } - when "favorite" - @channel << -> { - log(:debug, "receive favorite: #{msg[:source][:id]} => #{msg[:target_object][:id]}") - f = Favorite.create_from_json(msg) - Notification.notify_favorites_count(f.tweet) - } - when "unfavorite" - @channel << -> { - log(:debug, "receive unfavorite: #{msg[:source][:id]} => #{msg[:target_object][:id]}") - Favorite.destroy_from_json(msg) - } - when "retweet" - @channel << -> { - log(:debug, "receive retweet: #{msg[:user][:id]} => #{msg[:retweeted_status][:id]}") - Retweet.create_from_json(msg) - } - when "delete" - @channel << -> { - log(:debug, "receive delete: #{msg[:delete][:status][:id]}") - Tweet.destroy_from_json(msg) || Retweet.destroy_from_json(msg) - } - when "quit" - log(:info, "receive quit: #{msg[:reason]}") - send_data(type: "quit", message: "Bye") - close_connection_after_writing - else - log(:warn, "unknown message: #{msg[:type]}") - send_object(type: "error", message: "Unknown message type") - end - end - end - - private - def log(level, message) - text = "[RECEIVER" - text << ":#{@worker_number}" if @worker_number - text << "] #{message}" - Rails.logger.__send__(level, text) - end - - def send_object(data) - send_data(data.to_msgpack) - end - - def auth(msg) - secret_key = msg[:secret_key] - unless secret_key == Settings.collector.secret_key - log(:warn, "Invalid secret_key: \"#{secret_key}\"") - send_object(type: "fatal", message: "invalid secret_key") - close_connection_after_writing - return - end - - worker_number = (Settings.collector.count.times.to_a - @connections.keys).sort.first - if worker_number == nil - log(:warn, "all connection alive") - send_object(type: "error", message: "all connection alive") - close_connection_after_writing - return - end - - @connections[worker_number] = self - @worker_number = worker_number - @authorized = true - log(:info, "connect") - send_object(type: "ok", message: "connected") - - Account.set_of_collector(@worker_number).each do |account| - send_account(account) - end - end - end - end -end - diff --git a/lib/aclog/receiver/register_server.rb b/lib/aclog/receiver/register_server.rb deleted file mode 100644 index f5b6060..0000000 --- a/lib/aclog/receiver/register_server.rb +++ /dev/null @@ -1,25 +0,0 @@ -module Aclog - module Receiver - class RegisterServer - def initialize(connections) - @connections = connections - end - - def register(account_) - account = Marshal.load(account_) - con_num = account.id % Settings.collector.count - con = @connections[con_num] - if con - if account.active? - con.send_account(account) - else - con.send_stop_account(account) - end - else - Rails.logger.info("Connection not found: connection_number: #{con_num} / account_id: #{account.id}") - end - end - end - end -end - diff --git a/lib/aclog/receiver/worker.rb b/lib/aclog/receiver/worker.rb deleted file mode 100644 index db60951..0000000 --- a/lib/aclog/receiver/worker.rb +++ /dev/null @@ -1,46 +0,0 @@ -require "msgpack/rpc/transport/unix" - -module Aclog - module Receiver - class Worker < DaemonSpawn::Base - def initialize(opts = {}) - super(opts) unless opts.empty? - _logger = Logger.new(STDOUT) - _logger.level = Rails.env.production? ? Logger::INFO : Logger::DEBUG - ActiveRecord::Base.logger = Rails.logger = _logger - end - - def start(args) - _sock_path = File.join(Rails.root, "tmp", "sockets", "receiver.sock") - - Rails.logger.info("Receiver started") - File.delete(_sock_path) if File.exists?(_sock_path) - EM.run do - channel = EM::Channel.new - EM.defer { channel.subscribe(&:call) } - - connections = {} - - collector_server = EM.start_server("0.0.0.0", Settings.collector.server_port, CollectorConnection, channel, connections) - - reg_svr_listener = MessagePack::RPC::UNIXServerTransport.new(_sock_path) - register_server = MessagePack::RPC::Server.new - register_server.listen(reg_svr_listener, RegisterServer.new(connections)) - EM.defer { register_server.run } - - stop = Proc.new do - EM.stop_server(collector_server) - register_server.close - EM.stop - end - Signal.trap(:INT, &stop) - Signal.trap(:TERM, &stop) - end - end - - def stop - end - end - end -end - diff --git a/lib/collector/collector_connection.rb b/lib/collector/collector_connection.rb new file mode 100644 index 0000000..1af298b --- /dev/null +++ b/lib/collector/collector_connection.rb @@ -0,0 +1,138 @@ +module Collector + class CollectorConnection < EM::Connection + def initialize(channel, connections) + @channel = channel + @connections = connections + + @worker_number = nil + @unpacker = MessagePack::Unpacker.new(symbolize_keys: true) + end + + def send_account(account) + send_object(type: "account", + id: account.id, + consumer_key: Settings.consumer.key, + consumer_secret: Settings.consumer.secret, + oauth_token: account.oauth_token, + oauth_token_secret: account.oauth_token_secret, + user_id: account.user_id) + log(:debug, "send: #{account.id}/#{account.user_id}") + end + + def send_stop_account(account) + send_object(type: "stop", + id: account.id) + log(:debug, "send stop: #{account.id}/#{account.user_id}") + end + + def post_init + end + + def unbind + @connections.reject! {|k, v| v == self } + log(:info, "connection closed") + end + + def receive_data(data) + @unpacker.feed_each(data) do |msg| + unless msg.is_a?(Hash) && msg[:type] + log(:error, "unknown data: #{msg}") + send_object(type: "fatal", message: "unknown data") + close_connection_after_writing + return + end + + unless @authorized + if msg[:type] == "auth" + auth(msg) + else + log(:warn, "not authorized client: #{msg}") + send_object(type: "fatal", message: "You aren't authorized") + close_connection_after_writing + end + return + end + + case msg[:type] + when "unauthorized" + @channel << -> { + log(:warn, "unauthorized: ##{msg[:id]}/#{msg[:user_id]}") + } + when "tweet" + @channel << -> { + log(:debug, "receive tweet: #{msg[:id]}") + Tweet.create_from_json(msg) + } + when "favorite" + @channel << -> { + log(:debug, "receive favorite: #{msg[:source][:id]} => #{msg[:target_object][:id]}") + f = Favorite.create_from_json(msg) + Notification.notify_favorites_count(f.tweet) + } + when "unfavorite" + @channel << -> { + log(:debug, "receive unfavorite: #{msg[:source][:id]} => #{msg[:target_object][:id]}") + Favorite.destroy_from_json(msg) + } + when "retweet" + @channel << -> { + log(:debug, "receive retweet: #{msg[:user][:id]} => #{msg[:retweeted_status][:id]}") + Retweet.create_from_json(msg) + } + when "delete" + @channel << -> { + log(:debug, "receive delete: #{msg[:delete][:status][:id]}") + Tweet.destroy_from_json(msg) || Retweet.destroy_from_json(msg) + } + when "quit" + log(:info, "receive quit: #{msg[:reason]}") + send_data(type: "quit", message: "Bye") + close_connection_after_writing + else + log(:warn, "unknown message: #{msg[:type]}") + send_object(type: "error", message: "Unknown message type") + end + end + end + + private + def log(level, message) + text = "[RECEIVER" + text << ":#{@worker_number}" if @worker_number + text << "] #{message}" + Rails.logger.__send__(level, text) + end + + def send_object(data) + send_data(data.to_msgpack) + end + + def auth(msg) + secret_key = msg[:secret_key] + unless secret_key == Settings.collector.secret_key + log(:warn, "Invalid secret_key: \"#{secret_key}\"") + send_object(type: "fatal", message: "invalid secret_key") + close_connection_after_writing + return + end + + worker_number = (Settings.collector.count.times.to_a - @connections.keys).sort.first + if worker_number == nil + log(:warn, "all connection alive") + send_object(type: "error", message: "all connection alive") + close_connection_after_writing + return + end + + @connections[worker_number] = self + @worker_number = worker_number + @authorized = true + log(:info, "connect") + send_object(type: "ok", message: "connected") + + Account.set_of_collector(@worker_number).each do |account| + send_account(account) + end + end + end +end diff --git a/lib/collector/daemon.rb b/lib/collector/daemon.rb new file mode 100644 index 0000000..b9a8b3e --- /dev/null +++ b/lib/collector/daemon.rb @@ -0,0 +1,37 @@ +require "msgpack/rpc/transport/unix" + +module Collector + class Daemon + def self.start + _logger = Logger.new(STDOUT) + _logger.level = Rails.env.production? ? Logger::INFO : Logger::DEBUG + ActiveRecord::Base.logger = Rails.logger = _logger + + _sock_path = File.join(Rails.root, "tmp", "sockets", "receiver.sock") + + Rails.logger.info("Receiver started") + File.delete(_sock_path) if File.exists?(_sock_path) + EM.run do + channel = EM::Channel.new + EM.defer { channel.subscribe(&:call) } + + connections = {} + + collector_server = EM.start_server("0.0.0.0", Settings.collector.server_port, CollectorConnection, channel, connections) + + reg_svr_listener = MessagePack::RPC::UNIXServerTransport.new(_sock_path) + register_server = MessagePack::RPC::Server.new + register_server.listen(reg_svr_listener, RegisterServer.new(connections)) + EM.defer { register_server.run } + + stop = Proc.new do + EM.stop_server(collector_server) + register_server.close + EM.stop + end + Signal.trap(:INT, &stop) + Signal.trap(:TERM, &stop) + end + end + end +end diff --git a/lib/collector/register_server.rb b/lib/collector/register_server.rb new file mode 100644 index 0000000..21114ab --- /dev/null +++ b/lib/collector/register_server.rb @@ -0,0 +1,22 @@ +module Collector + class RegisterServer + def initialize(connections) + @connections = connections + end + + def register(account_) + account = Marshal.load(account_) + con_num = account.id % Settings.collector.count + con = @connections[con_num] + if con + if account.active? + con.send_account(account) + else + con.send_stop_account(account) + end + else + Rails.logger.info("Connection not found: connection_number: #{con_num} / account_id: #{account.id}") + end + end + end +end diff --git a/lib/tasks/collector.rake b/lib/tasks/collector.rake new file mode 100644 index 0000000..3030e88 --- /dev/null +++ b/lib/tasks/collector.rake @@ -0,0 +1,68 @@ +namespace :collector do + @pid_file = Rails.root.join("tmp", "pids", "collector.pid").to_s + @log_file = Rails.root.join("log", "collector.log").to_s + + def read_pid + Integer(File.read(@pid_file)) rescue nil + end + + def process_alive?(pid) + Process.kill(0, pid) rescue false + end + + desc "Start aclog collector (master) in the foreground" + task run: :environment do + Collector::Daemon.start + end + + desc "Start aclog collector (master)" + task start: :environment do + pid = read_pid + if pid && process_alive?(pid) + STDERR.puts "Collector daemon is already started (PID: #{pid})" + next + end + + Process.daemon + File.open(@pid_file, "w").write(Process.pid) + + log = File.open(@log_file, "a") + log.sync = true + STDOUT.reopen(log) + STDERR.reopen(STDOUT) + + Collector::Daemon.start + end + + desc "Stop aclog collector (master)" + task :stop do + pid = read_pid + unless process_alive?(pid) + puts "Collector daemon is not started." + next + end + + Process.kill("TERM", pid) + while process_alive?(pid) + sleep 0.1 + end + + File.delete(@pid_file) + end + + desc "Retart aclog collector (master)" + task :restart do + Rake::Task["collector:stop"].invoke + Rake::Task["collector:start"].invoke + end + + desc "Show status of running aclog collector (master)" + task :status do + pid = read_pid + if pid && process_alive?(pid) + puts "Collector is running." + else + puts "Collector is not running." + end + end +end diff --git a/lib/tasks/web.rake b/lib/tasks/web.rake new file mode 100644 index 0000000..43f6937 --- /dev/null +++ b/lib/tasks/web.rake @@ -0,0 +1,56 @@ +namespace :web do + @pid_file = Rails.root.join("tmp", "pids", "puma.pid").to_s + + def read_pid + Integer(File.read(@pid_file)) rescue nil + end + + def process_alive?(pid) + Process.kill(0, pid) rescue false + end + + desc "Start web server (puma)" + task :start do + pid = read_pid + if pid && process_alive?(pid) + STDERR.puts "Puma is already started (PID: #{pid})" + next + end + echo `puma -d -e #{Rails.env} -C #{Rails.root}/config/puma.rb --pidfile #{@pid_file}` + end + + desc "Stop web server (puma)" + task :stop do + pid = read_pid + unless process_alive?(pid) + STDERR.puts "Puma is not running." + next + end + + Process.kill("TERM", pid) + while process_alive?(pid) + sleep 0.1 + end + end + + desc "Retart web server (puma)" + task :restart do + pid = read_pid + unless process_alive?(pid) + STDERR.puts "Puma is not running." + Rake::Task["web:start"].invoke + end + + Process.kill("USR2", pid) + end + + desc "Show status of web server (puma)" + task :status do + pid = read_pid + if pid && process_alive?(pid) + STDOUT.puts "Puma is running." + else + STDOUT.puts "Puma is not running." + end + end +end diff --git a/start_receiver.sh b/start_receiver.sh deleted file mode 100755 index 9da74b0..0000000 --- a/start_receiver.sh +++ /dev/null @@ -1,22 +0,0 @@ -#!/bin/sh - -export RAILS_ENV=production -receiver="bundle exec rails runner bin/start.rb" - -case "$1" in - start) - $receiver start - ;; - stop) - $receiver stop - ;; - restart) - $receiver restart - ;; - status) - $receiver status - ;; - *) - echo "usage: $0 {start|stop|restart|status}" - exit 1 -esac diff --git a/start_webserver.sh b/start_webserver.sh deleted file mode 100755 index 4c60a63..0000000 --- a/start_webserver.sh +++ /dev/null @@ -1,36 +0,0 @@ -#!/bin/sh - -export RAILS_ENV=production -export RAILS_ROOT=$(readlink -f `dirname $0`) - -PID="$RAILS_ROOT/tmp/pids/puma.pid" - -sig () { - test -s $PID && kill -$1 `cat $PID` -} - -error () { - echo $1 - exit 1 -} - -puma_start="bundle exec puma -d -e $RAILS_ENV -C $RAILS_ROOT/config/puma.rb --pidfile $PID" - -case "$1" in - start) - sig 0 && error "Puma already started!" - $puma_start && echo "Puma started!" || error "Puma failed to start!" - ;; - stop) - sig TERM && echo "Puma stopped!" || error "Puma not started!" - ;; - restart) - sig 0 || error "Puma not started!" - sig USR2 && echo "Puma restarted!" `cat $PID` || echo "Puma failed to restart!" - ;; - status) - sig 0 && echo "Puma is started!" || echo "Puma is stopped!" - ;; - *) - error "usage: $0 {start|stop|restart|status}" -esac |