diff options
-rw-r--r-- | Gemfile | 1 | ||||
-rw-r--r-- | Gemfile.lock | 6 | ||||
-rw-r--r-- | app/jobs/tweet_response_notification_job.rb | 56 | ||||
-rw-r--r-- | app/models/notification.rb | 71 | ||||
-rwxr-xr-x | bin/delayed_job | 5 | ||||
-rw-r--r-- | config/application.rb | 4 | ||||
-rw-r--r-- | config/initializers/delayed_jobs.rb | 4 | ||||
-rw-r--r-- | db/migrate/20150618164547_create_delayed_jobs.rb | 22 | ||||
-rw-r--r-- | db/schema.rb | 28 | ||||
-rw-r--r-- | lib/collector/event_queue.rb | 10 | ||||
-rw-r--r-- | spec/jobs/tweet_response_notification_job_spec.rb | 5 | ||||
-rw-r--r-- | worker_node/lib/user_stream/client.rb | 2 |
12 files changed, 131 insertions, 83 deletions
@@ -19,6 +19,7 @@ gem "bootstrap-sass" gem "puma" gem "dalli" gem "connection_pool" +gem "delayed_job_active_record" gem "omniauth-twitter" gem "twitter" diff --git a/Gemfile.lock b/Gemfile.lock index 756a0b4..4c3ac76 100644 --- a/Gemfile.lock +++ b/Gemfile.lock @@ -82,6 +82,11 @@ GEM daemons (1.2.2) dalli (2.7.4) debug_inspector (0.0.2) + delayed_job (4.0.6) + activesupport (>= 3.0, < 5.0) + delayed_job_active_record (4.0.3) + activerecord (>= 3.0, < 5.0) + delayed_job (>= 3.0, < 4.1) descendants_tracker (0.0.4) thread_safe (~> 0.3, >= 0.3.1) diff-lcs (1.2.5) @@ -369,6 +374,7 @@ DEPENDENCIES connection_pool coveralls dalli + delayed_job_active_record eventmachine factory_girl_rails grape diff --git a/app/jobs/tweet_response_notification_job.rb b/app/jobs/tweet_response_notification_job.rb new file mode 100644 index 0000000..1fa9c06 --- /dev/null +++ b/app/jobs/tweet_response_notification_job.rb @@ -0,0 +1,56 @@ +class TweetResponseNotificationJob < ActiveJob::Base + queue_as :default + + # Notifies the count of favovorites for the tweet with tweeting a reply from notification account. + # Notification will be send only when the count reached the specified number in settings.yml. + # THIS METHOD IS NOT THREAD SAFE + # + # @param [Hash, Tweet] hash_or_tweet the target tweet. + def perform(tweet) + return unless Settings.notification.enabled + + last_count = Rails.cache.read("notification/tweets/#{ tweet.id }/favorites_count") + Rails.cache.write("notification/tweets/#{ tweet.id }/favorites_count", [last_count || 0, tweet.favorites_count].max) + + if last_count + t_count = Settings.notification.favorites.select {|m| last_count < m && m <= tweet.favorites_count }.last + else + t_count = Settings.notification.favorites.include?(tweet.favorites_count) || tweet.favorites_count + end + + if t_count + notify(tweet, "#{ t_count }favs!") + end + end + + private + def notify(tweet, text) + user = tweet.user + account = user.account + + if account && account.active? && account.notification_enabled? + url = Rails.application.routes.url_helpers.tweet_url(host: Settings.base_url, id: tweet.id) + post("@#{ user.screen_name } #{ text } #{ url }", tweet.id) + end + end + + def post(text, reply_to = 0) + Settings.notification.accounts.each do |hash| + begin + client(hash).update(text, in_reply_to_status_id: reply_to) + break + rescue Twitter::Error::Forbidden => e + raise e unless e.message = "User is over daily status update limit." + end + end + end + + def client(acc) + @_client ||= {} + @_client[acc] ||= + Twitter::REST::Client.new(consumer_key: Settings.notification.consumer.key, + consumer_secret: Settings.notification.consumer.secret, + access_token: acc.token, + access_token_secret: acc.secret) + end +end diff --git a/app/models/notification.rb b/app/models/notification.rb deleted file mode 100644 index b99610e..0000000 --- a/app/models/notification.rb +++ /dev/null @@ -1,71 +0,0 @@ -class Notification - # Notifies the count of favovorites for the tweet with tweeting a reply from notification account. - # Notification will be send only when the count reached the specified number in settings.yml. - # THIS METHOD IS NOT THREAD SAFE - # - # @param [Hash, Tweet] hash_or_tweet the target tweet. - def self.try_notify_favorites(tweet) - return unless Settings.notification.enabled - - notify_favs = -> c do - account = Account.includes(:user).where(users: { id: tweet.user_id }).first - if account && account.active? && account.notification_enabled? - notify(account.user, "#{ c }favs!", tweet.id) - end - end - - last_count = Rails.cache.read("notification/tweets/#{ tweet.id }/favorites_count") - if last_count - t_count = Settings.notification.favorites.select {|m| last_count < m && m <= tweet.favorites_count }.last - if t_count - notify_favs.(t_count) - end - else - if Settings.notification.favorites.include?(tweet.favorites_count) - notify_favs.(tweet.favorites_count) - end - end - - Rails.cache.write("notification/tweets/#{ tweet.id }/favorites_count", [last_count || 0, tweet.favorites_count].max) - end - - private - def self.notify(user, text, id) - url = Rails.application.routes.url_helpers.tweet_url(host: Settings.base_url, id: id) - tweet("@#{ user.screen_name } #{ text } #{ url }", id) - end - - def self.tweet(text, reply_to = 0) - defer do - begin - Settings.notification.accounts.each do |hash| - begin - client(hash).update(text, in_reply_to_status_id: reply_to) - break - rescue Twitter::Error::Forbidden => e - raise e unless e.message = "User is over daily status update limit." - end - end - rescue => e - Rails.logger.error("NOTIFICATION: #{ e.class }: #{ e.message }") - end - end - end - - def self.client(acc) - @_client ||= {} - @_client[acc] ||= - Twitter::REST::Client.new(consumer_key: Settings.notification.consumer.key, - consumer_secret: Settings.notification.consumer.secret, - access_token: acc.token, - access_token_secret: acc.secret) - end - - def self.defer(&blk) - if EM.reactor_running? - EM.defer &blk - else - Thread.new &blk - end - end -end diff --git a/bin/delayed_job b/bin/delayed_job new file mode 100755 index 0000000..edf1959 --- /dev/null +++ b/bin/delayed_job @@ -0,0 +1,5 @@ +#!/usr/bin/env ruby + +require File.expand_path(File.join(File.dirname(__FILE__), '..', 'config', 'environment')) +require 'delayed/command' +Delayed::Command.new(ARGV).daemonize diff --git a/config/application.rb b/config/application.rb index 388e12d..4b39b7c 100644 --- a/config/application.rb +++ b/config/application.rb @@ -2,7 +2,7 @@ require File.expand_path('../boot', __FILE__) # Pick the frameworks you want: require "active_model/railtie" -# require "active_job/railtie" +require "active_job/railtie" require "active_record/railtie" require "action_controller/railtie" # require "action_mailer/railtie" @@ -39,6 +39,8 @@ module Aclog # Do not swallow errors in after_commit/after_rollback callbacks. config.active_record.raise_in_transactional_callbacks = true + config.active_job.queue_adapter = :delayed_job + config.generators do |g| g.test_framework :rspec g.fixture_replacement :factory_girl diff --git a/config/initializers/delayed_jobs.rb b/config/initializers/delayed_jobs.rb new file mode 100644 index 0000000..935b971 --- /dev/null +++ b/config/initializers/delayed_jobs.rb @@ -0,0 +1,4 @@ +Delayed::Worker.logger = Logger.new(STDOUT) +Delayed::Worker.logger.level = + Rails.env.production? ? Logger::INFO : Logger::DEBUG +ActiveRecord::Base.logger = Delayed::Worker.logger diff --git a/db/migrate/20150618164547_create_delayed_jobs.rb b/db/migrate/20150618164547_create_delayed_jobs.rb new file mode 100644 index 0000000..27fdcf6 --- /dev/null +++ b/db/migrate/20150618164547_create_delayed_jobs.rb @@ -0,0 +1,22 @@ +class CreateDelayedJobs < ActiveRecord::Migration + def self.up + create_table :delayed_jobs, force: true do |table| + table.integer :priority, default: 0, null: false # Allows some jobs to jump to the front of the queue + table.integer :attempts, default: 0, null: false # Provides for retries, but still fail eventually. + table.text :handler, null: false # YAML-encoded string of the object that will do work + table.text :last_error # reason for last failure (See Note below) + table.datetime :run_at # When to run. Could be Time.zone.now for immediately, or sometime in the future. + table.datetime :locked_at # Set when a client is working on this object + table.datetime :failed_at # Set when all retries have failed (actually, by default, the record is deleted instead) + table.string :locked_by # Who is working on this object (if locked) + table.string :queue # The name of the queue this job is in + table.timestamps null: true + end + + add_index :delayed_jobs, [:priority, :run_at], name: "delayed_jobs_priority" + end + + def self.down + drop_table :delayed_jobs + end +end diff --git a/db/schema.rb b/db/schema.rb index 3756e17..e23f0bc 100644 --- a/db/schema.rb +++ b/db/schema.rb @@ -11,9 +11,9 @@ # # It's strongly recommended that you check this file into your version control system. -ActiveRecord::Schema.define(version: 20141220025331) do +ActiveRecord::Schema.define(version: 20150618164547) do - create_table "accounts", force: true do |t| + create_table "accounts", force: :cascade do |t| t.integer "user_id", limit: 8, null: false t.string "oauth_token", limit: 255, null: false t.string "oauth_token_secret", limit: 255, null: false @@ -26,7 +26,23 @@ ActiveRecord::Schema.define(version: 20141220025331) do add_index "accounts", ["status"], name: "index_accounts_on_status", using: :btree add_index "accounts", ["user_id"], name: "index_accounts_on_user_id", unique: true, using: :btree - create_table "favorites", force: true do |t| + create_table "delayed_jobs", force: :cascade do |t| + t.integer "priority", limit: 4, default: 0, null: false + t.integer "attempts", limit: 4, default: 0, null: false + t.text "handler", limit: 65535, null: false + t.text "last_error", limit: 65535 + t.datetime "run_at" + t.datetime "locked_at" + t.datetime "failed_at" + t.string "locked_by", limit: 255 + t.string "queue", limit: 255 + t.datetime "created_at" + t.datetime "updated_at" + end + + add_index "delayed_jobs", ["priority", "run_at"], name: "delayed_jobs_priority", using: :btree + + create_table "favorites", force: :cascade do |t| t.integer "tweet_id", limit: 8, null: false t.integer "user_id", limit: 8, null: false end @@ -34,7 +50,7 @@ ActiveRecord::Schema.define(version: 20141220025331) do add_index "favorites", ["tweet_id"], name: "index_favorites_on_tweet_id", using: :btree add_index "favorites", ["user_id", "tweet_id"], name: "index_favorites_on_user_id_and_tweet_id", unique: true, using: :btree - create_table "retweets", force: true do |t| + create_table "retweets", force: :cascade do |t| t.integer "tweet_id", limit: 8, null: false t.integer "user_id", limit: 8, null: false end @@ -42,7 +58,7 @@ ActiveRecord::Schema.define(version: 20141220025331) do add_index "retweets", ["tweet_id"], name: "index_retweets_on_tweet_id", using: :btree add_index "retweets", ["user_id"], name: "index_retweets_on_user_id", using: :btree - create_table "tweets", force: true do |t| + create_table "tweets", force: :cascade do |t| t.text "text", limit: 65535, null: false t.text "source", limit: 65535, null: false t.integer "user_id", limit: 8, null: false @@ -57,7 +73,7 @@ ActiveRecord::Schema.define(version: 20141220025331) do add_index "tweets", ["reactions_count"], name: "index_tweets_on_reactions_count", using: :btree add_index "tweets", ["user_id", "reactions_count"], name: "index_tweets_on_user_id_and_reactions_count", using: :btree - create_table "users", force: true do |t| + create_table "users", force: :cascade do |t| t.string "screen_name", limit: 20, null: false t.string "name", limit: 64, null: false t.string "profile_image_url", limit: 255, null: false diff --git a/lib/collector/event_queue.rb b/lib/collector/event_queue.rb index 936263b..2686cdd 100644 --- a/lib/collector/event_queue.rb +++ b/lib/collector/event_queue.rb @@ -37,10 +37,12 @@ module Collector Retweet.delete_bulk_from_json(deletes) end - tweet_ids = favorites.map {|f| f[:target_object][:id] } - if tweet_ids.size > 0 - Tweet.where(id: tweet_ids).each do |tweet| - Notification.try_notify_favorites(tweet) + if Settings.notification.enabled? + tweet_ids = favorites.map {|f| f[:target_object][:id] } + if tweet_ids.size > 0 + Tweet.where(id: tweet_ids).each do |tweet| + TweetResponseNotificationJob.perform_later(tweet) + end end end diff --git a/spec/jobs/tweet_response_notification_job_spec.rb b/spec/jobs/tweet_response_notification_job_spec.rb new file mode 100644 index 0000000..2f848d2 --- /dev/null +++ b/spec/jobs/tweet_response_notification_job_spec.rb @@ -0,0 +1,5 @@ +require 'rails_helper' + +RSpec.describe TweetResponseNotificationJob, type: :job do + pending "add some examples to (or delete) #{__FILE__}" +end diff --git a/worker_node/lib/user_stream/client.rb b/worker_node/lib/user_stream/client.rb index e9e54f8..66c5261 100644 --- a/worker_node/lib/user_stream/client.rb +++ b/worker_node/lib/user_stream/client.rb @@ -85,7 +85,7 @@ module UserStream def setup_connection opts = { query: {}, head: {} } - opts[:query].merge(@options[:params]) if @options[:params].is_a? Hash + opts[:query].merge!(@options[:params]) if @options[:params].is_a? Hash opts[:head]["accept-encoding"] = "gzip" if @options[:compression] oauth = { consumer_key: @options[:consumer_key], |