aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorRhenium <rhenium@rhe.jp>2014-03-23 12:04:46 +0900
committerRhenium <rhenium@rhe.jp>2014-03-23 12:04:46 +0900
commitb20e7e0641760bb07c22ba457f53661b21cff904 (patch)
treeed4643152431010a83a67bb8cfaceeae7af87b41
parent35b89e4e1e976ec03e209eab3d28aef957e847fe (diff)
downloadaclog-b20e7e0641760bb07c22ba457f53661b21cff904.tar.gz
control puma and collector with Rake
-rw-r--r--Gemfile3
-rw-r--r--Gemfile.lock15
-rw-r--r--README.md10
-rw-r--r--bin/start.rb8
-rw-r--r--lib/aclog/receiver/collector_connection.rb141
-rw-r--r--lib/aclog/receiver/register_server.rb25
-rw-r--r--lib/aclog/receiver/worker.rb46
-rw-r--r--lib/collector/collector_connection.rb138
-rw-r--r--lib/collector/daemon.rb37
-rw-r--r--lib/collector/register_server.rb22
-rw-r--r--lib/tasks/collector.rake68
-rw-r--r--lib/tasks/web.rake56
-rwxr-xr-xstart_receiver.sh22
-rwxr-xr-xstart_webserver.sh36
14 files changed, 334 insertions, 293 deletions
diff --git a/Gemfile b/Gemfile
index 4493a2c..ed9a1e9 100644
--- a/Gemfile
+++ b/Gemfile
@@ -7,7 +7,7 @@ gem "yajl-ruby", require: "yajl"
gem "grape", github: "intridea/grape"
gem "grape-rabl"
gem "haml-rails"
-gem "sass-rails"
+gem "sass-rails", "~> 4.0.2"
gem "uglifier"
gem "jquery-rails"
gem "bootstrap-sass"
@@ -20,7 +20,6 @@ gem "connection_pool"
gem "omniauth-twitter"
gem "twitter"
gem "twitter-text"
-gem "daemon-spawn", require: "daemon_spawn"
gem "msgpack"
gem "msgpack-rpc"
gem "em-work_queue"
diff --git a/Gemfile.lock b/Gemfile.lock
index 42d268e..9401ba4 100644
--- a/Gemfile.lock
+++ b/Gemfile.lock
@@ -79,7 +79,6 @@ GEM
thor
crack (0.4.2)
safe_yaml (~> 1.0.0)
- daemon-spawn (0.4.2)
dalli (2.7.0)
descendants_tracker (0.0.3)
diff-lcs (1.2.5)
@@ -232,10 +231,11 @@ GEM
rspec-support (= 3.0.0.beta2)
rspec-support (3.0.0.beta2)
safe_yaml (1.0.1)
- sass (3.3.3)
- sass-rails (4.0.1)
+ sass (3.2.18)
+ sass-rails (4.0.2)
railties (>= 4.0.0, < 5.0)
- sass (>= 3.1.10)
+ sass (~> 3.2.0)
+ sprockets (~> 2.8, <= 2.11.0)
sprockets-rails (~> 2.0.0)
settingslogic (2.0.9)
simple_oauth (0.2.0)
@@ -248,7 +248,7 @@ GEM
spring (1.1.2)
spring-commands-rspec (1.0.1)
spring (>= 0.9.1)
- sprockets (2.12.0)
+ sprockets (2.11.0)
hike (~> 1.2)
multi_json (~> 1.0)
rack (~> 1.0)
@@ -259,7 +259,7 @@ GEM
sprockets (~> 2.8)
term-ansicolor (1.3.0)
tins (~> 1.0)
- thor (0.18.1)
+ thor (0.19.0)
thread_safe (0.1.3)
atomic
tilt (1.4.1)
@@ -308,7 +308,6 @@ DEPENDENCIES
bootstrap-sass
connection_pool
coveralls
- daemon-spawn
dalli
em-work_queue
factory_girl_rails
@@ -327,7 +326,7 @@ DEPENDENCIES
rails (~> 4.1.0.rc1)
rspec (~> 3.0.0.beta2)
rspec-rails (~> 3.0.0.beta2)
- sass-rails
+ sass-rails (~> 4.0.2)
settingslogic
simplecov
spring
diff --git a/README.md b/README.md
index 0caaed8..6302a40 100644
--- a/README.md
+++ b/README.md
@@ -60,14 +60,14 @@ Collects favs and retweets in real time by UserStreams.
* Start your aclog
$ # Start Puma (Web server)
- $ ./start_webserver.sh start
- $ # Start Background server
- $ ./start_receiver.sh start
+ $ rake web:start
+ $ # Start Background worker
+ $ rake collector:start
-### Aclog (Collector Clusters)
+### Aclog (Collector worker nodes)
* Chdir
- $ cd /var/webapps/collector
+ $ cd /var/webapps/worker_nodes
* Configure it
diff --git a/bin/start.rb b/bin/start.rb
deleted file mode 100644
index 98ede1f..0000000
--- a/bin/start.rb
+++ /dev/null
@@ -1,8 +0,0 @@
-Aclog::Receiver::Worker.spawn!(
- working_dir: Rails.root,
- pid_file: Rails.root.join("tmp", "pids", "receiver.pid").to_s,
- log_file: Rails.root.join("log", "receiver.log").to_s,
- sync_log: true,
- singleton: true
-)
-
diff --git a/lib/aclog/receiver/collector_connection.rb b/lib/aclog/receiver/collector_connection.rb
deleted file mode 100644
index 9a6128f..0000000
--- a/lib/aclog/receiver/collector_connection.rb
+++ /dev/null
@@ -1,141 +0,0 @@
-module Aclog
- module Receiver
- class CollectorConnection < EM::Connection
- def initialize(channel, connections)
- @channel = channel
- @connections = connections
-
- @worker_number = nil
- @unpacker = MessagePack::Unpacker.new(symbolize_keys: true)
- end
-
- def send_account(account)
- send_object(type: "account",
- id: account.id,
- consumer_key: Settings.consumer.key,
- consumer_secret: Settings.consumer.secret,
- oauth_token: account.oauth_token,
- oauth_token_secret: account.oauth_token_secret,
- user_id: account.user_id)
- log(:debug, "send: #{account.id}/#{account.user_id}")
- end
-
- def send_stop_account(account)
- send_object(type: "stop",
- id: account.id)
- log(:debug, "send stop: #{account.id}/#{account.user_id}")
- end
-
- def post_init
- end
-
- def unbind
- @connections.reject! {|k, v| v == self }
- log(:info, "connection closed")
- end
-
- def receive_data(data)
- @unpacker.feed_each(data) do |msg|
- unless msg.is_a?(Hash) && msg[:type]
- log(:error, "unknown data: #{msg}")
- send_object(type: "fatal", message: "unknown data")
- close_connection_after_writing
- return
- end
-
- unless @authorized
- if msg[:type] == "auth"
- auth(msg)
- else
- log(:warn, "not authorized client: #{msg}")
- send_object(type: "fatal", message: "You aren't authorized")
- close_connection_after_writing
- end
- return
- end
-
- case msg[:type]
- when "unauthorized"
- @channel << -> {
- log(:warn, "unauthorized: ##{msg[:id]}/#{msg[:user_id]}")
- }
- when "tweet"
- @channel << -> {
- log(:debug, "receive tweet: #{msg[:id]}")
- Tweet.create_from_json(msg)
- }
- when "favorite"
- @channel << -> {
- log(:debug, "receive favorite: #{msg[:source][:id]} => #{msg[:target_object][:id]}")
- f = Favorite.create_from_json(msg)
- Notification.notify_favorites_count(f.tweet)
- }
- when "unfavorite"
- @channel << -> {
- log(:debug, "receive unfavorite: #{msg[:source][:id]} => #{msg[:target_object][:id]}")
- Favorite.destroy_from_json(msg)
- }
- when "retweet"
- @channel << -> {
- log(:debug, "receive retweet: #{msg[:user][:id]} => #{msg[:retweeted_status][:id]}")
- Retweet.create_from_json(msg)
- }
- when "delete"
- @channel << -> {
- log(:debug, "receive delete: #{msg[:delete][:status][:id]}")
- Tweet.destroy_from_json(msg) || Retweet.destroy_from_json(msg)
- }
- when "quit"
- log(:info, "receive quit: #{msg[:reason]}")
- send_data(type: "quit", message: "Bye")
- close_connection_after_writing
- else
- log(:warn, "unknown message: #{msg[:type]}")
- send_object(type: "error", message: "Unknown message type")
- end
- end
- end
-
- private
- def log(level, message)
- text = "[RECEIVER"
- text << ":#{@worker_number}" if @worker_number
- text << "] #{message}"
- Rails.logger.__send__(level, text)
- end
-
- def send_object(data)
- send_data(data.to_msgpack)
- end
-
- def auth(msg)
- secret_key = msg[:secret_key]
- unless secret_key == Settings.collector.secret_key
- log(:warn, "Invalid secret_key: \"#{secret_key}\"")
- send_object(type: "fatal", message: "invalid secret_key")
- close_connection_after_writing
- return
- end
-
- worker_number = (Settings.collector.count.times.to_a - @connections.keys).sort.first
- if worker_number == nil
- log(:warn, "all connection alive")
- send_object(type: "error", message: "all connection alive")
- close_connection_after_writing
- return
- end
-
- @connections[worker_number] = self
- @worker_number = worker_number
- @authorized = true
- log(:info, "connect")
- send_object(type: "ok", message: "connected")
-
- Account.set_of_collector(@worker_number).each do |account|
- send_account(account)
- end
- end
- end
- end
-end
-
diff --git a/lib/aclog/receiver/register_server.rb b/lib/aclog/receiver/register_server.rb
deleted file mode 100644
index f5b6060..0000000
--- a/lib/aclog/receiver/register_server.rb
+++ /dev/null
@@ -1,25 +0,0 @@
-module Aclog
- module Receiver
- class RegisterServer
- def initialize(connections)
- @connections = connections
- end
-
- def register(account_)
- account = Marshal.load(account_)
- con_num = account.id % Settings.collector.count
- con = @connections[con_num]
- if con
- if account.active?
- con.send_account(account)
- else
- con.send_stop_account(account)
- end
- else
- Rails.logger.info("Connection not found: connection_number: #{con_num} / account_id: #{account.id}")
- end
- end
- end
- end
-end
-
diff --git a/lib/aclog/receiver/worker.rb b/lib/aclog/receiver/worker.rb
deleted file mode 100644
index db60951..0000000
--- a/lib/aclog/receiver/worker.rb
+++ /dev/null
@@ -1,46 +0,0 @@
-require "msgpack/rpc/transport/unix"
-
-module Aclog
- module Receiver
- class Worker < DaemonSpawn::Base
- def initialize(opts = {})
- super(opts) unless opts.empty?
- _logger = Logger.new(STDOUT)
- _logger.level = Rails.env.production? ? Logger::INFO : Logger::DEBUG
- ActiveRecord::Base.logger = Rails.logger = _logger
- end
-
- def start(args)
- _sock_path = File.join(Rails.root, "tmp", "sockets", "receiver.sock")
-
- Rails.logger.info("Receiver started")
- File.delete(_sock_path) if File.exists?(_sock_path)
- EM.run do
- channel = EM::Channel.new
- EM.defer { channel.subscribe(&:call) }
-
- connections = {}
-
- collector_server = EM.start_server("0.0.0.0", Settings.collector.server_port, CollectorConnection, channel, connections)
-
- reg_svr_listener = MessagePack::RPC::UNIXServerTransport.new(_sock_path)
- register_server = MessagePack::RPC::Server.new
- register_server.listen(reg_svr_listener, RegisterServer.new(connections))
- EM.defer { register_server.run }
-
- stop = Proc.new do
- EM.stop_server(collector_server)
- register_server.close
- EM.stop
- end
- Signal.trap(:INT, &stop)
- Signal.trap(:TERM, &stop)
- end
- end
-
- def stop
- end
- end
- end
-end
-
diff --git a/lib/collector/collector_connection.rb b/lib/collector/collector_connection.rb
new file mode 100644
index 0000000..1af298b
--- /dev/null
+++ b/lib/collector/collector_connection.rb
@@ -0,0 +1,138 @@
+module Collector
+ class CollectorConnection < EM::Connection
+ def initialize(channel, connections)
+ @channel = channel
+ @connections = connections
+
+ @worker_number = nil
+ @unpacker = MessagePack::Unpacker.new(symbolize_keys: true)
+ end
+
+ def send_account(account)
+ send_object(type: "account",
+ id: account.id,
+ consumer_key: Settings.consumer.key,
+ consumer_secret: Settings.consumer.secret,
+ oauth_token: account.oauth_token,
+ oauth_token_secret: account.oauth_token_secret,
+ user_id: account.user_id)
+ log(:debug, "send: #{account.id}/#{account.user_id}")
+ end
+
+ def send_stop_account(account)
+ send_object(type: "stop",
+ id: account.id)
+ log(:debug, "send stop: #{account.id}/#{account.user_id}")
+ end
+
+ def post_init
+ end
+
+ def unbind
+ @connections.reject! {|k, v| v == self }
+ log(:info, "connection closed")
+ end
+
+ def receive_data(data)
+ @unpacker.feed_each(data) do |msg|
+ unless msg.is_a?(Hash) && msg[:type]
+ log(:error, "unknown data: #{msg}")
+ send_object(type: "fatal", message: "unknown data")
+ close_connection_after_writing
+ return
+ end
+
+ unless @authorized
+ if msg[:type] == "auth"
+ auth(msg)
+ else
+ log(:warn, "not authorized client: #{msg}")
+ send_object(type: "fatal", message: "You aren't authorized")
+ close_connection_after_writing
+ end
+ return
+ end
+
+ case msg[:type]
+ when "unauthorized"
+ @channel << -> {
+ log(:warn, "unauthorized: ##{msg[:id]}/#{msg[:user_id]}")
+ }
+ when "tweet"
+ @channel << -> {
+ log(:debug, "receive tweet: #{msg[:id]}")
+ Tweet.create_from_json(msg)
+ }
+ when "favorite"
+ @channel << -> {
+ log(:debug, "receive favorite: #{msg[:source][:id]} => #{msg[:target_object][:id]}")
+ f = Favorite.create_from_json(msg)
+ Notification.notify_favorites_count(f.tweet)
+ }
+ when "unfavorite"
+ @channel << -> {
+ log(:debug, "receive unfavorite: #{msg[:source][:id]} => #{msg[:target_object][:id]}")
+ Favorite.destroy_from_json(msg)
+ }
+ when "retweet"
+ @channel << -> {
+ log(:debug, "receive retweet: #{msg[:user][:id]} => #{msg[:retweeted_status][:id]}")
+ Retweet.create_from_json(msg)
+ }
+ when "delete"
+ @channel << -> {
+ log(:debug, "receive delete: #{msg[:delete][:status][:id]}")
+ Tweet.destroy_from_json(msg) || Retweet.destroy_from_json(msg)
+ }
+ when "quit"
+ log(:info, "receive quit: #{msg[:reason]}")
+ send_data(type: "quit", message: "Bye")
+ close_connection_after_writing
+ else
+ log(:warn, "unknown message: #{msg[:type]}")
+ send_object(type: "error", message: "Unknown message type")
+ end
+ end
+ end
+
+ private
+ def log(level, message)
+ text = "[RECEIVER"
+ text << ":#{@worker_number}" if @worker_number
+ text << "] #{message}"
+ Rails.logger.__send__(level, text)
+ end
+
+ def send_object(data)
+ send_data(data.to_msgpack)
+ end
+
+ def auth(msg)
+ secret_key = msg[:secret_key]
+ unless secret_key == Settings.collector.secret_key
+ log(:warn, "Invalid secret_key: \"#{secret_key}\"")
+ send_object(type: "fatal", message: "invalid secret_key")
+ close_connection_after_writing
+ return
+ end
+
+ worker_number = (Settings.collector.count.times.to_a - @connections.keys).sort.first
+ if worker_number == nil
+ log(:warn, "all connection alive")
+ send_object(type: "error", message: "all connection alive")
+ close_connection_after_writing
+ return
+ end
+
+ @connections[worker_number] = self
+ @worker_number = worker_number
+ @authorized = true
+ log(:info, "connect")
+ send_object(type: "ok", message: "connected")
+
+ Account.set_of_collector(@worker_number).each do |account|
+ send_account(account)
+ end
+ end
+ end
+end
diff --git a/lib/collector/daemon.rb b/lib/collector/daemon.rb
new file mode 100644
index 0000000..b9a8b3e
--- /dev/null
+++ b/lib/collector/daemon.rb
@@ -0,0 +1,37 @@
+require "msgpack/rpc/transport/unix"
+
+module Collector
+ class Daemon
+ def self.start
+ _logger = Logger.new(STDOUT)
+ _logger.level = Rails.env.production? ? Logger::INFO : Logger::DEBUG
+ ActiveRecord::Base.logger = Rails.logger = _logger
+
+ _sock_path = File.join(Rails.root, "tmp", "sockets", "receiver.sock")
+
+ Rails.logger.info("Receiver started")
+ File.delete(_sock_path) if File.exists?(_sock_path)
+ EM.run do
+ channel = EM::Channel.new
+ EM.defer { channel.subscribe(&:call) }
+
+ connections = {}
+
+ collector_server = EM.start_server("0.0.0.0", Settings.collector.server_port, CollectorConnection, channel, connections)
+
+ reg_svr_listener = MessagePack::RPC::UNIXServerTransport.new(_sock_path)
+ register_server = MessagePack::RPC::Server.new
+ register_server.listen(reg_svr_listener, RegisterServer.new(connections))
+ EM.defer { register_server.run }
+
+ stop = Proc.new do
+ EM.stop_server(collector_server)
+ register_server.close
+ EM.stop
+ end
+ Signal.trap(:INT, &stop)
+ Signal.trap(:TERM, &stop)
+ end
+ end
+ end
+end
diff --git a/lib/collector/register_server.rb b/lib/collector/register_server.rb
new file mode 100644
index 0000000..21114ab
--- /dev/null
+++ b/lib/collector/register_server.rb
@@ -0,0 +1,22 @@
+module Collector
+ class RegisterServer
+ def initialize(connections)
+ @connections = connections
+ end
+
+ def register(account_)
+ account = Marshal.load(account_)
+ con_num = account.id % Settings.collector.count
+ con = @connections[con_num]
+ if con
+ if account.active?
+ con.send_account(account)
+ else
+ con.send_stop_account(account)
+ end
+ else
+ Rails.logger.info("Connection not found: connection_number: #{con_num} / account_id: #{account.id}")
+ end
+ end
+ end
+end
diff --git a/lib/tasks/collector.rake b/lib/tasks/collector.rake
new file mode 100644
index 0000000..3030e88
--- /dev/null
+++ b/lib/tasks/collector.rake
@@ -0,0 +1,68 @@
+namespace :collector do
+ @pid_file = Rails.root.join("tmp", "pids", "collector.pid").to_s
+ @log_file = Rails.root.join("log", "collector.log").to_s
+
+ def read_pid
+ Integer(File.read(@pid_file)) rescue nil
+ end
+
+ def process_alive?(pid)
+ Process.kill(0, pid) rescue false
+ end
+
+ desc "Start aclog collector (master) in the foreground"
+ task run: :environment do
+ Collector::Daemon.start
+ end
+
+ desc "Start aclog collector (master)"
+ task start: :environment do
+ pid = read_pid
+ if pid && process_alive?(pid)
+ STDERR.puts "Collector daemon is already started (PID: #{pid})"
+ next
+ end
+
+ Process.daemon
+ File.open(@pid_file, "w").write(Process.pid)
+
+ log = File.open(@log_file, "a")
+ log.sync = true
+ STDOUT.reopen(log)
+ STDERR.reopen(STDOUT)
+
+ Collector::Daemon.start
+ end
+
+ desc "Stop aclog collector (master)"
+ task :stop do
+ pid = read_pid
+ unless process_alive?(pid)
+ puts "Collector daemon is not started."
+ next
+ end
+
+ Process.kill("TERM", pid)
+ while process_alive?(pid)
+ sleep 0.1
+ end
+
+ File.delete(@pid_file)
+ end
+
+ desc "Retart aclog collector (master)"
+ task :restart do
+ Rake::Task["collector:stop"].invoke
+ Rake::Task["collector:start"].invoke
+ end
+
+ desc "Show status of running aclog collector (master)"
+ task :status do
+ pid = read_pid
+ if pid && process_alive?(pid)
+ puts "Collector is running."
+ else
+ puts "Collector is not running."
+ end
+ end
+end
diff --git a/lib/tasks/web.rake b/lib/tasks/web.rake
new file mode 100644
index 0000000..43f6937
--- /dev/null
+++ b/lib/tasks/web.rake
@@ -0,0 +1,56 @@
+namespace :web do
+ @pid_file = Rails.root.join("tmp", "pids", "puma.pid").to_s
+
+ def read_pid
+ Integer(File.read(@pid_file)) rescue nil
+ end
+
+ def process_alive?(pid)
+ Process.kill(0, pid) rescue false
+ end
+
+ desc "Start web server (puma)"
+ task :start do
+ pid = read_pid
+ if pid && process_alive?(pid)
+ STDERR.puts "Puma is already started (PID: #{pid})"
+ next
+ end
+ echo `puma -d -e #{Rails.env} -C #{Rails.root}/config/puma.rb --pidfile #{@pid_file}`
+ end
+
+ desc "Stop web server (puma)"
+ task :stop do
+ pid = read_pid
+ unless process_alive?(pid)
+ STDERR.puts "Puma is not running."
+ next
+ end
+
+ Process.kill("TERM", pid)
+ while process_alive?(pid)
+ sleep 0.1
+ end
+ end
+
+ desc "Retart web server (puma)"
+ task :restart do
+ pid = read_pid
+ unless process_alive?(pid)
+ STDERR.puts "Puma is not running."
+ Rake::Task["web:start"].invoke
+ end
+
+ Process.kill("USR2", pid)
+ end
+
+ desc "Show status of web server (puma)"
+ task :status do
+ pid = read_pid
+ if pid && process_alive?(pid)
+ STDOUT.puts "Puma is running."
+ else
+ STDOUT.puts "Puma is not running."
+ end
+ end
+end
diff --git a/start_receiver.sh b/start_receiver.sh
deleted file mode 100755
index 9da74b0..0000000
--- a/start_receiver.sh
+++ /dev/null
@@ -1,22 +0,0 @@
-#!/bin/sh
-
-export RAILS_ENV=production
-receiver="bundle exec rails runner bin/start.rb"
-
-case "$1" in
- start)
- $receiver start
- ;;
- stop)
- $receiver stop
- ;;
- restart)
- $receiver restart
- ;;
- status)
- $receiver status
- ;;
- *)
- echo "usage: $0 {start|stop|restart|status}"
- exit 1
-esac
diff --git a/start_webserver.sh b/start_webserver.sh
deleted file mode 100755
index 4c60a63..0000000
--- a/start_webserver.sh
+++ /dev/null
@@ -1,36 +0,0 @@
-#!/bin/sh
-
-export RAILS_ENV=production
-export RAILS_ROOT=$(readlink -f `dirname $0`)
-
-PID="$RAILS_ROOT/tmp/pids/puma.pid"
-
-sig () {
- test -s $PID && kill -$1 `cat $PID`
-}
-
-error () {
- echo $1
- exit 1
-}
-
-puma_start="bundle exec puma -d -e $RAILS_ENV -C $RAILS_ROOT/config/puma.rb --pidfile $PID"
-
-case "$1" in
- start)
- sig 0 && error "Puma already started!"
- $puma_start && echo "Puma started!" || error "Puma failed to start!"
- ;;
- stop)
- sig TERM && echo "Puma stopped!" || error "Puma not started!"
- ;;
- restart)
- sig 0 || error "Puma not started!"
- sig USR2 && echo "Puma restarted!" `cat $PID` || echo "Puma failed to restart!"
- ;;
- status)
- sig 0 && echo "Puma is started!" || echo "Puma is stopped!"
- ;;
- *)
- error "usage: $0 {start|stop|restart|status}"
-esac