class Akane::Recorder

Public Class Methods

new(storages, timeout: 20, logger: Logger.new(nil)) click to toggle source
# File lib/akane/recorder.rb, line 8
def initialize(storages, timeout: 20, logger: Logger.new(nil))
  @storages = storages
  @logger = logger
  @queue = Queue.new
  @recently_performed = RoundrobinFlags.new(1000)
  @timeout = timeout
  @stop = false
end

Public Instance Methods

dequeue(raise_errors = false) click to toggle source
# File lib/akane/recorder.rb, line 45
def dequeue(raise_errors = false)
  perform(*@queue.pop, raise_errors: raise_errors)
end
mark_as_deleted(account, user_id, tweet_id) click to toggle source
# File lib/akane/recorder.rb, line 27
def mark_as_deleted(account, user_id, tweet_id)
  return self if @stop
  @queue << [:mark_as_deleted, account, user_id, tweet_id]
  self
end
perform(action, account, *payload, raise_errors: false) click to toggle source
# File lib/akane/recorder.rb, line 49
def perform(action, account, *payload, raise_errors: false)
  if action == :record_tweet
    tweet = payload.last
    return if @recently_performed[tweet.id]
    @recently_performed.flag!(tweet.id)

    if tweet.retweet?
      perform(:record_tweet, account, tweet.retweeted_status, raise_errors: raise_errors)
    end
  end

  @storages.each do |storage|
    begin
      Timeout.timeout(@timeout) do
        storage.__send__(action, account, *payload)
      end

    rescue Timeout::Error => e
      raise e if raise_errors
      @logger.warn "#{storage.name} (#{action}) timed out"

    rescue Interrupt, SignalException, SystemExit => e
      raise e

    rescue Exception => e
      raise e if raise_errors
      @logger.error "Error while recorder performing to #{storage.inspect}:  #{e.inspect}"
      @logger.error e.backtrace
    end
  end
end
queue_length() click to toggle source
# File lib/akane/recorder.rb, line 17
def queue_length
  @queue.size
end
record_event(account, event) click to toggle source
# File lib/akane/recorder.rb, line 39
def record_event(account, event)
  return self if @stop
  @queue << [:record_event, account, event]
  self
end
record_message(account, message) click to toggle source
# File lib/akane/recorder.rb, line 33
def record_message(account, message)
  return self if @stop
  @queue << [:record_message, account, message]
  self
end
record_tweet(account, tweet) click to toggle source
# File lib/akane/recorder.rb, line 21
def record_tweet(account, tweet)
  return self if @stop
  @queue << [:record_tweet, account, tweet]
  self
end
run(raise_errors = false) click to toggle source
# File lib/akane/recorder.rb, line 81
def run(raise_errors = false)
  @running_thread = Thread.new do
    loop do
      begin
        begin
          self.dequeue(raise_errors)
        rescue Interrupt, SignalException, Stop
        end

        if @stop
          break if self.queue_length.zero?
          @logger.info "processing queue: #{self.queue_length} remaining."
        end
      rescue Exception => e
        raise e if raise_errors
        @logger.error "Error while recorder dequing: #{e.inspect}"
        @logger.error e.backtrace
      end
    end

    @logger.info "Recorder stopped."
    @stop = false
  end

  @running_thread.join
  nil
end
stop!() click to toggle source
# File lib/akane/recorder.rb, line 109
def stop!
  @stop = true
  @running_thread.raise Stop
end