class LogStash::Inputs::Twitter
Ingest events from the Twitter
Streaming API.
Attributes
event_generation_error_count[R]
filter_options[R]
Public Instance Methods
register()
click to toggle source
# File lib/logstash/inputs/twitter.rb, line 122 def register if !@use_samples && ( @keywords.nil? && @follows.nil? && @locations.nil? ) raise LogStash::ConfigurationError.new("At least one parameter (follows, locations or keywords) must be specified.") end # monkey patch twitter gem to ignore json parsing error. # at the same time, use our own json parser # this has been tested with a specific gem version, raise if not the same LogStash::Inputs::TwitterPatches.patch @rest_client = Twitter::REST::Client.new { |c| configure(c) } @stream_client = Twitter::Streaming::Client.new { |c| configure(c) } @twitter_options = build_options end
run(queue)
click to toggle source
# File lib/logstash/inputs/twitter.rb, line 138 def run(queue) @logger.info("Starting twitter tracking", twitter_options.clone) # need to pass a clone as it modify this var otherwise # keep track of the amount of non-specific errors rescued and logged - use in testing to verify no errors. # this is because, as yet, we can't have rspec expectations on the logger instance. @event_generation_error_count = 0 begin do_run(queue) rescue Twitter::Error::TooManyRequests => e sleep_for = e.rate_limit.reset_in || @rate_limit_reset_in # 5 minutes default value from config @logger.warn("Twitter too many requests error, sleeping for #{sleep_for}s") Stud.stoppable_sleep(sleep_for) { stop? } retry rescue => e # if a lot of these errors begin to occur, the repeated retry will result in TooManyRequests errors trapped above. @logger.warn("Twitter client error", :message => e.message, :exception => e.class, :backtrace => e.backtrace, :options => @filter_options) retry end end
set_stream_client(client)
click to toggle source
# File lib/logstash/inputs/twitter.rb, line 182 def set_stream_client(client) @stream_client = client end
stop()
click to toggle source
# File lib/logstash/inputs/twitter.rb, line 174 def stop @stream_client = nil end
twitter_options()
click to toggle source
# File lib/logstash/inputs/twitter.rb, line 178 def twitter_options @twitter_options end
Private Instance Methods
build_options()
click to toggle source
# File lib/logstash/inputs/twitter.rb, line 252 def build_options build_options = {} build_options[:track] = @keywords.join(",") if @keywords && !@keywords.empty? build_options[:locations] = @locations if @locations && !@locations.empty? build_options[:language] = @languages.join(",") if @languages && !@languages.empty? if @follows && @follows.length > 0 build_options[:follow] = @follows.map do |username| is_number?(username) ? username : find_user_id(username).to_s end.join(",") end build_options end
configure(client)
click to toggle source
# File lib/logstash/inputs/twitter.rb, line 238 def configure(client) client.consumer_key = @consumer_key client.consumer_secret = @consumer_secret.value client.access_token = @oauth_token client.access_token_secret = @oauth_token_secret.value if @use_proxy if client.is_a?(Twitter::REST::Client) client.proxy = { host: @proxy_address, port: @proxy_port } else client.proxy = { proxy_address: @proxy_address, proxy_port: @proxy_port } end end end
do_run(queue)
click to toggle source
# File lib/logstash/inputs/twitter.rb, line 159 def do_run(queue) if @use_samples @stream_client.sample do |tweet| return if stop? tweet_processor(queue, tweet) end else @stream_client.filter(twitter_options) do |tweet| return if stop? tweet_processor(queue, tweet) end end end
find_user_id(username)
click to toggle source
@return [Integer] user id @raise [Twitter::Error::NotFound]
# File lib/logstash/inputs/twitter.rb, line 268 def find_user_id(username) @logger.debug? && @logger.debug("Looking up twitter user identifier for", :user => username) @rest_client.user(:screen_name => username).id # Twitter::User#id end
from_tweet(tweet)
click to toggle source
# File lib/logstash/inputs/twitter.rb, line 205 def from_tweet(tweet) @logger.debug? && @logger.debug("Got tweet", :user => tweet.user.screen_name, :text => tweet.text) if @full_tweet attributes = LogStash::Util.stringify_symbols(tweet.to_hash) else attributes = { "message" => tweet.full_text, "user" => tweet.user.screen_name, "client" => tweet.source, "retweeted" => tweet.retweeted?, "source" => "http://twitter.com/#{tweet.user.screen_name}/status/#{tweet.id}", "hashtags" => tweet.hashtags.map(&:attrs), "symbols" => tweet.symbols.map(&:attrs), "user_mentions" => tweet.user_mentions.map(&:attrs), } if tweet.reply? && !tweet.in_reply_to_status_id.nil? attributes["in-reply-to"] = tweet.in_reply_to_status_id end unless tweet.urls.empty? attributes["urls"] = tweet.urls.map(&:expanded_url).map(&:to_s) end end # Work around bugs in JrJackson. The standard serializer won't work till we upgrade # event.set("in-reply-to", nil) if event.get("in-reply-to").is_a?(Twitter::NullObject) event = targeted_event_factory.new_event(attributes) event.timestamp = LogStash::Timestamp.new(tweet.created_at) event end
ignore?(tweet)
click to toggle source
# File lib/logstash/inputs/twitter.rb, line 201 def ignore?(tweet) @ignore_retweets && tweet.retweet? end
is_number?(string)
click to toggle source
# File lib/logstash/inputs/twitter.rb, line 273 def is_number?(string) /^(\d+)$/.match(string) ? true : false end
tweet_processor(queue, tweet)
click to toggle source
# File lib/logstash/inputs/twitter.rb, line 188 def tweet_processor(queue, tweet) if tweet.is_a?(Twitter::Tweet) && !ignore?(tweet) begin event = from_tweet(tweet) decorate(event) queue << event rescue => e @event_generation_error_count = @event_generation_error_count.next @logger.error("Event generation error", :message => e.message, :exception => e.class, :backtrace => e.backtrace) end end end