diff options
author | Kazuki Yamaguchi <k@rhe.jp> | 2015-06-19 01:44:27 +0900 |
---|---|---|
committer | Kazuki Yamaguchi <k@rhe.jp> | 2015-06-19 01:44:27 +0900 |
commit | a41781c281e79aa79524a7d3aa3a3ced36a78eaf (patch) | |
tree | 05a355b63499408efd108eed55934b1d5e8c1701 | |
parent | bbe05db50cc521c25a57dd9cafb7f7bd50410554 (diff) | |
parent | 3f6d626e67ab827dc8c18d26c0fd9334fc9a4499 (diff) | |
download | aclog-a41781c281e79aa79524a7d3aa3a3ced36a78eaf.tar.gz |
Merge branch 'master' into collector-proxy
-rw-r--r-- | Gemfile.lock | 144 | ||||
-rw-r--r-- | README.md | 1 | ||||
-rw-r--r-- | app/api/api.rb | 17 | ||||
-rw-r--r-- | app/assets/javascripts/parts/sidebar_user_stats.coffee | 2 | ||||
-rw-r--r-- | app/assets/javascripts/tweets.coffee.erb | 2 | ||||
-rw-r--r-- | app/controllers/apidocs_controller.rb | 12 | ||||
-rw-r--r-- | app/controllers/application_controller.rb | 7 | ||||
-rw-r--r-- | app/controllers/sessions_controller.rb | 16 | ||||
-rw-r--r-- | app/controllers/tweets_controller.rb | 2 | ||||
-rw-r--r-- | app/controllers/users_controller.rb | 11 | ||||
-rw-r--r-- | app/models/favorite.rb | 41 | ||||
-rw-r--r-- | app/models/retweet.rb | 40 | ||||
-rw-r--r-- | app/views/tweets/i_responses.json.jbuilder (renamed from app/views/tweets/responses.json.jbuilder) | 0 | ||||
-rw-r--r-- | config/routes.rb | 3 | ||||
-rw-r--r-- | lib/settings.rb (renamed from app/models/settings.rb) | 0 | ||||
-rw-r--r-- | worker_node/Gemfile.lock | 2 | ||||
-rw-r--r-- | worker_node/lib/user_connection.rb | 8 | ||||
-rw-r--r-- | worker_node/lib/user_stream/client.rb | 64 | ||||
-rw-r--r-- | worker_node/lib/worker_node.rb | 2 | ||||
-rw-r--r-- | worker_node/settings.yml.example | 2 |
20 files changed, 201 insertions, 175 deletions
diff --git a/Gemfile.lock b/Gemfile.lock index c501ebd..756a0b4 100644 --- a/Gemfile.lock +++ b/Gemfile.lock @@ -1,38 +1,38 @@ GEM remote: https://rubygems.org/ specs: - actionmailer (4.2.1) - actionpack (= 4.2.1) - actionview (= 4.2.1) - activejob (= 4.2.1) + actionmailer (4.2.2) + actionpack (= 4.2.2) + actionview (= 4.2.2) + activejob (= 4.2.2) mail (~> 2.5, >= 2.5.4) rails-dom-testing (~> 1.0, >= 1.0.5) - actionpack (4.2.1) - actionview (= 4.2.1) - activesupport (= 4.2.1) + actionpack (4.2.2) + actionview (= 4.2.2) + activesupport (= 4.2.2) rack (~> 1.6) rack-test (~> 0.6.2) rails-dom-testing (~> 1.0, >= 1.0.5) rails-html-sanitizer (~> 1.0, >= 1.0.1) - actionview (4.2.1) - activesupport (= 4.2.1) + actionview (4.2.2) + activesupport (= 4.2.2) builder (~> 3.1) erubis (~> 2.7.0) rails-dom-testing (~> 1.0, >= 1.0.5) rails-html-sanitizer (~> 1.0, >= 1.0.1) - activejob (4.2.1) - activesupport (= 4.2.1) + activejob (4.2.2) + activesupport (= 4.2.2) globalid (>= 0.3.0) - activemodel (4.2.1) - activesupport (= 4.2.1) + activemodel (4.2.2) + activesupport (= 4.2.2) builder (~> 3.1) - activerecord (4.2.1) - activemodel (= 4.2.1) - activesupport (= 4.2.1) + activerecord (4.2.2) + activemodel (= 4.2.2) + activesupport (= 4.2.2) arel (~> 6.0) - activerecord-import (0.7.0) + activerecord-import (0.8.0) activerecord (>= 3.0) - activesupport (4.2.1) + activesupport (4.2.2) i18n (~> 0.7) json (~> 1.7, >= 1.7.7) minitest (~> 5.1) @@ -40,7 +40,7 @@ GEM tzinfo (~> 1.1) addressable (2.3.8) arel (6.0.0) - autoprefixer-rails (5.1.11) + autoprefixer-rails (5.2.0.1) execjs json axiom-types (0.1.1) @@ -49,12 +49,12 @@ GEM thread_safe (~> 0.3, >= 0.3.1) binding_of_caller (0.7.2) debug_inspector (>= 0.0.1) - bootstrap-sass (3.3.4.1) + bootstrap-sass (3.3.5) autoprefixer-rails (>= 5.0.0.1) sass (>= 3.2.19) buftok (0.2.0) builder (3.2.2) - byebug (4.0.5) + byebug (5.0.0) columnize (= 0.9.0) celluloid (0.16.0) timers (~> 4.0.0) @@ -103,7 +103,7 @@ GEM formatador (0.2.5) globalid (0.3.5) activesupport (>= 4.1.0) - grape (0.11.0) + grape (0.12.0) activesupport builder hashie (>= 2.1.0) @@ -118,7 +118,7 @@ GEM i18n rabl tilt - guard (2.12.5) + guard (2.12.6) formatador (>= 0.2.4) listen (~> 2.7) lumberjack (~> 1.0) @@ -128,7 +128,7 @@ GEM shellany (~> 0.0) thor (>= 0.18.1) guard-compat (1.2.1) - guard-rspec (4.5.0) + guard-rspec (4.5.2) guard (~> 2.1) guard-compat (~> 1.1) rspec (>= 2.99.0, < 4.0) @@ -140,7 +140,7 @@ GEM haml (>= 4.0.6, < 5.0) html2haml (>= 1.0.1) railties (>= 4.0.1) - hashie (3.4.1) + hashie (3.4.2) hitimes (1.2.2) hodel_3000_compliant_logger (0.1.1) html2haml (2.0.0) @@ -155,15 +155,15 @@ GEM http_parser.rb (0.6.0) i18n (0.7.0) ice_nine (0.11.1) - jbuilder (2.2.13) + jbuilder (2.3.0) activesupport (>= 3.0.0, < 5) multi_json (~> 1.2) - jquery-rails (4.0.3) + jquery-rails (4.0.4) rails-dom-testing (~> 1.0) railties (>= 4.2.0) thor (>= 0.14, < 2.0) - json (1.8.2) - listen (2.10.0) + json (1.8.3) + listen (2.10.1) celluloid (~> 0.16.0) rb-fsevent (>= 0.9.3) rb-inotify (>= 0.9) @@ -175,14 +175,14 @@ GEM memoizable (0.4.2) thread_safe (~> 0.3, >= 0.3.1) method_source (0.8.2) - mime-types (2.5) + mime-types (2.6.1) mini_portile (0.6.2) - minitest (5.6.1) - msgpack (0.5.11) + minitest (5.7.0) + msgpack (0.5.12) msgpack-rpc (0.5.3) cool.io (~> 1.2.4) msgpack (~> 0.5.10) - multi_json (1.11.0) + multi_json (1.11.1) multi_xml (0.5.5) multipart-post (2.0.0) mysql2 (0.3.18) @@ -213,29 +213,29 @@ GEM slop (~> 3.4) pry-rails (0.3.4) pry (>= 0.9.10) - puma (2.11.2) + puma (2.11.3) rack (>= 1.1, < 2.0) quiet_assets (1.1.0) railties (>= 3.1, < 5.0) rabl (0.11.6) activesupport (>= 2.3.14) - rack (1.6.1) + rack (1.6.2) rack-accept (0.4.5) rack (>= 0.4) rack-mount (0.8.3) rack (>= 1.0.0) rack-test (0.6.3) rack (>= 1.0) - rails (4.2.1) - actionmailer (= 4.2.1) - actionpack (= 4.2.1) - actionview (= 4.2.1) - activejob (= 4.2.1) - activemodel (= 4.2.1) - activerecord (= 4.2.1) - activesupport (= 4.2.1) + rails (4.2.2) + actionmailer (= 4.2.2) + actionpack (= 4.2.2) + actionview (= 4.2.2) + activejob (= 4.2.2) + activemodel (= 4.2.2) + activerecord (= 4.2.2) + activesupport (= 4.2.2) bundler (>= 1.3.0, < 2.0) - railties (= 4.2.1) + railties (= 4.2.2) sprockets-rails rails-deprecated_sanitizer (1.0.3) activesupport (>= 4.2.0.alpha) @@ -245,44 +245,44 @@ GEM rails-deprecated_sanitizer (>= 1.0.1) rails-html-sanitizer (1.0.2) loofah (~> 2.0) - railties (4.2.1) - actionpack (= 4.2.1) - activesupport (= 4.2.1) + railties (4.2.2) + actionpack (= 4.2.2) + activesupport (= 4.2.2) rake (>= 0.8.7) thor (>= 0.18.1, < 2.0) rake (10.4.2) - rb-fsevent (0.9.4) + rb-fsevent (0.9.5) rb-inotify (0.9.5) ffi (>= 0.5.0) rest-client (1.8.0) http-cookie (>= 1.0.2, < 2.0) mime-types (>= 1.16, < 3.0) netrc (~> 0.7) - rspec (3.2.0) - rspec-core (~> 3.2.0) - rspec-expectations (~> 3.2.0) - rspec-mocks (~> 3.2.0) - rspec-core (3.2.3) - rspec-support (~> 3.2.0) - rspec-expectations (3.2.1) + rspec (3.3.0) + rspec-core (~> 3.3.0) + rspec-expectations (~> 3.3.0) + rspec-mocks (~> 3.3.0) + rspec-core (3.3.1) + rspec-support (~> 3.3.0) + rspec-expectations (3.3.0) diff-lcs (>= 1.2.0, < 2.0) - rspec-support (~> 3.2.0) - rspec-mocks (3.2.1) + rspec-support (~> 3.3.0) + rspec-mocks (3.3.0) diff-lcs (>= 1.2.0, < 2.0) - rspec-support (~> 3.2.0) - rspec-rails (3.2.1) + rspec-support (~> 3.3.0) + rspec-rails (3.3.2) actionpack (>= 3.0, < 4.3) activesupport (>= 3.0, < 4.3) railties (>= 3.0, < 4.3) - rspec-core (~> 3.2.0) - rspec-expectations (~> 3.2.0) - rspec-mocks (~> 3.2.0) - rspec-support (~> 3.2.0) - rspec-support (3.2.2) - ruby_parser (3.6.6) + rspec-core (~> 3.3.0) + rspec-expectations (~> 3.3.0) + rspec-mocks (~> 3.3.0) + rspec-support (~> 3.3.0) + rspec-support (3.3.0) + ruby_parser (3.7.0) sexp_processor (~> 4.1) safe_yaml (1.0.4) - sass (3.4.13) + sass (3.4.14) sass-rails (5.0.3) railties (>= 4.0.0, < 5.0) sass (~> 3.1) @@ -290,7 +290,7 @@ GEM sprockets-rails (>= 2.0, < 4.0) tilt (~> 1.1) settingslogic (2.0.9) - sexp_processor (4.5.1) + sexp_processor (4.6.0) shellany (0.0.1) simple_oauth (0.3.1) simplecov (0.10.0) @@ -302,13 +302,13 @@ GEM spring (1.3.6) spring-commands-rspec (1.0.4) spring (>= 0.9.1) - sprockets (3.1.0) + sprockets (3.2.0) rack (~> 1.0) sprockets-rails (2.3.1) actionpack (>= 3.0) activesupport (>= 3.0) sprockets (>= 2.8, < 4.0) - term-ansicolor (1.3.0) + term-ansicolor (1.3.1) tins (~> 1.0) thin (1.6.3) daemons (~> 1.0, >= 1.0.9) @@ -319,7 +319,7 @@ GEM tilt (1.4.1) timers (4.0.1) hitimes - tins (1.5.1) + tins (1.5.2) twitter (5.14.0) addressable (~> 2.3) buftok (~> 0.2.0) @@ -335,7 +335,7 @@ GEM unf (~> 0.1.0) tzinfo (1.2.2) thread_safe (~> 0.1) - tzinfo-data (1.2015.4) + tzinfo-data (1.2015.5) tzinfo (>= 1.0.0) uglifier (2.7.1) execjs (>= 0.3.0) @@ -348,7 +348,7 @@ GEM coercible (~> 1.0) descendants_tracker (~> 0.0, >= 0.0.3) equalizer (~> 0.0, >= 0.0.9) - web-console (2.1.2) + web-console (2.1.3) activemodel (>= 4.0) binding_of_caller (>= 0.7.2) railties (>= 4.0) @@ -29,6 +29,7 @@ Collects favs and retweets in real time by UserStreams. * Atom feed ## Requirements +* Linux (WorkerNode optionally needs epoll) * Ruby 2.2+ * MySQL/MariaDB 5.5.14+ (needs utf8mb4 support) * memcached diff --git a/app/api/api.rb b/app/api/api.rb index 588e755..3183271 100644 --- a/app/api/api.rb +++ b/app/api/api.rb @@ -50,4 +50,21 @@ class Api < Grape::API route :any, "*path", ignore: true do raise Aclog::Exceptions::NotFound end + + class << self + def docs + Rails.cache.fetch("apidocs") do + {}.tap do |h| + Api.routes.each {|route| + next if route.route_ignore + next if route.route_method == "HEAD" + method = route.route_method + namespace = route.route_namespace.sub(/^\//, "") + path = route.route_path.split("/", 3).last.sub(/\(\.:format\)$/, "") + ((h[method] ||= {})[namespace] ||= {})[path] = route + } + end + end + end + end end diff --git a/app/assets/javascripts/parts/sidebar_user_stats.coffee b/app/assets/javascripts/parts/sidebar_user_stats.coffee index cc880d5..2b3f078 100644 --- a/app/assets/javascripts/parts/sidebar_user_stats.coffee +++ b/app/assets/javascripts/parts/sidebar_user_stats.coffee @@ -10,7 +10,7 @@ Parts.sidebar_user_stats = -> Math.round(this.stats.reactions_count / this.stats.tweets_count * 100) / 100 superagent - .get "/" + Helpers.user_screen_name() + "/stats" + .get "/i/api/users/stats?screen_name=" + Helpers.user_screen_name() .accept "json" .end (err, res) -> vm.stats = res.body diff --git a/app/assets/javascripts/tweets.coffee.erb b/app/assets/javascripts/tweets.coffee.erb index cd40568..87f333c 100644 --- a/app/assets/javascripts/tweets.coffee.erb +++ b/app/assets/javascripts/tweets.coffee.erb @@ -42,7 +42,7 @@ Views.tweets = if status.allowed && status.reactions_count > 0 status.loading = true superagent - .get "/i/" + status.id_str + "/responses" + .get "/i/api/tweets/responses?id=" + status.id_str .accept "json" .end (rerr, rres) -> rjson = rres.body diff --git a/app/controllers/apidocs_controller.rb b/app/controllers/apidocs_controller.rb index b22ecf3..3057405 100644 --- a/app/controllers/apidocs_controller.rb +++ b/app/controllers/apidocs_controller.rb @@ -12,17 +12,7 @@ class ApidocsController < ApplicationController private def set_apidocs - @apidocs = Rails.cache.fetch("apidocs", expired_in: 1.days) do - h = {} - Api.routes.reject {|r| r.route_ignore }.each {|route| - next if route.route_method == "HEAD" - method = route.route_method - namespace = route.route_namespace.sub(/^\//, "") - path = route.route_path.split("/", 3).last.sub(/\(\.:format\)$/, "") - ((h[method] ||= {})[namespace] ||= {})[path] = route - } - h - end + @apidocs = Api.docs end def set_sidebar diff --git a/app/controllers/application_controller.rb b/app/controllers/application_controller.rb index 7700a2c..15cc108 100644 --- a/app/controllers/application_controller.rb +++ b/app/controllers/application_controller.rb @@ -17,11 +17,10 @@ class ApplicationController < ActionController::Base end def current_user - @_current_user ||= begin + @_current_user ||= if logged_in? User.find(session[:user_id]) end - end end def authorized_to_show_user?(user) @@ -39,4 +38,8 @@ class ApplicationController < ActionController::Base end object end + + def safe_redirect?(to) + to[0] == "/" && !to.include?("//") + end end diff --git a/app/controllers/sessions_controller.rb b/app/controllers/sessions_controller.rb index f5d609c..335d84e 100644 --- a/app/controllers/sessions_controller.rb +++ b/app/controllers/sessions_controller.rb @@ -5,11 +5,6 @@ class SessionsController < ApplicationController account = Account.register(user_id: auth.uid, oauth_token: auth.credentials.token, oauth_token_secret: auth.credentials.secret) - begin - WorkerManager.update_account(account) - rescue Aclog::Exceptions::WorkerConnectionError - end - User.create_or_update_from_json( { id: account.user_id, screen_name: auth.extra.raw_info.screen_name, @@ -17,13 +12,18 @@ class SessionsController < ApplicationController profile_image_url_https: auth.extra.raw_info.profile_image_url_https, protected: auth.extra.raw_info.protected }) + begin + WorkerManager.update_account(account) + rescue Aclog::Exceptions::WorkerConnectionError + end + session[:user_id] = account.user_id to = request.env["omniauth.params"]["redirect_after_login"].to_s - if to == "/" || to[0] != "/" || to.include?("//") - redirect_to user_path(auth.extra.raw_info.screen_name) - else + if safe_redirect?(to) redirect_to to + else + redirect_to user_path(auth.extra.raw_info.screen_name) end end diff --git a/app/controllers/tweets_controller.rb b/app/controllers/tweets_controller.rb index d994a0f..8708898 100644 --- a/app/controllers/tweets_controller.rb +++ b/app/controllers/tweets_controller.rb @@ -15,7 +15,7 @@ class TweetsController < ApplicationController redirect_to tweet end - def responses + def i_responses authorize! @tweet = Tweet.find(params[:id]) end diff --git a/app/controllers/users_controller.rb b/app/controllers/users_controller.rb index fa0de47..3e95fb6 100644 --- a/app/controllers/users_controller.rb +++ b/app/controllers/users_controller.rb @@ -17,15 +17,14 @@ class UsersController < ApplicationController @sidebars = [:user] end + def i_stats + user = User.find(screen_name: params[:screen_name]) + render json: user.stats.to_h + end + def i_suggest_screen_name - sleep 1 if Rails.env.development? users = User.suggest_screen_name(params[:head].to_s).limit(10) filtered = users.map {|user| { name: user.name, screen_name: user.screen_name, profile_image_url: user.profile_image_url(:mini) } } render json: filtered end - - def stats - user = User.find(screen_name: params[:screen_name]) - render json: user.stats.to_h - end end diff --git a/app/models/favorite.rb b/app/models/favorite.rb index 701b8ca..5962f1c 100644 --- a/app/models/favorite.rb +++ b/app/models/favorite.rb @@ -2,30 +2,31 @@ class Favorite < ActiveRecord::Base belongs_to :tweet belongs_to :user - # Registers favorite event in bulk from an array of Streaming API events. - # This method doesn't update Tweet#reactions_count. - # - # @param [Array] array An array of Streaming API events. - def self.create_bulk_from_json(array) - return if array.empty? + class << self + # Registers favorite event in bulk from an array of Streaming API events. + # This method doesn't update Tweet#reactions_count. + # + # @param [Array] array An array of Streaming API events. + def create_bulk_from_json(array) + return if array.empty? - objects = array.map do |json| - { - user_id: json[:source][:id], - tweet_id: json[:target_object][:id] + keys = [:user_id, :tweet_id] + objects = array.map {|json| + [json[:source][:id], json[:target_object][:id]] } - end - self.import(objects.first.keys, objects.map(&:values), ignore: true) - end + import(keys, objects, ignore: true) + end - # Unregisters favorite event in bulk from an array of Streaming API 'unfavorite' events. - # This method doesn't update Tweet#reactions_count. - # - # @param [Array] array An array of Streaming API events. - def self.delete_bulk_from_json(array) - array.each do |json| - self.delete_all(user_id: json[:source][:id], tweet_id: json[:target_object][:id]) + # Unregisters favorite event in bulk from an array of Streaming API 'unfavorite' events. + # This method doesn't update Tweet#reactions_count. + # + # @param [Array] array An array of Streaming API events. + def delete_bulk_from_json(array) + array.each do |json| + delete_all(user_id: json[:source][:id], + tweet_id: json[:target_object][:id]) + end end end end diff --git a/app/models/retweet.rb b/app/models/retweet.rb index cefd901..b67bd72 100644 --- a/app/models/retweet.rb +++ b/app/models/retweet.rb @@ -2,29 +2,29 @@ class Retweet < ActiveRecord::Base belongs_to :tweet belongs_to :user - # Registers retweet event in bulk from an array of Streaming API messages. - # This doesn't update Tweet#reactions_count. - # - # @param [Array] array An array of Streaming API messages. - def self.create_bulk_from_json(array) - return if array.empty? + class << self + # Registers retweet event in bulk from an array of Streaming API messages. + # This doesn't update Tweet#reactions_count. + # + # @param [Array] array An array of Streaming API messages. + def create_bulk_from_json(array) + return if array.empty? - objects = array.map do |json| - { - id: json[:id], - user_id: json[:user][:id], - tweet_id: json[:retweeted_status][:id] + keys = [:id, :user_id, :tweet_id] + objects = array.map {|json| + [json[:id], json[:user][:id], json[:retweeted_status][:id]] } - end - self.import(objects.first.keys, objects.map(&:values), ignore: true) - end + import(keys, objects, ignore: true) + end - # Unregisters retweet events in bulk from array of Streaming API's delete events. - # This doesn't update Tweet#reactions_count. - # - # @param [Array] array An array of Streaming API delete events. - def self.delete_bulk_from_json(array) - self.where(id: array.map {|json| json[:delete][:status][:id] }).delete_all + # Unregisters retweet events in bulk from array of Streaming API's delete events. + # This doesn't update Tweet#reactions_count. + # + # @param [Array] array An array of Streaming API delete events. + def delete_bulk_from_json(array) + ids = array.map {|json| json[:delete][:status][:id] } + where(id: ids).delete_all + end end end diff --git a/app/views/tweets/responses.json.jbuilder b/app/views/tweets/i_responses.json.jbuilder index 62de683..62de683 100644 --- a/app/views/tweets/responses.json.jbuilder +++ b/app/views/tweets/i_responses.json.jbuilder diff --git a/config/routes.rb b/config/routes.rb index 2acd4d3..d8279b0 100644 --- a/config/routes.rb +++ b/config/routes.rb @@ -10,7 +10,6 @@ Rails.application.routes.draw do get "/i/logout" => "sessions#destroy", as: "logout" get "/i/:id" => "tweets#show", as: "tweet", constraints: { id: /\d+/ } - get "/i/:id/responses" => "tweets#responses", as: "responses", constraints: { id: /\d+/ } post "/i/:id/import" => "tweets#import", as: "import", constraints: { id: /\d+/ } get "/i/settings" => "settings#index", as: "settings" @@ -22,6 +21,7 @@ Rails.application.routes.draw do get "/i/timeline" => "tweets#all_timeline", as: "timeline" get "/i/filter" => "tweets#filter", as: "filter" + get "/i/api/tweets/responses" => "tweets#i_responses", as: "responses" get "/i/api/users/suggest_screen_name" => "users#i_suggest_screen_name" get "/i/api/users/stats" => "users#i_stats" @@ -38,7 +38,6 @@ Rails.application.routes.draw do get "/discovered_by" => "users#discovered_by", as: "user_discovered_by" get "/discovered_users" => "users#discovered_users", as: "user_discovered_users" - get "/stats" => "users#stats", as: "user_stats" end get "*unmatched_route" => "application#routing_error" diff --git a/app/models/settings.rb b/lib/settings.rb index b3cf53b..b3cf53b 100644 --- a/app/models/settings.rb +++ b/lib/settings.rb diff --git a/worker_node/Gemfile.lock b/worker_node/Gemfile.lock index e45cbaf..cef11f3 100644 --- a/worker_node/Gemfile.lock +++ b/worker_node/Gemfile.lock @@ -14,7 +14,7 @@ GEM eventmachine (>= 1.0.0.beta.4) eventmachine (1.0.7) http_parser.rb (0.6.0) - msgpack (0.5.11) + msgpack (0.6.0) rake (10.4.2) simple_oauth (0.3.1) yajl-ruby (1.2.1) diff --git a/worker_node/lib/user_connection.rb b/worker_node/lib/user_connection.rb index 83c030a..2457d4f 100644 --- a/worker_node/lib/user_connection.rb +++ b/worker_node/lib/user_connection.rb @@ -12,11 +12,10 @@ class UserConnection end def update(hash) - if hash[:oauth_token] == @client.options[:oauth_token] - log(:debug, "Token is not changed") - else - @client.update(hash) + if @client.update_if_necessary(hash) log(:info, "Updated connection") + else + log(:debug, "Token is not changed") end end @@ -129,7 +128,6 @@ class UserConnection end def on_delete(json, timestamp = nil) - timestamp ||= json[:timestamp_ms] log(:debug, "Delete: #{json[:delete][:status]}") EventChannel << { event: :delete, identifier: "delete-#{json[:delete][:status][:id]}", diff --git a/worker_node/lib/user_stream/client.rb b/worker_node/lib/user_stream/client.rb index 4044292..e9e54f8 100644 --- a/worker_node/lib/user_stream/client.rb +++ b/worker_node/lib/user_stream/client.rb @@ -4,17 +4,26 @@ module UserStream class Client attr_reader :options - def initialize(options = {}) - @options = { compression: true }.merge(options).freeze + def initialize(options) + @options = options @callbacks = {} @exiting = false end - def update(options = {}) + def update(options) initialize(options) reconnect end + def update_if_necessary(options) + if options[:oauth_token] == @options[:oauth_token] + update(options) + true + else + false + end + end + def reconnect close connect @@ -31,47 +40,36 @@ module UserStream def connect @buftok = BufferedTokenizer.new("\r\n") + @http = setup_connection - opts = { query: (@options[:params] || {}), - head: { "accept-encoding": @options[:compression] ? "gzip" : "" } } - oauth = { consumer_key: @options[:consumer_key], - consumer_secret: @options[:consumer_secret], - access_token: @options[:oauth_token], - access_token_secret: @options[:oauth_token_secret] } - req = EM::HttpRequest.new("https://userstream.twitter.com/1.1/user.json", inactivity_timeout: 100) # at least one line per 90 seconds will come - req.use(EM::Middleware::OAuth, oauth) - http = req.get(opts) - - http.headers do |headers| + @http.headers do |headers| end - http.stream do |chunk| + @http.stream do |chunk| @buftok.extract(chunk).each do |line| next if line.empty? callback(:item, line) end end - http.callback do - case http.response_header.status + @http.callback do + case @http.response_header.status when 401 - callback(:unauthorized, http.response) + callback(:unauthorized, @http.response) when 420 - callback(:enhance_your_calm, http.response) + callback(:enhance_your_calm, @http.response) when 503 - callback(:service_unavailable, http.response) + callback(:service_unavailable, @http.response) when 200 callback(:disconnected) else - callback(:error, "#{http.response}: #{http.response}") + callback(:error, "#{@http.response}: #{@http.response}") end end - http.errback do - callback(:error, http.error) unless @exiting + @http.errback do + callback(:error, @http.error) unless @exiting end - - @http = http end def method_missing(name, &block) @@ -84,5 +82,21 @@ module UserStream def callback(name, *args) @callbacks.key?(name) && @callbacks[name].call(*args) end + + def setup_connection + opts = { query: {}, head: {} } + opts[:query].merge(@options[:params]) if @options[:params].is_a? Hash + opts[:head]["accept-encoding"] = "gzip" if @options[:compression] + + oauth = { consumer_key: @options[:consumer_key], + consumer_secret: @options[:consumer_secret], + access_token: @options[:oauth_token], + access_token_secret: @options[:oauth_token_secret] } + + req = EM::HttpRequest.new("https://userstream.twitter.com/1.1/user.json", inactivity_timeout: 100) # at least one line per 90 seconds will come + req.use(EM::Middleware::OAuth, oauth) + + req.get(opts) + end end end diff --git a/worker_node/lib/worker_node.rb b/worker_node/lib/worker_node.rb index 6ede74e..cc4e9b2 100644 --- a/worker_node/lib/worker_node.rb +++ b/worker_node/lib/worker_node.rb @@ -13,6 +13,8 @@ class WorkerNode def run EventChannel.setup + EM.epoll if Settings.epoll + EM.set_descriptor_table_size(Settings.descriptor_table_size || 1024) EM.run do connection = EM.connect(Settings.collector_host, Settings.collector_port, CollectorConnection) diff --git a/worker_node/settings.yml.example b/worker_node/settings.yml.example index b49d64d..56a9d29 100644 --- a/worker_node/settings.yml.example +++ b/worker_node/settings.yml.example @@ -3,6 +3,8 @@ collector_host: localhost collector_port: 42106 log_level: info memcached: "127.0.0.1:11211" +epoll: false +descriptor_table_size: 4096 user_stream_compression: true user_stream_params: replies: "all" |