aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--Gemfile1
-rw-r--r--Gemfile.lock6
-rw-r--r--app/jobs/tweet_response_notification_job.rb56
-rw-r--r--app/models/notification.rb71
-rwxr-xr-xbin/delayed_job5
-rw-r--r--config/application.rb4
-rw-r--r--config/initializers/delayed_jobs.rb4
-rw-r--r--db/migrate/20150618164547_create_delayed_jobs.rb22
-rw-r--r--db/schema.rb28
-rw-r--r--lib/collector/event_queue.rb10
-rw-r--r--spec/jobs/tweet_response_notification_job_spec.rb5
-rw-r--r--worker_node/lib/user_stream/client.rb2
12 files changed, 131 insertions, 83 deletions
diff --git a/Gemfile b/Gemfile
index 3807468..85fcc01 100644
--- a/Gemfile
+++ b/Gemfile
@@ -19,6 +19,7 @@ gem "bootstrap-sass"
gem "puma"
gem "dalli"
gem "connection_pool"
+gem "delayed_job_active_record"
gem "omniauth-twitter"
gem "twitter"
diff --git a/Gemfile.lock b/Gemfile.lock
index 756a0b4..4c3ac76 100644
--- a/Gemfile.lock
+++ b/Gemfile.lock
@@ -82,6 +82,11 @@ GEM
daemons (1.2.2)
dalli (2.7.4)
debug_inspector (0.0.2)
+ delayed_job (4.0.6)
+ activesupport (>= 3.0, < 5.0)
+ delayed_job_active_record (4.0.3)
+ activerecord (>= 3.0, < 5.0)
+ delayed_job (>= 3.0, < 4.1)
descendants_tracker (0.0.4)
thread_safe (~> 0.3, >= 0.3.1)
diff-lcs (1.2.5)
@@ -369,6 +374,7 @@ DEPENDENCIES
connection_pool
coveralls
dalli
+ delayed_job_active_record
eventmachine
factory_girl_rails
grape
diff --git a/app/jobs/tweet_response_notification_job.rb b/app/jobs/tweet_response_notification_job.rb
new file mode 100644
index 0000000..1fa9c06
--- /dev/null
+++ b/app/jobs/tweet_response_notification_job.rb
@@ -0,0 +1,56 @@
+class TweetResponseNotificationJob < ActiveJob::Base
+ queue_as :default
+
+ # Notifies the count of favovorites for the tweet with tweeting a reply from notification account.
+ # Notification will be send only when the count reached the specified number in settings.yml.
+ # THIS METHOD IS NOT THREAD SAFE
+ #
+ # @param [Hash, Tweet] hash_or_tweet the target tweet.
+ def perform(tweet)
+ return unless Settings.notification.enabled
+
+ last_count = Rails.cache.read("notification/tweets/#{ tweet.id }/favorites_count")
+ Rails.cache.write("notification/tweets/#{ tweet.id }/favorites_count", [last_count || 0, tweet.favorites_count].max)
+
+ if last_count
+ t_count = Settings.notification.favorites.select {|m| last_count < m && m <= tweet.favorites_count }.last
+ else
+ t_count = Settings.notification.favorites.include?(tweet.favorites_count) || tweet.favorites_count
+ end
+
+ if t_count
+ notify(tweet, "#{ t_count }favs!")
+ end
+ end
+
+ private
+ def notify(tweet, text)
+ user = tweet.user
+ account = user.account
+
+ if account && account.active? && account.notification_enabled?
+ url = Rails.application.routes.url_helpers.tweet_url(host: Settings.base_url, id: tweet.id)
+ post("@#{ user.screen_name } #{ text } #{ url }", tweet.id)
+ end
+ end
+
+ def post(text, reply_to = 0)
+ Settings.notification.accounts.each do |hash|
+ begin
+ client(hash).update(text, in_reply_to_status_id: reply_to)
+ break
+ rescue Twitter::Error::Forbidden => e
+ raise e unless e.message = "User is over daily status update limit."
+ end
+ end
+ end
+
+ def client(acc)
+ @_client ||= {}
+ @_client[acc] ||=
+ Twitter::REST::Client.new(consumer_key: Settings.notification.consumer.key,
+ consumer_secret: Settings.notification.consumer.secret,
+ access_token: acc.token,
+ access_token_secret: acc.secret)
+ end
+end
diff --git a/app/models/notification.rb b/app/models/notification.rb
deleted file mode 100644
index b99610e..0000000
--- a/app/models/notification.rb
+++ /dev/null
@@ -1,71 +0,0 @@
-class Notification
- # Notifies the count of favovorites for the tweet with tweeting a reply from notification account.
- # Notification will be send only when the count reached the specified number in settings.yml.
- # THIS METHOD IS NOT THREAD SAFE
- #
- # @param [Hash, Tweet] hash_or_tweet the target tweet.
- def self.try_notify_favorites(tweet)
- return unless Settings.notification.enabled
-
- notify_favs = -> c do
- account = Account.includes(:user).where(users: { id: tweet.user_id }).first
- if account && account.active? && account.notification_enabled?
- notify(account.user, "#{ c }favs!", tweet.id)
- end
- end
-
- last_count = Rails.cache.read("notification/tweets/#{ tweet.id }/favorites_count")
- if last_count
- t_count = Settings.notification.favorites.select {|m| last_count < m && m <= tweet.favorites_count }.last
- if t_count
- notify_favs.(t_count)
- end
- else
- if Settings.notification.favorites.include?(tweet.favorites_count)
- notify_favs.(tweet.favorites_count)
- end
- end
-
- Rails.cache.write("notification/tweets/#{ tweet.id }/favorites_count", [last_count || 0, tweet.favorites_count].max)
- end
-
- private
- def self.notify(user, text, id)
- url = Rails.application.routes.url_helpers.tweet_url(host: Settings.base_url, id: id)
- tweet("@#{ user.screen_name } #{ text } #{ url }", id)
- end
-
- def self.tweet(text, reply_to = 0)
- defer do
- begin
- Settings.notification.accounts.each do |hash|
- begin
- client(hash).update(text, in_reply_to_status_id: reply_to)
- break
- rescue Twitter::Error::Forbidden => e
- raise e unless e.message = "User is over daily status update limit."
- end
- end
- rescue => e
- Rails.logger.error("NOTIFICATION: #{ e.class }: #{ e.message }")
- end
- end
- end
-
- def self.client(acc)
- @_client ||= {}
- @_client[acc] ||=
- Twitter::REST::Client.new(consumer_key: Settings.notification.consumer.key,
- consumer_secret: Settings.notification.consumer.secret,
- access_token: acc.token,
- access_token_secret: acc.secret)
- end
-
- def self.defer(&blk)
- if EM.reactor_running?
- EM.defer &blk
- else
- Thread.new &blk
- end
- end
-end
diff --git a/bin/delayed_job b/bin/delayed_job
new file mode 100755
index 0000000..edf1959
--- /dev/null
+++ b/bin/delayed_job
@@ -0,0 +1,5 @@
+#!/usr/bin/env ruby
+
+require File.expand_path(File.join(File.dirname(__FILE__), '..', 'config', 'environment'))
+require 'delayed/command'
+Delayed::Command.new(ARGV).daemonize
diff --git a/config/application.rb b/config/application.rb
index 388e12d..4b39b7c 100644
--- a/config/application.rb
+++ b/config/application.rb
@@ -2,7 +2,7 @@ require File.expand_path('../boot', __FILE__)
# Pick the frameworks you want:
require "active_model/railtie"
-# require "active_job/railtie"
+require "active_job/railtie"
require "active_record/railtie"
require "action_controller/railtie"
# require "action_mailer/railtie"
@@ -39,6 +39,8 @@ module Aclog
# Do not swallow errors in after_commit/after_rollback callbacks.
config.active_record.raise_in_transactional_callbacks = true
+ config.active_job.queue_adapter = :delayed_job
+
config.generators do |g|
g.test_framework :rspec
g.fixture_replacement :factory_girl
diff --git a/config/initializers/delayed_jobs.rb b/config/initializers/delayed_jobs.rb
new file mode 100644
index 0000000..935b971
--- /dev/null
+++ b/config/initializers/delayed_jobs.rb
@@ -0,0 +1,4 @@
+Delayed::Worker.logger = Logger.new(STDOUT)
+Delayed::Worker.logger.level =
+ Rails.env.production? ? Logger::INFO : Logger::DEBUG
+ActiveRecord::Base.logger = Delayed::Worker.logger
diff --git a/db/migrate/20150618164547_create_delayed_jobs.rb b/db/migrate/20150618164547_create_delayed_jobs.rb
new file mode 100644
index 0000000..27fdcf6
--- /dev/null
+++ b/db/migrate/20150618164547_create_delayed_jobs.rb
@@ -0,0 +1,22 @@
+class CreateDelayedJobs < ActiveRecord::Migration
+ def self.up
+ create_table :delayed_jobs, force: true do |table|
+ table.integer :priority, default: 0, null: false # Allows some jobs to jump to the front of the queue
+ table.integer :attempts, default: 0, null: false # Provides for retries, but still fail eventually.
+ table.text :handler, null: false # YAML-encoded string of the object that will do work
+ table.text :last_error # reason for last failure (See Note below)
+ table.datetime :run_at # When to run. Could be Time.zone.now for immediately, or sometime in the future.
+ table.datetime :locked_at # Set when a client is working on this object
+ table.datetime :failed_at # Set when all retries have failed (actually, by default, the record is deleted instead)
+ table.string :locked_by # Who is working on this object (if locked)
+ table.string :queue # The name of the queue this job is in
+ table.timestamps null: true
+ end
+
+ add_index :delayed_jobs, [:priority, :run_at], name: "delayed_jobs_priority"
+ end
+
+ def self.down
+ drop_table :delayed_jobs
+ end
+end
diff --git a/db/schema.rb b/db/schema.rb
index 3756e17..e23f0bc 100644
--- a/db/schema.rb
+++ b/db/schema.rb
@@ -11,9 +11,9 @@
#
# It's strongly recommended that you check this file into your version control system.
-ActiveRecord::Schema.define(version: 20141220025331) do
+ActiveRecord::Schema.define(version: 20150618164547) do
- create_table "accounts", force: true do |t|
+ create_table "accounts", force: :cascade do |t|
t.integer "user_id", limit: 8, null: false
t.string "oauth_token", limit: 255, null: false
t.string "oauth_token_secret", limit: 255, null: false
@@ -26,7 +26,23 @@ ActiveRecord::Schema.define(version: 20141220025331) do
add_index "accounts", ["status"], name: "index_accounts_on_status", using: :btree
add_index "accounts", ["user_id"], name: "index_accounts_on_user_id", unique: true, using: :btree
- create_table "favorites", force: true do |t|
+ create_table "delayed_jobs", force: :cascade do |t|
+ t.integer "priority", limit: 4, default: 0, null: false
+ t.integer "attempts", limit: 4, default: 0, null: false
+ t.text "handler", limit: 65535, null: false
+ t.text "last_error", limit: 65535
+ t.datetime "run_at"
+ t.datetime "locked_at"
+ t.datetime "failed_at"
+ t.string "locked_by", limit: 255
+ t.string "queue", limit: 255
+ t.datetime "created_at"
+ t.datetime "updated_at"
+ end
+
+ add_index "delayed_jobs", ["priority", "run_at"], name: "delayed_jobs_priority", using: :btree
+
+ create_table "favorites", force: :cascade do |t|
t.integer "tweet_id", limit: 8, null: false
t.integer "user_id", limit: 8, null: false
end
@@ -34,7 +50,7 @@ ActiveRecord::Schema.define(version: 20141220025331) do
add_index "favorites", ["tweet_id"], name: "index_favorites_on_tweet_id", using: :btree
add_index "favorites", ["user_id", "tweet_id"], name: "index_favorites_on_user_id_and_tweet_id", unique: true, using: :btree
- create_table "retweets", force: true do |t|
+ create_table "retweets", force: :cascade do |t|
t.integer "tweet_id", limit: 8, null: false
t.integer "user_id", limit: 8, null: false
end
@@ -42,7 +58,7 @@ ActiveRecord::Schema.define(version: 20141220025331) do
add_index "retweets", ["tweet_id"], name: "index_retweets_on_tweet_id", using: :btree
add_index "retweets", ["user_id"], name: "index_retweets_on_user_id", using: :btree
- create_table "tweets", force: true do |t|
+ create_table "tweets", force: :cascade do |t|
t.text "text", limit: 65535, null: false
t.text "source", limit: 65535, null: false
t.integer "user_id", limit: 8, null: false
@@ -57,7 +73,7 @@ ActiveRecord::Schema.define(version: 20141220025331) do
add_index "tweets", ["reactions_count"], name: "index_tweets_on_reactions_count", using: :btree
add_index "tweets", ["user_id", "reactions_count"], name: "index_tweets_on_user_id_and_reactions_count", using: :btree
- create_table "users", force: true do |t|
+ create_table "users", force: :cascade do |t|
t.string "screen_name", limit: 20, null: false
t.string "name", limit: 64, null: false
t.string "profile_image_url", limit: 255, null: false
diff --git a/lib/collector/event_queue.rb b/lib/collector/event_queue.rb
index 936263b..2686cdd 100644
--- a/lib/collector/event_queue.rb
+++ b/lib/collector/event_queue.rb
@@ -37,10 +37,12 @@ module Collector
Retweet.delete_bulk_from_json(deletes)
end
- tweet_ids = favorites.map {|f| f[:target_object][:id] }
- if tweet_ids.size > 0
- Tweet.where(id: tweet_ids).each do |tweet|
- Notification.try_notify_favorites(tweet)
+ if Settings.notification.enabled?
+ tweet_ids = favorites.map {|f| f[:target_object][:id] }
+ if tweet_ids.size > 0
+ Tweet.where(id: tweet_ids).each do |tweet|
+ TweetResponseNotificationJob.perform_later(tweet)
+ end
end
end
diff --git a/spec/jobs/tweet_response_notification_job_spec.rb b/spec/jobs/tweet_response_notification_job_spec.rb
new file mode 100644
index 0000000..2f848d2
--- /dev/null
+++ b/spec/jobs/tweet_response_notification_job_spec.rb
@@ -0,0 +1,5 @@
+require 'rails_helper'
+
+RSpec.describe TweetResponseNotificationJob, type: :job do
+ pending "add some examples to (or delete) #{__FILE__}"
+end
diff --git a/worker_node/lib/user_stream/client.rb b/worker_node/lib/user_stream/client.rb
index e9e54f8..66c5261 100644
--- a/worker_node/lib/user_stream/client.rb
+++ b/worker_node/lib/user_stream/client.rb
@@ -85,7 +85,7 @@ module UserStream
def setup_connection
opts = { query: {}, head: {} }
- opts[:query].merge(@options[:params]) if @options[:params].is_a? Hash
+ opts[:query].merge!(@options[:params]) if @options[:params].is_a? Hash
opts[:head]["accept-encoding"] = "gzip" if @options[:compression]
oauth = { consumer_key: @options[:consumer_key],