class Ably::Realtime::Client::OutgoingMessageDispatcher

OutgoingMessageDispatcher is a (private) class that is used to deliver outgoing {Ably::Models::ProtocolMessage}s using the {Ably::Realtime::Connection} when the connection state is capable of delivering messages

Constants

ACTION

Public Class Methods

new(client, connection) click to toggle source
# File lib/submodules/ably-ruby/lib/ably/realtime/client/outgoing_message_dispatcher.rb, line 11
def initialize(client, connection)
  @client     = client
  @connection = connection

  subscribe_to_outgoing_protocol_message_queue
  setup_event_handlers
end

Private Instance Methods

can_send_messages?() click to toggle source
# File lib/submodules/ably-ruby/lib/ably/realtime/client/outgoing_message_dispatcher.rb, line 28
def can_send_messages?
  connection.connected? || connection.closing?
end
client() click to toggle source
# File lib/submodules/ably-ruby/lib/ably/realtime/client/outgoing_message_dispatcher.rb, line 20
def client
  @client
end
connection() click to toggle source
# File lib/submodules/ably-ruby/lib/ably/realtime/client/outgoing_message_dispatcher.rb, line 24
def connection
  @connection
end
current_transport_outgoing_message_bus() click to toggle source
# File lib/submodules/ably-ruby/lib/ably/realtime/client/outgoing_message_dispatcher.rb, line 44
def current_transport_outgoing_message_bus
  connection.transport.__outgoing_protocol_msgbus__
end
deliver_queued_protocol_messages() click to toggle source
# File lib/submodules/ably-ruby/lib/ably/realtime/client/outgoing_message_dispatcher.rb, line 48
def deliver_queued_protocol_messages
  condition = -> { can_send_messages? && messages_in_outgoing_queue? }

  non_blocking_loop_while(condition) do
    protocol_message = outgoing_queue.shift

    if (!connection.transport)
      protocol_message.fail Ably::Exceptions::TransportClosed.new('Transport disconnected unexpectedly', nil, Ably::Exceptions::Codes::DISCONNECTED)
      next
    end

    current_transport_outgoing_message_bus.publish :protocol_message, protocol_message

    if protocol_message.ack_required?
      pending_ack_queue << protocol_message
    else
      protocol_message.succeed protocol_message
    end
  end
end
messages_in_outgoing_queue?() click to toggle source
# File lib/submodules/ably-ruby/lib/ably/realtime/client/outgoing_message_dispatcher.rb, line 32
def messages_in_outgoing_queue?
  !outgoing_queue.empty?
end
outgoing_queue() click to toggle source
# File lib/submodules/ably-ruby/lib/ably/realtime/client/outgoing_message_dispatcher.rb, line 36
def outgoing_queue
  connection.__outgoing_message_queue__
end
pending_ack_queue() click to toggle source
# File lib/submodules/ably-ruby/lib/ably/realtime/client/outgoing_message_dispatcher.rb, line 40
def pending_ack_queue
  connection.__pending_message_ack_queue__
end
setup_event_handlers() click to toggle source
# File lib/submodules/ably-ruby/lib/ably/realtime/client/outgoing_message_dispatcher.rb, line 75
def setup_event_handlers
  connection.unsafe_on(:connected) do
    # Give connection manager enough time to prevent message delivery if necessary
    # For example, if reconnecting and connection and channel state is lost,
    # then the queued messages must be NACK'd
    EventMachine.next_tick do
      deliver_queued_protocol_messages
    end
  end
end
subscribe_to_outgoing_protocol_message_queue() click to toggle source
# File lib/submodules/ably-ruby/lib/ably/realtime/client/outgoing_message_dispatcher.rb, line 69
def subscribe_to_outgoing_protocol_message_queue
  connection.__outgoing_protocol_msgbus__.subscribe(:protocol_message) do |*args|
    deliver_queued_protocol_messages
  end
end