class Akane::Receivers::Stream

Attributes

last[RW]
thread[R]

Public Class Methods

new(*) click to toggle source
Calls superclass method Akane::Receivers::AbstractReceiver::new
# File lib/akane/receivers/stream.rb, line 10
def initialize(*)
  super
  @thread = nil

  if @config["method"]
    @stream_method = @config["method"].to_sym
  else
    @stream_method = :user
  end

  if @config["options"]
    @stream_options = Hash[@config["options"].map do |k,v|
      [k.to_sym, v]
    end]
  else
    @stream_options = {}
  end
end

Public Instance Methods

name() click to toggle source
# File lib/akane/receivers/stream.rb, line 29
def name
  # For backward compatibility, user stream returns only account name if
  # config.name not specified.
  @name ||= @config['name'] || @account[:name]
end
running?() click to toggle source
# File lib/akane/receivers/stream.rb, line 35
def running?() !!(@thread && @thread.alive?) end
start() click to toggle source
# File lib/akane/receivers/stream.rb, line 50
def start
  @logger.info "Stream : Starting"

  @last = Time.now
  @retry_count = 0
  @thread = Thread.new do
    begin
      stream.send(@stream_method, @stream_options) do |obj|
        @retry_count = 0

        case obj
        when Twitter::Tweet
          invoke(:tweet, obj)
        when Twitter::DirectMessage
          invoke(:message, obj)
        when Twitter::Streaming::DeletedTweet
          invoke(:delete, obj.user_id, obj.id)
        when Twitter::Streaming::Event
          invoke(:event,
                 'event' => obj.name, 'source' => obj.source,
                 'target' => obj.target, 'target_object' => obj.target_object)
        else
          next
        end
      end
    rescue Exception => e
      raise e if defined?(Twitter::Streaming::MockClient)
      @logger.error 'Error on stream'
      @logger.error e.inspect
      @logger.error e.backtrace

      @retry_count += 1

      # reconnecting https://dev.twitter.com/streaming/overview/connecting
      case e
      when Twitter::Error::EnhanceYourCalm # 420
        interval = 60 * (2 ** (@retry_count - 1))
      when Twitter::Error
        interval = [320, 5 * (2 ** (@retry_count - 1))].min
      else
        interval = [16, 0.25 * @retry_count].min
      end

      @logger.info "stream will reconnect after #{interval} sec (retry_count=#{@retry_count})"
      sleep interval
      @logger.info 'stream reconnecting'
      retry
    end
  end

  @watchdog = Thread.new do
    th = @thread
    begin
      loop do
        break unless @thread
        # @logger.debug "watchdog last #{@last} #{Time.now - @last}"
        if (Time.now - @last) > 90
          @last = Time.now
          @logger.error 'watchdog timeout'
          th.raise(TimeoutError)
        end
        sleep 1
      end
      @logger.info 'watchdog stop'
    rescue Exception => e
      @logger.error 'Error on watchdog'
      @logger.error e.inspect
      @logger.error e.backtrace

      sleep 5
      @logger.info 'watchdog restarting'
      retry
    end
  end

  @thread.abort_on_exception = true

  self
end
stop() click to toggle source
# File lib/akane/receivers/stream.rb, line 130
def stop
  @thread.tap(&:kill).join
  @thread = nil
  self
end
stream() click to toggle source
# File lib/akane/receivers/stream.rb, line 37
def stream
  @stream ||= Twitter::Streaming::Client.new(
    consumer_key: @consumer[:token],
    consumer_secret: @consumer[:secret],
    access_token: @account[:token],
    access_token_secret: @account[:secret],
    ssl_socket_class: CustomSSLSocketFactory.new(self),
  )
end