class Handlers::ReceiverHandler

Receiver events handler for receiver client

Attributes

browse[RW]

Browse

count[RW]

Count of expected messages to be received

prefetch[RW]

Credit for messages to be pre-fetched

process_reply_to[RW]

Process reply to

recv_listen[RW]

Receiver listen

recv_listen_port[RW]

Receiver listen port

selector[RW]

Selector

Public Class Methods

new( broker, log_msgs, msg_content_hashed, count, prefetch, process_reply_to, browse, selector, sasl_mechs, idle_timeout, max_frame_size, sasl_enabled, log_lib, recv_listen, recv_listen_port, auto_settle_off, exit_timer, duration, duration_mode ) click to toggle source

Initialization of receiver events handler

Receiver events handler arguments

broker

URI of broker

log_msgs

format of message(s) log

count

number of messages to receive

process-reply-to

send message to reply-to address if enabled and message got reply-to address

browse

browse messages instead of reading

sasl_mechs

allowed SASL mechanisms

Calls superclass method Handlers::SRCommonHandler::new
# File lib/handlers/receiver_handler.rb, line 49
def initialize(
  broker,
  log_msgs,
  msg_content_hashed,
  count,
  prefetch,
  process_reply_to,
  browse,
  selector,
  sasl_mechs,
  idle_timeout,
  max_frame_size,
  sasl_enabled,
  log_lib,
  recv_listen,
  recv_listen_port,
  auto_settle_off,
  exit_timer,
  duration,
  duration_mode
)
  super(
    broker,
    log_msgs,
    msg_content_hashed,
    sasl_mechs,
    idle_timeout,
    max_frame_size,
    sasl_enabled,
    log_lib,
    auto_settle_off,
    exit_timer
  )
  # Save count of expected messages to be received
  @count = count
  # Save credit for messages to be pre-fetched
  @prefetch = prefetch
  # Save process reply to
  @process_reply_to = process_reply_to
  # Save browse
  @browse = browse
  # Save selector
  @selector = selector
  # Save recv-listen value
  @recv_listen = recv_listen
  # Save recv-listen port value
  @recv_listen_port = recv_listen_port
  # Number of received messages
  @received = 0
  # Flag indicating that all expected messages were received
  @all_received = false
  # Hash with senders for replying
  @senders = {}
  # Counter of sent messages when processing reply-to
  @sent = 0
  # Counter of accepted messages
  @accepted = 0
  # Duration
  @duration = Duration.new(duration, count, duration_mode)
end

Public Instance Methods

do_process_reply_to(message) click to toggle source

Processing reply to reply-to address of message

# File lib/handlers/receiver_handler.rb, line 184
def do_process_reply_to(message)
  # If sender for actual reply-to address does not exist
  unless @senders.include?(message.reply_to)
    # Create new sender for reply-to address
    @senders[message.reply_to] = @receiver.connection.open_sender({
      # Set target address
      :target => message.reply_to,
      # Set auto settle
      :auto_settle => @auto_settle_off ? false : true,
    })
  end
  # Set target address of message to be send to reply-to address
  message.address = message.reply_to
  # Increase number of sent messages
  @sent = @sent + 1
  # Send message to reply-to address
  @senders[message.reply_to].send(message)
end
on_container_start(container) click to toggle source

Called when the event loop starts, connects receiver client to SRCommonHandler#broker and creates receiver

# File lib/handlers/receiver_handler.rb, line 113
def on_container_start(container)
  if @recv_listen # P2P
    @listener = container.listen("0.0.0.0:#{@recv_listen_port}")
  else # Broker
    # Prepare source options
    source = {}
    source[:address] = @broker.amqp_address
    source[:filter] = { :selector => make_apache_selector(@selector)} if @selector
    # Connecting to broker and creating receiver
    @receiver = container.connect(
      # Set broker URI
      @broker,
      # Enabled SASL authentication
      sasl_enabled: @sasl_enabled,
      # Enabled insecure SASL mechanisms
      sasl_allow_insecure_mechs: true,
      # Set allowed SASL mechanisms
      sasl_allowed_mechs: @sasl_mechs,
      # Set idle timeout
      idle_timeout: @idle_timeout,
      # Set max frame size
      max_frame_size: @max_frame_size,
    ).open_receiver(
      # Set source options
      :source => source,
      # Set prefetch
      :credit_window => @prefetch,
    )
    # If browse messages instead of reading
    if browse
      # Set browsing mode
      @receiver.source.distribution_mode = \
        Qpid::Proton::Terminus::DIST_MODE_COPY
    end
  end
end
on_message(delivery, message) click to toggle source

Called when a message is received, receiving ReceiverHandler#count messages

# File lib/handlers/receiver_handler.rb, line 152
def on_message(delivery, message)
  @duration.delay("before-receive") { |d| sleep d }
  exit_timer.reset if exit_timer
  # Print received message
  print_message(message)
  # If process reply to
  if @process_reply_to and !message.reply_to.nil?
    self.do_process_reply_to(message)
  end
  # Increase number of received messages
  @received = @received + 1
  # If expected count of messages to be received is not zero
  # and all expected messages are received
  if @count > 0 and @received == @count
    # Set flag indicating that all expected messages were received to true
    @all_received = true
    # Close listener when listening
    if recv_listen
      # Close listener if not processing reply-to
      @listener.close unless process_reply_to
    # Close receiver when not listening, but receiving
    else
      # Close receiver
      delivery.receiver.close 
      # Close connection if not processing reply-to
      delivery.receiver.connection.close unless process_reply_to
    end
  end # if
  @duration.delay("after-receive") { |d| sleep d }
end
on_tracker_accept(_tracker) click to toggle source

Called when the remote peer accepts an outgoing message, accepting ReceiverHandler#sent messages

# File lib/handlers/receiver_handler.rb, line 205
def on_tracker_accept(_tracker)
  # Increase number of accepted messages
  @accepted = @accepted + 1
  # If all expected messages were received
  # and all sent messages were accepted
  if @all_received and @accepted == @sent
    # Close all senders and their connections
    @senders.each do |_, i_sender|
      # Close sender
      i_sender.close
      # Close connection of sender
      i_sender.connection.close
    end
  end # if
end

Private Instance Methods

make_apache_selector(selector) click to toggle source
# File lib/handlers/receiver_handler.rb, line 222
def make_apache_selector(selector)
  Qpid::Proton::Types::Described.new(:"apache.org:selector-filter:string", selector)
end