class Superbolt::Queue
Attributes
config[R]
name[R]
Public Class Methods
new(name, config=nil)
click to toggle source
# File lib/superbolt/queue.rb, line 5 def initialize(name, config=nil) @name = name @config = config || Superbolt.config end
Public Instance Methods
all()
click to toggle source
# File lib/superbolt/queue.rb, line 49 def all read.map(&:parse) end
clear()
click to toggle source
# File lib/superbolt/queue.rb, line 30 def clear closing do q.purge end end
connection()
click to toggle source
# File lib/superbolt/queue.rb, line 10 def connection @connection ||= Connection::Queue.new(name, config) end
delete() { |parse| ... }
click to toggle source
# File lib/superbolt/queue.rb, line 71 def delete messages = [] closing do q.subscribe(:ack => true) do |delivery_info, metadata, payload| message = IncomingMessage.new(delivery_info, payload, channel) relevant = yield(message.parse) if relevant messages << message.parse message.ack end end # channel is closed by block before message ack can complete # therefore we must sleep :( sleep 0.02 end messages end
peek()
click to toggle source
# File lib/superbolt/queue.rb, line 53 def peek message = pop push(message) message end
pop()
click to toggle source
# File lib/superbolt/queue.rb, line 59 def pop closing do q.pop do |delivery_info, metadata, message| message = IncomingMessage.new(delivery_info, message, channel) message && message.parse end end end
push(message)
click to toggle source
# File lib/superbolt/queue.rb, line 18 def push(message) closing do writer.publish(message.to_json, routing_key: name) end end
read()
click to toggle source
TODO: roll up some of these subscribe methods
# File lib/superbolt/queue.rb, line 38 def read messages = [] closing do q.subscribe(:ack => true) do |delivery_info, metadata, payload| message = IncomingMessage.new(delivery_info, payload, channel) messages << message end end messages end
size()
click to toggle source
# File lib/superbolt/queue.rb, line 24 def size closing do q.message_count end end