diff options
author | Kazuki Yamaguchi <k@rhe.jp> | 2018-08-19 22:41:22 +0900 |
---|---|---|
committer | Kazuki Yamaguchi <k@rhe.jp> | 2018-08-20 18:30:37 +0900 |
commit | 5cf82c8beb85b8ccd1bc3f45fac8a80605db4e6c (patch) | |
tree | b12432ba1d260e0a1c25b4066e20ebed77255320 | |
download | twitter-event-stream-5cf82c8beb85b8ccd1bc3f45fac8a80605db4e6c.tar.gz |
twitter-event-stream
This is still work-in-progress: my application to a Twitter developer
account has not yet been approved.
-rw-r--r-- | .gitignore | 1 | ||||
-rw-r--r-- | COPYING | 19 | ||||
-rw-r--r-- | Gemfile | 5 | ||||
-rw-r--r-- | Gemfile.lock | 26 | ||||
-rw-r--r-- | Procfile | 1 | ||||
-rw-r--r-- | README.txt | 151 | ||||
-rw-r--r-- | app.json | 19 | ||||
-rw-r--r-- | app.rb | 149 | ||||
-rw-r--r-- | config.ru | 7 | ||||
-rw-r--r-- | service.rb | 180 | ||||
-rw-r--r-- | setup-oauth.rb | 61 | ||||
-rw-r--r-- | setup-webhook.rb | 3 |
12 files changed, 622 insertions, 0 deletions
diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..24fe853 --- /dev/null +++ b/.gitignore @@ -0,0 +1 @@ +/.bundle @@ -0,0 +1,19 @@ +Copyright (c) 2018 Kazuki Yamaguchi <k@rhe.jp> + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. @@ -0,0 +1,5 @@ +source "https://rubygems.org" + +gem "simple_oauth", "~> 0.3.1" +gem "sinatra", "~> 2.0" +gem "puma" diff --git a/Gemfile.lock b/Gemfile.lock new file mode 100644 index 0000000..cfdb202 --- /dev/null +++ b/Gemfile.lock @@ -0,0 +1,26 @@ +GEM + remote: https://rubygems.org/ + specs: + mustermann (1.0.3) + puma (3.12.0) + rack (2.0.5) + rack-protection (2.0.3) + rack + simple_oauth (0.3.1) + sinatra (2.0.3) + mustermann (~> 1.0) + rack (~> 2.0) + rack-protection (= 2.0.3) + tilt (~> 2.0) + tilt (2.0.8) + +PLATFORMS + ruby + +DEPENDENCIES + puma + simple_oauth (~> 0.3.1) + sinatra (~> 2.0) + +BUNDLED WITH + 1.16.3 diff --git a/Procfile b/Procfile new file mode 100644 index 0000000..ad78fdb --- /dev/null +++ b/Procfile @@ -0,0 +1 @@ +web: bundle exec puma -e production -p $PORT diff --git a/README.txt b/README.txt new file mode 100644 index 0000000..a649016 --- /dev/null +++ b/README.txt @@ -0,0 +1,151 @@ +twitter-event-stream +==================== + +*WORK IN PROGRESS: My application for a developer account has not yet been +approved and the code is not tested at all.* + +Description +----------- + +twitter-event-stream provides an HTTP long polling endpoint that works in a +similar way to the deprecated User Streams API[1]. It uses the REST API and +the Account Activity API. + +It is no easy work to update a Twitter client application built on top of the +User Streams API. Even worse, the Account Activity API which pretends to be +the replacement cannot be used directly by a mobile Twitter client. +twitter-event-stream allows such applications to continue to work with the +minimal amount of changes. + +[1] https://twittercommunity.com/t/details-and-what-to-expect-from-the-api-deprecations-this-week-on-august-16-2018/110746 + +Setup +----- + +Configuration +~~~~~~~~~~~~~ + + - You have to gain access to (the premium version of) the Account Activity + API. A whitelisted consumer key and the consumer secret are specified by + environment variables. + + TWITTER_EVENT_STREAM_CONSUMER_KEY=<consumer key> + TWITTER_EVENT_STREAM_CONSUMER_SECRET=<consumer secret> + + - Credentials used for fetching home_timeline are stored in environment + variables named `TWITTER_EVENT_STREAM_USER_<tag>`. `<tag>` may be any + text. + + TWITTER_EVENT_STREAM_USER_ABC=<value> + + `<value>` is a JSON encoding of the following object: + + { + "user_id": <user's numerical id>, + "requests_per_window": 15, + "token": <access token>, + "token_secret": <access token secret> + } + # Increase requests_per_window if your application is granted the + # permission to make more requests per 15 minutes window. + + If you need to use a different consumer key pair for the REST API requests, + add the following to the JSON object. The token may be read-only. + + { + "rest_consumer_key": <consumer key>, + "rest_consumer_secret": <consumer secret>, + "rest_token": <access token>, + "rest_token_secret": <access token secret>, + } + + NOTE: `setup-oauth.rb` included in this distribution might be useful to + do 3-legged OAuth and make the JSON object. + +Deployment +~~~~~~~~~~ + + - Ruby and Bundler are the prerequisites. + + - Install dependencies by `bundle install`, and then run + `bundle exec puma -e production -p $PORT`. + + * The quickest way to deploy twitter-event-stream would be to use Heroku. + Click the link and fill forms: https://heroku.com/deploy + + - Run `setup-webhook.rb` to register a webhook and perform the Challenge + Response Check required to activate it. The web application must be + already started. Note that `setup-webhook.rb` also requires the + environment variables to be set correctly. + + * twitter-event-stream will receive webhooks at /webhook. + +Usage +----- + +twitter-event-stream opens two endpoints for a client: + + - /1.1/user.json + + The message format is almost identical to the User streams' message format. + However, due to the limitation of the Account Activity API, direct messages + and some of the event types are not supported. + + - /stream + + Sends events and home_timeline tweets in the server-sent events format + (text/event-stream). Events have the structure: + + event: <event>\r\n + data: <payload>\r\n\r\n + + `<event>` will be one of the event types received by the webhook: + + * `favorite_events` (for example; see Twitter's documentation[2]) + + event: favorite_events\r\n + data: [{"id":"...","favorited_status":{...}}]\r\n\r\n + + Or, one of the following event types defined by twitter-event-stream: + + * `twitter_event_stream_home_timeline` + + New items in the home timeline. `<payload>` is an array of Tweet object. + + event: twitter_event_stream_home_timeline\r\n + data: [{"id":...,"text":"..."},...]\r\n\r\n + + * `twitter_event_stream_message` + + A message from twitter-event-stream, such as error reporting. `<payload>` + is a String. + + event: twitter_event_stream_message\r\n + data: "Message"\r\n\r\n + + Note that comment events are also sent every 30 seconds to keep the HTTP + connection open: + + :\r\n\r\n + + +twitter-event-stream uses "OAuth Echo"[3] to authenticate a client, meaning +an application must provide the following HTTP headers: + + - `x-auth-service-provider` + + Must be set to + "https://api.twitter.com/1.1/account/verify_credentials.json". + + - `x-verify-credentials-authorization` + + The content of the Authorization HTTP header that the client would + normally send when calling the account/verify_credentials API. + +[2] https://developer.twitter.com/en/docs/basics/authentication/overview/oauth-echo.html +[3] https://developer.twitter.com/en/docs/accounts-and-users/subscribe-account-activity/guides/account-activity-data-objects + +License +------- + +twitter-event-stream is licensed under the MIT license. See COPYING. diff --git a/app.json b/app.json new file mode 100644 index 0000000..0e399c0 --- /dev/null +++ b/app.json @@ -0,0 +1,19 @@ +{ + "name": "twitter-event-stream", + "description": "rhenium/twitter-event-stream", + "website": "https://github.com/rhenium/twitter-event-stream", + "repository": "https://github.com/rhenium/twitter-event-stream", + "env": { + "RACK_ENV": "production", + "TWITTER_EVENT_STREAM_CONSUMER_KEY": { + "description": "A consumer key whitelisted for the Account Activity API." + }, + "TWITTER_EVENT_STREAM_CONSUMER_SECRET": { + "description": "A consumer secret." + }, + "TWITTER_EVENT_STREAM_USER_1": { + "description": "A JSON object containing credentials for an user '1'.", + "required": false + } + } +} @@ -0,0 +1,149 @@ +require "sinatra/base" +require "json" +require_relative "service" + +class App < Sinatra::Base + enable :logging + set :consumer_key, ENV["TWITTER_EVENT_STREAM_CONSUMER_KEY"] + set :consumer_secret, ENV["TWITTER_EVENT_STREAM_CONSUMER_SECRET"] + + helpers do + def get_service + asp = request.env["HTTP_X_AUTH_SERVICE_PROVIDER"] + vca = request.env["HTTP_X_VERIFY_CREDENTIALS_AUTHORIZATION"] + Service.oauth_echo(asp, vca) + rescue ServiceError => e + halt 403, "authentication failed" + end + end + + get "/stream" do + content_type "text/event-stream" + service = get_service + logger.debug("/stream (#{service.user_id}): CONNECT!") + + # Heroku will kill the connection after 55 seconds of inactivity. + # https://devcenter.heroku.com/articles/request-timeout#long-polling-and-streaming-responses + queue = Thread::Queue.new + th = Thread.start { sleep 15; loop { queue << ":\r\n\r\n"; sleep 30 } } + tag = service.subscribe(params["count"].to_i) { |event, data| + queue << "event: #{event}\r\ndata: #{JSON.generate(data)}\r\n\r\n" + } + + stream(true) do |out| + out.callback { + logger.debug("/stream (#{service.user_id}): CLEANUP!") + queue.close + service.unsubscribe(tag) + th.kill; th.join + } + loop { out << queue.pop } + end + end + + get "/1.1/user.json" do + content_type :json + service = get_service + logger.debug("/1.1/user.json (#{service.user_id}): CONNECT!") + + friend_ids = service.twitter_get("/1.1/friends/ids.json", + { "user_id" => service.user_id }) + + queue = Thread::Queue.new + queue << "#{JSON.generate({ "friends" => friend_ids["ids"] })}\r\n" + + th = Thread.start { sleep 15; loop { queue << "\r\n"; sleep 30 } } + tag = service.subscribe(params["count"].to_i) { |event, data| + case event + when "twitter_event_stream_home_timeline" + queue << data.map { |object| JSON.generate(object) }.join("\r\n") + when "twitter_event_stream_message" + when "tweet_create_events" + queue << data.map { |object| JSON.generate(object) }.join("\r\n") + when "favorites_events" + queue << data.map { |object| + JSON.generate({ + "event" => "favorite", + "created_at" => object["created_at"], + "source" => object["user"], + "target" => object["favorited_status"]["user"], + "target_object" => object["favorited_status"], + }) + }.join("\r\n") + when "follow_events", "block_events" + queue << data.map { |object| + JSON.generate({ + "event" => object["type"], + "created_at" => Time.utc(Integer(object["created_timestamp"])) + .strftime("%a %b %d %T %z %Y"), + "source" => object["user"], + "target" => object["favorited_status"]["user"], + "target_object" => object["favorited_status"], + }) + }.join("\r\n") + when "mute_events" + # Not supported + when "direct_message_events", "direct_message_indicate_typing_events", + "direct_message_mark_read_events" + # Not supported + when "tweet_delete_events" + queue << data.map { |object| + JSON.generate({ + "delete" => object + }) + }.join("\r\n") + else + logger.info("/1.1/user.json (#{service.user_id}): " \ + "unknown event: #{event}") + end + } + + stream(true) do |out| + out.callback { + logger.debug("/1.1/user.json (#{service.user_id}): CLEANUP!") + queue.close + service.unsubscribe(tag) + th.kill; th.join + } + loop { out << queue.pop } + end + end + + get "/webhook" do + content_type :json + crc_token = params["crc_token"] or + halt 400, "crc_token missing" + mac = OpenSSL::HMAC.digest("sha256", settings.consumer_secret, crc_token) + response_token = "sha256=#{[mac].pack("m0")}" + JSON.generate({ "response_token" => response_token }) + end + + post "/webhook" do + content_type :json + body = request.body.read + mac = OpenSSL::HMAC.digest("sha256", settings.consumer_secret, body) + sig = "sha256=#{[mac].pack("m0")}" + if request.env["HTTP_X_TWITTER_WEBHOOKS_SIGNATURE"] == sig + Service.feed_webhook(body) + else + logger.info "x-twitter-webhooks-signature invalid" + end + JSON.generate({ "looks" => "ok" }) + end + + get "/" do + <<~'EOF' + <!DOCTYPE html> + <meta charset=UTF-8> + <meta name=viewport content="width=device-width,initial-scale=1"> + <title>twitter-event-stream</title> + <style> + div { max-width: 1200px; margin: 0 auto; } + </style> + <div> + <h1>twitter-event-stream</h1> + <a href="https://github.com/rhenium/twitter-event-stream">Source Code</a> + </div> + EOF + end +end diff --git a/config.ru b/config.ru new file mode 100644 index 0000000..766432f --- /dev/null +++ b/config.ru @@ -0,0 +1,7 @@ +# Start home_timeline polling +require_relative "service" +Service.setup + +# Start web app +require_relative "app" +run App diff --git a/service.rb b/service.rb new file mode 100644 index 0000000..be028d1 --- /dev/null +++ b/service.rb @@ -0,0 +1,180 @@ +require "json" +require "net/http" +require "simple_oauth" + +class ServiceError < StandardError; end + +class Service + class << self + private :new + + def setup + aa_consumer_key = ENV["TWITTER_EVENT_STREAM_CONSUMER_KEY"] + aa_consumer_secret = ENV["TWITTER_EVENT_STREAM_CONSUMER_SECRET"] + + @users = {} + ENV.each { |k, v| + next unless k.start_with?("TWITTER_EVENT_STREAM_USER_") + obj = JSON.parse(v, symbolize_names: true) + @users[obj.fetch(:user_id)] = new( + consumer_key: aa_consumer_key, + consumer_secret: aa_consumer_secret, + **obj, + ) + + # TODO: Add to the webhook if needed + } + end + + def oauth_echo(asp, vca) + if asp != "https://api.twitter.com/1.1/account/verify_credentials.json" + raise ServiceError, "invalid OAuth Echo parameters" + end + + uri = URI.parse(asp) + Net::HTTP.start(uri.host, uri.port, use_ssl: true) { |http| + res = http.get(uri.path, { "Authorization" => vca }) + raise ServiceError, "OAuth Echo failed" if res.code != "200" + content = JSON.parse(res.body) + get(content["id"]) + } + end + + def feed_webhook(json) + hash = JSON.parse(json) + if user_id = hash["for_user_id"] + service = get(user_id) + service.feed_webhook(hash) + else + warn "FIXME\n#{hash}" + end + end + + private + + def get(user_id) + @users[user_id] or + raise ServiceError, "unauthenticated user: #{user_id}" + end + end + + attr_reader :user_id + + def initialize(user_id:, + requests_per_window:, + consumer_key:, + consumer_secret:, + token:, + token_secret:, + rest_consumer_key: consumer_key, + rest_consumer_secret: consumer_secret, + rest_token: token, + rest_token_secret: token_secret) + @user_id = user_id + @requests_per_window = Integer(requests_per_window) + @aa_oauth = { + consumer_key: consumer_key, + consumer_secret: consumer_secret, + token: token, + token_secret: token_secret, + } + @rest_oauth = { + consumer_key: rest_consumer_key, + consumer_secret: rest_consumer_secret, + token: rest_token, + token_secret: rest_token_secret, + } + @listeners = {} + @backfill = [] + start_polling + end + + def subscribe(count, &block) + @listeners[block] = block + emit_backfill(count) + block + end + + def unsubscribe(tag) + @listeners.delete(tag) + end + + def feed_webhook(hash) + hash.each do |key, value| + next if key == "for_user_id" + emit(key, value) + end + end + + def twitter_get(path, params) + path += "?" + params.map { |k, v| "#{k}=#{v}" }.join("&") if !params.empty? + uri = URI.parse("https://api.twitter.com#{path}") + auth = SimpleOAuth::Header.new(:get, uri.to_s, {}, @rest_oauth).to_s + + Net::HTTP.start(uri.host, uri.port, use_ssl: true) { |http| + res = http.get(path, { "Authorization" => auth }) + if res.code != "200" + # pp res.each_header.to_h + raise ServiceError, "API request failed: path=#{path} body=#{res.body}" + end + JSON.parse(res.body) + } + end + + private + + def emit(event, object) + # TODO: backfill + @backfill.shift if @backfill.size == 100 + @backfill << [event, object] + @listeners.each { |_, block| block.call(event, object) } + end + + def emit_system(message) + emit("twitter_event_stream_message", message) + end + + def emit_backfill(count) + @backfill.last(count).each { |event, object| emit(event, object) } + end + + def start_polling + @polling_thread = Thread.start { + request_interval = 15.0 * 60 / @requests_per_window + + begin + last_max = nil + while true + t = Time.now + opts = { "count" => 200, "since_id" => last_max ? last_max - 1 : 1 } + ret = twitter_get("/1.1/statuses/home_timeline.json", opts) + + unless ret.empty? + if last_max + if last_max != ret.last["id"] + emit_system("possible stalled tweets " \ + "#{last_max}+1...#{ret.last["id"]}") + else + ret.pop + end + end + + unless ret.empty? + emit("twitter_event_stream_home_timeline", ret) + last_max = ret.first["id"] + end + end + + sleep -(Time.now - t) % request_interval + end + rescue => e + warn "polling_thread (#{user_id}) uncaught exception: " \ + "#{e.class} (#{e.message})" + warn e.backtrace + warn "polling_thread (#{user_id}) restarting in #{request_interval}s" + sleep request_interval + retry + end + } + end +end diff --git a/setup-oauth.rb b/setup-oauth.rb new file mode 100644 index 0000000..4de360d --- /dev/null +++ b/setup-oauth.rb @@ -0,0 +1,61 @@ +require "json" +require "net/http" +require "simple_oauth" + +if ARGV.size != 2 + STDERR.puts "Usage: ruby setup-oauth.rb <consumer key> <consumer secret>" + exit 1 +end + +def oauth_post(path, params, oauth) + body = params.map { |k, v| "#{k}=#{v}" }.join("&") + + uri = URI.parse("https://api.twitter.com#{path}") + auth = SimpleOAuth::Header.new(:post, uri.to_s, params, oauth).to_s + + Net::HTTP.start(uri.host, uri.port, use_ssl: true) { |http| + res = http.post(path, body, { "Authorization" => auth }) + if res.code != "200" + raise "request failed: path=#{path} code=#{res.code} body=#{res.body}" + end + res.body + } +end + + +oauth_opts = { consumer_key: ARGV[0], consumer_secret: ARGV[1] } + +puts "#POST /oauth/request_token" +authorize_params = oauth_post("/oauth/request_token", + { "oauth_callback" => "oob" }, oauth_opts) +extracted = authorize_params.split("&").map { |v| v.split("=") }.to_h +oauth_opts[:token] = extracted["oauth_token"] +oauth_opts[:token_secret] = extracted["oauth_token_secret"] +puts "#=> #{authorize_params}" +puts + +puts "Visit https://api.twitter.com/oauth/authorize?oauth_token=" \ + "#{extracted["oauth_token"]}" +print "Input PIN code: " +pin = STDIN.gets.chomp +puts + +puts "#POST /oauth/access_token" +oauth_params = oauth_post("/oauth/access_token", + { "oauth_verifier" => pin }, oauth_opts) +puts "#=> #{oauth_params}" +puts + +extracted = oauth_params.split("&").map { |v| v.split("=") }.to_h +user_id = extracted["oauth_token"].split("-")[0].to_i +obj = { + user_id: user_id, + requests_per_window: 15, + token: extracted["oauth_token"], + token_secret: extracted["oauth_token_secret"], + rest_consumer_key: oauth_opts[:consumer_key], + rest_consumer_secret: oauth_opts[:consumer_secret], + rest_token: extracted["oauth_token"], + rest_token_secret: extracted["oauth_token_secret"], +} +puts "TWITTER_EVENT_STREAM_USER_#{user_id}='#{JSON.generate(obj)}'" diff --git a/setup-webhook.rb b/setup-webhook.rb new file mode 100644 index 0000000..3c7c2f6 --- /dev/null +++ b/setup-webhook.rb @@ -0,0 +1,3 @@ +require "json" +require "net/http" +require "simple_oauth" |