class Ably::Realtime::Channel
Enables messages to be published and subscribed to. Also enables historic messages to be retrieved and provides access to the {Ably::Realtime::Channel} object of a channel.
Channels
will always be in one of the following states:
initialized: 0 attaching: 1 attached: 2 detaching: 3 detached: 4 failed: 5
Note that the states are available as Enum-like constants:
Channel::STATE.Initialized The channel has been initialized but no attach has yet been attempted. Channel::STATE.Attaching An attach has been initiated by sending a request to Ably. This is a transient state, followed either by a transition to ATTACHED, SUSPENDED, or FAILED. Channel::STATE.Attached The attach has succeeded. In the ATTACHED state a client may publish and subscribe to messages, or be present on the channel. Channel::STATE.Detaching A detach has been initiated on an ATTACHED channel by sending a request to Ably. This is a transient state, followed either by a transition to DETACHED or FAILED. Channel::STATE.Detached The channel, having previously been ATTACHED, has been detached by the user. Channel::STATE.Suspended The channel, having previously been ATTACHED, has lost continuity, usually due to the client being disconnected from Ably for longer than two minutes. It will automatically attempt to reattach as soon as connectivity is restored. Channel::STATE.Failed An indefinite failure condition. This state is entered if a channel error has been received from the Ably service, such as an attempt to attach without the necessary access rights.
Constants
- EVENT
Describes the events emitted by a {Ably::Rest::Channel} or {Ably::Realtime::Channel} object. An event is either an UPDATE or a {Ably::Rest::Channel::STATE}.
The permitted channel events that are emitted for this channel
@spec RTL2g
- MAX_PROTOCOL_MESSAGE_BATCH_SIZE
Max number of messages to bundle in a single ProtocolMessage
- STATE
The current {Abbly::Realtime::Channel::STATE} of the channel.
@spec RTL2b
The permitted states for this channel
Attributes
Flag that specifies whether channel is resuming attachment(reattach) or is doing a ‘clean attach’ RTL4j1 @return [Boolean] @api private
{Ably::Realtime::Client} associated with this channel
@return [Ably::Realtime::Client]
@api private
An {Ably::Models::ErrorInfo} object describing the last error which occurred on the channel, if any. @spec RTL4e @return [Ably::Models::ErrorInfo,Ably::Exceptions::BaseAblyException]
The Channel
manager responsible for attaching, detaching and handling failures for this channel @return [Ably::Realtime::Channel::ChannelManager] @api private
The channel name. @return [String]
Channel
options configured for this channel, see {#initialize} for channel_options @return [Hash]
A {Ably::Realtime::Channel::ChannelProperties} object.
@spec CP1, RTL15
@return [{Ably::Realtime::Channel::ChannelProperties}]
A {Ably::Realtime::Channel::PushChannel} object.
@return [Ably::Realtime::Channel::PushChannel]
Public Class Methods
Initialize a new Channel
object
@param client [Ably::Rest::Client] @param name [String] The name of the channel @param channel_options [Hash, Ably::Models::ChannelOptions
] A hash of options or a {Ably::Models::ChannelOptions}
# File lib/submodules/ably-ruby/lib/ably/realtime/channel.rb, line 127 def initialize(client, name, channel_options = {}) name = ensure_utf_8(:name, name) @options = Ably::Models::ChannelOptions(channel_options) @client = client @name = name @queue = [] @state_machine = ChannelStateMachine.new(self) @state = STATE(state_machine.current_state) @manager = ChannelManager.new(self, client.connection) @push = PushChannel.new(self) @properties = ChannelProperties.new(self) @attach_resume = false setup_event_handlers setup_presence end
Public Instance Methods
@return [Ably::Util::PubSub] Client
library internal channel incoming message bus
@api private
# File lib/submodules/ably-ruby/lib/ably/realtime/channel.rb, line 352 def __incoming_msgbus__ @__incoming_msgbus__ ||= Ably::Util::PubSub.new( coerce_into: lambda { |event| Ably::Models::ProtocolMessage::ACTION(event) } ) end
Attach to this channel ensuring the channel is created in the Ably
system and all messages published on the channel are received by any channel listeners registered using {Ably::Realtime::Channel#subscribe}. Any resulting channel state change will be emitted to any listeners registered using the {Ably::Modules::EventEmitter#on} or {Ably::Modules::EventEmitter#once} methods. A callback may optionally be passed in to this call to be notified of success or failure of the operation. As a convenience, attach() is called implicitly if {Ably::Realtime::Channel#subscribe} for the channel is called, or {Ably::Realtime::Presence#enter} or {Ably::Realtime::Presence#subscribe} are called on the {Ably::Realtime::Presence} object for this channel.
@spec RTL4d
@yield [Ably::Realtime::Channel] Block is called as soon as this channel is in the Attached state @return [Ably::Util::SafeDeferrable] Deferrable that supports both success (callback) and failure (errback) callback
# File lib/submodules/ably-ruby/lib/ably/realtime/channel.rb, line 257 def attach(&success_block) if connection.closing? || connection.closed? || connection.suspended? || connection.failed? error = Ably::Exceptions::InvalidStateChange.new("Cannot ATTACH channel when the connection is in a closed, suspended or failed state. Connection state: #{connection.state}") return Ably::Util::SafeDeferrable.new_and_fail_immediately(logger, error) end if !attached? if detaching? # Let the pending operation complete (#RTL4h) once_state_changed { transition_state_machine :attaching if can_transition_to?(:attaching) } else transition_state_machine :attaching if can_transition_to?(:attaching) end end deferrable_for_state_change_to(STATE.Attached, &success_block) end
@api private
# File lib/submodules/ably-ruby/lib/ably/realtime/channel.rb, line 389 def attach_resume! @attach_resume = true end
@api private
# File lib/submodules/ably-ruby/lib/ably/realtime/channel.rb, line 378 def clear_error_reason @error_reason = nil end
Detach from this channel. Any resulting channel state change is emitted to any listeners registered using the {Ably::Modules::EventEmitter#on} or {Ably::Modules::EventEmitter#once} methods. A callback may optionally be passed in to this call to be notified of success or failure of the operation. Once all clients globally have detached from the channel, the channel will be released in the Ably
service within two minutes.
@spec RTL5e
@yield [Ably::Realtime::Channel] Block is called as soon as this channel is in the Detached or Failed state @return [Ably::Util::SafeDeferrable] Deferrable that supports both success (callback) and failure (errback) callback
# File lib/submodules/ably-ruby/lib/ably/realtime/channel.rb, line 285 def detach(&success_block) if initialized? success_block.call if block_given? return Ably::Util::SafeDeferrable.new_and_succeed_immediately(logger) end if failed? || connection.closing? || connection.failed? return Ably::Util::SafeDeferrable.new_and_fail_immediately(logger, exception_for_state_change_to(:detaching)) end if !detached? if attaching? # Let the pending operation complete (#RTL5i) once_state_changed { transition_state_machine :detaching if can_transition_to?(:detaching) } elsif can_transition_to?(:detaching) transition_state_machine :detaching else transition_state_machine! :detached end end deferrable_for_state_change_to(STATE.Detached, &success_block) end
Retrieves a {Ably::Models::PaginatedResult} object, containing an array of historical {Ably::Models::Message} objects for the channel. If the channel is configured to persist messages, then messages can be retrieved from history for up to 72 hours in the past. If not, messages can only be retrieved from history for up to two minutes in the past.
@spec RSL2a
@param (see {Ably::Rest::Channel#history}) @option options (see {Ably::Rest::Channel#history}) @option options [Boolean] :until_attach When true, the history request will be limited only to messages published before this channel was attached. Channel
must be attached
@yield [Ably::Models::PaginatedResult<Ably::Models::Message>] First {Ably::Models::PaginatedResult page} of {Ably::Models::Message} objects accessible with {Ably::Models::PaginatedResult#items items}.
@return [Ably::Util::SafeDeferrable]
# File lib/submodules/ably-ruby/lib/ably/realtime/channel.rb, line 334 def history(options = {}, &callback) # RTL10b if options.delete(:until_attach) unless attached? error = Ably::Exceptions::InvalidRequest.new('option :until_attach is invalid as the channel is not attached' ) return Ably::Util::SafeDeferrable.new_and_fail_immediately(logger, error) end options[:from_serial] = properties.attach_serial end async_wrap(callback) do rest_channel.history(options.merge(async_blocking_operations: true)) end end
Used by {Ably::Modules::StateEmitter} to debug state changes @api private
# File lib/submodules/ably-ruby/lib/ably/realtime/channel.rb, line 384 def logger client.logger end
As we are using a state machine, do not allow change_state to be used transition_state_machine
must be used instead
# File lib/submodules/ably-ruby/lib/ably/realtime/channel.rb, line 402 def need_reattach? !!(attaching? || attached?) && !!(options.modes || options.params) end
A {Ably::Realtime::Presence} object.
@spec RTL9
@return {Ably::Realtime::Presence}
# File lib/submodules/ably-ruby/lib/ably/realtime/channel.rb, line 315 def presence @presence end
Publish a message to the channel. A callback may optionally be passed in to this call to be notified of success or failure of the operation. When publish is called with this client library, it won’t attempt to implicitly attach to the channel.
@spec RTL6i
@param name [String, Array<Ably::Models::Message|Hash>, nil] The event name of the message to publish, or an Array of [Ably::Model::Message] objects or [Hash] objects with :name
and :data
pairs @param data [String, ByteArray, nil] The message payload unless an Array of [Ably::Model::Message] objects passed in the first argument @param attributes [Hash, nil] Optional additional message attributes such as :client_id or :connection_id, applied when name attribute is nil or a string
@yield [Ably::Models::Message,Array<Ably::Models::Message>] On success, will call the block with the {Ably::Models::Message} if a single message is published, or an Array of {Ably::Models::Message} when multiple messages are published @return [Ably::Util::SafeDeferrable] Deferrable that supports both success (callback) and failure (errback) callbacks
@example
# Publish a single message form channel.publish 'click', { x: 1, y: 2 } # Publish a single message with single Hash form message = { name: 'click', data: { x: 1, y: 2 } } channel.publish message # Publish an array of message Hashes form messages = [ { name: 'click', data: { x: 1, y: 2 } }, { name: 'click', data: { x: 2, y: 3 } } ] channel.publish messages # Publish an array of Ably::Models::Message objects form messages = [ Ably::Models::Message(name: 'click', data: { x: 1, y: 2 }) Ably::Models::Message(name: 'click', data: { x: 2, y: 3 }) ] channel.publish messages # Publish an array of Ably::Models::Message objects form message = Ably::Models::Message(name: 'click', data: { x: 1, y: 2 }) channel.publish message channel.publish('click', 'body') do |message| puts "#{message.name} event received with #{message.data}" end channel.publish('click', 'body').errback do |error, message| puts "#{message.name} was not received, error #{error.message}" end
# File lib/submodules/ably-ruby/lib/ably/realtime/channel.rb, line 193 def publish(name, data = nil, attributes = {}, &success_block) if suspended? || failed? error = Ably::Exceptions::ChannelInactive.new("Cannot publish messages on a channel in state #{state}") return Ably::Util::SafeDeferrable.new_and_fail_immediately(logger, error) end if !connection.can_publish_messages? error = Ably::Exceptions::MessageQueueingDisabled.new("Message cannot be published. Client is not allowed to queue messages when connection is in state #{connection.state}") return Ably::Util::SafeDeferrable.new_and_fail_immediately(logger, error) end messages = build_messages(name, data, attributes) # (RSL1a, RSL1b) if messages.length > Realtime::Connection::MAX_PROTOCOL_MESSAGE_BATCH_SIZE error = Ably::Exceptions::InvalidRequest.new("It is not possible to publish more than #{Realtime::Connection::MAX_PROTOCOL_MESSAGE_BATCH_SIZE} messages with a single publish request.") return Ably::Util::SafeDeferrable.new_and_fail_immediately(logger, error) end enqueue_messages_on_connection(client, messages, channel_name, options).tap do |deferrable| deferrable.callback(&success_block) if block_given? end end
@api private
# File lib/submodules/ably-ruby/lib/ably/realtime/channel.rb, line 394 def reset_attach_resume! @attach_resume = false end
@api private
# File lib/submodules/ably-ruby/lib/ably/realtime/channel.rb, line 373 def set_channel_error_reason(error) @error_reason = error end
Sets the {Ably::Models::ChannelOptions} for the channel. An optional callback may be provided to notify of the success or failure of the operation.
@spec RTL16
@param channel_options [Hash, Ably::Models::ChannelOptions
] A hash of options or a {Ably::Models::ChannelOptions} @return [Ably::Models::ChannelOptions]
# File lib/submodules/ably-ruby/lib/ably/realtime/channel.rb, line 365 def set_options(channel_options) @options = Ably::Models::ChannelOptions(channel_options) # RTL4i manager.request_reattach if (need_reattach? and connection.state?(:connected)) end
Registers a listener for messages on this channel. The caller supplies a listener function, which is called each time one or more messages arrives on the channel. A callback may optionally be passed in to this call to be notified of success or failure of the channel {Ably::Realtime::Channel#attach} operation.
@spec RTL7a
@param names [String] The event name of the message to subscribe to if provided. Defaults to all events. @yield [Ably::Models::Message] For each message received, the block is called
@return [void]
Ably::Modules::MessageEmitter#subscribe
# File lib/submodules/ably-ruby/lib/ably/realtime/channel.rb, line 227 def subscribe(*names, &callback) attach unless attached? || attaching? super end
Deregisters the given listener for the specified event name(s). This removes an earlier event-specific subscription.
@spec RTL8a
@param names [String] The event name of the message to subscribe to if provided. Defaults to all events.
@return [void]
Ably::Modules::MessageEmitter#unsubscribe
# File lib/submodules/ably-ruby/lib/ably/realtime/channel.rb, line 240 def unsubscribe(*names, &callback) super end
Private Instance Methods
Alias useful for methods with a name argument
# File lib/submodules/ably-ruby/lib/ably/realtime/channel.rb, line 430 def channel_name name end
# File lib/submodules/ably-ruby/lib/ably/realtime/channel.rb, line 421 def connection client.connection end
# File lib/submodules/ably-ruby/lib/ably/realtime/channel.rb, line 417 def rest_channel client.rest_client.channel(name) end
# File lib/submodules/ably-ruby/lib/ably/realtime/channel.rb, line 408 def setup_event_handlers __incoming_msgbus__.subscribe(:message) do |message| message.decode(client.encoders, options) do |encode_error, error_message| client.logger.error error_message end emit_message message.name, message end end
# File lib/submodules/ably-ruby/lib/ably/realtime/channel.rb, line 425 def setup_presence @presence ||= Presence.new(self) end