aboutsummaryrefslogtreecommitdiffstats
path: root/core/plugin/streaming/streamer.rb
blob: 60b4c5a4e8d9c436b90c75538094aa8747d00b57 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
# -*- coding: utf-8 -*-
require 'thread'
require File.expand_path File.join(File.dirname(__FILE__), 'streamer_error')

module ::Plugin::Streaming
  class Streamer
    attr_reader :thread, :service

    # イベントを登録する
    # ==== Args
    # [name] イベント名
    # [many] オブジェクトをまとめて配列で受け取るかどうか
    # [&proc] イベントを受け取るオブジェクト。
    def self.defevent(name, many=false, &proc)
      speed_key = "#{name}_queue_delay".to_sym
      define_method("_event_#{name}", &proc)
      if many
        define_method("event_#{name}"){ |json|
          @queue[name] ||= TimeLimitedQueue.new(HYDE, everytime{ (UserConfig[speed_key] || 100).to_f / 1000 }){ |data|
            begin
              __send__("_event_#{name}", data)
            rescue Exception => e
              warn e end }
          @threads[name] ||= everytime{ @queue[name].thread }
          @queue[name].push json }
      else
        define_method("event_#{name}"){ |json|
          @queue[name] ||= Queue.new
          @threads[name] ||= Thread.new{
            loop{
              begin
                sleep((UserConfig[speed_key] || 100).to_f / 1000)
                __send__("_event_#{name}", @queue[name].pop)
              rescue Exception => e
                warn e end } }
          queue_push(name, json) } end end

    # ==== Args
    # [service] 接続するService
    # [on_connect] 接続されたら呼ばれる
    def initialize(service, &on_connect)
      @service = service
      @thread = Thread.new(&method(:mainloop))
      @on_connect = on_connect
      @threads = {}
      @queue = {} end

    def mainloop
      service.streaming(:userstream, include_followings_activity: true){ |q|
        if q and not q.empty?
          parsed = JSON.parse(q) rescue nil
          event_factory parsed if parsed end }
    rescue Net::ReadTimeout
      raise
    rescue => exception
      error exception
      raise end

    # UserStreamを終了する
    def kill
      @thread.kill
      @threads.each{ |event, thread|
        thread.kill }
      @threads.clear
      @queue.clear end

    private

    # イベント _name_ のキューに値 _data_ を追加する。
    # ==== Args
    # [name] イベント名
    # [data] キューに入れる値
    # ==== Exception
    # キューを処理するスレッドが正常終了している場合、 Plugin::Streaming::StreamerError を発生させる。
    # 異常終了している場合は、その例外をそのまま発生させる。
    def queue_push(name, data)
      if @threads[name] && @threads[name]
        if @threads[name].alive?
          @queue[name].push data
        else
          if @threads[name].status.nil?
            @queue[name].thread.status.join
          else
            raise Plugin::Streaming::StreamerError, "event '#{name}' thread is dead." end end end end

    # UserStreamで流れてきた情報を処理する
    # ==== Args
    # [parsed] パースされたJSONオブジェクト
    def event_factory(json)
      json.freeze
      case
      when json['friends']
        if @on_connect
          @on_connect.call(json)
          @on_connect = nil end
      when respond_to?("event_#{json['event']}")
        __send__(:"event_#{json['event']}", json)
      when json['direct_message']
        event_direct_message(json['direct_message'])
      when json['delete']
        # if Mopt.debug
        #   Plugin.activity :system, YAML.dump(json)
        # end
      when !json.has_key?('event')
        event_update(json)
      when Mopt.debug
        Plugin.activity :system, YAML.dump(json)
      else
        if Mopt.debug
          Plugin.activity :system, "unsupported event:\n" + YAML.dump(json) end end end

    defevent(:update, true) do |data|
      events = {update: Messages.new, mention: Messages.new, mypost: Messages.new}
      data.each { |json|
        msg = MikuTwitter::ApiCallSupport::Request::Parser.streaming_message(json.symbolize)
        events[:update] << msg
        events[:mention] << msg if msg.to_me?
        events[:mypost] << msg if msg.from_me? }
      events.each{ |event_name, data|
        Plugin.call(event_name, @service, data.freeze) } end

    defevent(:direct_message, true) do |data|
      Plugin.call(:direct_messages, @service, data.map{ |datum| MikuTwitter::ApiCallSupport::Request::Parser.direct_message(datum.symbolize) }) end

    defevent(:favorite) do |json|
      by = MikuTwitter::ApiCallSupport::Request::Parser.user(json['source'].symbolize)
      to = MikuTwitter::ApiCallSupport::Request::Parser.streaming_message(json['target_object'].symbolize)
      if(to.respond_to?(:add_favorited_by))
        to.add_favorited_by(by, Time.parse(json['created_at'])) end end

    defevent(:unfavorite) do |json|
      by = MikuTwitter::ApiCallSupport::Request::Parser.user(json['source'].symbolize)
      to = MikuTwitter::ApiCallSupport::Request::Parser.streaming_message(json['target_object'].symbolize)
      if(to.respond_to?(:remove_favorited_by))
        to.remove_favorited_by(by) end end

    defevent(:favorited_retweet) do |json|
      by = MikuTwitter::ApiCallSupport::Request::Parser.user(json['source'].symbolize)
      to = Message.findbyid(json['target_object']['id'].to_i)
      if to.is_a? Message
        source_message = to.retweet_source
        if(to.respond_to?(:add_favorited_by))
          source_message.add_favorited_by(by, Time.parse(json['created_at'])) end end end

    defevent(:retweeted_retweet) do |json|
      by = MikuTwitter::ApiCallSupport::Request::Parser.user(json['source'].symbolize)
      #to = MikuTwitter::ApiCallSupport::Request::Parser.user(json['target'].symbolize)
      target_object = MikuTwitter::ApiCallSupport::Request::Parser.streaming_message(json['target_object'].symbolize)
      source_object = target_object.retweet_source
      source_object.add_retweet_user(by, Time.parse(json['created_at'])) end

    defevent(:quoted_tweet) do |json|
      MikuTwitter::ApiCallSupport::Request::Parser.streaming_message(json['target_object'].symbolize) end

    defevent(:follow) do |json|
      source = MikuTwitter::ApiCallSupport::Request::Parser.user(json['source'].symbolize)
      target = MikuTwitter::ApiCallSupport::Request::Parser.user(json['target'].symbolize)
      if target.me?(@service)
        Plugin.call(:followers_created, @service, Users.new([source]))
      elsif source.me?(@service)
        Plugin.call(:followings_created, @service, Users.new([target])) end end

    defevent(:list_member_added) do |json|
      target_user = MikuTwitter::ApiCallSupport::Request::Parser.user(json['target'].symbolize) # リストに追加されたユーザ
      list = MikuTwitter::ApiCallSupport::Request::Parser.list(json['target_object'].symbolize) # リスト
      source_user = MikuTwitter::ApiCallSupport::Request::Parser.user(json['source'].symbolize) # 追加したユーザ
      list.add_member(target_user)
      Plugin.call(:list_member_added, @service, target_user, list, source_user) end

    defevent(:list_member_removed) do |json|
      target_user = MikuTwitter::ApiCallSupport::Request::Parser.user(json['target'].symbolize) # リストに追加されたユーザ
      list = MikuTwitter::ApiCallSupport::Request::Parser.list(json['target_object'].symbolize) # リスト
      source_user = MikuTwitter::ApiCallSupport::Request::Parser.user(json['source'].symbolize) # 追加したユーザ
      list.remove_member(target_user)
      Plugin.call(:list_member_removed, @service, target_user, list, source_user) end

  end
end