aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorKazuki Yamaguchi <k@rhe.jp>2018-08-19 22:41:22 +0900
committerKazuki Yamaguchi <k@rhe.jp>2018-08-20 18:30:37 +0900
commit5cf82c8beb85b8ccd1bc3f45fac8a80605db4e6c (patch)
treeb12432ba1d260e0a1c25b4066e20ebed77255320
downloadtwitter-event-stream-5cf82c8beb85b8ccd1bc3f45fac8a80605db4e6c.tar.gz
twitter-event-stream
This is still work-in-progress: my application to a Twitter developer account has not yet been approved.
-rw-r--r--.gitignore1
-rw-r--r--COPYING19
-rw-r--r--Gemfile5
-rw-r--r--Gemfile.lock26
-rw-r--r--Procfile1
-rw-r--r--README.txt151
-rw-r--r--app.json19
-rw-r--r--app.rb149
-rw-r--r--config.ru7
-rw-r--r--service.rb180
-rw-r--r--setup-oauth.rb61
-rw-r--r--setup-webhook.rb3
12 files changed, 622 insertions, 0 deletions
diff --git a/.gitignore b/.gitignore
new file mode 100644
index 0000000..24fe853
--- /dev/null
+++ b/.gitignore
@@ -0,0 +1 @@
+/.bundle
diff --git a/COPYING b/COPYING
new file mode 100644
index 0000000..d9d2f45
--- /dev/null
+++ b/COPYING
@@ -0,0 +1,19 @@
+Copyright (c) 2018 Kazuki Yamaguchi <k@rhe.jp>
+
+Permission is hereby granted, free of charge, to any person obtaining a copy
+of this software and associated documentation files (the "Software"), to deal
+in the Software without restriction, including without limitation the rights
+to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+copies of the Software, and to permit persons to whom the Software is
+furnished to do so, subject to the following conditions:
+
+The above copyright notice and this permission notice shall be included in all
+copies or substantial portions of the Software.
+
+THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+SOFTWARE.
diff --git a/Gemfile b/Gemfile
new file mode 100644
index 0000000..4307cdb
--- /dev/null
+++ b/Gemfile
@@ -0,0 +1,5 @@
+source "https://rubygems.org"
+
+gem "simple_oauth", "~> 0.3.1"
+gem "sinatra", "~> 2.0"
+gem "puma"
diff --git a/Gemfile.lock b/Gemfile.lock
new file mode 100644
index 0000000..cfdb202
--- /dev/null
+++ b/Gemfile.lock
@@ -0,0 +1,26 @@
+GEM
+ remote: https://rubygems.org/
+ specs:
+ mustermann (1.0.3)
+ puma (3.12.0)
+ rack (2.0.5)
+ rack-protection (2.0.3)
+ rack
+ simple_oauth (0.3.1)
+ sinatra (2.0.3)
+ mustermann (~> 1.0)
+ rack (~> 2.0)
+ rack-protection (= 2.0.3)
+ tilt (~> 2.0)
+ tilt (2.0.8)
+
+PLATFORMS
+ ruby
+
+DEPENDENCIES
+ puma
+ simple_oauth (~> 0.3.1)
+ sinatra (~> 2.0)
+
+BUNDLED WITH
+ 1.16.3
diff --git a/Procfile b/Procfile
new file mode 100644
index 0000000..ad78fdb
--- /dev/null
+++ b/Procfile
@@ -0,0 +1 @@
+web: bundle exec puma -e production -p $PORT
diff --git a/README.txt b/README.txt
new file mode 100644
index 0000000..a649016
--- /dev/null
+++ b/README.txt
@@ -0,0 +1,151 @@
+twitter-event-stream
+====================
+
+*WORK IN PROGRESS: My application for a developer account has not yet been
+approved and the code is not tested at all.*
+
+Description
+-----------
+
+twitter-event-stream provides an HTTP long polling endpoint that works in a
+similar way to the deprecated User Streams API[1]. It uses the REST API and
+the Account Activity API.
+
+It is no easy work to update a Twitter client application built on top of the
+User Streams API. Even worse, the Account Activity API which pretends to be
+the replacement cannot be used directly by a mobile Twitter client.
+twitter-event-stream allows such applications to continue to work with the
+minimal amount of changes.
+
+[1] https://twittercommunity.com/t/details-and-what-to-expect-from-the-api-deprecations-this-week-on-august-16-2018/110746
+
+Setup
+-----
+
+Configuration
+~~~~~~~~~~~~~
+
+ - You have to gain access to (the premium version of) the Account Activity
+ API. A whitelisted consumer key and the consumer secret are specified by
+ environment variables.
+
+ TWITTER_EVENT_STREAM_CONSUMER_KEY=<consumer key>
+ TWITTER_EVENT_STREAM_CONSUMER_SECRET=<consumer secret>
+
+ - Credentials used for fetching home_timeline are stored in environment
+ variables named `TWITTER_EVENT_STREAM_USER_<tag>`. `<tag>` may be any
+ text.
+
+ TWITTER_EVENT_STREAM_USER_ABC=<value>
+
+ `<value>` is a JSON encoding of the following object:
+
+ {
+ "user_id": <user's numerical id>,
+ "requests_per_window": 15,
+ "token": <access token>,
+ "token_secret": <access token secret>
+ }
+ # Increase requests_per_window if your application is granted the
+ # permission to make more requests per 15 minutes window.
+
+ If you need to use a different consumer key pair for the REST API requests,
+ add the following to the JSON object. The token may be read-only.
+
+ {
+ "rest_consumer_key": <consumer key>,
+ "rest_consumer_secret": <consumer secret>,
+ "rest_token": <access token>,
+ "rest_token_secret": <access token secret>,
+ }
+
+ NOTE: `setup-oauth.rb` included in this distribution might be useful to
+ do 3-legged OAuth and make the JSON object.
+
+Deployment
+~~~~~~~~~~
+
+ - Ruby and Bundler are the prerequisites.
+
+ - Install dependencies by `bundle install`, and then run
+ `bundle exec puma -e production -p $PORT`.
+
+ * The quickest way to deploy twitter-event-stream would be to use Heroku.
+ Click the link and fill forms: https://heroku.com/deploy
+
+ - Run `setup-webhook.rb` to register a webhook and perform the Challenge
+ Response Check required to activate it. The web application must be
+ already started. Note that `setup-webhook.rb` also requires the
+ environment variables to be set correctly.
+
+ * twitter-event-stream will receive webhooks at /webhook.
+
+Usage
+-----
+
+twitter-event-stream opens two endpoints for a client:
+
+ - /1.1/user.json
+
+ The message format is almost identical to the User streams' message format.
+ However, due to the limitation of the Account Activity API, direct messages
+ and some of the event types are not supported.
+
+ - /stream
+
+ Sends events and home_timeline tweets in the server-sent events format
+ (text/event-stream). Events have the structure:
+
+ event: <event>\r\n
+ data: <payload>\r\n\r\n
+
+ `<event>` will be one of the event types received by the webhook:
+
+ * `favorite_events` (for example; see Twitter's documentation[2])
+
+ event: favorite_events\r\n
+ data: [{"id":"...","favorited_status":{...}}]\r\n\r\n
+
+ Or, one of the following event types defined by twitter-event-stream:
+
+ * `twitter_event_stream_home_timeline`
+
+ New items in the home timeline. `<payload>` is an array of Tweet object.
+
+ event: twitter_event_stream_home_timeline\r\n
+ data: [{"id":...,"text":"..."},...]\r\n\r\n
+
+ * `twitter_event_stream_message`
+
+ A message from twitter-event-stream, such as error reporting. `<payload>`
+ is a String.
+
+ event: twitter_event_stream_message\r\n
+ data: "Message"\r\n\r\n
+
+ Note that comment events are also sent every 30 seconds to keep the HTTP
+ connection open:
+
+ :\r\n\r\n
+
+
+twitter-event-stream uses "OAuth Echo"[3] to authenticate a client, meaning
+an application must provide the following HTTP headers:
+
+ - `x-auth-service-provider`
+
+ Must be set to
+ "https://api.twitter.com/1.1/account/verify_credentials.json".
+
+ - `x-verify-credentials-authorization`
+
+ The content of the Authorization HTTP header that the client would
+ normally send when calling the account/verify_credentials API.
+
+[2] https://developer.twitter.com/en/docs/basics/authentication/overview/oauth-echo.html
+[3] https://developer.twitter.com/en/docs/accounts-and-users/subscribe-account-activity/guides/account-activity-data-objects
+
+License
+-------
+
+twitter-event-stream is licensed under the MIT license. See COPYING.
diff --git a/app.json b/app.json
new file mode 100644
index 0000000..0e399c0
--- /dev/null
+++ b/app.json
@@ -0,0 +1,19 @@
+{
+ "name": "twitter-event-stream",
+ "description": "rhenium/twitter-event-stream",
+ "website": "https://github.com/rhenium/twitter-event-stream",
+ "repository": "https://github.com/rhenium/twitter-event-stream",
+ "env": {
+ "RACK_ENV": "production",
+ "TWITTER_EVENT_STREAM_CONSUMER_KEY": {
+ "description": "A consumer key whitelisted for the Account Activity API."
+ },
+ "TWITTER_EVENT_STREAM_CONSUMER_SECRET": {
+ "description": "A consumer secret."
+ },
+ "TWITTER_EVENT_STREAM_USER_1": {
+ "description": "A JSON object containing credentials for an user '1'.",
+ "required": false
+ }
+ }
+}
diff --git a/app.rb b/app.rb
new file mode 100644
index 0000000..286a4d9
--- /dev/null
+++ b/app.rb
@@ -0,0 +1,149 @@
+require "sinatra/base"
+require "json"
+require_relative "service"
+
+class App < Sinatra::Base
+ enable :logging
+ set :consumer_key, ENV["TWITTER_EVENT_STREAM_CONSUMER_KEY"]
+ set :consumer_secret, ENV["TWITTER_EVENT_STREAM_CONSUMER_SECRET"]
+
+ helpers do
+ def get_service
+ asp = request.env["HTTP_X_AUTH_SERVICE_PROVIDER"]
+ vca = request.env["HTTP_X_VERIFY_CREDENTIALS_AUTHORIZATION"]
+ Service.oauth_echo(asp, vca)
+ rescue ServiceError => e
+ halt 403, "authentication failed"
+ end
+ end
+
+ get "/stream" do
+ content_type "text/event-stream"
+ service = get_service
+ logger.debug("/stream (#{service.user_id}): CONNECT!")
+
+ # Heroku will kill the connection after 55 seconds of inactivity.
+ # https://devcenter.heroku.com/articles/request-timeout#long-polling-and-streaming-responses
+ queue = Thread::Queue.new
+ th = Thread.start { sleep 15; loop { queue << ":\r\n\r\n"; sleep 30 } }
+ tag = service.subscribe(params["count"].to_i) { |event, data|
+ queue << "event: #{event}\r\ndata: #{JSON.generate(data)}\r\n\r\n"
+ }
+
+ stream(true) do |out|
+ out.callback {
+ logger.debug("/stream (#{service.user_id}): CLEANUP!")
+ queue.close
+ service.unsubscribe(tag)
+ th.kill; th.join
+ }
+ loop { out << queue.pop }
+ end
+ end
+
+ get "/1.1/user.json" do
+ content_type :json
+ service = get_service
+ logger.debug("/1.1/user.json (#{service.user_id}): CONNECT!")
+
+ friend_ids = service.twitter_get("/1.1/friends/ids.json",
+ { "user_id" => service.user_id })
+
+ queue = Thread::Queue.new
+ queue << "#{JSON.generate({ "friends" => friend_ids["ids"] })}\r\n"
+
+ th = Thread.start { sleep 15; loop { queue << "\r\n"; sleep 30 } }
+ tag = service.subscribe(params["count"].to_i) { |event, data|
+ case event
+ when "twitter_event_stream_home_timeline"
+ queue << data.map { |object| JSON.generate(object) }.join("\r\n")
+ when "twitter_event_stream_message"
+ when "tweet_create_events"
+ queue << data.map { |object| JSON.generate(object) }.join("\r\n")
+ when "favorites_events"
+ queue << data.map { |object|
+ JSON.generate({
+ "event" => "favorite",
+ "created_at" => object["created_at"],
+ "source" => object["user"],
+ "target" => object["favorited_status"]["user"],
+ "target_object" => object["favorited_status"],
+ })
+ }.join("\r\n")
+ when "follow_events", "block_events"
+ queue << data.map { |object|
+ JSON.generate({
+ "event" => object["type"],
+ "created_at" => Time.utc(Integer(object["created_timestamp"]))
+ .strftime("%a %b %d %T %z %Y"),
+ "source" => object["user"],
+ "target" => object["favorited_status"]["user"],
+ "target_object" => object["favorited_status"],
+ })
+ }.join("\r\n")
+ when "mute_events"
+ # Not supported
+ when "direct_message_events", "direct_message_indicate_typing_events",
+ "direct_message_mark_read_events"
+ # Not supported
+ when "tweet_delete_events"
+ queue << data.map { |object|
+ JSON.generate({
+ "delete" => object
+ })
+ }.join("\r\n")
+ else
+ logger.info("/1.1/user.json (#{service.user_id}): " \
+ "unknown event: #{event}")
+ end
+ }
+
+ stream(true) do |out|
+ out.callback {
+ logger.debug("/1.1/user.json (#{service.user_id}): CLEANUP!")
+ queue.close
+ service.unsubscribe(tag)
+ th.kill; th.join
+ }
+ loop { out << queue.pop }
+ end
+ end
+
+ get "/webhook" do
+ content_type :json
+ crc_token = params["crc_token"] or
+ halt 400, "crc_token missing"
+ mac = OpenSSL::HMAC.digest("sha256", settings.consumer_secret, crc_token)
+ response_token = "sha256=#{[mac].pack("m0")}"
+ JSON.generate({ "response_token" => response_token })
+ end
+
+ post "/webhook" do
+ content_type :json
+ body = request.body.read
+ mac = OpenSSL::HMAC.digest("sha256", settings.consumer_secret, body)
+ sig = "sha256=#{[mac].pack("m0")}"
+ if request.env["HTTP_X_TWITTER_WEBHOOKS_SIGNATURE"] == sig
+ Service.feed_webhook(body)
+ else
+ logger.info "x-twitter-webhooks-signature invalid"
+ end
+ JSON.generate({ "looks" => "ok" })
+ end
+
+ get "/" do
+ <<~'EOF'
+ <!DOCTYPE html>
+ <meta charset=UTF-8>
+ <meta name=viewport content="width=device-width,initial-scale=1">
+ <title>twitter-event-stream</title>
+ <style>
+ div { max-width: 1200px; margin: 0 auto; }
+ </style>
+ <div>
+ <h1>twitter-event-stream</h1>
+ <a href="https://github.com/rhenium/twitter-event-stream">Source Code</a>
+ </div>
+ EOF
+ end
+end
diff --git a/config.ru b/config.ru
new file mode 100644
index 0000000..766432f
--- /dev/null
+++ b/config.ru
@@ -0,0 +1,7 @@
+# Start home_timeline polling
+require_relative "service"
+Service.setup
+
+# Start web app
+require_relative "app"
+run App
diff --git a/service.rb b/service.rb
new file mode 100644
index 0000000..be028d1
--- /dev/null
+++ b/service.rb
@@ -0,0 +1,180 @@
+require "json"
+require "net/http"
+require "simple_oauth"
+
+class ServiceError < StandardError; end
+
+class Service
+ class << self
+ private :new
+
+ def setup
+ aa_consumer_key = ENV["TWITTER_EVENT_STREAM_CONSUMER_KEY"]
+ aa_consumer_secret = ENV["TWITTER_EVENT_STREAM_CONSUMER_SECRET"]
+
+ @users = {}
+ ENV.each { |k, v|
+ next unless k.start_with?("TWITTER_EVENT_STREAM_USER_")
+ obj = JSON.parse(v, symbolize_names: true)
+ @users[obj.fetch(:user_id)] = new(
+ consumer_key: aa_consumer_key,
+ consumer_secret: aa_consumer_secret,
+ **obj,
+ )
+
+ # TODO: Add to the webhook if needed
+ }
+ end
+
+ def oauth_echo(asp, vca)
+ if asp != "https://api.twitter.com/1.1/account/verify_credentials.json"
+ raise ServiceError, "invalid OAuth Echo parameters"
+ end
+
+ uri = URI.parse(asp)
+ Net::HTTP.start(uri.host, uri.port, use_ssl: true) { |http|
+ res = http.get(uri.path, { "Authorization" => vca })
+ raise ServiceError, "OAuth Echo failed" if res.code != "200"
+ content = JSON.parse(res.body)
+ get(content["id"])
+ }
+ end
+
+ def feed_webhook(json)
+ hash = JSON.parse(json)
+ if user_id = hash["for_user_id"]
+ service = get(user_id)
+ service.feed_webhook(hash)
+ else
+ warn "FIXME\n#{hash}"
+ end
+ end
+
+ private
+
+ def get(user_id)
+ @users[user_id] or
+ raise ServiceError, "unauthenticated user: #{user_id}"
+ end
+ end
+
+ attr_reader :user_id
+
+ def initialize(user_id:,
+ requests_per_window:,
+ consumer_key:,
+ consumer_secret:,
+ token:,
+ token_secret:,
+ rest_consumer_key: consumer_key,
+ rest_consumer_secret: consumer_secret,
+ rest_token: token,
+ rest_token_secret: token_secret)
+ @user_id = user_id
+ @requests_per_window = Integer(requests_per_window)
+ @aa_oauth = {
+ consumer_key: consumer_key,
+ consumer_secret: consumer_secret,
+ token: token,
+ token_secret: token_secret,
+ }
+ @rest_oauth = {
+ consumer_key: rest_consumer_key,
+ consumer_secret: rest_consumer_secret,
+ token: rest_token,
+ token_secret: rest_token_secret,
+ }
+ @listeners = {}
+ @backfill = []
+ start_polling
+ end
+
+ def subscribe(count, &block)
+ @listeners[block] = block
+ emit_backfill(count)
+ block
+ end
+
+ def unsubscribe(tag)
+ @listeners.delete(tag)
+ end
+
+ def feed_webhook(hash)
+ hash.each do |key, value|
+ next if key == "for_user_id"
+ emit(key, value)
+ end
+ end
+
+ def twitter_get(path, params)
+ path += "?" + params.map { |k, v| "#{k}=#{v}" }.join("&") if !params.empty?
+ uri = URI.parse("https://api.twitter.com#{path}")
+ auth = SimpleOAuth::Header.new(:get, uri.to_s, {}, @rest_oauth).to_s
+
+ Net::HTTP.start(uri.host, uri.port, use_ssl: true) { |http|
+ res = http.get(path, { "Authorization" => auth })
+ if res.code != "200"
+ # pp res.each_header.to_h
+ raise ServiceError, "API request failed: path=#{path} body=#{res.body}"
+ end
+ JSON.parse(res.body)
+ }
+ end
+
+ private
+
+ def emit(event, object)
+ # TODO: backfill
+ @backfill.shift if @backfill.size == 100
+ @backfill << [event, object]
+ @listeners.each { |_, block| block.call(event, object) }
+ end
+
+ def emit_system(message)
+ emit("twitter_event_stream_message", message)
+ end
+
+ def emit_backfill(count)
+ @backfill.last(count).each { |event, object| emit(event, object) }
+ end
+
+ def start_polling
+ @polling_thread = Thread.start {
+ request_interval = 15.0 * 60 / @requests_per_window
+
+ begin
+ last_max = nil
+ while true
+ t = Time.now
+ opts = { "count" => 200, "since_id" => last_max ? last_max - 1 : 1 }
+ ret = twitter_get("/1.1/statuses/home_timeline.json", opts)
+
+ unless ret.empty?
+ if last_max
+ if last_max != ret.last["id"]
+ emit_system("possible stalled tweets " \
+ "#{last_max}+1...#{ret.last["id"]}")
+ else
+ ret.pop
+ end
+ end
+
+ unless ret.empty?
+ emit("twitter_event_stream_home_timeline", ret)
+ last_max = ret.first["id"]
+ end
+ end
+
+ sleep -(Time.now - t) % request_interval
+ end
+ rescue => e
+ warn "polling_thread (#{user_id}) uncaught exception: " \
+ "#{e.class} (#{e.message})"
+ warn e.backtrace
+ warn "polling_thread (#{user_id}) restarting in #{request_interval}s"
+ sleep request_interval
+ retry
+ end
+ }
+ end
+end
diff --git a/setup-oauth.rb b/setup-oauth.rb
new file mode 100644
index 0000000..4de360d
--- /dev/null
+++ b/setup-oauth.rb
@@ -0,0 +1,61 @@
+require "json"
+require "net/http"
+require "simple_oauth"
+
+if ARGV.size != 2
+ STDERR.puts "Usage: ruby setup-oauth.rb <consumer key> <consumer secret>"
+ exit 1
+end
+
+def oauth_post(path, params, oauth)
+ body = params.map { |k, v| "#{k}=#{v}" }.join("&")
+
+ uri = URI.parse("https://api.twitter.com#{path}")
+ auth = SimpleOAuth::Header.new(:post, uri.to_s, params, oauth).to_s
+
+ Net::HTTP.start(uri.host, uri.port, use_ssl: true) { |http|
+ res = http.post(path, body, { "Authorization" => auth })
+ if res.code != "200"
+ raise "request failed: path=#{path} code=#{res.code} body=#{res.body}"
+ end
+ res.body
+ }
+end
+
+
+oauth_opts = { consumer_key: ARGV[0], consumer_secret: ARGV[1] }
+
+puts "#POST /oauth/request_token"
+authorize_params = oauth_post("/oauth/request_token",
+ { "oauth_callback" => "oob" }, oauth_opts)
+extracted = authorize_params.split("&").map { |v| v.split("=") }.to_h
+oauth_opts[:token] = extracted["oauth_token"]
+oauth_opts[:token_secret] = extracted["oauth_token_secret"]
+puts "#=> #{authorize_params}"
+puts
+
+puts "Visit https://api.twitter.com/oauth/authorize?oauth_token=" \
+ "#{extracted["oauth_token"]}"
+print "Input PIN code: "
+pin = STDIN.gets.chomp
+puts
+
+puts "#POST /oauth/access_token"
+oauth_params = oauth_post("/oauth/access_token",
+ { "oauth_verifier" => pin }, oauth_opts)
+puts "#=> #{oauth_params}"
+puts
+
+extracted = oauth_params.split("&").map { |v| v.split("=") }.to_h
+user_id = extracted["oauth_token"].split("-")[0].to_i
+obj = {
+ user_id: user_id,
+ requests_per_window: 15,
+ token: extracted["oauth_token"],
+ token_secret: extracted["oauth_token_secret"],
+ rest_consumer_key: oauth_opts[:consumer_key],
+ rest_consumer_secret: oauth_opts[:consumer_secret],
+ rest_token: extracted["oauth_token"],
+ rest_token_secret: extracted["oauth_token_secret"],
+}
+puts "TWITTER_EVENT_STREAM_USER_#{user_id}='#{JSON.generate(obj)}'"
diff --git a/setup-webhook.rb b/setup-webhook.rb
new file mode 100644
index 0000000..3c7c2f6
--- /dev/null
+++ b/setup-webhook.rb
@@ -0,0 +1,3 @@
+require "json"
+require "net/http"
+require "simple_oauth"