class Spacebunny::LiveStream::Amqp
Constants
- ACK_TYPES
- DEFAULT_EXCHANGE_OPTIONS
- DEFAULT_QUEUE_OPTIONS
Attributes
client[R]
Public Class Methods
new(*args)
click to toggle source
Calls superclass method
# File lib/spacebunny/live_stream/amqp.rb, line 12 def initialize(*args) super(:amqp, *args) end
Public Instance Methods
connect()
click to toggle source
# File lib/spacebunny/live_stream/amqp.rb, line 16 def connect # 'Fix' attributes: start from common connection configs and adjust attributes to match what Bunny # wants as connection args connection_params = connection_configs.dup connection_params[:user] = connection_params.delete :client connection_params[:password] = connection_params.delete :secret # Default on a tls connection unless connection_params[:tls] == false connection_params[:port] = connection_params.delete(:tls_port) end connection_params[:log_level] = connection_params.delete(:log_level) || ::Logger::ERROR # Re-create client every time connect is called @client = Bunny.new(connection_params) @client.start logger.info 'Connected to SpaceBunny' end
disconnect()
click to toggle source
Calls superclass method
# File lib/spacebunny/live_stream/amqp.rb, line 34 def disconnect super client.stop if client logger.info 'Disconnected from SpaceBunny' end
message_from(name, options = {}, &block)
click to toggle source
Subscribe for messages coming from Live Stream with name 'name' Each subscriber will receive a copy of messages flowing through the Live Stream
# File lib/spacebunny/live_stream/amqp.rb, line 42 def message_from(name, options = {}, &block) receive_message_from name, options, &block end
message_from_cache(name, options = {}, &block)
click to toggle source
Subscribe for messages coming from cache of Live Stream with name 'name' The Live Stream will dispatch a message to the first ready subscriber in a round-robin fashion.
# File lib/spacebunny/live_stream/amqp.rb, line 48 def message_from_cache(name, options = {}, &block) options[:from_cache] = true receive_message_from name, options, &block end
Private Instance Methods
check_client()
click to toggle source
# File lib/spacebunny/live_stream/amqp.rb, line 55 def check_client unless client raise ClientNotSetup end unless client.connected? if raise_on_error raise ClientNotConnected else @logger.error 'Client not connected! Check internet connection' return false end end true end
client_connected?()
click to toggle source
# File lib/spacebunny/live_stream/amqp.rb, line 70 def client_connected? client && client.status.eql?(:open) end
live_stream_data_from_name(name)
click to toggle source
# File lib/spacebunny/live_stream/amqp.rb, line 74 def live_stream_data_from_name(name) # Find the live_stream from provided name unless live_stream_data = live_streams.find { |ls| ls[:name] == name } raise LiveStreamNotFound.new(name) end live_stream_data end
parse_ack(ack)
click to toggle source
# File lib/spacebunny/live_stream/amqp.rb, line 82 def parse_ack(ack) to_ack = false auto_ack = false if ack raise AckTypeError unless ACK_TYPES.include?(ack) to_ack = true case ack when :manual auto_ack = false when :auto auto_ack = true end end return to_ack, auto_ack end
receive_message_from(name, options) { |message| ... }
click to toggle source
# File lib/spacebunny/live_stream/amqp.rb, line 98 def receive_message_from(name, options) unless block_given? raise BlockRequired end name = name.to_s blocking = options.fetch :wait, false to_ack, auto_ack = parse_ack options.fetch(:ack, :manual) from_cache = options.fetch :from_cache, false if check_client ls_channel = client.create_channel live_stream_name = "#{name}.live_stream" if from_cache live_stream = ls_channel.queue live_stream_name, DEFAULT_QUEUE_OPTIONS else ls_exchange = ls_channel.fanout live_stream_name, DEFAULT_EXCHANGE_OPTIONS live_stream = ls_channel.queue("#{client}_#{Time.now.to_f}.live_stream.temp", auto_delete: true) .bind ls_exchange, routing_key: '#' end live_stream.subscribe(block: blocking, manual_ack: to_ack) do |delivery_info, metadata, payload| message = LiveStream::Message.new ls_channel, options, delivery_info, metadata, payload yield message # If ack is :auto then ack current message if to_ack && auto_ack message.ack end end return true else @logger.debug 'Not subscribed due to client not connected' false end end