class PulsarSdk::Client::ConnectionPool
Public Class Methods
new(opts)
click to toggle source
# File lib/pulsar_sdk/client/connection_pool.rb, line 6 def initialize(opts) raise "opts expected a PulsarSdk::Options::Connection got #{opts.class}" unless opts.is_a?(PulsarSdk::Options::Connection) @mutex = Mutex.new @pool = ::PulsarSdk::Tweaks::WaitMap.new @options = opts @keepalive = opts.keepalive @connection_timeout = opts.connection_timeout @authentication = opts.auth_provider @tls_options = opts.tls_options instance_variables.each do |x| remove_instance_variable(x) if instance_variable_get(x).nil? end end
Public Instance Methods
close()
click to toggle source
# File lib/pulsar_sdk/client/connection_pool.rb, line 72 def close @pool.clear do |_, v| v.close end end
fetch(logical_addr, physical_addr)
click to toggle source
# File lib/pulsar_sdk/client/connection_pool.rb, line 24 def fetch(logical_addr, physical_addr) id = (logical_addr || physical_addr).to_s raise 'logical_addr and physical_addr both empty!' if id.empty? conn = nil @mutex.synchronize do conn = @pool.find(id) if conn.nil? || conn.closed? # REMOVE closed conncetion from pool @pool.delete(id, 0.01) unless conn.nil? opts = @options.dup opts.assign_attributes( logical_addr: logical_addr, physical_addr: physical_addr ) conn = @pool.add(id, ::PulsarSdk::Client::Connection.establish(opts)) end end conn end
run_checker()
click to toggle source
# File lib/pulsar_sdk/client/connection_pool.rb, line 49 def run_checker Thread.new do loop do begin @pool.each do |_k, v| last_ping_at, last_received_at = v.active_status case when last_ping_at - last_received_at >= @keepalive * 2 v.close when last_ping_at - last_received_at > @keepalive v.ping end end rescue => exp PulsarSdk.logger.error(exp) end sleep(1) end end end