aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--.gitignore7
-rw-r--r--Gemfile4
-rw-r--r--Gemfile.lock20
-rw-r--r--app/controllers/sessions_controller.rb16
-rw-r--r--app/controllers/users_controller.rb12
-rw-r--r--app/views/shared/_tweets.haml2
-rw-r--r--client/Gemfile6
-rw-r--r--client/Gemfile.lock20
-rw-r--r--client/logger.rb44
-rw-r--r--client/settings.rb5
-rw-r--r--client/settings.yml.default7
-rw-r--r--client/worker.rb257
-rw-r--r--config/database.yml23
-rw-r--r--config/locales/en.yml29
-rw-r--r--config/settings.yml14
-rw-r--r--config/unicorn.rb33
-rw-r--r--db/migrate/20130225123010_create_accounts.rb7
-rw-r--r--db/migrate/20130226150329_create_tweets.rb10
-rw-r--r--db/migrate/20130226150829_create_favorites.rb7
-rw-r--r--db/migrate/20130226151042_create_retweets.rb6
-rw-r--r--db/schema.rb28
-rw-r--r--lib/receiver/worker.rb378
-rwxr-xr-xworker.sh5
23 files changed, 625 insertions, 315 deletions
diff --git a/.gitignore b/.gitignore
index 538e5ca..6adb1fc 100644
--- a/.gitignore
+++ b/.gitignore
@@ -22,7 +22,8 @@
/config/environments/*.local.yml
/config/initializers/secret_token.local.rb
-# dotcloud
-/dotcloud.yml
-/supervisord.conf
+# local scripts
+/client/worker.sh
+/client/settings.yml
+/server.sh
diff --git a/Gemfile b/Gemfile
index 14ee69c..242f552 100644
--- a/Gemfile
+++ b/Gemfile
@@ -7,7 +7,8 @@ gem 'mysql2'
gem 'unicorn'
gem 'rails_config'
-gem 'will_paginate', :github => 're4k/will_paginate'
+gem 'kaminari'
+gem 'em-work_queue'
group :assets do
gem 'sass-rails', '~> 4.0.0.beta1'
@@ -15,7 +16,6 @@ group :assets do
gem 'uglifier', '>= 1.0.3'
end
-gem 'therubyracer'
gem 'haml-rails'
gem 'yajl-ruby', :require => "yajl"
diff --git a/Gemfile.lock b/Gemfile.lock
index c38d712..6020296 100644
--- a/Gemfile.lock
+++ b/Gemfile.lock
@@ -6,12 +6,6 @@ GIT
multi_json (~> 1.3)
omniauth-oauth (~> 1.0)
-GIT
- remote: git://github.com/re4k/will_paginate.git
- revision: fcbbd91ced44aaabb1f369875ee9f4f651bc74fb
- specs:
- will_paginate (3.0.4)
-
GEM
remote: https://rubygems.org/
specs:
@@ -53,6 +47,8 @@ GEM
eventmachine (~> 1.0)
http_parser.rb (~> 0.5)
simple_oauth (~> 0.1)
+ em-work_queue (0.0.1)
+ eventmachine
erubis (2.7.0)
eventmachine (1.0.1)
execjs (1.4.0)
@@ -71,8 +67,10 @@ GEM
http_parser.rb (0.5.3)
i18n (0.6.4)
json (1.7.7)
+ kaminari (0.14.1)
+ actionpack (>= 3.0.0)
+ activesupport (>= 3.0.0)
kgio (2.8.0)
- libv8 (3.11.8.13)
mail (2.5.3)
i18n (>= 0.4.0)
mime-types (~> 1.16)
@@ -113,7 +111,6 @@ GEM
rake (10.0.3)
rdoc (3.12.2)
json (~> 1.4)
- ref (1.0.2)
sass (3.2.6)
sass-rails (4.0.0.beta1)
railties (>= 4.0.0.beta, < 5.0)
@@ -130,9 +127,6 @@ GEM
actionpack (>= 3.0)
activesupport (>= 3.0)
sprockets (~> 2.8)
- therubyracer (0.11.4)
- libv8 (~> 3.11.8.12)
- ref
thor (0.17.0)
thread_safe (0.1.0)
atomic
@@ -160,15 +154,15 @@ PLATFORMS
DEPENDENCIES
coffee-rails (~> 4.0.0.beta1)
em-twitter
+ em-work_queue
haml-rails
+ kaminari
mysql2
omniauth-twitter!
rails (= 4.0.0.beta1)
rails_config
sass-rails (~> 4.0.0.beta1)
- therubyracer
twitter
uglifier (>= 1.0.3)
unicorn
- will_paginate!
yajl-ruby
diff --git a/app/controllers/sessions_controller.rb b/app/controllers/sessions_controller.rb
index a93fb6a..8cc8f50 100644
--- a/app/controllers/sessions_controller.rb
+++ b/app/controllers/sessions_controller.rb
@@ -1,18 +1,16 @@
+require "socket"
+
class SessionsController < ApplicationController
def callback
auth = request.env["omniauth.auth"]
- user = Account.find_or_initialize_by(:id => auth["uid"])
+ user = Account.find_or_initialize_by(:user_id => auth["uid"])
user.update_attributes(:oauth_token => auth["credentials"]["token"],
:oauth_token_secret => auth["credentials"]["secret"])
- session[:user_id] = user.id
+ session[:user_id] = user.user_id
session[:screen_name] = auth["info"]["nickname"]
- EM.defer do
- EM.connect("127.0.0.1", Settings.worker_port) do |client|
- def client.post_init
- p data = [:REGISTER, user.id].map(&:to_s).join(" ")
- send_data(data)
- end
- end
+
+ UNIXSocket.open(Settings.register_server_path) do |socket|
+ socket.write "REGISTER #{user.id}\r\n"
end
redirect_to root_url
diff --git a/app/controllers/users_controller.rb b/app/controllers/users_controller.rb
index 5b5995f..1a944f7 100644
--- a/app/controllers/users_controller.rb
+++ b/app/controllers/users_controller.rb
@@ -7,7 +7,8 @@ class UsersController < ApplicationController
@items = user.tweets
.where("favorites_count > 0 or retweets_count > 0")
.order("COALESCE(favorites_count, 0) + COALESCE(retweets_count, 0) DESC")
- .paginate(:page => page, :per_page => Settings.page_per)
+ .page(page)
+ .per(Settings.page_per)
else
@items = []
end
@@ -23,7 +24,8 @@ class UsersController < ApplicationController
@items = user.tweets
.where("favorites_count > 0 or retweets_count > 0")
.order("id DESC")
- .paginate(:page => page, :per_page => Settings.page_per)
+ .page(page)
+ .per(Settings.page_per)
else
@items = []
end
@@ -38,7 +40,8 @@ class UsersController < ApplicationController
if user
@items = user.tweets
.order("id DESC")
- .paginate(:page => page, :per_page => Settings.page_per)
+ .page(page)
+ .per(Settings.page_per)
else
@items = []
end
@@ -58,7 +61,8 @@ class UsersController < ApplicationController
"SELECT tweet_id FROM retweets WHERE user_id = #{user.id}" +
") AS rf)")
.order("id DESC")
- .paginate(:page => page, :per_page => Settings.page_per)
+ .page(page)
+ .per(Settings.page_per)
else
@items = []
diff --git a/app/views/shared/_tweets.haml b/app/views/shared/_tweets.haml
index 57149ed..46ffe37 100644
--- a/app/views/shared/_tweets.haml
+++ b/app/views/shared/_tweets.haml
@@ -1,4 +1,4 @@
.items
= render :partial => "shared/tweet", :collection => items, :as => :item, :locals => {:user_cache => @user_cache}
- if items.size > 0
- = will_paginate items
+ = paginate items
diff --git a/client/Gemfile b/client/Gemfile
new file mode 100644
index 0000000..1763098
--- /dev/null
+++ b/client/Gemfile
@@ -0,0 +1,6 @@
+ruby '1.9.3'
+source 'https://rubygems.org'
+
+gem 'settingslogic'
+gem 'em-twitter'
+gem 'yajl-ruby', :require => "yajl"
diff --git a/client/Gemfile.lock b/client/Gemfile.lock
new file mode 100644
index 0000000..cf106cc
--- /dev/null
+++ b/client/Gemfile.lock
@@ -0,0 +1,20 @@
+GEM
+ remote: https://rubygems.org/
+ specs:
+ em-twitter (0.2.1)
+ eventmachine (~> 1.0)
+ http_parser.rb (~> 0.5)
+ simple_oauth (~> 0.1)
+ eventmachine (1.0.3)
+ http_parser.rb (0.5.3)
+ settingslogic (2.0.9)
+ simple_oauth (0.2.0)
+ yajl-ruby (1.1.0)
+
+PLATFORMS
+ ruby
+
+DEPENDENCIES
+ em-twitter
+ settingslogic
+ yajl-ruby
diff --git a/client/logger.rb b/client/logger.rb
new file mode 100644
index 0000000..eb09894
--- /dev/null
+++ b/client/logger.rb
@@ -0,0 +1,44 @@
+module Aclog
+ class Logger
+ def debug(msg)
+ if @level == :debug
+ log(@out, "DEBUG", msg)
+ end
+ end
+
+ def info(msg)
+ unless @level == :none
+ @level == :error
+ @level == :warn
+ log(@out, "INFO", msg)
+ end
+ end
+
+ def warn(msg)
+ unless @level == :none
+ @level == :error
+ log(@err, "WARN", msg)
+ end
+ end
+
+ def error(msg)
+ unless @level == :none
+ log(@err, "ERROR", msg)
+ end
+ end
+
+ def fatal(msg)
+ log(@err, "FATAL", msg)
+ end
+
+ def log(out, type, msg)
+ out.puts Time.now.utc.iso8601(3) + " " + type + ": " + msg.to_s
+ end
+
+ def initialize(level = :warn)
+ @out = STDOUT
+ @err = STDERR
+ @level = level
+ end
+ end
+end
diff --git a/client/settings.rb b/client/settings.rb
new file mode 100644
index 0000000..5c299dc
--- /dev/null
+++ b/client/settings.rb
@@ -0,0 +1,5 @@
+require "settingslogic"
+
+class Settings < Settingslogic
+ source "settings.yml"
+end
diff --git a/client/settings.yml.default b/client/settings.yml.default
new file mode 100644
index 0000000..2f8e787
--- /dev/null
+++ b/client/settings.yml.default
@@ -0,0 +1,7 @@
+consumer_key:
+consumer_secret:
+secret_key:
+worker_count:
+worker_number:
+db_proxy_host:
+db_proxy_port:
diff --git a/client/worker.rb b/client/worker.rb
new file mode 100644
index 0000000..2d0cd25
--- /dev/null
+++ b/client/worker.rb
@@ -0,0 +1,257 @@
+require "time"
+require "cgi"
+require "em-twitter"
+require "yajl"
+require "./settings"
+require "./logger"
+
+class Worker
+ class DBProxyClient < EM::Connection
+ def initialize
+ super
+ @connections = []
+ @receive_buf = ""
+ end
+
+ def format_text_from_hash(hash)
+ text = hash[:text]
+ entities = hash[:entities]
+
+ return text unless entities
+
+ gaps = {}
+ replace = -> ents, bl do
+ ents.each do |entity|
+ starts = entity[:indices].first
+ ends = entity[:indices].last
+ rep = bl.call(entity)
+ gaps[starts] = rep.size - (ends - starts)
+ bgap = gaps.select{|k, v| k < starts}.values.inject(0){|s, m| s += m}
+ text[starts + bgap...ends + bgap] = rep
+ end
+ end
+
+ replace.call((entities[:media] || []) + (entities[:urls] || []),
+ -> entity {"<url:#{CGI.escapeHTML(entity[:expanded_url])}:#{CGI.escapeHTML(entity[:display_url])}>"})
+ replace.call(entities[:hashtags] || [],
+ -> entity {"<hashtag:#{CGI.escapeHTML(URI.encode(entity[:text]))}>"})
+ replace.call(entities[:user_mentions] || [],
+ -> entity {"<mention:#{CGI.escapeHTML(URI.encode(entity[:screen_name]))}>"})
+
+ return text
+ end
+
+ def format_source(source)
+ source
+ end
+
+ def send_user(hash)
+ out = {:id => hash[:id],
+ :screen_name => hash[:screen_name],
+ :name => hash[:name],
+ :profile_image_url => hash[:profile_image_url_https]}
+ send_data("USER #{Yajl::Encoder.encode(out)}\r\n")
+ end
+
+ def send_tweet(hash)
+ send_user(hash[:user])
+ out = {:id => hash[:id],
+ :text => format_text_from_hash(hash),
+ :source => format_source(hash[:source]),
+ :tweeted_at => hash[:created_at],
+ :user_id => hash[:user][:id]}
+ send_data("TWEET #{Yajl::Encoder.encode(out)}\r\n")
+ end
+
+ def send_favorite(hash)
+ send_tweet(hash[:target_object])
+ send_user(hash[:source])
+ out = {:tweet_id => hash[:target_object][:id],
+ :user_id => hash[:source][:id]}
+ send_data("FAVORITE #{Yajl::Encoder.encode(out)}\r\n")
+ end
+
+ def send_unfavorite(hash)
+ send_tweet(hash[:target_object])
+ send_user(hash[:source])
+ out = {:tweet_id => hash[:target_object][:id],
+ :user_id => hash[:source][:id]}
+ send_data("UNFAVORITE #{Yajl::Encoder.encode(out)}\r\n")
+ end
+
+ def send_retweet(hash)
+ send_tweet(hash[:retweeted_status])
+ out = {:id => hash[:id],
+ :tweet_id => hash[:id],
+ :user_id => hash[:user][:id]}
+ send_data("RETWEET #{Yajl::Encoder.encode(out)}\r\n")
+ end
+
+ def send_delete(hash)
+ out = {:tweet_id => hash[:delete][:status][:id],
+ :user_id => hash[:delete][:status][:user_id]}
+ send_data("DELETE #{Yajl::Encoder.encode(out)}\r\n")
+ end
+
+ def post_init
+ send_data("CONNECT #{Settings.secret_key}&#{Settings.worker_number}&#{Settings.worker_count}\r\n")
+ end
+
+ def unbind
+ $logger.info("Connection closed")
+ reconnect(@options[:host], @options[:port])
+ end
+
+ def receive_data(data)
+ @receive_buf << data
+ while line = @receive_buf.slice!(/.+?\r\n/)
+ line.chomp!
+ next if line == ""
+ arg = line.split(/ /, 2)
+ case arg.first
+ when "OK"
+ $logger.info("Connected to DB Proxy")
+ when "ERROR"
+ $logger.error("Error: #{arg.last}")
+ when "ACCOUNT"
+ begin
+ p arg
+ hash = ::Yajl::Parser.parse(arg.last, :symbolize_keys => true)
+ con = EM::Twitter::Client.connect({
+ :host => "userstream.twitter.com",
+ :path => "/1.1/user.json",
+ :oauth => {:consumer_key => Settings.consumer_key,
+ :consumer_secret => Settings.consumer_secret,
+ :token => hash[:oauth_token],
+ :token_secret => hash[:oauth_token_secret]},
+ :method => "GET",
+ # user data
+ :user_id => hash[:user_id]
+ })
+
+ con.on_reconnect do |timeout, count|
+ $logger.warn("Reconnected: #{con.options[:user_id]}/#{count}")
+ end
+
+ con.on_max_reconnects do |timeout, count|
+ $logger.error("Reached Max Reconnects: #{con.options[:user_id]}")
+ end
+
+ con.on_unauthorized do
+ $logger.error("Unauthorized: #{con.options[:user_id]}")
+ @connections.delete(con)
+ con.stop
+ end
+
+ con.on_forbidden do
+ $logger.error("Forbidden: #{con.options[:user_id]}")
+ @connections.delete(con)
+ end
+
+ con.on_not_found do
+ $logger.error("Not Found: #{con.options[:user_id]}")
+ @connections.delete(con)
+ end
+
+ con.on_not_acceptable do
+ $logger.error("Not Acceptable: #{con.options[:user_id]}")
+ end
+
+ con.on_too_long do
+ $logger.error("Too Long: #{con.options[:user_id]}")
+ end
+
+ con.on_range_unacceptable do
+ $logger.error("Range Unacceptable: #{con.options[:user_id]}")
+ end
+
+ con.on_enhance_your_calm do
+ $logger.error("Enhance Your Calm: #{con.options[:user_id]}")
+ @connections.delete(con)
+ end
+
+ con.on_error do |message|
+ $logger.error("Unknown: #{con.options[:user_id]}/#{message}")
+ end
+
+ con.each do |chunk|
+ begin # convert error
+ begin
+ status = ::Yajl::Parser.parse(chunk, :symbolize_keys => true)
+ rescue ::Yajl::ParseError
+ $logger.warn("::Yajl::ParseError in stream: #{chunk}")
+ next
+ end
+
+ if status.is_a?(::Hash)
+ if status.key?(:user)
+ if status[:user][:id] == con.options[:user_id] &&
+ !status.key?(:retweeted_status)
+ send_tweet(status)
+ $logger.debug("Created Tweet")
+ elsif status.key?(:retweeted_status) &&
+ (status[:retweeted_status][:user][:id] == con.options[:user_id] ||
+ status[:user][:id] == con.options[:user_id])
+ send_retweet(status)
+ $logger.debug("Created Retweet")
+ end
+ elsif status[:event] == "favorite"
+ if status[:target_object][:user] &&
+ (!status[:target_object][:user][:protected] ||
+ status[:target_object][:user][:id] == con.options[:user_id])
+ send_favorite(status)
+ $logger.debug("Created Favorite")
+ end
+ elsif status[:event] == "unfavorite"
+ send_unfavorite(status)
+ $logger.debug("Destroyed Favorite")
+ elsif status.key?(:delete) && status[:delete].key?(:status)
+ send_delete(status)
+ $logger.debug("Destroyed Tweet: #{status[:delete][:status][:id]}/#{status[:delete][:status][:user_id]}")
+ else
+ # monyo
+ end
+ else
+ $logger.warn("Unexpected object in stream: #{status}")
+ next
+ end
+ rescue # debug
+ $logger.error($!)
+ $logger.error($@)
+ end
+ end
+
+ $logger.info("User connected: #{con.options[:user_id]}")
+ @connections << con
+ rescue ::Yajl::ParseError
+ $logger.error("JSON Parse Error: #{json}")
+ end
+ end
+ end
+ end
+
+ def stop_all
+ @connections.map(&:stop)
+ send_data("QUIT\r\n")
+ end
+ end
+
+ def initialize
+ $logger = Aclog::Logger.new(:debug)
+ end
+
+ def start
+ $logger.info("Worker ##{Settings.worker_number} started")
+ EM.run do
+ stop = Proc.new do
+ EM.stop
+ end
+ Signal.trap(:INT, &stop)
+ Signal.trap(:TERM, &stop)
+
+ EM.connect(Settings.db_proxy_host, Settings.db_proxy_port, DBProxyClient)
+ end
+ end
+end
+
+
diff --git a/config/database.yml b/config/database.yml
index b2b0606..d9682a5 100644
--- a/config/database.yml
+++ b/config/database.yml
@@ -1,16 +1,13 @@
<% #development:
-
#test:
+ #production:
+ # adapter: mysql2
+ # encoding: utf8
+ # reconnect: true
+ # database: production
+ # pool: 5
+ # username: <%= ENV["DOTCLOUD_DB_MYSQL_LOGIN"] %>
+ # password: <%= ENV["DOTCLOUD_DB_MYSQL_PASSWORD"] %>
+ # host: <%= ENV["DOTCLOUD_DB_MYSQL_HOST"] %>
+ # port: <%= ENV["DOTCLOUD_DB_MYSQL_PORT"] %>
%>
-
-production:
- adapter: mysql2
- encoding: utf8
- reconnect: true
- database: production
- pool: 5
- username: <%= ENV["DOTCLOUD_DB_MYSQL_LOGIN"] %>
- password: <%= ENV["DOTCLOUD_DB_MYSQL_PASSWORD"] %>
- host: <%= ENV["DOTCLOUD_DB_MYSQL_HOST"] %>
- port: <%= ENV["DOTCLOUD_DB_MYSQL_PORT"] %>
-
diff --git a/config/locales/en.yml b/config/locales/en.yml
index 0653957..8513e9c 100644
--- a/config/locales/en.yml
+++ b/config/locales/en.yml
@@ -1,23 +1,8 @@
-# Files in the config/locales directory are used for internationalization
-# and are automatically loaded by Rails. If you want to use locales other
-# than English, add the necessary files in this directory.
-#
-# To use the locales, use `I18n.t`:
-#
-# I18n.t 'hello'
-#
-# In views, this is aliased to just `t`:
-#
-# <%= t('hello') %>
-#
-# To use a different locale, set it with `I18n.locale`:
-#
-# I18n.locale = :es
-#
-# This would use the information in config/locales/es.yml.
-#
-# To learn more, please read the Rails Internationalization guide
-# available at http://guides.rubyonrails.org/i18n.html.
-
en:
- hello: "Hello world"
+ views:
+ pagination:
+ first: "&#171; First"
+ last: "Last &#187;"
+ previous: "&#8249; Prev"
+ next: "Next &#8250;"
+ truncate: "&#8230;"
diff --git a/config/settings.yml b/config/settings.yml
index b93d958..1081845 100644
--- a/config/settings.yml
+++ b/config/settings.yml
@@ -1,12 +1,6 @@
-<%
-# dotCloud
-env_file = "/home/dotcloud/environment.json"
-if File.exists?(env_file)
- ENV.update(Hash[JSON.parse(File.read(env_file)).map{|k, v| [k.to_s, v.to_s]}])
-end
-%>
consumer_key: <%= ENV["CONSUMER_KEY"] %>
consumer_secret: <%= ENV["CONSUMER_SECRET"] %>
-worker_count: <%= ENV["WORKER_COUNT"] %>
-dotcloud_service_name: <%= ENV["DOTCLOUD_SERVICE_NAME"] %>
-page_per: 10
+page_per: <%= ENV["ACLOG_PAGE_PER"] %>
+secret_key: <%= ENV["ACLOG_SECRET_KEY"] %>
+db_proxy_port: <%= ENV["ACLOG_DB_PROXY_PORT"] %>
+register_server_path: <%= ENV["ACLOG_REGISTER_SERVER_PATH"] %>
diff --git a/config/unicorn.rb b/config/unicorn.rb
new file mode 100644
index 0000000..d83c1c0
--- /dev/null
+++ b/config/unicorn.rb
@@ -0,0 +1,33 @@
+worker_processes 2
+
+working_directory File.expand_path("../../", __FILE__)
+
+listen "/tmp/aclog-unicorn.sock"
+
+log = "/var/log/rails/unicorn.log"
+stderr_path File.expand_path("log/unicorn.log", ENV["RAILS_ROOT"])
+stdout_path File.expand_path("log/unicorn.log", ENV["RAILS_ROOT"])
+
+preload_app true
+
+before_fork do |server, worker|
+ if defined?(ActiveRecord::Base)
+ ActiveRecord::Base.connection.disconnect!
+ end
+
+ old_pid = "#{server.config[:pid]}.old"
+ unless old_pid == server.pid
+ begin
+ Process.kill :QUIT, File.read(old_pid).to_i
+ rescue Errno::ENOENT, Errno::ESRCH
+ end
+ end
+end
+
+after_fork do |server, worker|
+ if defined?(ActiveRecord::Base)
+ ActiveRecord::Base.establish_connection
+ end
+end
+
+
diff --git a/db/migrate/20130225123010_create_accounts.rb b/db/migrate/20130225123010_create_accounts.rb
index 6856539..9e96020 100644
--- a/db/migrate/20130225123010_create_accounts.rb
+++ b/db/migrate/20130225123010_create_accounts.rb
@@ -1,10 +1,13 @@
class CreateAccounts < ActiveRecord::Migration
def change
create_table :accounts do |t|
- t.string :oauth_token
- t.string :oauth_token_secret
+ t.integer :user_id, :limit => 8, :null => false
+ t.string :oauth_token, :null => false
+ t.string :oauth_token_secret, :null => false
t.timestamps
end
+
+ add_index :accounts, :user_id, :unique => true
end
end
diff --git a/db/migrate/20130226150329_create_tweets.rb b/db/migrate/20130226150329_create_tweets.rb
index 393b26e..3c7138c 100644
--- a/db/migrate/20130226150329_create_tweets.rb
+++ b/db/migrate/20130226150329_create_tweets.rb
@@ -1,13 +1,15 @@
class CreateTweets < ActiveRecord::Migration
def change
create_table :tweets do |t|
- t.text :text
+ t.text :text, :null => false
t.text :source
- t.references :user, :limit => 8
+ t.references :user, :limit => 8, :null => false
t.datetime :tweeted_at
- t.integer :favorites_count
- t.integer :retweets_count
+ t.integer :favorites_count, :default => 0
+ t.integer :retweets_count, :default => 0
end
+
+ add_index :tweets, :user_id
end
end
diff --git a/db/migrate/20130226150829_create_favorites.rb b/db/migrate/20130226150829_create_favorites.rb
index d8b34fb..c972b92 100644
--- a/db/migrate/20130226150829_create_favorites.rb
+++ b/db/migrate/20130226150829_create_favorites.rb
@@ -1,8 +1,11 @@
class CreateFavorites < ActiveRecord::Migration
def change
create_table :favorites do |t|
- t.references :tweet, :limit => 8
- t.references :user, :limit => 8
+ t.references :tweet, :limit => 8, :null => false
+ t.references :user, :limit => 8, :null => false
end
+
+ add_index :favorites, [:tweet_id, :user_id], :unique => true
+ add_index :favorites, :tweet_id
end
end
diff --git a/db/migrate/20130226151042_create_retweets.rb b/db/migrate/20130226151042_create_retweets.rb
index a2eb3cf..6de664e 100644
--- a/db/migrate/20130226151042_create_retweets.rb
+++ b/db/migrate/20130226151042_create_retweets.rb
@@ -1,8 +1,10 @@
class CreateRetweets < ActiveRecord::Migration
def change
create_table :retweets do |t|
- t.references :tweet, :limit => 8
- t.references :user, :limit => 8
+ t.references :tweet, :limit => 8, :null => false
+ t.references :user, :limit => 8, :null => false
end
+
+ add_index :retweets, :tweet_id
end
end
diff --git a/db/schema.rb b/db/schema.rb
index 72fb1e1..4ec571b 100644
--- a/db/schema.rb
+++ b/db/schema.rb
@@ -14,29 +14,37 @@
ActiveRecord::Schema.define(version: 20130226151042) do
create_table "accounts", force: true do |t|
- t.string "oauth_token"
- t.string "oauth_token_secret"
+ t.integer "user_id", limit: 8, null: false
+ t.string "oauth_token", null: false
+ t.string "oauth_token_secret", null: false
t.datetime "created_at"
t.datetime "updated_at"
end
+ add_index "accounts", ["user_id"], name: "index_accounts_on_user_id", unique: true
+
create_table "favorites", force: true do |t|
- t.integer "tweet_id", limit: 8
- t.integer "user_id", limit: 8
+ t.integer "tweet_id", limit: 8, null: false
+ t.integer "user_id", limit: 8, null: false
end
+ add_index "favorites", ["tweet_id", "user_id"], name: "index_favorites_on_tweet_id_and_user_id", unique: true
+ add_index "favorites", ["tweet_id"], name: "index_favorites_on_tweet_id"
+
create_table "retweets", force: true do |t|
- t.integer "tweet_id", limit: 8
- t.integer "user_id", limit: 8
+ t.integer "tweet_id", limit: 8, null: false
+ t.integer "user_id", limit: 8, null: false
end
+ add_index "retweets", ["tweet_id"], name: "index_retweets_on_tweet_id"
+
create_table "tweets", force: true do |t|
- t.text "text"
+ t.text "text", null: false
t.text "source"
- t.integer "user_id", limit: 8
+ t.integer "user_id", limit: 8, null: false
t.datetime "tweeted_at"
- t.integer "favorites_count"
- t.integer "retweets_count"
+ t.integer "favorites_count", default: 0
+ t.integer "retweets_count", default: 0
end
create_table "users", force: true do |t|
diff --git a/lib/receiver/worker.rb b/lib/receiver/worker.rb
index 5d5f60b..01a73cd 100644
--- a/lib/receiver/worker.rb
+++ b/lib/receiver/worker.rb
@@ -1,252 +1,204 @@
require "time"
-class Receiver::Worker
- def initialize
- @logger = Receiver::Logger.new(:info)
- @connections = []
-
- _tm = Settings.dotcloud_service_name.split(/_/).last
- if _tm =~ /^[0-9]+$/
- @worker_number = Integer(_tm)
- else
- exit(0)
+module EM
+ class Connection
+ def send_chunk(data)
+ send_data(data + "\r\n")
end
end
+end
- # Create Aclog format text from Twitter Status Hash
- def format_text_from_hash(hash)
- text = hash[:text]
- entities = hash[:entities]
-
- return text unless entities
-
- gaps = {}
- replace = -> ents, bl do
- ents.each do |entity|
- starts = entity[:indices].first
- ends = entity[:indices].last
- rep = bl.call(entity)
- gaps[starts] = rep.size - (ends - starts)
- bgap = gaps.select{|k, v| k < starts}.values.inject(0){|s, m| s += m}
- text[starts + bgap...ends + bgap] = rep
+class Receiver::Worker
+ class DBProxyServer < EM::Connection
+ $worker_count = nil
+ @@wq = EM::WorkQueue::WorkQueue.new do |arg|
+ begin
+ json = ::Yajl::Parser.parse(arg.last, :symbolize_keys => true)
+ rescue ::Yajl::ParseError
+ # JSON parse error....??
+ p $!
end
- end
- replace.call((entities[:media] || []) + (entities[:urls] || []),
- -> entity {"<url:#{CGI.escapeHTML(entity[:expanded_url])}:#{CGI.escapeHTML(entity[:display_url])}>"})
- replace.call(entities[:hashtags] || [],
- -> entity {"<hashtag:#{CGI.escapeHTML(URI.encode(entity[:text]))}>"})
- replace.call(entities[:user_mentions] || [],
- -> entity {"<mention:#{CGI.escapeHTML(URI.encode(entity[:screen_name]))}>"})
-
- return text
- end
-
- def rescue_duplicate(&proc)
- begin
- return proc.call
- rescue ActiveRecord::StatementInvalid => e
- if e.to_s =~ /^Mysql2::Error: Duplicate entry /
- @logger.info("Duplicate entry..")
+ case arg.first
+ when "USER"
+ $logger.debug("Received User")
+ rec = User.find_or_initialize_by(:id => json[:id])
+ rec.screen_name = json[:screen_name]
+ rec.name = json[:name]
+ rec.profile_image_url = json[:profile_image_url]
+ rec.save! if rec.changed?
+ when "TWEET"
+ $logger.debug("Received Tweet")
+ begin
+ Tweet.create!(:id => json[:id],
+ :text => json[:text],
+ :source => json[:source],
+ :tweeted_at => Time.parse(json[:tweeted_at]),
+ :user_id => json[:user_id])
+ $logger.debug("Saved Tweet")
+ rescue ActiveRecord::RecordNotUnique
+ $logger.info("Can't Save Tweet: Duplicate")
+ end
+ when "FAVORITE"
+ $logger.debug("Received Favorite")
+ begin
+ Favorite.create!(:tweet_id => json[:tweet_id],
+ :user_id => json[:user_id])
+ $logger.debug("Saved Favorite")
+ rescue ActiveRecord::RecordNotUnique
+ $logger.info("Can't Save Tweet: Duplicate")
+ end
+ when "UNFAVORITE"
+ Favorite.delete_all("tweet_id = #{json[:tweet_id]} AND user_id = #{json[:user_id]}")
+ when "RETWEET"
+ $logger.debug("Received Retweet")
+ begin
+ Retweet.create!(:id => json[:id],
+ :tweet_id => json[:tweet_id],
+ :user_id => json[:user_id])
+ $logger.debug("Saved Retweet")
+ rescue ActiveRecord::RecordNotUnique
+ $logger.info("Can't Save Retweet: Duplicate")
+ end
+ when "DELETE"
+ tweet = Tweet.find_by(:id => json[:id]) || Retweet.find_by(:id => json[:id])
+ if tweet
+ tweet.destroy
+ end
else
- raise e
+ # ???
+ puts "???????"
end
end
- end
+ @@wq.start
- # Create or Update user by Twitter User Hash
- def create_user_from_hash(user)
- rescue_duplicate do
- rec = User.find_or_initialize_by(:id => user[:id])
- rec.screen_name = user[:screen_name]
- rec.name = user[:name]
- rec.profile_image_url = user[:profile_image_url_https]
- rec.save! if rec.changed?
-
- return rec
+ def initialize
+ @worker_number = nil
+ @receive_buf = ""
end
- end
-
- # Create tweet by Twitter Status Hash
- def create_tweet_from_hash(status)
- rescue_duplicate do
- Tweet.find_by(:id => status[:id]) ||
- Tweet.create!(:id => status[:id],
- :text => format_text_from_hash(status),
- :source => status[:source],
- :tweeted_at => Time.parse(status[:created_at]),
- :user => create_user_from_hash(status[:user]))
- end
- end
-
- def destroy_tweet_from_hash(status)
- Tweet.delete(status[:delete][:status][:id]) ||
- Retweet.delete(status[:delete][:status][:id])
- end
- # Create Retweet by Twitter Status Hash
- def create_retweet_from_hash(status)
- rescue_duplicate do
- Retweet.find_by(:id => status[:id]) ||
- Retweet.create!(:id => status[:id],
- :tweet => create_tweet_from_hash(status[:retweeted_status]),
- :user => create_user_from_hash(status[:user]))
+ def post_init
+ # なにもしない。クライアントが
end
- end
- # Create Favorite by Streaming Event Hash
- def create_favorite_from_hash(status)
- rescue_duplicate do
- user = create_user_from_hash(status[:source])
- user.favorites.find_by(:tweet_id => status[:target_object][:id]) ||
- user.favorites.create!(:tweet => create_tweet_from_hash(status[:target_object]))
+ def unbind
+ $connections.delete_if{|k, v| v == self}
+ $logger.info("Connection closed")
end
- end
-
- def destroy_favorite_from_hash(status)
- create_tweet_from_hash(status[:target_object])
- .favorites.where(:user_id => status[:source][:id]).delete_all
- end
- def start
- @logger.info("Worker ##{@worker_number} started")
- EM.run do
- stop = Proc.new do
- @connections.map(&:stop)
- EM.stop
+ def send_account_all
+ Account.where("user_id % ? = ?", $worker_count, @worker_number).each do |account|
+ puts "Sent #{account.id}/#{account.user_id}"
+ send_account(account)
end
- Signal.trap(:INT, &stop)
- Signal.trap(:TERM, &stop)
-
- register = -> account do
- con = EM::Twitter::Client.connect({
- :host => "userstream.twitter.com",
- :path => "/1.1/user.json",
- :oauth => {:consumer_key => Settings.consumer_key,
- :consumer_secret => Settings.consumer_secret,
- :token => account.oauth_token,
- :token_secret => account.oauth_token_secret},
- :method => "GET",
- # user data
- :user_id => account.id
- })
-
- con.on_reconnect do |timeout, count|
- @logger.warn("Reconnected: #{con.options[:user_id]}/#{count}")
- end
-
- con.on_max_reconnects do |timeout, count|
- @logger.error("Reached Max Reconnects: #{con.options[:user_id]}")
- end
-
- con.on_unauthorized do
- @logger.error("Unauthorized: #{con.options[:user_id]}")
- @connections.delete(con)
- end
-
- con.on_forbidden do
- @logger.error("Forbidden: #{con.options[:user_id]}")
- @connections.delete(con)
- end
-
- con.on_not_found do
- @logger.error("Not Found: #{con.options[:user_id]}")
- @connections.delete(con)
- end
-
- con.on_not_acceptable do
- @logger.error("Not Acceptable: #{con.options[:user_id]}")
- end
-
- con.on_too_long do
- @logger.error("Too Long: #{con.options[:user_id]}")
- end
+ end
- con.on_range_unacceptable do
- @logger.error("Range Unacceptable: #{con.options[:user_id]}")
- end
+ def send_account(account)
+ send_chunk("ACCOUNT #{Yajl::Encoder.encode(account.attributes)}")
+ end
- con.on_enhance_your_calm do
- @logger.error("Enhance Your Calm: #{con.options[:user_id]}")
- @connections.delete(con)
+ def receive_data(data)
+ @receive_buf << data
+ while line = @receive_buf.slice!(/.+?\r\n/)
+ line.chomp!
+ next if line == ""
+ p line
+ arg = line.split(/ /, 2)
+ case arg.first
+ when "CONNECT"
+ ff = arg.last.split(/&/)
+ secret_token = ff[0]
+ worker_number = ff[1].to_i
+ worker_count = ff[2].to_i
+ if secret_token == Settings.secret_key
+ if $worker_count != worker_count && $connections.size > 0
+ $logger.error("Error: Worker Count Difference: $worker_count=#{$worker_count}, worker_count=#{worker_count}")
+ send_chunk("ERROR Invalid Worker Count")
+ close_connection_after_writing
+ else
+ $worker_count = worker_count
+ $connections[worker_number] = self
+ @worker_number = worker_number
+ $logger.info("Connected: #{worker_number}")
+ send_chunk("OK Connected")
+ send_account_all
+ end
+ else
+ $logger.error("Error: Invalid Secret Key")
+ send_chunk("ERROR Invalid Secret Token")
+ close_connection_after_writing
+ end
+ when "QUIT"
+ send_chunk("BYE")
+ close_connection_after_writing
+ else
+ @@wq.push arg
end
+ end
+ end
+ end
- con.on_error do |message|
- @logger.error("Unknown: #{con.options[:user_id]}/#{message}")
- end
+ class RegisterServer < EM::Connection
+ def initialize
+ @receive_buf = ""
+ end
- con.each do |json|
- begin # convert error
- begin
- status = ::Yajl::Parser.parse(json, :symbolize_keys => true)
- rescue ::Yajl::ParseError
- @logger.warn("::Yajl::ParseError in stream: #{json}")
- next
- end
+ def post_init
+ p "connected"
+ end
- if status.is_a?(::Hash)
- if status.key?(:user)
- # Tweet or Retweet
- if status[:user][:id] == con.options[:user_id] &&
- !status.key?(:retweeted_status)
- # Tweet
- create_tweet_from_hash(status)
- @logger.debug("Created Tweet")
- elsif status.key?(:retweeted_status) &&
- (status[:retweeted_status][:user][:id] == con.options[:user_id] ||
- status[:user][:id] == con.options[:user_id])
- # Retweet
- create_retweet_from_hash(status)
- @logger.debug("Created Retweet")
- end
- elsif status[:event] == "favorite"
- # Favorite
- create_favorite_from_hash(status)
- @logger.debug("Created Favorite")
- elsif status[:event] == "unfavorite"
- # Unfavorite
- destroy_favorite_from_hash(status)
- @logger.debug("Destroyed Favorite")
- elsif status.key?(:delete) && status[:delete].key?(:status)
- # Delete
- destroy_tweet_from_hash(status)
- @logger.debug("Destroyed Tweet")
+ def receive_data(data)
+ @receive_buf << data
+ while line = @receive_buf.slice!(/.+?\r\n/)
+ line.chomp!
+ next if line == ""
+ p line
+ sp = line.split(/ /, 2)
+ if sp.first == "REGISTER"
+ if sp.last =~ /^[0-9]+$/
+ account = Account.find_by(:id => sp.last.to_i)
+ if account
+ if con = $connections[account.id % $worker_count]
+ con.send_account(account)
+ send_chunk("OK Registered")
else
- # Else - do nothing
- # p status
+ send_chunk("OK Worker not found")
end
else
- @logger.warn("Unexpected object in stream: #{status}")
- next
+ $logger.error("Unknown account: #{sp.last}")
+ send_chunk("ERROR Unknown Account")
end
- rescue # debug
- @logger.error($!)
- @logger.error($@)
- end
- end
-
- @logger.info("User connected: #{con.options[:user_id]}")
- @connections << con
- end
-
- reconnect = -> do
- Account.where("id % #{Settings.worker_count} = #{@worker_number}").each do |account|
- #Account.find_by_sql("SELECT * FROM accounts WHERE id % #{Settings.worker_count} = #{@worker_number}").each do |account|
- if con = @connections.find{|m| m.options[:user_id] == account.id}
- con.immediate_reconnect
else
- register.call(account)
+ $logger.error("Invalid User ID")
+ send_chunk("ERROR Invalid User ID")
end
+ else
+ $logger.error("Unknown Command: #{sp})")
+ send_chunk("ERROR Unknown command")
end
+ close_connection_after_writing
+ return
end
+ end
+ end
+
+ def initialize
+ $logger = Receiver::Logger.new(:info)
+ $connections = {}
+ end
- EM.add_periodic_timer(30 * 60) do
- reconnect.call
+ def start
+ $logger.info("Database Proxy Started")
+ EM.run do
+ stop = Proc.new do
+ EM.stop
end
+ Signal.trap(:INT, &stop)
+ Signal.trap(:TERM, &stop)
- reconnect.call
+ EM.start_server("0.0.0.0", Settings.db_proxy_port, DBProxyServer)
+ EM.start_unix_domain_server(Settings.register_server_path, RegisterServer)
end
end
end
-
diff --git a/worker.sh b/worker.sh
deleted file mode 100755
index 837ab11..0000000
--- a/worker.sh
+++ /dev/null
@@ -1,5 +0,0 @@
-#!/bin/sh
-
-bundle exec rails runner Receiver::Worker.new.start
-
-