module Legion::Transport::Connection
Public Class Methods
channel()
click to toggle source
# File lib/legion/transport/connection.rb, line 79 def channel # rubocop:disable Metrics/AbcSize return @channel_thread.value if !@channel_thread.value.nil? && @channel_thread.value.open? @channel_thread.value = session.create_channel(nil, settings[:channel][:default_worker_pool_size], false, 10) if Legion::Transport::TYPE == 'march_hare' @channel_thread.value.basic_qos(settings[:prefetch]) else @channel_thread.value.prefetch(settings[:prefetch]) end @channel_thread.value end
channel_open?()
click to toggle source
# File lib/legion/transport/connection.rb, line 100 def channel_open? channel.open? rescue StandardError false end
channel_thread()
click to toggle source
# File lib/legion/transport/connection.rb, line 96 def channel_thread channel end
connector()
click to toggle source
# File lib/legion/transport/connection.rb, line 15 def connector Legion::Transport::CONNECTOR end
new()
click to toggle source
# File lib/legion/transport/connection.rb, line 11 def new clone end
reconnect(connection_name: 'Legion', **)
click to toggle source
# File lib/legion/transport/connection.rb, line 19 def reconnect(connection_name: 'Legion', **) @session = nil @channel_thread = Concurrent::ThreadLocalVar.new(nil) setup(connection_name: connection_name) end
session()
click to toggle source
# File lib/legion/transport/connection.rb, line 91 def session nil if @session.nil? @session.value end
session_open?()
click to toggle source
# File lib/legion/transport/connection.rb, line 106 def session_open? session.open? rescue StandardError false end
settings()
click to toggle source
# File lib/legion/transport/connection.rb, line 7 def settings Legion::Settings[:transport] end
setup(connection_name: 'Legion', **)
click to toggle source
# File lib/legion/transport/connection.rb, line 25 def setup(connection_name: 'Legion', **) # rubocop:disable Metrics/AbcSize,Metrics/MethodLength Legion::Transport.logger.info("Using transport connector: #{Legion::Transport::CONNECTOR}") if @session.respond_to?(:value) && session.respond_to?(:closed?) && session.closed? @channel_thread = Concurrent::ThreadLocalVar.new(nil) elsif @session.respond_to?(:value) && session.respond_to?(:closed?) && session.open? nil elsif Legion::Transport::TYPE == 'march_hare' @session ||= Concurrent::AtomicReference.new( MarchHare.connect(host: settings[:connection][:host], vhost: settings[:connection][:vhost], user: settings[:connection][:user], password: settings[:connection][:password], port: settings[:connection][:port]) ) @channel_thread = Concurrent::ThreadLocalVar.new(nil) session.start session.create_channel.basic_qos(settings[:prefetch]) Legion::Settings[:transport][:connected] = true else @session ||= Concurrent::AtomicReference.new( connector.new( Legion::Settings[:transport][:connection], connection_name: connection_name, logger: Legion::Transport.logger, log_level: :info ) ) @channel_thread = Concurrent::ThreadLocalVar.new(nil) session.start session.create_channel(nil, settings[:channel][:session_worker_pool_size]) .basic_qos(settings[:prefetch], true) Legion::Settings[:transport][:connected] = true end if session.respond_to? :on_blocked session.on_blocked { Legion::Transport.logger.warn('Legion::Transport is being blocked by RabbitMQ!') } end if session.respond_to? :on_unblocked session.on_unblocked do Legion::Transport.logger.info('Legion::Transport is no longer being blocked by RabbitMQ') end end if session.respond_to? :after_recovery_completed session.after_recovery_completed do Legion::Transport.logger.info('Legion::Transport has completed recovery') end end true end
shutdown()
click to toggle source
# File lib/legion/transport/connection.rb, line 112 def shutdown session.close @session = nil end