# -*- coding: utf-8 -*- require 'thread' require File.expand_path File.join(File.dirname(__FILE__), 'streamer_error') module ::Plugin::Streaming class Streamer attr_reader :thread, :service # イベントを登録する # ==== Args # [name] イベント名 # [many] オブジェクトをまとめて配列で受け取るかどうか # [&proc] イベントを受け取るオブジェクト。 def self.defevent(name, many=false, &proc) speed_key = "#{name}_queue_delay".to_sym define_method("_event_#{name}", &proc) if many define_method("event_#{name}"){ |json| @queue[name] ||= TimeLimitedQueue.new(HYDE, everytime{ (UserConfig[speed_key] || 100).to_f / 1000 }){ |data| begin __send__("_event_#{name}", data) rescue Exception => e warn e end } @threads[name] ||= everytime{ @queue[name].thread } @queue[name].push json } else define_method("event_#{name}"){ |json| @queue[name] ||= Queue.new @threads[name] ||= Thread.new{ loop{ begin sleep((UserConfig[speed_key] || 100).to_f / 1000) __send__("_event_#{name}", @queue[name].pop) rescue Exception => e warn e end } } queue_push(name, json) } end end # ==== Args # [service] 接続するService # [on_connect] 接続されたら呼ばれる def initialize(service, &on_connect) @service = service @thread = Thread.new(&method(:mainloop)) @on_connect = on_connect @threads = {} @queue = {} end def mainloop service.streaming{ |q| if q and not q.empty? parsed = JSON.parse(q) rescue nil event_factory parsed if parsed end } rescue Net::ReadTimeout raise rescue => exception error exception raise end # UserStreamを終了する def kill @thread.kill @threads.each{ |event, thread| thread.kill } @threads.clear @queue.clear end private # イベント _name_ のキューに値 _data_ を追加する。 # ==== Args # [name] イベント名 # [data] キューに入れる値 # ==== Exception # キューを処理するスレッドが正常終了している場合、 Plugin::Streaming::StreamerError を発生させる。 # 異常終了している場合は、その例外をそのまま発生させる。 def queue_push(name, data) if @threads[name] && @threads[name] if @threads[name].alive? @queue[name].push data else if @threads[name].status.nil? @queue[name].thread.status.join else raise Plugin::Streaming::StreamerError, "event '#{name}' thread is dead." end end end end # UserStreamで流れてきた情報を処理する # ==== Args # [parsed] パースされたJSONオブジェクト def event_factory(json) json.freeze case when json['friends'] if @on_connect @on_connect.call(json) @on_connect = nil end when respond_to?("event_#{json['event']}") __send__(:"event_#{json['event']}", json) when json['direct_message'] event_direct_message(json['direct_message']) when json['delete'] # if Mopt.debug # Plugin.activity :system, YAML.dump(json) # end when !json.has_key?('event') event_update(json) when Mopt.debug Plugin.activity :system, YAML.dump(json) else if Mopt.debug Plugin.activity :system, "unsupported event:\n" + YAML.dump(json) end end end defevent(:update, true) do |data| events = {update: Messages.new, mention: Messages.new, mypost: Messages.new} data.each { |json| msg = MikuTwitter::ApiCallSupport::Request::Parser.streaming_message(json.symbolize) events[:update] << msg events[:mention] << msg if msg.to_me? events[:mypost] << msg if msg.from_me? } events.each{ |event_name, data| Plugin.call(event_name, @service, data.freeze) } end defevent(:direct_message, true) do |data| Plugin.call(:direct_messages, @service, data.map{ |datum| MikuTwitter::ApiCallSupport::Request::Parser.direct_message(datum.symbolize) }) end defevent(:favorite) do |json| by = MikuTwitter::ApiCallSupport::Request::Parser.user(json['source'].symbolize) to = MikuTwitter::ApiCallSupport::Request::Parser.streaming_message(json['target_object'].symbolize) if(to.respond_to?(:add_favorited_by)) to.add_favorited_by(by, Time.parse(json['created_at'])) end end defevent(:unfavorite) do |json| by = MikuTwitter::ApiCallSupport::Request::Parser.user(json['source'].symbolize) to = MikuTwitter::ApiCallSupport::Request::Parser.streaming_message(json['target_object'].symbolize) if(to.respond_to?(:remove_favorited_by)) to.remove_favorited_by(by) end end defevent(:favorited_retweet) do |json| by = MikuTwitter::ApiCallSupport::Request::Parser.user(json['source'].symbolize) to = Message.findbyid(json['target_object']['id'].to_i) if to.is_a? Message source_message = to.retweet_source if(to.respond_to?(:add_favorited_by)) source_message.add_favorited_by(by, Time.parse(json['created_at'])) end end end defevent(:retweeted_retweet) do |json| by = MikuTwitter::ApiCallSupport::Request::Parser.user(json['source'].symbolize) #to = MikuTwitter::ApiCallSupport::Request::Parser.user(json['target'].symbolize) target_object = MikuTwitter::ApiCallSupport::Request::Parser.streaming_message(json['target_object'].symbolize) source_object = target_object.retweet_source source_object.add_retweet_user(by, Time.parse(json['created_at'])) end defevent(:quoted_tweet) do |json| MikuTwitter::ApiCallSupport::Request::Parser.streaming_message(json['target_object'].symbolize) end defevent(:follow) do |json| source = MikuTwitter::ApiCallSupport::Request::Parser.user(json['source'].symbolize) target = MikuTwitter::ApiCallSupport::Request::Parser.user(json['target'].symbolize) if target.me?(@service) Plugin.call(:followers_created, @service, Users.new([source])) elsif source.me?(@service) Plugin.call(:followings_created, @service, Users.new([target])) end end defevent(:list_member_added) do |json| target_user = MikuTwitter::ApiCallSupport::Request::Parser.user(json['target'].symbolize) # リストに追加されたユーザ list = MikuTwitter::ApiCallSupport::Request::Parser.list(json['target_object'].symbolize) # リスト source_user = MikuTwitter::ApiCallSupport::Request::Parser.user(json['source'].symbolize) # 追加したユーザ list.add_member(target_user) Plugin.call(:list_member_added, @service, target_user, list, source_user) end defevent(:list_member_removed) do |json| target_user = MikuTwitter::ApiCallSupport::Request::Parser.user(json['target'].symbolize) # リストに追加されたユーザ list = MikuTwitter::ApiCallSupport::Request::Parser.list(json['target_object'].symbolize) # リスト source_user = MikuTwitter::ApiCallSupport::Request::Parser.user(json['source'].symbolize) # 追加したユーザ list.remove_member(target_user) Plugin.call(:list_member_removed, @service, target_user, list, source_user) end end end