module Lowkiq
Constants
- VERSION
Attributes
build_scheduler[RW]
build_splitter[RW]
client_pool_size[RW]
dump_payload[RW]
last_words[RW]
load_payload[RW]
on_server_init[RW]
poll_interval[RW]
pool_timeout[RW]
redis[RW]
server_middlewares[RW]
threads_per_node[RW]
workers[RW]
Public Class Methods
build_by_node_splitter(number_of_nodes, node_number)
click to toggle source
# File lib/lowkiq.rb, line 89 def build_by_node_splitter(number_of_nodes, node_number) Lowkiq::Splitters::ByNode.new( number_of_nodes, node_number, Lowkiq.threads_per_node, ) end
build_default_splitter()
click to toggle source
# File lib/lowkiq.rb, line 85 def build_default_splitter Lowkiq::Splitters::Default.new Lowkiq.threads_per_node end
build_lag_scheduler()
click to toggle source
# File lib/lowkiq.rb, line 72 def build_lag_scheduler Schedulers::Lag.new( ->() { sleep Lowkiq.poll_interval }, Queue::ShardMetrics.new(self.server_redis_pool) ) end
build_seq_scheduler()
click to toggle source
# File lib/lowkiq.rb, line 79 def build_seq_scheduler Schedulers::Seq.new( ->() { sleep Lowkiq.poll_interval } ) end
client_redis_pool()
click to toggle source
# File lib/lowkiq.rb, line 51 def client_redis_pool @client_redis_pool ||= ConnectionPool.new(size: client_pool_size, timeout: pool_timeout, &redis) end
server_redis_pool()
click to toggle source
# File lib/lowkiq.rb, line 47 def server_redis_pool @server_redis_pool ||= ConnectionPool.new(size: threads_per_node, timeout: pool_timeout, &redis) end
server_wrapper()
click to toggle source
# File lib/lowkiq.rb, line 55 def server_wrapper null = -> (worker, batch, &block) { block.call } server_middlewares.reduce(null) do |wrapper, m| -> (worker, batch, &block) do wrapper.call worker, batch do m.call worker, batch, &block end end end end
shard_handlers()
click to toggle source
# File lib/lowkiq.rb, line 66 def shard_handlers self.workers.flat_map do |w| ShardHandler.build_many w, self.server_wrapper end end
Public Instance Methods
client_actions()
click to toggle source
# File lib/lowkiq/worker.rb, line 33 def client_actions Queue::Actions.new client_queue, client_queries end
client_queries()
click to toggle source
# File lib/lowkiq/worker.rb, line 29 def client_queries Queue::Queries.new Lowkiq.client_redis_pool, self.queue_name end
client_queue()
click to toggle source
# File lib/lowkiq/worker.rb, line 25 def client_queue Queue::Queue.new Lowkiq.client_redis_pool, self.queue_name, self.shards_count end
perform(payload)
click to toggle source
# File lib/lowkiq/worker.rb, line 21 def perform(payload) fail "not implemented" end
perform_async(batch)
click to toggle source
# File lib/lowkiq/worker.rb, line 37 def perform_async(batch) client_queue.push batch end
retry_in(retry_count)
click to toggle source
i.e. 15, 16, 31, 96, 271, … seconds + a random amount of time
# File lib/lowkiq/worker.rb, line 17 def retry_in(retry_count) (retry_count ** 4) + 15 + (rand(30) * (retry_count + 1)) end