class Superbolt::Queue

Attributes

config[R]
name[R]

Public Class Methods

new(name, config=nil) click to toggle source
# File lib/superbolt/queue.rb, line 5
def initialize(name, config=nil)
  @name = name
  @config = config || Superbolt.config
end

Public Instance Methods

all() click to toggle source
# File lib/superbolt/queue.rb, line 49
def all
  read.map(&:parse)
end
clear() click to toggle source
# File lib/superbolt/queue.rb, line 30
def clear
  closing do
    q.purge
  end
end
connection() click to toggle source
# File lib/superbolt/queue.rb, line 10
def connection
  @connection ||= Connection::Queue.new(name, config)
end
delete() { |parse| ... } click to toggle source
# File lib/superbolt/queue.rb, line 71
def delete
  messages = []
  closing do
    q.subscribe(:ack => true) do |delivery_info, metadata, payload|
      message = IncomingMessage.new(delivery_info, payload, channel)
      relevant = yield(message.parse)
      if relevant
        messages << message.parse
        message.ack
      end
    end

    # channel is closed by block before message ack can complete
    # therefore we must sleep :(
    sleep 0.02
  end
  messages
end
peek() click to toggle source
# File lib/superbolt/queue.rb, line 53
def peek
  message = pop
  push(message)
  message
end
pop() click to toggle source
# File lib/superbolt/queue.rb, line 59
def pop
  closing do
    q.pop do |delivery_info, metadata, message|
      message = IncomingMessage.new(delivery_info, message, channel)
      message && message.parse
    end
  end
end
push(message) click to toggle source
# File lib/superbolt/queue.rb, line 18
def push(message)
  closing do
    writer.publish(message.to_json, routing_key: name)
  end
end
read() click to toggle source

TODO: roll up some of these subscribe methods

# File lib/superbolt/queue.rb, line 38
def read
  messages = []
  closing do
    q.subscribe(:ack => true) do |delivery_info, metadata, payload|
      message = IncomingMessage.new(delivery_info, payload, channel)
      messages << message
    end
  end
  messages
end
size() click to toggle source
# File lib/superbolt/queue.rb, line 24
def size
  closing do
    q.message_count
  end
end