module ActivePublisher::Connection

Constants

CONNECTION_MUTEX

Public Class Methods

connected?() click to toggle source
# File lib/active_publisher/connection.rb, line 7
def self.connected?
  connection.try(:connected?)
end
connection() click to toggle source
# File lib/active_publisher/connection.rb, line 11
def self.connection
  CONNECTION_MUTEX.synchronize do
    return @connection if @connection
    @connection = create_connection
  end
end
disconnect!() click to toggle source
# File lib/active_publisher/connection.rb, line 18
def self.disconnect!
  CONNECTION_MUTEX.synchronize do
    if @connection && @connection.connected?
      @connection.close
    end

    @connection = nil
  end
rescue Timeout::Error
  # No-op ... this happens sometimes on MRI disconnect
end

Private Class Methods

connection_options() click to toggle source
# File lib/active_publisher/connection.rb, line 55
def self.connection_options
  {
    :automatically_recover         => true,
    :continuation_timeout          => ::ActivePublisher.configuration.timeout * 1_000.0, #convert sec to ms
    :heartbeat                     => ::ActivePublisher.configuration.heartbeat,
    :hosts                         => ::ActivePublisher.configuration.hosts,
    :network_recovery_interval     => ::ActivePublisher.configuration.network_recovery_interval,
    :pass                          => ::ActivePublisher.configuration.password,
    :port                          => ::ActivePublisher.configuration.port,
    :recover_from_connection_close => true,
    :tls                           => ::ActivePublisher.configuration.tls,
    :tls_ca_certificates           => ::ActivePublisher.configuration.tls_ca_certificates,
    :tls_cert                      => ::ActivePublisher.configuration.tls_cert,
    :tls_key                       => ::ActivePublisher.configuration.tls_key,
    :user                          => ::ActivePublisher.configuration.username,
    :verify_peer                   => ::ActivePublisher.configuration.verify_peer,
  }
end
create_connection() click to toggle source

Private API

# File lib/active_publisher/connection.rb, line 31
def self.create_connection
  if ::RUBY_PLATFORM == "java"
    connection = ::MarchHare.connect(connection_options)
    connection.on_blocked do |reason|
      on_blocked(reason)
    end
    connection.on_unblocked do
      on_unblocked
    end
    connection
  else
    connection = ::Bunny.new(connection_options)
    connection.start
    connection.on_blocked do |blocked_message|
      on_blocked(blocked_message.reason)
    end
    connection.on_unblocked do
      on_unblocked
    end
    connection
  end
end
on_blocked(reason) click to toggle source
# File lib/active_publisher/connection.rb, line 75
def self.on_blocked(reason)
  ::ActiveSupport::Notifications.instrument("connection_blocked.active_publisher", :reason => reason)
end
on_unblocked() click to toggle source
# File lib/active_publisher/connection.rb, line 80
def self.on_unblocked
  ::ActiveSupport::Notifications.instrument("connection_unblocked.active_publisher")
end