module Legion::Transport::Connection

Public Class Methods

channel() click to toggle source
# File lib/legion/transport/connection.rb, line 79
def channel # rubocop:disable Metrics/AbcSize
  return @channel_thread.value if !@channel_thread.value.nil? && @channel_thread.value.open?

  @channel_thread.value = session.create_channel(nil, settings[:channel][:default_worker_pool_size], false, 10)
  if Legion::Transport::TYPE == 'march_hare'
    @channel_thread.value.basic_qos(settings[:prefetch])
  else
    @channel_thread.value.prefetch(settings[:prefetch])
  end
  @channel_thread.value
end
channel_open?() click to toggle source
# File lib/legion/transport/connection.rb, line 100
def channel_open?
  channel.open?
rescue StandardError
  false
end
channel_thread() click to toggle source
# File lib/legion/transport/connection.rb, line 96
def channel_thread
  channel
end
connector() click to toggle source
# File lib/legion/transport/connection.rb, line 15
def connector
  Legion::Transport::CONNECTOR
end
new() click to toggle source
# File lib/legion/transport/connection.rb, line 11
def new
  clone
end
reconnect(connection_name: 'Legion', **) click to toggle source
# File lib/legion/transport/connection.rb, line 19
def reconnect(connection_name: 'Legion', **)
  @session = nil
  @channel_thread = Concurrent::ThreadLocalVar.new(nil)
  setup(connection_name: connection_name)
end
session() click to toggle source
# File lib/legion/transport/connection.rb, line 91
def session
  nil if @session.nil?
  @session.value
end
session_open?() click to toggle source
# File lib/legion/transport/connection.rb, line 106
def session_open?
  session.open?
rescue StandardError
  false
end
settings() click to toggle source
# File lib/legion/transport/connection.rb, line 7
def settings
  Legion::Settings[:transport]
end
setup(connection_name: 'Legion', **) click to toggle source
# File lib/legion/transport/connection.rb, line 25
def setup(connection_name: 'Legion', **) # rubocop:disable Metrics/AbcSize,Metrics/MethodLength
  Legion::Transport.logger.info("Using transport connector: #{Legion::Transport::CONNECTOR}")

  if @session.respond_to?(:value) && session.respond_to?(:closed?) && session.closed?
    @channel_thread = Concurrent::ThreadLocalVar.new(nil)
  elsif @session.respond_to?(:value) && session.respond_to?(:closed?) && session.open?
    nil
  elsif Legion::Transport::TYPE == 'march_hare'
    @session ||= Concurrent::AtomicReference.new(
      MarchHare.connect(host: settings[:connection][:host],
                        vhost: settings[:connection][:vhost],
                        user: settings[:connection][:user],
                        password: settings[:connection][:password],
                        port: settings[:connection][:port])
    )
    @channel_thread = Concurrent::ThreadLocalVar.new(nil)
    session.start
    session.create_channel.basic_qos(settings[:prefetch])
    Legion::Settings[:transport][:connected] = true
  else
    @session ||= Concurrent::AtomicReference.new(
      connector.new(
        Legion::Settings[:transport][:connection],
        connection_name: connection_name,
        logger: Legion::Transport.logger,
        log_level: :info
      )
    )
    @channel_thread = Concurrent::ThreadLocalVar.new(nil)
    session.start
    session.create_channel(nil, settings[:channel][:session_worker_pool_size])
           .basic_qos(settings[:prefetch], true)
    Legion::Settings[:transport][:connected] = true
  end

  if session.respond_to? :on_blocked
    session.on_blocked { Legion::Transport.logger.warn('Legion::Transport is being blocked by RabbitMQ!') }
  end

  if session.respond_to? :on_unblocked
    session.on_unblocked do
      Legion::Transport.logger.info('Legion::Transport is no longer being blocked by RabbitMQ')
    end
  end

  if session.respond_to? :after_recovery_completed
    session.after_recovery_completed do
      Legion::Transport.logger.info('Legion::Transport has completed recovery')
    end
  end

  true
end
shutdown() click to toggle source
# File lib/legion/transport/connection.rb, line 112
def shutdown
  session.close
  @session = nil
end