aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorKazuki Yamaguchi <k@rhe.jp>2018-08-30 16:24:47 +0900
committerKazuki Yamaguchi <k@rhe.jp>2018-08-30 17:11:01 +0900
commitb0378cb2455072f8981143287ba373be458e2d63 (patch)
tree75e96c57b4b8390ec7f044f5243fba7fe1cd738a
parentf996806ed747ee4c04ea7549b71ca9655e4a9968 (diff)
downloadtwitter-event-stream-b0378cb2455072f8981143287ba373be458e2d63.tar.gz
support Account Activity API
My Twitter developer account application has finally been approved. It looks working fine now.
-rw-r--r--README.txt28
-rw-r--r--app.json14
-rw-r--r--app.rb2
-rw-r--r--config.ru15
-rw-r--r--oauth.rb72
-rw-r--r--service.rb143
-rw-r--r--setup-oauth.rb27
-rw-r--r--setup-webhook.rb3
8 files changed, 210 insertions, 94 deletions
diff --git a/README.txt b/README.txt
index a649016..9daac40 100644
--- a/README.txt
+++ b/README.txt
@@ -1,9 +1,6 @@
twitter-event-stream
====================
-*WORK IN PROGRESS: My application for a developer account has not yet been
-approved and the code is not tested at all.*
-
Description
-----------
@@ -26,12 +23,26 @@ Configuration
~~~~~~~~~~~~~
- You have to gain access to (the premium version of) the Account Activity
- API. A whitelisted consumer key and the consumer secret are specified by
- environment variables.
+ API, and create a "dev environment". The "dev environment name", the base
+ url where twitter-event-stream is deployed, the whitelisted consumer key,
+ and the consumer secret are specified by environment variables.
+ TWITTER_EVENT_STREAM_BASE_URL=<base url>
+ TWITTER_EVENT_STREAM_ENV_NAME=<dev environment name>
TWITTER_EVENT_STREAM_CONSUMER_KEY=<consumer key>
TWITTER_EVENT_STREAM_CONSUMER_SECRET=<consumer secret>
+ WARNING: twitter-event-stream assumes your dev environment allows only one
+ webhook URL (which is the case for sandbox (free) plan) and removes all the
+ existing webhook URL(s) on startup.
+
+ NOTE: Subscription are limited to a maximum of 15 users per application in
+ the sandbox plan. Because there is no way to clear subscriptions without
+ having the access token of every subscribing user, it is not possible for
+ twitter-event-stream to do that. It may be necessary to re-create the dev
+ environment manually on developer.twitter.com after removing and adding
+ another user to twitter-event-stream.
+
- Credentials used for fetching home_timeline are stored in environment
variables named `TWITTER_EVENT_STREAM_USER_<tag>`. `<tag>` may be any
text.
@@ -73,13 +84,6 @@ Deployment
* The quickest way to deploy twitter-event-stream would be to use Heroku.
Click the link and fill forms: https://heroku.com/deploy
- - Run `setup-webhook.rb` to register a webhook and perform the Challenge
- Response Check required to activate it. The web application must be
- already started. Note that `setup-webhook.rb` also requires the
- environment variables to be set correctly.
-
- * twitter-event-stream will receive webhooks at /webhook.
-
Usage
-----
diff --git a/app.json b/app.json
index 0e399c0..7df2303 100644
--- a/app.json
+++ b/app.json
@@ -5,11 +5,21 @@
"repository": "https://github.com/rhenium/twitter-event-stream",
"env": {
"RACK_ENV": "production",
+ "TWITTER_EVENT_STREAM_BASE_URL": {
+ "description": "The URL where twitter-event-stream is deployed.",
+ "value": "https://<app name>.herokuapp.com/"
+ },
+ "TWITTER_EVENT_STREAM_ENV_NAME": {
+ "description": "The \"dev environment\" for the Account Activity API.",
+ "value": "<env name>"
+ },
"TWITTER_EVENT_STREAM_CONSUMER_KEY": {
- "description": "A consumer key whitelisted for the Account Activity API."
+ "description": "A consumer key whitelisted for the Account Activity API.",
+ "value": "<dummy>"
},
"TWITTER_EVENT_STREAM_CONSUMER_SECRET": {
- "description": "A consumer secret."
+ "description": "A consumer secret.",
+ "value": "<dummy>"
},
"TWITTER_EVENT_STREAM_USER_1": {
"description": "A JSON object containing credentials for an user '1'.",
diff --git a/app.rb b/app.rb
index 286a4d9..ec69974 100644
--- a/app.rb
+++ b/app.rb
@@ -60,7 +60,7 @@ class App < Sinatra::Base
when "twitter_event_stream_message"
when "tweet_create_events"
queue << data.map { |object| JSON.generate(object) }.join("\r\n")
- when "favorites_events"
+ when "favorite_events"
queue << data.map { |object|
JSON.generate({
"event" => "favorite",
diff --git a/config.ru b/config.ru
index 238f43f..e3112a6 100644
--- a/config.ru
+++ b/config.ru
@@ -1,9 +1,18 @@
# Start home_timeline polling
require_relative "service"
-Service.setup
-
-# Start web app
require_relative "app"
+# HACK: The web app must be already started and accept "GET /webhook" when
+# Service.setup is called
+Thread.start {
+ sleep 1
+ begin
+ Net::HTTP.get_response(URI(ENV["TWITTER_EVENT_STREAM_BASE_URL"]))
+ rescue
+ end
+ Service.setup
+}
+
+# Start web app
use Rack::Deflater
run App
diff --git a/oauth.rb b/oauth.rb
new file mode 100644
index 0000000..1817cd1
--- /dev/null
+++ b/oauth.rb
@@ -0,0 +1,72 @@
+require "json"
+require "net/http"
+require "simple_oauth"
+
+module OAuthHelpers
+ class HTTPRequestError < StandardError
+ attr_reader :res
+
+ def initialize(uri, res)
+ super("HTTP request failed: path=#{uri.request_uri} code=#{res.code} " \
+ "body=#{res.body}")
+ @res = res
+ end
+ end
+
+ module_function
+
+ private def http_req_connect(uri_string)
+ uri = URI.parse(uri_string)
+ Net::HTTP.start(uri.host, uri.port, use_ssl: true) { |http|
+ res = yield(http, uri.request_uri)
+ raise HTTPRequestError.new(uri, res) if res.code !~ /\A2\d\d\z/
+ res.body
+ }
+ end
+
+ def http_get(auth, uri_string, method: :get)
+ http_req_connect(uri_string) { |http, path|
+ http.send(method, path, { "Authorization" => auth })
+ }
+ end
+
+ def http_post(auth, uri_string, body)
+ http_req_connect(uri_string) { |http, path|
+ http.post(path, body, { "Authorization" => auth })
+ }
+ end
+
+ def bearer_request_token(oauth)
+ ck, cs = oauth[:consumer_key], oauth[:consumer_secret]
+ body = http_post("Basic #{["#{ck}:#{cs}"].pack("m0")}",
+ "https://api.twitter.com/oauth2/token",
+ "grant_type=client_credentials")
+ hash = JSON.parse(body, symbolize_names: true)
+ hash[:access_token]
+ end
+
+ def bearer_get(token, path)
+ http_get("Bearer #{token}", "https://api.twitter.com#{path}")
+ end
+
+ def user_get(oauth, path, params = {})
+ path += "?" + params.map { |k, v| "#{k}=#{v}" }.join("&") if !params.empty?
+ uri_string = "https://api.twitter.com#{path}"
+ auth = SimpleOAuth::Header.new(:get, uri_string, {}, oauth).to_s
+ http_get(auth, uri_string)
+ end
+
+ def user_delete(oauth, path, params = {})
+ path += "?" + params.map { |k, v| "#{k}=#{v}" }.join("&") if !params.empty?
+ uri_string = "https://api.twitter.com#{path}"
+ auth = SimpleOAuth::Header.new(:delete, uri_string, {}, oauth).to_s
+ http_get(auth, uri_string, method: :delete)
+ end
+
+ def user_post(oauth, path, params = {})
+ body = params.map { |k, v| "#{k}=#{v}" }.join("&")
+ uri_string = "https://api.twitter.com#{path}"
+ auth = SimpleOAuth::Header.new(:post, uri_string, params, oauth).to_s
+ http_post(auth, uri_string, body)
+ end
+end
diff --git a/service.rb b/service.rb
index be028d1..c997409 100644
--- a/service.rb
+++ b/service.rb
@@ -1,6 +1,5 @@
require "json"
-require "net/http"
-require "simple_oauth"
+require_relative "oauth"
class ServiceError < StandardError; end
@@ -9,21 +8,89 @@ class Service
private :new
def setup
- aa_consumer_key = ENV["TWITTER_EVENT_STREAM_CONSUMER_KEY"]
- aa_consumer_secret = ENV["TWITTER_EVENT_STREAM_CONSUMER_SECRET"]
+ consumer_key = ENV["TWITTER_EVENT_STREAM_CONSUMER_KEY"]
+ consumer_secret = ENV["TWITTER_EVENT_STREAM_CONSUMER_SECRET"]
- @users = {}
+ user_objs = []
ENV.each { |k, v|
next unless k.start_with?("TWITTER_EVENT_STREAM_USER_")
- obj = JSON.parse(v, symbolize_names: true)
+ user_objs << JSON.parse(v, symbolize_names: true)
+ }
+
+ # We assume the webapp is already started at this point: the CRC requires
+ # GET /webhook to respond
+ app_url = ENV["TWITTER_EVENT_STREAM_BASE_URL"]
+ aa_env_name = ENV["TWITTER_EVENT_STREAM_ENV_NAME"]
+ setup_webhook(app_url, aa_env_name, consumer_key, consumer_secret,
+ user_objs)
+
+ @users = {}
+ user_objs.each { |obj|
@users[obj.fetch(:user_id)] = new(
- consumer_key: aa_consumer_key,
- consumer_secret: aa_consumer_secret,
- **obj,
+ user_id: obj.fetch(:user_id),
+ requests_per_window: obj.fetch(:requests_per_window),
+ rest_oauth: {
+ consumer_key: obj.fetch(:rest_consumer_key) {
+ consumer_key },
+ consumer_secret: obj.fetch(:rest_consumer_secret) {
+ consumer_secret },
+ token: obj.fetch(:rest_token) {
+ obj.fetch(:token) },
+ token_secret: obj.fetch(:rest_token_secret) {
+ obj.fetch(:token_secret) }
+ },
)
+ }
+ end
+
+ private def setup_webhook(app_url, env_name, consumer_key, consumer_secret,
+ user_objs)
+ oauth = proc { |n|
+ {
+ consumer_key: consumer_key,
+ consumer_secret: consumer_secret,
+ token: user_objs.dig(n, :token),
+ token_secret: user_objs.dig(n, :token_secret),
+ }
+ }
+
+ if user_objs.empty?
+ warn "setup_webhook: no users configured. cannot setup webhook"
+ return
+ end
+
+ warn "setup_webhook: get existing webhook URL(s)"
+ app_token = OAuthHelpers.bearer_request_token(oauth[0])
+ body = OAuthHelpers.bearer_get(app_token,
+ "/1.1/account_activity/all/webhooks.json")
+ obj = JSON.parse(body, symbolize_names: true)
+ env = obj.dig(:environments).find { |v| v[:environment_name] == env_name }
+
+ warn "setup_webhook: clear existing webhook URL(s)"
+ env[:webhooks].each do |webhook|
+ warn "setup_webhook: delete id=#{webhook[:id]}: #{webhook[:url]}"
+ path = "/1.1/account_activity/all/#{env_name}/webhooks/" \
+ "#{webhook[:id]}.json"
+ OAuthHelpers.user_delete(oauth[0], path)
+ end
- # TODO: Add to the webhook if needed
+ warn "setup_webhook: register a webhook URL"
+ webhook_url = app_url + (app_url.end_with?("/") ? "" : "/") + "webhook"
+ path = "/1.1/account_activity/all/#{env_name}/webhooks.json?url=" +
+ CGI.escape(webhook_url)
+ webhook = OAuthHelpers.user_post(oauth[0], path)
+ warn "setup_webhook: => #{webhook}"
+
+ warn "setup_webhook: add subscriptions"
+ user_objs.each_with_index { |_, n|
+ warn "setup_webhook: add subscription for " \
+ "user_id=#{user_objs.dig(n, :user_id)}"
+ path = "/1.1/account_activity/all/#{env_name}/subscriptions.json"
+ OAuthHelpers.user_post(oauth[n], path)
}
+ rescue => e
+ warn "setup_webhook: uncaught exception: #{e.class} (#{e.message})"
+ warn e.backtrace
end
def oauth_echo(asp, vca)
@@ -31,19 +98,19 @@ class Service
raise ServiceError, "invalid OAuth Echo parameters"
end
- uri = URI.parse(asp)
- Net::HTTP.start(uri.host, uri.port, use_ssl: true) { |http|
- res = http.get(uri.path, { "Authorization" => vca })
- raise ServiceError, "OAuth Echo failed" if res.code != "200"
- content = JSON.parse(res.body)
- get(content["id"])
- }
+ begin
+ body = OAuthHelpers.http_get(vca, asp)
+ content = JSON.parse(body, symbolize_names: true)
+ get(content[:id])
+ rescue OAuthHelpers::HTTPRequestError
+ raise ServiceError, "OAuth Echo failed"
+ end
end
def feed_webhook(json)
hash = JSON.parse(json)
if user_id = hash["for_user_id"]
- service = get(user_id)
+ service = get(Integer(user_id))
service.feed_webhook(hash)
else
warn "FIXME\n#{hash}"
@@ -53,7 +120,7 @@ class Service
private
def get(user_id)
- @users[user_id] or
+ defined?(@users) and @users[user_id] or
raise ServiceError, "unauthenticated user: #{user_id}"
end
end
@@ -62,28 +129,10 @@ class Service
def initialize(user_id:,
requests_per_window:,
- consumer_key:,
- consumer_secret:,
- token:,
- token_secret:,
- rest_consumer_key: consumer_key,
- rest_consumer_secret: consumer_secret,
- rest_token: token,
- rest_token_secret: token_secret)
+ rest_oauth:)
@user_id = user_id
@requests_per_window = Integer(requests_per_window)
- @aa_oauth = {
- consumer_key: consumer_key,
- consumer_secret: consumer_secret,
- token: token,
- token_secret: token_secret,
- }
- @rest_oauth = {
- consumer_key: rest_consumer_key,
- consumer_secret: rest_consumer_secret,
- token: rest_token,
- token_secret: rest_token_secret,
- }
+ @rest_oauth = rest_oauth
@listeners = {}
@backfill = []
start_polling
@@ -107,18 +156,10 @@ class Service
end
def twitter_get(path, params)
- path += "?" + params.map { |k, v| "#{k}=#{v}" }.join("&") if !params.empty?
- uri = URI.parse("https://api.twitter.com#{path}")
- auth = SimpleOAuth::Header.new(:get, uri.to_s, {}, @rest_oauth).to_s
-
- Net::HTTP.start(uri.host, uri.port, use_ssl: true) { |http|
- res = http.get(path, { "Authorization" => auth })
- if res.code != "200"
- # pp res.each_header.to_h
- raise ServiceError, "API request failed: path=#{path} body=#{res.body}"
- end
- JSON.parse(res.body)
- }
+ JSON.parse(OAuthHelpers.user_get(@rest_oauth, path, params))
+ rescue OAuthHelpers::HTTPRequestError => e
+ # pp e.res.each_header.to_h
+ raise ServiceError, "API request failed: path=#{path} body=#{e.res.body}"
end
private
diff --git a/setup-oauth.rb b/setup-oauth.rb
index 4de360d..dbca17b 100644
--- a/setup-oauth.rb
+++ b/setup-oauth.rb
@@ -1,33 +1,16 @@
require "json"
-require "net/http"
-require "simple_oauth"
+require_relative "oauth"
if ARGV.size != 2
STDERR.puts "Usage: ruby setup-oauth.rb <consumer key> <consumer secret>"
exit 1
end
-def oauth_post(path, params, oauth)
- body = params.map { |k, v| "#{k}=#{v}" }.join("&")
-
- uri = URI.parse("https://api.twitter.com#{path}")
- auth = SimpleOAuth::Header.new(:post, uri.to_s, params, oauth).to_s
-
- Net::HTTP.start(uri.host, uri.port, use_ssl: true) { |http|
- res = http.post(path, body, { "Authorization" => auth })
- if res.code != "200"
- raise "request failed: path=#{path} code=#{res.code} body=#{res.body}"
- end
- res.body
- }
-end
-
-
oauth_opts = { consumer_key: ARGV[0], consumer_secret: ARGV[1] }
puts "#POST /oauth/request_token"
-authorize_params = oauth_post("/oauth/request_token",
- { "oauth_callback" => "oob" }, oauth_opts)
+authorize_params = OAuthHelpers.user_post(oauth_opts, "/oauth/request_token",
+ { "oauth_callback" => "oob" })
extracted = authorize_params.split("&").map { |v| v.split("=") }.to_h
oauth_opts[:token] = extracted["oauth_token"]
oauth_opts[:token_secret] = extracted["oauth_token_secret"]
@@ -41,8 +24,8 @@ pin = STDIN.gets.chomp
puts
puts "#POST /oauth/access_token"
-oauth_params = oauth_post("/oauth/access_token",
- { "oauth_verifier" => pin }, oauth_opts)
+oauth_params = OAuthHelpers.user_post(oauth_opts, "/oauth/access_token",
+ { "oauth_verifier" => pin })
puts "#=> #{oauth_params}"
puts
diff --git a/setup-webhook.rb b/setup-webhook.rb
deleted file mode 100644
index 3c7c2f6..0000000
--- a/setup-webhook.rb
+++ /dev/null
@@ -1,3 +0,0 @@
-require "json"
-require "net/http"
-require "simple_oauth"