class LogStash::Inputs::Twitter

Ingest events from the Twitter Streaming API.

Attributes

event_generation_error_count[R]
filter_options[R]

Public Instance Methods

register() click to toggle source
# File lib/logstash/inputs/twitter.rb, line 122
def register
  if !@use_samples && ( @keywords.nil? && @follows.nil? && @locations.nil? )
    raise LogStash::ConfigurationError.new("At least one parameter (follows, locations or keywords) must be specified.")
  end

  # monkey patch twitter gem to ignore json parsing error.
  # at the same time, use our own json parser
  # this has been tested with a specific gem version, raise if not the same

  LogStash::Inputs::TwitterPatches.patch

  @rest_client     = Twitter::REST::Client.new       { |c|  configure(c) }
  @stream_client   = Twitter::Streaming::Client.new  { |c|  configure(c) }
  @twitter_options = build_options
end
run(queue) click to toggle source
# File lib/logstash/inputs/twitter.rb, line 138
def run(queue)
  @logger.info("Starting twitter tracking", twitter_options.clone) # need to pass a clone as it modify this var otherwise

  # keep track of the amount of non-specific errors rescued and logged - use in testing to verify no errors.
  # this is because, as yet, we can't have rspec expectations on the logger instance.
  @event_generation_error_count = 0

  begin
    do_run(queue)
  rescue Twitter::Error::TooManyRequests => e
    sleep_for = e.rate_limit.reset_in || @rate_limit_reset_in # 5 minutes default value from config
    @logger.warn("Twitter too many requests error, sleeping for #{sleep_for}s")
    Stud.stoppable_sleep(sleep_for) { stop? }
    retry
  rescue => e
    # if a lot of these errors begin to occur, the repeated retry will result in TooManyRequests errors trapped above.
    @logger.warn("Twitter client error", :message => e.message, :exception => e.class, :backtrace => e.backtrace, :options => @filter_options)
    retry
  end
end
set_stream_client(client) click to toggle source
# File lib/logstash/inputs/twitter.rb, line 182
def set_stream_client(client)
  @stream_client = client
end
stop() click to toggle source
# File lib/logstash/inputs/twitter.rb, line 174
def stop
  @stream_client = nil
end
twitter_options() click to toggle source
# File lib/logstash/inputs/twitter.rb, line 178
def twitter_options
  @twitter_options
end

Private Instance Methods

build_options() click to toggle source
# File lib/logstash/inputs/twitter.rb, line 252
def build_options
  build_options = {}
  build_options[:track]     = @keywords.join(",")  if @keywords  && !@keywords.empty?
  build_options[:locations] = @locations           if @locations && !@locations.empty?
  build_options[:language]  = @languages.join(",") if @languages && !@languages.empty?

  if @follows && @follows.length > 0
    build_options[:follow] = @follows.map do |username|
      is_number?(username) ? username : find_user_id(username).to_s
    end.join(",")
  end
  build_options
end
configure(client) click to toggle source
# File lib/logstash/inputs/twitter.rb, line 238
def configure(client)
  client.consumer_key = @consumer_key
  client.consumer_secret = @consumer_secret.value
  client.access_token = @oauth_token
  client.access_token_secret = @oauth_token_secret.value
  if @use_proxy
    if client.is_a?(Twitter::REST::Client)
      client.proxy = { host: @proxy_address, port: @proxy_port }
    else
      client.proxy = { proxy_address: @proxy_address, proxy_port: @proxy_port }
    end
  end
end
do_run(queue) click to toggle source
# File lib/logstash/inputs/twitter.rb, line 159
def do_run(queue)
  if @use_samples
    @stream_client.sample do |tweet|
      return if stop?
      tweet_processor(queue, tweet)
    end
  else
    @stream_client.filter(twitter_options) do |tweet|
      return if stop?
      tweet_processor(queue, tweet)
    end
  end
end
find_user_id(username) click to toggle source

@return [Integer] user id @raise [Twitter::Error::NotFound]

# File lib/logstash/inputs/twitter.rb, line 268
def find_user_id(username)
  @logger.debug? && @logger.debug("Looking up twitter user identifier for", :user => username)
  @rest_client.user(:screen_name => username).id # Twitter::User#id
end
from_tweet(tweet) click to toggle source
# File lib/logstash/inputs/twitter.rb, line 205
def from_tweet(tweet)
  @logger.debug? && @logger.debug("Got tweet", :user => tweet.user.screen_name, :text => tweet.text)

  if @full_tweet
    attributes = LogStash::Util.stringify_symbols(tweet.to_hash)
  else
    attributes = {
      "message" => tweet.full_text,
      "user" => tweet.user.screen_name,
      "client" => tweet.source,
      "retweeted" => tweet.retweeted?,
      "source" => "http://twitter.com/#{tweet.user.screen_name}/status/#{tweet.id}",
      "hashtags" => tweet.hashtags.map(&:attrs),
      "symbols" => tweet.symbols.map(&:attrs),
      "user_mentions" => tweet.user_mentions.map(&:attrs),
    }

    if tweet.reply? && !tweet.in_reply_to_status_id.nil?
      attributes["in-reply-to"] = tweet.in_reply_to_status_id
    end
    unless tweet.urls.empty?
      attributes["urls"] = tweet.urls.map(&:expanded_url).map(&:to_s)
    end
  end

  # Work around bugs in JrJackson. The standard serializer won't work till we upgrade
  # event.set("in-reply-to", nil) if event.get("in-reply-to").is_a?(Twitter::NullObject)

  event = targeted_event_factory.new_event(attributes)
  event.timestamp = LogStash::Timestamp.new(tweet.created_at)
  event
end
ignore?(tweet) click to toggle source
# File lib/logstash/inputs/twitter.rb, line 201
def ignore?(tweet)
  @ignore_retweets && tweet.retweet?
end
is_number?(string) click to toggle source
# File lib/logstash/inputs/twitter.rb, line 273
def is_number?(string)
  /^(\d+)$/.match(string) ? true : false
end
tweet_processor(queue, tweet) click to toggle source
# File lib/logstash/inputs/twitter.rb, line 188
def tweet_processor(queue, tweet)
  if tweet.is_a?(Twitter::Tweet) && !ignore?(tweet)
    begin
      event = from_tweet(tweet)
      decorate(event)
      queue << event
    rescue => e
      @event_generation_error_count = @event_generation_error_count.next
      @logger.error("Event generation error", :message => e.message, :exception => e.class, :backtrace => e.backtrace)
    end
  end
end