diff options
author | Kazuki Yamaguchi <k@rhe.jp> | 2018-08-30 16:24:47 +0900 |
---|---|---|
committer | Kazuki Yamaguchi <k@rhe.jp> | 2018-08-30 17:11:01 +0900 |
commit | b0378cb2455072f8981143287ba373be458e2d63 (patch) | |
tree | 75e96c57b4b8390ec7f044f5243fba7fe1cd738a | |
parent | f996806ed747ee4c04ea7549b71ca9655e4a9968 (diff) | |
download | twitter-event-stream-b0378cb2455072f8981143287ba373be458e2d63.tar.gz |
support Account Activity API
My Twitter developer account application has finally been approved. It
looks working fine now.
-rw-r--r-- | README.txt | 28 | ||||
-rw-r--r-- | app.json | 14 | ||||
-rw-r--r-- | app.rb | 2 | ||||
-rw-r--r-- | config.ru | 15 | ||||
-rw-r--r-- | oauth.rb | 72 | ||||
-rw-r--r-- | service.rb | 143 | ||||
-rw-r--r-- | setup-oauth.rb | 27 | ||||
-rw-r--r-- | setup-webhook.rb | 3 |
8 files changed, 210 insertions, 94 deletions
@@ -1,9 +1,6 @@ twitter-event-stream ==================== -*WORK IN PROGRESS: My application for a developer account has not yet been -approved and the code is not tested at all.* - Description ----------- @@ -26,12 +23,26 @@ Configuration ~~~~~~~~~~~~~ - You have to gain access to (the premium version of) the Account Activity - API. A whitelisted consumer key and the consumer secret are specified by - environment variables. + API, and create a "dev environment". The "dev environment name", the base + url where twitter-event-stream is deployed, the whitelisted consumer key, + and the consumer secret are specified by environment variables. + TWITTER_EVENT_STREAM_BASE_URL=<base url> + TWITTER_EVENT_STREAM_ENV_NAME=<dev environment name> TWITTER_EVENT_STREAM_CONSUMER_KEY=<consumer key> TWITTER_EVENT_STREAM_CONSUMER_SECRET=<consumer secret> + WARNING: twitter-event-stream assumes your dev environment allows only one + webhook URL (which is the case for sandbox (free) plan) and removes all the + existing webhook URL(s) on startup. + + NOTE: Subscription are limited to a maximum of 15 users per application in + the sandbox plan. Because there is no way to clear subscriptions without + having the access token of every subscribing user, it is not possible for + twitter-event-stream to do that. It may be necessary to re-create the dev + environment manually on developer.twitter.com after removing and adding + another user to twitter-event-stream. + - Credentials used for fetching home_timeline are stored in environment variables named `TWITTER_EVENT_STREAM_USER_<tag>`. `<tag>` may be any text. @@ -73,13 +84,6 @@ Deployment * The quickest way to deploy twitter-event-stream would be to use Heroku. Click the link and fill forms: https://heroku.com/deploy - - Run `setup-webhook.rb` to register a webhook and perform the Challenge - Response Check required to activate it. The web application must be - already started. Note that `setup-webhook.rb` also requires the - environment variables to be set correctly. - - * twitter-event-stream will receive webhooks at /webhook. - Usage ----- @@ -5,11 +5,21 @@ "repository": "https://github.com/rhenium/twitter-event-stream", "env": { "RACK_ENV": "production", + "TWITTER_EVENT_STREAM_BASE_URL": { + "description": "The URL where twitter-event-stream is deployed.", + "value": "https://<app name>.herokuapp.com/" + }, + "TWITTER_EVENT_STREAM_ENV_NAME": { + "description": "The \"dev environment\" for the Account Activity API.", + "value": "<env name>" + }, "TWITTER_EVENT_STREAM_CONSUMER_KEY": { - "description": "A consumer key whitelisted for the Account Activity API." + "description": "A consumer key whitelisted for the Account Activity API.", + "value": "<dummy>" }, "TWITTER_EVENT_STREAM_CONSUMER_SECRET": { - "description": "A consumer secret." + "description": "A consumer secret.", + "value": "<dummy>" }, "TWITTER_EVENT_STREAM_USER_1": { "description": "A JSON object containing credentials for an user '1'.", @@ -60,7 +60,7 @@ class App < Sinatra::Base when "twitter_event_stream_message" when "tweet_create_events" queue << data.map { |object| JSON.generate(object) }.join("\r\n") - when "favorites_events" + when "favorite_events" queue << data.map { |object| JSON.generate({ "event" => "favorite", @@ -1,9 +1,18 @@ # Start home_timeline polling require_relative "service" -Service.setup - -# Start web app require_relative "app" +# HACK: The web app must be already started and accept "GET /webhook" when +# Service.setup is called +Thread.start { + sleep 1 + begin + Net::HTTP.get_response(URI(ENV["TWITTER_EVENT_STREAM_BASE_URL"])) + rescue + end + Service.setup +} + +# Start web app use Rack::Deflater run App diff --git a/oauth.rb b/oauth.rb new file mode 100644 index 0000000..1817cd1 --- /dev/null +++ b/oauth.rb @@ -0,0 +1,72 @@ +require "json" +require "net/http" +require "simple_oauth" + +module OAuthHelpers + class HTTPRequestError < StandardError + attr_reader :res + + def initialize(uri, res) + super("HTTP request failed: path=#{uri.request_uri} code=#{res.code} " \ + "body=#{res.body}") + @res = res + end + end + + module_function + + private def http_req_connect(uri_string) + uri = URI.parse(uri_string) + Net::HTTP.start(uri.host, uri.port, use_ssl: true) { |http| + res = yield(http, uri.request_uri) + raise HTTPRequestError.new(uri, res) if res.code !~ /\A2\d\d\z/ + res.body + } + end + + def http_get(auth, uri_string, method: :get) + http_req_connect(uri_string) { |http, path| + http.send(method, path, { "Authorization" => auth }) + } + end + + def http_post(auth, uri_string, body) + http_req_connect(uri_string) { |http, path| + http.post(path, body, { "Authorization" => auth }) + } + end + + def bearer_request_token(oauth) + ck, cs = oauth[:consumer_key], oauth[:consumer_secret] + body = http_post("Basic #{["#{ck}:#{cs}"].pack("m0")}", + "https://api.twitter.com/oauth2/token", + "grant_type=client_credentials") + hash = JSON.parse(body, symbolize_names: true) + hash[:access_token] + end + + def bearer_get(token, path) + http_get("Bearer #{token}", "https://api.twitter.com#{path}") + end + + def user_get(oauth, path, params = {}) + path += "?" + params.map { |k, v| "#{k}=#{v}" }.join("&") if !params.empty? + uri_string = "https://api.twitter.com#{path}" + auth = SimpleOAuth::Header.new(:get, uri_string, {}, oauth).to_s + http_get(auth, uri_string) + end + + def user_delete(oauth, path, params = {}) + path += "?" + params.map { |k, v| "#{k}=#{v}" }.join("&") if !params.empty? + uri_string = "https://api.twitter.com#{path}" + auth = SimpleOAuth::Header.new(:delete, uri_string, {}, oauth).to_s + http_get(auth, uri_string, method: :delete) + end + + def user_post(oauth, path, params = {}) + body = params.map { |k, v| "#{k}=#{v}" }.join("&") + uri_string = "https://api.twitter.com#{path}" + auth = SimpleOAuth::Header.new(:post, uri_string, params, oauth).to_s + http_post(auth, uri_string, body) + end +end @@ -1,6 +1,5 @@ require "json" -require "net/http" -require "simple_oauth" +require_relative "oauth" class ServiceError < StandardError; end @@ -9,21 +8,89 @@ class Service private :new def setup - aa_consumer_key = ENV["TWITTER_EVENT_STREAM_CONSUMER_KEY"] - aa_consumer_secret = ENV["TWITTER_EVENT_STREAM_CONSUMER_SECRET"] + consumer_key = ENV["TWITTER_EVENT_STREAM_CONSUMER_KEY"] + consumer_secret = ENV["TWITTER_EVENT_STREAM_CONSUMER_SECRET"] - @users = {} + user_objs = [] ENV.each { |k, v| next unless k.start_with?("TWITTER_EVENT_STREAM_USER_") - obj = JSON.parse(v, symbolize_names: true) + user_objs << JSON.parse(v, symbolize_names: true) + } + + # We assume the webapp is already started at this point: the CRC requires + # GET /webhook to respond + app_url = ENV["TWITTER_EVENT_STREAM_BASE_URL"] + aa_env_name = ENV["TWITTER_EVENT_STREAM_ENV_NAME"] + setup_webhook(app_url, aa_env_name, consumer_key, consumer_secret, + user_objs) + + @users = {} + user_objs.each { |obj| @users[obj.fetch(:user_id)] = new( - consumer_key: aa_consumer_key, - consumer_secret: aa_consumer_secret, - **obj, + user_id: obj.fetch(:user_id), + requests_per_window: obj.fetch(:requests_per_window), + rest_oauth: { + consumer_key: obj.fetch(:rest_consumer_key) { + consumer_key }, + consumer_secret: obj.fetch(:rest_consumer_secret) { + consumer_secret }, + token: obj.fetch(:rest_token) { + obj.fetch(:token) }, + token_secret: obj.fetch(:rest_token_secret) { + obj.fetch(:token_secret) } + }, ) + } + end + + private def setup_webhook(app_url, env_name, consumer_key, consumer_secret, + user_objs) + oauth = proc { |n| + { + consumer_key: consumer_key, + consumer_secret: consumer_secret, + token: user_objs.dig(n, :token), + token_secret: user_objs.dig(n, :token_secret), + } + } + + if user_objs.empty? + warn "setup_webhook: no users configured. cannot setup webhook" + return + end + + warn "setup_webhook: get existing webhook URL(s)" + app_token = OAuthHelpers.bearer_request_token(oauth[0]) + body = OAuthHelpers.bearer_get(app_token, + "/1.1/account_activity/all/webhooks.json") + obj = JSON.parse(body, symbolize_names: true) + env = obj.dig(:environments).find { |v| v[:environment_name] == env_name } + + warn "setup_webhook: clear existing webhook URL(s)" + env[:webhooks].each do |webhook| + warn "setup_webhook: delete id=#{webhook[:id]}: #{webhook[:url]}" + path = "/1.1/account_activity/all/#{env_name}/webhooks/" \ + "#{webhook[:id]}.json" + OAuthHelpers.user_delete(oauth[0], path) + end - # TODO: Add to the webhook if needed + warn "setup_webhook: register a webhook URL" + webhook_url = app_url + (app_url.end_with?("/") ? "" : "/") + "webhook" + path = "/1.1/account_activity/all/#{env_name}/webhooks.json?url=" + + CGI.escape(webhook_url) + webhook = OAuthHelpers.user_post(oauth[0], path) + warn "setup_webhook: => #{webhook}" + + warn "setup_webhook: add subscriptions" + user_objs.each_with_index { |_, n| + warn "setup_webhook: add subscription for " \ + "user_id=#{user_objs.dig(n, :user_id)}" + path = "/1.1/account_activity/all/#{env_name}/subscriptions.json" + OAuthHelpers.user_post(oauth[n], path) } + rescue => e + warn "setup_webhook: uncaught exception: #{e.class} (#{e.message})" + warn e.backtrace end def oauth_echo(asp, vca) @@ -31,19 +98,19 @@ class Service raise ServiceError, "invalid OAuth Echo parameters" end - uri = URI.parse(asp) - Net::HTTP.start(uri.host, uri.port, use_ssl: true) { |http| - res = http.get(uri.path, { "Authorization" => vca }) - raise ServiceError, "OAuth Echo failed" if res.code != "200" - content = JSON.parse(res.body) - get(content["id"]) - } + begin + body = OAuthHelpers.http_get(vca, asp) + content = JSON.parse(body, symbolize_names: true) + get(content[:id]) + rescue OAuthHelpers::HTTPRequestError + raise ServiceError, "OAuth Echo failed" + end end def feed_webhook(json) hash = JSON.parse(json) if user_id = hash["for_user_id"] - service = get(user_id) + service = get(Integer(user_id)) service.feed_webhook(hash) else warn "FIXME\n#{hash}" @@ -53,7 +120,7 @@ class Service private def get(user_id) - @users[user_id] or + defined?(@users) and @users[user_id] or raise ServiceError, "unauthenticated user: #{user_id}" end end @@ -62,28 +129,10 @@ class Service def initialize(user_id:, requests_per_window:, - consumer_key:, - consumer_secret:, - token:, - token_secret:, - rest_consumer_key: consumer_key, - rest_consumer_secret: consumer_secret, - rest_token: token, - rest_token_secret: token_secret) + rest_oauth:) @user_id = user_id @requests_per_window = Integer(requests_per_window) - @aa_oauth = { - consumer_key: consumer_key, - consumer_secret: consumer_secret, - token: token, - token_secret: token_secret, - } - @rest_oauth = { - consumer_key: rest_consumer_key, - consumer_secret: rest_consumer_secret, - token: rest_token, - token_secret: rest_token_secret, - } + @rest_oauth = rest_oauth @listeners = {} @backfill = [] start_polling @@ -107,18 +156,10 @@ class Service end def twitter_get(path, params) - path += "?" + params.map { |k, v| "#{k}=#{v}" }.join("&") if !params.empty? - uri = URI.parse("https://api.twitter.com#{path}") - auth = SimpleOAuth::Header.new(:get, uri.to_s, {}, @rest_oauth).to_s - - Net::HTTP.start(uri.host, uri.port, use_ssl: true) { |http| - res = http.get(path, { "Authorization" => auth }) - if res.code != "200" - # pp res.each_header.to_h - raise ServiceError, "API request failed: path=#{path} body=#{res.body}" - end - JSON.parse(res.body) - } + JSON.parse(OAuthHelpers.user_get(@rest_oauth, path, params)) + rescue OAuthHelpers::HTTPRequestError => e + # pp e.res.each_header.to_h + raise ServiceError, "API request failed: path=#{path} body=#{e.res.body}" end private diff --git a/setup-oauth.rb b/setup-oauth.rb index 4de360d..dbca17b 100644 --- a/setup-oauth.rb +++ b/setup-oauth.rb @@ -1,33 +1,16 @@ require "json" -require "net/http" -require "simple_oauth" +require_relative "oauth" if ARGV.size != 2 STDERR.puts "Usage: ruby setup-oauth.rb <consumer key> <consumer secret>" exit 1 end -def oauth_post(path, params, oauth) - body = params.map { |k, v| "#{k}=#{v}" }.join("&") - - uri = URI.parse("https://api.twitter.com#{path}") - auth = SimpleOAuth::Header.new(:post, uri.to_s, params, oauth).to_s - - Net::HTTP.start(uri.host, uri.port, use_ssl: true) { |http| - res = http.post(path, body, { "Authorization" => auth }) - if res.code != "200" - raise "request failed: path=#{path} code=#{res.code} body=#{res.body}" - end - res.body - } -end - - oauth_opts = { consumer_key: ARGV[0], consumer_secret: ARGV[1] } puts "#POST /oauth/request_token" -authorize_params = oauth_post("/oauth/request_token", - { "oauth_callback" => "oob" }, oauth_opts) +authorize_params = OAuthHelpers.user_post(oauth_opts, "/oauth/request_token", + { "oauth_callback" => "oob" }) extracted = authorize_params.split("&").map { |v| v.split("=") }.to_h oauth_opts[:token] = extracted["oauth_token"] oauth_opts[:token_secret] = extracted["oauth_token_secret"] @@ -41,8 +24,8 @@ pin = STDIN.gets.chomp puts puts "#POST /oauth/access_token" -oauth_params = oauth_post("/oauth/access_token", - { "oauth_verifier" => pin }, oauth_opts) +oauth_params = OAuthHelpers.user_post(oauth_opts, "/oauth/access_token", + { "oauth_verifier" => pin }) puts "#=> #{oauth_params}" puts diff --git a/setup-webhook.rb b/setup-webhook.rb deleted file mode 100644 index 3c7c2f6..0000000 --- a/setup-webhook.rb +++ /dev/null @@ -1,3 +0,0 @@ -require "json" -require "net/http" -require "simple_oauth" |