diff options
-rw-r--r-- | app/assets/stylesheets/_tweets.css.sass | 3 | ||||
-rw-r--r-- | app/models/account.rb | 1 | ||||
-rw-r--r-- | app/models/favorite.rb | 1 | ||||
-rw-r--r-- | app/models/retweet.rb | 1 | ||||
-rw-r--r-- | app/models/tweet.rb | 1 | ||||
-rw-r--r-- | app/models/user.rb | 1 | ||||
-rw-r--r-- | app/views/shared/_tweet.haml | 4 | ||||
-rw-r--r-- | client/Gemfile | 3 | ||||
-rw-r--r-- | client/Gemfile.lock | 20 | ||||
-rw-r--r-- | client/worker.rb | 160 | ||||
-rw-r--r-- | db/migrate/20130226151042_create_retweets.rb | 1 | ||||
-rw-r--r-- | lib/receiver/worker.rb | 22 | ||||
-rw-r--r-- | public/assets/application-7be762e280e0e5656d2ea3a02cbf372c.css | 116 | ||||
-rw-r--r-- | public/assets/application-7be762e280e0e5656d2ea3a02cbf372c.css.gz | bin | 0 -> 793 bytes | |||
-rw-r--r-- | public/assets/manifest-0b26a376c9cfe81af432b634628ee75d.json | 2 | ||||
-rw-r--r-- | script/start.rb (renamed from lib/receiver/start.rb) | 0 |
16 files changed, 227 insertions, 109 deletions
diff --git a/app/assets/stylesheets/_tweets.css.sass b/app/assets/stylesheets/_tweets.css.sass index a1f0cb3..818d099 100644 --- a/app/assets/stylesheets/_tweets.css.sass +++ b/app/assets/stylesheets/_tweets.css.sass @@ -61,7 +61,8 @@ //:border-radius 0 0 4px 4px :padding 15px //:background #edf0f4 - .type_row + .favs_row, + .retweets_row :margin-bottom 3px .info :width 60px diff --git a/app/models/account.rb b/app/models/account.rb index 1220cee..50229f9 100644 --- a/app/models/account.rb +++ b/app/models/account.rb @@ -1,3 +1,2 @@ class Account < ActiveRecord::Base -# attr_accessible :id, :oauth_token, :oauth_token_secret end diff --git a/app/models/favorite.rb b/app/models/favorite.rb index 56dbf76..c764d08 100644 --- a/app/models/favorite.rb +++ b/app/models/favorite.rb @@ -1,5 +1,4 @@ class Favorite < ActiveRecord::Base -# attr_accessible :tweet, :user belongs_to :tweet, :counter_cache => true belongs_to :user end diff --git a/app/models/retweet.rb b/app/models/retweet.rb index 2f23ae0..34da286 100644 --- a/app/models/retweet.rb +++ b/app/models/retweet.rb @@ -1,5 +1,4 @@ class Retweet < ActiveRecord::Base -# attr_accessible :id, :tweet, :user belongs_to :tweet, :counter_cache => true belongs_to :user end diff --git a/app/models/tweet.rb b/app/models/tweet.rb index a6862eb..a1124f9 100644 --- a/app/models/tweet.rb +++ b/app/models/tweet.rb @@ -1,5 +1,4 @@ class Tweet < ActiveRecord::Base -# attr_accessible :id, :text, :source, :tweeted_at, :user belongs_to :user has_many :favorites, :dependent => :delete_all has_many :retweets, :dependent => :delete_all diff --git a/app/models/user.rb b/app/models/user.rb index ea1bb26..9a090c8 100644 --- a/app/models/user.rb +++ b/app/models/user.rb @@ -1,5 +1,4 @@ class User < ActiveRecord::Base -# attr_accessible :id, :screen_name, :name, :profile_image_url has_many :tweets, :dependent => :delete_all has_many :favorites, :dependent => :delete_all has_many :retweets, :dependent => :delete_all diff --git a/app/views/shared/_tweet.haml b/app/views/shared/_tweet.haml index 9b9bd2f..d46e69f 100644 --- a/app/views/shared/_tweet.haml +++ b/app/views/shared/_tweet.haml @@ -18,8 +18,8 @@ %span.source = raw format_source_text(item.source) .stats - - [["FAVS", item.favorites], ["RETWEETS", item.retweets]].each do |type, data| - .type_row.clearfix + - [["FAVS", item.favorites, "favs_row"], ["RETWEETS", item.retweets, "retweets_row"]].each do |type, data, cssclass| + %div{:class => "#{cssclass} clearfix"} .info %span.count= data.length %span.type= type diff --git a/client/Gemfile b/client/Gemfile index 5503903..58a9039 100644 --- a/client/Gemfile +++ b/client/Gemfile @@ -4,5 +4,6 @@ source 'https://rubygems.org' gem 'foreman' gem 'settingslogic' -gem 'tweetstream' +gem 'em-twitter' +gem 'twitter' gem 'yajl-ruby', :require => "yajl" diff --git a/client/Gemfile.lock b/client/Gemfile.lock index e747bfa..c52698d 100644 --- a/client/Gemfile.lock +++ b/client/Gemfile.lock @@ -1,17 +1,6 @@ GEM remote: https://rubygems.org/ specs: - addressable (2.3.3) - cookiejar (0.3.0) - daemons (1.1.9) - em-http-request (1.0.3) - addressable (>= 2.2.3) - cookiejar - em-socksify - eventmachine (>= 1.0.0.beta.4) - http_parser.rb (>= 0.5.3) - em-socksify (0.2.1) - eventmachine (>= 1.0.0.beta.4) em-twitter (0.2.1) eventmachine (~> 1.0) http_parser.rb (~> 0.5) @@ -27,12 +16,6 @@ GEM settingslogic (2.0.9) simple_oauth (0.2.0) thor (0.17.0) - tweetstream (2.4.0) - daemons (~> 1.1) - em-http-request (~> 1.0.2) - em-twitter (~> 0.2) - twitter (~> 4.0) - yajl-ruby (~> 1.1) twitter (4.6.0) faraday (~> 0.8, < 0.10) multi_json (~> 1.0) @@ -43,7 +26,8 @@ PLATFORMS ruby DEPENDENCIES + em-twitter foreman settingslogic - tweetstream + twitter yajl-ruby diff --git a/client/worker.rb b/client/worker.rb index d9230ce..7e9c518 100644 --- a/client/worker.rb +++ b/client/worker.rb @@ -7,18 +7,21 @@ require "./logger" module EM class Connection def send_chunk(data) + puts data send_data(data + "\r\n") end end end -module TweetStream - class Client - attr_reader :user_id, :row_id +module EM + module Twitter + class Client + attr_reader :user_id, :row_id - def _set_aclog(user_id, row_id) - @user_id = user_id - @row_id = row_id + def _set_aclog(user_id, row_id) + @user_id = user_id + @row_id = row_id + end end end end @@ -33,18 +36,15 @@ class Worker def format_text(status) chars = status.text.to_s.split(//) - entities = status.attrs[:entities].values.flatten.map do |entity| - entity[:hashtag] = entity[:text] if entity[:text] - entity - end.sort_by{|entity| entity[:indices].first} + entities = status.attrs[:entities].values.flatten.sort_by{|entity| entity[:indices].first} result = [] last_index = entities.inject(0) do |last_index, entity| result << chars[last_index...entity[:indices].first] result << if entity[:url] "<url:#{CGI.escape(entity[:expanded_url])}:#{CGI.escape(entity[:display_url])}>" - elsif entity[:hashtag] - "<hashtag:#{CGI.escape(entity[:hashtag])}>" + elsif entity[:text] + "<hashtag:#{CGI.escape(entity[:text])}>" elsif entity[:screen_name] "<mention:#{CGI.escape(entity[:screen_name])}>" elsif entity[:cashtag] @@ -120,12 +120,15 @@ class Worker end def post_init - send_chunk("CONNECT #{Settings.secret_key}&#{Settings.worker_number}&#{Settings.worker_count}") + out = {:secret_key => Settings.secret_key, + :worker_number => Settings.worker_number, + :worker_count => Settings.worker_count} + send_chunk("CONNECT #{Yajl::Encoder.encode(out)}") end def unbind $logger.info("Connection closed") - EM.add_timer(5) do + EM.add_timer(10) do reconnect(Settings.db_proxy_host, Settings.db_proxy_port) post_init end @@ -145,97 +148,106 @@ class Worker when "ACCOUNT" begin hash = ::Yajl::Parser.parse(arg.last, :symbolize_keys => true) + rescue Yajl::ParseError + $logger.error("JSON Parse Error: #{json}") + next + end - @clients << client = TweetStream::Client.new( + @clients << client = EM::Twitter::Client.connect({ + :host => "userstream.twitter.com", + :path => "/1.1/user.json", + :oauth => { :consumer_key => Settings.consumer_key, :consumer_secret => Settings.consumer_secret, - :oauth_token => hash[:oauth_token], - :oauth_token_secret => hash[:oauth_token_secret], - :auth_method => :oauth) - client._set_aclog(hash[:user_id], hash[:id]) - - client.on_error do |message| - $logger.warn("UserStreams Error(##{client.user_id}): #{message}") - end - - client.on_limit do |discarded_count| - $logger.warn("UserStreams Limit Event(##{client.user_id}): #{discarded_count}") - end + :token => hash[:oauth_token], + :token_secret => hash[:oauth_token_secret]}, + :method => "GET"}) + client._set_aclog(hash[:user_id], hash[:id]) - client.on_unauthorized do - # revoked? - $logger.warn("Unauthorized(##{client.user_id})") - send_chunk("UNAUTHORIZED #{client.row_id}&#{client.user_id}") - client.stop_stream - @clients.delete(client) - end + client.on_error do |message| + $logger.warn("Unknown Error(##{client.user_id}): #{message}") + end - client.on_enhance_your_calm do - # limit? - $logger.warn("Enhance your calm(##{client.user_id})") - end + client.on_unauthorized do + # revoked? + $logger.warn("Unauthorized(##{client.user_id})") + send_chunk("UNAUTHORIZED #{client.row_id}&#{client.user_id}") + client.connection.stop + @clients.delete(client) + end - client.on_no_data_received do - # (?) - $logger.warn("No data received(##{client.user_id})") - client.close_connection - end + client.on_enhance_your_calm do + # limit? + $logger.warn("Enhance your calm(##{client.user_id})") + end - client.on_reconnect do |timeout, retries| - $logger.warn("Reconnected(##{client.user_id}): #{retries}") - end + client.on_no_data_received do + # (?) + $logger.warn("No data received(##{client.user_id})") + client.close_connection + end - client.on_stall_warning do |warning| - $logger.info("Stall warning(##{client.user_id}): #{warning}") + client.each do |chunk| + begin + hash = Yajl::Parser.parse(chunk, :symbolize_keys => true) + rescue Yajl::ParseError + $logger.warn("Unexpected chunk(##{client.user_id}): #{chunk}") + next end - client.on_timeline_status do |status| - # tweets. includes retweets + if hash[:warning] + $logger.info("Stall warning(##{client.user_id}): #{hash[:warning]}") + elsif hash[:delete] && hash[:delete][:status] + send_delete(hash[:delete][:status][:id], hash[:delete][:status][:user_id]) + elsif hash[:limit] + $logger.warn("UserStreams Limit Event(##{client.user_id}): #{hash[:limit][:track]}") + elsif hash[:event] + case hash[:event] + when "favorite" + source = Twitter::User.new(hash[:source]) + target_object = Twitter::Tweet.new(hash[:target_object]) + unless target_object.user.protected && target_object.user.id != client.user_id + send_favorite(source, target_object) + end + when "unfavorite" + send_unfavorite(Twitter::User.new(hash[:source]), Twitter::Tweet.new(hash[:target_object])) + end + elsif hash[:text] && hash[:user] + # tweet + status = Twitter::Tweet.new(hash) if status.retweeted_status && (status.retweeted_status.user.id == client.user_id || status.user.id == client.user_id) + $logger.debug("Retweet(##{client.user_id})") send_retweet(status) elsif status.user.id == client.user_id send_tweet(status) end end + end - client.on_event(:favorite) do |event| - source = Twitter::User.new(event[:source]) - target_object = Twitter::Tweet.new(event[:target_object]) - unless target_object.user.protected && target_object.user.id != client.user_id - send_favorite(source, target_object) - end - end - - client.on_event(:unfavorite) do |event| - send_unfavorite(Twitter::User.new(event[:source]), Twitter::Tweet.new(event[:target_object])) - end - - client.on_delete do |status_id, user_id| - send_delete(status_id, user_id) - end + client.on_reconnect do |timeout, retries| + $logger.warn("Reconnected(##{client.user_id}): #{retries}") + end - client.userstream - $logger.info("Connected(##{client.user_id})") - rescue ::Yajl::ParseError - $logger.error("JSON Parse Error: #{json}") - rescue TweetStream::ReconnectError - $logger.warn("TweetStream::ReconnectError: #{client.row_id}/#{client.user_id}") - client.stop_stream + client.on_max_reconnects do |timeout, retries| + $logger.warn("Max reconnects: #{client.row_id}/#{client.user_id}") + client.connection.stop @clients.delete(client) end + + $logger.info("Connected(##{client.user_id})") end end end def stop_all - @clients.map(&:stop_stream) + @clients.map{|c| c.connection.stop} send_chunk("QUIT") end end def initialize - $logger = Aclog::Logger.new(:debug) + $logger = Aclog::Logger.new(:warn) end def start diff --git a/db/migrate/20130226151042_create_retweets.rb b/db/migrate/20130226151042_create_retweets.rb index 6de664e..10ea615 100644 --- a/db/migrate/20130226151042_create_retweets.rb +++ b/db/migrate/20130226151042_create_retweets.rb @@ -5,6 +5,7 @@ class CreateRetweets < ActiveRecord::Migration t.references :user, :limit => 8, :null => false end + add_index :retweets, [:tweet_id, :user_id], :unique => true add_index :retweets, :tweet_id end end diff --git a/lib/receiver/worker.rb b/lib/receiver/worker.rb index 7c2c11e..860d890 100644 --- a/lib/receiver/worker.rb +++ b/lib/receiver/worker.rb @@ -112,11 +112,16 @@ class Receiver::Worker < DaemonSpawn::Base arg = line.split(/ /, 2) case arg.first when "CONNECT" - ff = arg.last.split(/&/) - secret_token = ff[0] - worker_number = ff[1].to_i - worker_count = ff[2].to_i - if secret_token == Settings.secret_key + begin + json = ::Yajl::Parser.parse(arg.last, :symbolize_keys => true) + rescue ::Yajl::ParseError + # JSON parse error....?? + p $! + end + secret_key = json[:secret_key] + worker_number = json[:worker_number] + worker_count = json[:worker_count] + if secret_key == Settings.secret_key if $worker_count != worker_count && $connections.size > 0 $logger.error("Error: Worker Count Difference: $worker_count=#{$worker_count}, worker_count=#{worker_count}") send_chunk("ERROR Invalid Worker Count") @@ -125,13 +130,14 @@ class Receiver::Worker < DaemonSpawn::Base $worker_count = worker_count $connections[worker_number] = self @worker_number = worker_number + @authorized = true $logger.info("Connected: #{worker_number}") send_chunk("OK Connected") send_account_all end else $logger.error("Error: Invalid Secret Key") - send_chunk("ERROR Invalid Secret Token") + send_chunk("ERROR Invalid Secret Key") close_connection_after_writing end when "UNAUTHORIZED" @@ -142,7 +148,9 @@ class Receiver::Worker < DaemonSpawn::Base send_chunk("BYE") close_connection_after_writing else - @@wq.push arg + if @authorized + @@wq.push arg + end end end end diff --git a/public/assets/application-7be762e280e0e5656d2ea3a02cbf372c.css b/public/assets/application-7be762e280e0e5656d2ea3a02cbf372c.css new file mode 100644 index 0000000..80f7d9c --- /dev/null +++ b/public/assets/application-7be762e280e0e5656d2ea3a02cbf372c.css @@ -0,0 +1,116 @@ +.items { + width: 572px; + margin: 15px 0 15px 18px; } + .items .item { + margin: 15px 0; } + .items .item .tweet { + overflow: hidden; + background: white; + border: 1px solid #c1c5cb; + border-width: 1px 0; + padding: 15px; } + .items .item .tweet .avatar { + width: 60px; + float: left; } + .items .item .tweet .avatar img { + width: 48px; + height: 48px; } + .items .item .tweet .tweet_content_fix { + float: left; + width: 0; + height: 85px; } + .items .item .tweet .tweet_content { + float: left; + width: 473px; } + .items .item .tweet .tweet_content .user { + padding: 0 5px 15px; + font-weight: bold; } + .items .item .tweet .tweet_content .user .name { + font-size: 14px; } + .items .item .tweet .tweet_content .user .screen_name { + font-size: 12px; } + .items .item .tweet .tweet_content .user a { + color: #666666; } + .items .item .tweet .tweet_content .text { + font-size: 18px; + line-height: 25px; + padding: 0 5px 15px; + word-wrap: break-word; } + .items .item .tweet .tweet_content .meta { + padding: 0 5px 15px; + color: #666666; + font-size: 12px; } + .items .item .tweet .tweet_content .meta .twitter_bird { + display: block; + float: left; } + .items .item .tweet .tweet_content .meta .twitter_bird img { + vertical-align: top; + margin-right: 2px; + margin-top: 1px; } + .items .item .tweet .tweet_content .meta .created_at { + display: block; + float: left; } + .items .item .tweet .tweet_content .meta .source { + display: block; + float: right; } + .items .item .stats { + clear: both; + padding: 15px; } + .items .item .stats .favs_row, + .items .item .stats .retweets_row { + margin-bottom: 3px; } + .items .item .stats .favs_row .info, + .items .item .stats .retweets_row .info { + width: 60px; + float: left; } + .items .item .stats .favs_row .info .count, + .items .item .stats .favs_row .info .type, + .items .item .stats .retweets_row .info .count, + .items .item .stats .retweets_row .info .type { + color: #666666; + display: block; } + .items .item .stats .favs_row .info .count, + .items .item .stats .retweets_row .info .count { + font-weight: bold; + font-size: 14px; } + .items .item .stats .favs_row .info .type, + .items .item .stats .retweets_row .info .type { + font-size: 10px; } + .items .item .stats .favs_row .users_content_fix, + .items .item .stats .retweets_row .users_content_fix { + float: left; + height: 48px; + width: 0; } + .items .item .stats .favs_row .users_content, + .items .item .stats .retweets_row .users_content { + float: left; } + .items .item .stats .favs_row .users_content ul.users_row, + .items .item .stats .retweets_row .users_content ul.users_row { + list-style: none; } + .items .item .stats .favs_row .users_content ul.users_row li, + .items .item .stats .retweets_row .users_content ul.users_row li { + float: left; } + .items .item .stats .favs_row .users_content ul.users_row li img, + .items .item .stats .retweets_row .users_content ul.users_row li img { + width: 48px; + height: 48px; + vertical-align: bottom; } + +* { + margin: 0; + padding: 0; + font-family: "Ubuntu", sans-serif; } + +a { + text-decoration: none; + color: #3b5998; } + +img { + border: none; } + +.clearfix:after { + content: "."; + visibility: hidden; + display: block; + height: 0; + clear: both; } diff --git a/public/assets/application-7be762e280e0e5656d2ea3a02cbf372c.css.gz b/public/assets/application-7be762e280e0e5656d2ea3a02cbf372c.css.gz Binary files differnew file mode 100644 index 0000000..e08ad90 --- /dev/null +++ b/public/assets/application-7be762e280e0e5656d2ea3a02cbf372c.css.gz diff --git a/public/assets/manifest-0b26a376c9cfe81af432b634628ee75d.json b/public/assets/manifest-0b26a376c9cfe81af432b634628ee75d.json index 8aabeda..3d637cf 100644 --- a/public/assets/manifest-0b26a376c9cfe81af432b634628ee75d.json +++ b/public/assets/manifest-0b26a376c9cfe81af432b634628ee75d.json @@ -1 +1 @@ -{"files":{"bird_gray_16-93f51980875017e26744056b62aaac22.png":{"logical_path":"bird_gray_16.png","mtime":"2013-03-09T14:41:39+09:00","size":1106,"digest":"93f51980875017e26744056b62aaac22"},"rails-bc7d436ef8afbf0f88829742a43ba3a4.png":{"logical_path":"rails.png","mtime":"2013-02-25T21:20:52+09:00","size":6646,"digest":"bc7d436ef8afbf0f88829742a43ba3a4"},"application-47a9a60feaec5625a9c66b1c985a1179.js":{"logical_path":"application.js","mtime":"2013-03-09T01:51:23+09:00","size":743,"digest":"47a9a60feaec5625a9c66b1c985a1179"},"application-37cb32f627581ac8bac91345987e7f10.css":{"logical_path":"application.css","mtime":"2013-03-09T15:10:20+09:00","size":3294,"digest":"37cb32f627581ac8bac91345987e7f10"}},"assets":{"bird_gray_16.png":"bird_gray_16-93f51980875017e26744056b62aaac22.png","rails.png":"rails-bc7d436ef8afbf0f88829742a43ba3a4.png","application.js":"application-47a9a60feaec5625a9c66b1c985a1179.js","application.css":"application-37cb32f627581ac8bac91345987e7f10.css"}}
\ No newline at end of file +{"files":{"bird_gray_16-93f51980875017e26744056b62aaac22.png":{"logical_path":"bird_gray_16.png","mtime":"2013-03-09T14:41:39+09:00","size":1106,"digest":"93f51980875017e26744056b62aaac22"},"rails-bc7d436ef8afbf0f88829742a43ba3a4.png":{"logical_path":"rails.png","mtime":"2013-02-25T21:20:52+09:00","size":6646,"digest":"bc7d436ef8afbf0f88829742a43ba3a4"},"application-47a9a60feaec5625a9c66b1c985a1179.js":{"logical_path":"application.js","mtime":"2013-03-09T01:51:23+09:00","size":743,"digest":"47a9a60feaec5625a9c66b1c985a1179"},"application-37cb32f627581ac8bac91345987e7f10.css":{"logical_path":"application.css","mtime":"2013-03-09T15:10:20+09:00","size":3294,"digest":"37cb32f627581ac8bac91345987e7f10"},"application-7be762e280e0e5656d2ea3a02cbf372c.css":{"logical_path":"application.css","mtime":"2013-03-14T22:27:42+09:00","size":3969,"digest":"7be762e280e0e5656d2ea3a02cbf372c"}},"assets":{"bird_gray_16.png":"bird_gray_16-93f51980875017e26744056b62aaac22.png","rails.png":"rails-bc7d436ef8afbf0f88829742a43ba3a4.png","application.js":"application-47a9a60feaec5625a9c66b1c985a1179.js","application.css":"application-7be762e280e0e5656d2ea3a02cbf372c.css"}}
\ No newline at end of file diff --git a/lib/receiver/start.rb b/script/start.rb index 476c866..476c866 100644 --- a/lib/receiver/start.rb +++ b/script/start.rb |