class AsyncStorage::Allocator

Constants

CTRL

Attributes

naming[R]

Public Class Methods

new(repo, *args) click to toggle source

@param repo [AsyncStorage::Repo] An instance of Repo @param args [Array] An array with arguments to be fowarded to resolver#call

# File lib/async_storage/allocator.rb, line 21
def initialize(repo, *args)
  @repo = repo
  @args = args
  @naming = AsyncStorage::Naming.new(repo.resolver_class, *args)
  # It's different than the config.namespace.
  # Thinking about a directory structure.. The global namespace would be the root directory.
  # And the namespace under Repo level would be the subdirectory.
  @naming.prefix = repo.namespace if repo.namespace
end

Public Instance Methods

exist?() click to toggle source

Check if a fresh value exist.

@return [Boolean] True or False according the object existence

# File lib/async_storage/allocator.rb, line 121
def exist?
  breaker.run(fallback: -> { false }) do
    connection { |redis| redis.exists?(naming.head) && redis.exists?(naming.body) }
  end
end
fresh?() click to toggle source

Check if a fresh object exists into the storage

@return [Boolean] true/false according to the object existence and freshness

# File lib/async_storage/allocator.rb, line 139
def fresh?
  breaker.run(fallback: -> { false }) do
    connection { |redis| redis.exists?(naming.body) && redis.ttl(naming.head) > 0 }
  end
end
get() click to toggle source

Async get value with a given key

@return [Object, NilClass] Return both stale or fresh object. If does not exist async call the retriever and return nil

# File lib/async_storage/allocator.rb, line 34
def get
  breaker.run(fallback: -> { fetch! }) do
    connection do |redis|
      raw_head = redis.get(naming.head)
      case raw_head
      when CTRL[:executed], CTRL[:enqueued]
        read_body(redis) # Try to deliver stale content
      when CTRL[:missing]
        return update!(redis) unless async?

        perform_async(redis) # Enqueue background job to resolve content
        redis.set(naming.head, CTRL[:enqueued])
        read_body(redis) # Try to deliver stale content
      else
        raise AsyncStorage::Error, format('the key %<k>s have an invalid value. Only "1" or "0" values are expected. And we got %<v>p', v: raw_head, k: naming.head)
      end
    end
  end
end
get!() click to toggle source

Sync get value with a given value

@return [Object] Return the result from resolver

# File lib/async_storage/allocator.rb, line 57
def get!
  breaker.run(fallback: -> { fetch! }) do
    connection do |redis|
      raw_head = redis.get(naming.head)
      case raw_head
      when CTRL[:executed]
        read_body(redis) || begin
          update!(redis) unless redis.exists?(naming.body)
        end
      when CTRL[:missing], CTRL[:enqueued]
        update!(redis)
      else
        raise AsyncStorage::Error, format('the key %<k>s have an invalid value. Only "1" or "0" values are expected. And we got %<v>p', v: raw_head, k: naming.head)
      end
    end
  end
end
invalidate() click to toggle source

Expire object the object with a given key. The stale object will not be removed

@return [Boolean] True or False according to the object existence

# File lib/async_storage/allocator.rb, line 78
def invalidate
  breaker.run(fallback: -> { false }) do
    connection do |redis|
      redis.del(naming.head) == 1
    end
  end
end
invalidate!() click to toggle source

Delete object with a given key.

@return [Boolean] True or False according to the object existence

# File lib/async_storage/allocator.rb, line 89
def invalidate!
  breaker.run(fallback: -> { false }) do
    connection do |redis|
      redis.multi do |cli|
        cli.del(naming.body)
        cli.del(naming.head)
      end.include?(1)
    end
  end
end
refresh() click to toggle source

Invalidate object with the given key and update content according to the strategy

@return [Object, NilClass] Stale object or nil when it does not exist

# File lib/async_storage/allocator.rb, line 103
def refresh
  breaker.run(fallback: -> { fetch! }) do
    get.tap { invalidate }
  end
end
refresh!() click to toggle source

Fetch data from resolver and store it into redis

@return [Object] Return the result from resolver

# File lib/async_storage/allocator.rb, line 112
def refresh!
  breaker.run(fallback: -> { fetch! }) do
    connection { |redis| update!(redis) }
  end
end
stale?() click to toggle source

Check if object with a given key is stale

@return [NilClass, Boolean] Return nil if the object does not exist or true/false according to the object freshness state

# File lib/async_storage/allocator.rb, line 130
def stale?
  breaker.run(fallback: -> { false }) do
    connection { |redis| redis.exists?(naming.body) && redis.ttl(naming.head) < 0 }
  end
end

Private Instance Methods

async?() click to toggle source
# File lib/async_storage/allocator.rb, line 147
def async?
  false
end
breaker() click to toggle source
# File lib/async_storage/allocator.rb, line 186
def breaker
  CircuitBreaker.new(self, exceptions: [Redis::BaseConnectionError])
end
connection() { |redis| ... } click to toggle source
# File lib/async_storage/allocator.rb, line 180
def connection
  return unless block_given?

  AsyncStorage.redis_pool.with { |redis| yield(redis) }
end
fetch!() click to toggle source
# File lib/async_storage/allocator.rb, line 169
def fetch!
  resolver_class.new.(*@args)
end
perform_async(redis) click to toggle source
# File lib/async_storage/allocator.rb, line 151
def perform_async(redis)
  # @TODO Enqueue a real background job here. It's only working on sync mode
  # redis.set(name.head, CTRL[:enqueued])
  refresh!
end
read_body(redis) click to toggle source
# File lib/async_storage/allocator.rb, line 173
def read_body(redis)
  raw = redis.get(naming.body)
  return unless raw

  AsyncStorage::JSON.load(raw)
end
update!(redis) click to toggle source
# File lib/async_storage/allocator.rb, line 157
def update!(redis)
  payload = fetch!

  json = AsyncStorage::JSON.dump(payload, mode: :compat)
  redis.multi do |cli|
    cli.set(naming.body, json)
    cli.set(naming.head, CTRL[:executed])
    cli.expire(naming.head, expires_in) if expires_in
  end
  AsyncStorage::JSON.load(json)
end