class Zold::AsyncEntrance

The entrance

Public Class Methods

new(entrance, dir, log: Log::NULL, threads: [Concurrent.processor_count, 8].max, queue_limit: 8) click to toggle source
# File lib/zold/node/async_entrance.rb, line 41
def initialize(entrance, dir, log: Log::NULL,
  threads: [Concurrent.processor_count, 8].max, queue_limit: 8)
  @entrance = entrance
  @dir = File.expand_path(dir)
  @log = log
  @threads = threads
  @pool = ThreadPool.new('async-entrance', log: log)
  @queue = Queue.new
  @queue_limit = queue_limit
end

Public Instance Methods

push(id, body) click to toggle source

Always returns an array with a single ID of the pushed wallet

# File lib/zold/node/async_entrance.rb, line 90
  def push(id, body)
    if @queue.size > @queue_limit
      raise(
        SoftError,
        "Queue is too long (#{@queue.size} wallets), can't add #{id}/#{Size.new(body.length)}, try again later"
      )
    end
    start = Time.now
    unless exists?(id, body)
      loop do
        uuid = SecureRandom.uuid
        file = File.join(@dir, "#{id}-#{uuid}#{Wallet::EXT}")
        next if File.exist?(file)
        IO.write(file, body)
        @queue << { id: id, file: file }
        @log.debug("Added #{id}/#{Size.new(body.length)} to the queue at pos.#{@queue.size} \
in #{Age.new(start, limit: 0.05)}")
        break
      end
    end
    [id]
  end
start() { |self| ... } click to toggle source
# File lib/zold/node/async_entrance.rb, line 60
def start
  raise 'Block must be given to start()' unless block_given?
  FileUtils.mkdir_p(@dir)
  DirItems.new(@dir).fetch.each do |f|
    file = File.join(@dir, f)
    if /^[0-9a-f]{16}-/.match?(f)
      id = f.split('-')[0]
      @queue << { id: Id.new(id), file: file }
    else
      File.delete(file)
    end
  end
  @log.info("#{@queue.size} wallets pre-loaded into async_entrace from #{@dir}") unless @queue.size.zero?
  @entrance.start do
    (0..@threads).map do |i|
      @pool.add do
        Endless.new("async-e##{i}", log: @log).run do
          take
        end
      end
    end
    begin
      yield(self)
    ensure
      @pool.kill
    end
  end
end
to_json() click to toggle source
# File lib/zold/node/async_entrance.rb, line 52
def to_json
  @entrance.to_json.merge(
    'queue': @queue.size,
    'threads': @pool.count,
    'queue_limit': @queue_limit
  )
end

Private Instance Methods

exists?(id, body) click to toggle source

Returns TRUE if a file for this wallet is already in the queue.

# File lib/zold/node/async_entrance.rb, line 116
def exists?(id, body)
  DirItems.new(@dir).fetch.each do |f|
    next unless f.start_with?("#{id}-")
    return true if safe_read(File.join(@dir, f)) == body
  end
  false
end
safe_read(file) click to toggle source
# File lib/zold/node/async_entrance.rb, line 124
def safe_read(file)
  IO.read(file)
rescue Errno::ENOENT
  ''
end
take() click to toggle source
# File lib/zold/node/async_entrance.rb, line 130
    def take
      start = Time.now
      item = @queue.pop
      Thread.current.thread_variable_set(:wallet, item[:id].to_s)
      body = IO.read(item[:file])
      FileUtils.rm_f(item[:file])
      @entrance.push(item[:id], body)
      @log.debug("Pushed #{item[:id]}/#{Size.new(body.length)} to #{@entrance.class.name} \
in #{Age.new(start, limit: 0.1)}#{@queue.size.zero? ? '' : "(#{@queue.size} still in the queue)"}")
    end