class Fleck::Client
Attributes
local_ip[R]
remote_ip[R]
Public Class Methods
new(connection, queue_name = "", exchange_type: :direct, exchange_name: "", multiple_responses: false, concurrency: 1)
click to toggle source
# File lib/fleck/client.rb, line 8 def initialize(connection, queue_name = "", exchange_type: :direct, exchange_name: "", multiple_responses: false, concurrency: 1) @connection = connection @queue_name = queue_name @multiple_responses = multiple_responses @default_timeout = multiple_responses ? 60 : nil @concurrency = [concurrency.to_i, 1].max @requests = ThreadSafe::Hash.new @subscriptions = ThreadSafe::Array.new @terminated = false @mutex = Mutex.new @local_ip = @connection.transport.socket.local_address.ip_address @remote_ip = @connection.transport.socket.remote_address.ip_address @channel = @connection.create_channel @exchange = Bunny::Exchange.new(@channel, :direct, 'fleck') @publisher = Bunny::Exchange.new(@connection.create_channel, exchange_type, exchange_name) @reply_queue = @channel.queue("", exclusive: true, auto_delete: true) @reply_queue.bind(@exchange, routing_key: @reply_queue.name) handle_returned_messages! @concurrency.times { handle_responses! } logger.debug("Client initialized!") at_exit do terminate end end
Public Instance Methods
publish(data, options)
click to toggle source
# File lib/fleck/client.rb, line 62 def publish(data, options) return if @terminated @mutex.synchronize { @publisher.publish(data, options) } end
remove_request(request_id)
click to toggle source
# File lib/fleck/client.rb, line 68 def remove_request(request_id) @requests.delete request_id end
request(action: nil, version: nil, headers: {}, params: {}, async: @multiple_responses || false, timeout: @default_timeout, queue: @queue_name, rmq_options: {}, &block)
click to toggle source
# File lib/fleck/client.rb, line 37 def request(action: nil, version: nil, headers: {}, params: {}, async: @multiple_responses || false, timeout: @default_timeout, queue: @queue_name, rmq_options: {}, &block) if @terminated return Fleck::Client::Response.new(Oj.dump({status: 503, errors: ['Service Unavailable'], body: nil} , mode: :compat)) end request = Fleck::Client::Request.new( self, queue, @reply_queue.name, action: action, version: version, headers: headers, params: params, timeout: timeout, multiple_responses: @multiple_responses, rmq_options: rmq_options, &block ) @requests[request.id] = request request.send!(async) return request.response end
terminate()
click to toggle source
# File lib/fleck/client.rb, line 73 def terminate @terminated = true logger.info "Unsubscribing from #{@reply_queue.name}" @subscriptions.map(&:cancel) # stop receiving new messages logger.info "Canceling pending requests" # cancel pending requests while item = @requests.shift do begin item[1].cancel! rescue => e logger.error e.inspect + "\n" + e.backtrace.join("\n") end end end
Protected Instance Methods
handle_responses!()
click to toggle source
# File lib/fleck/client.rb, line 105 def handle_responses! @subscriptions << @reply_queue.subscribe do |delivery_info, metadata, payload| begin logger.debug "Response received: #{payload}" request = @requests[metadata[:correlation_id]] if request request.response = Fleck::Client::Response.new(payload) else logger.warn "Request #{metadata[:correlation_id]} not found!" end rescue => e logger.error e.inspect + "\n" + e.backtrace.join("\n") end end end
handle_returned_messages!()
click to toggle source
# File lib/fleck/client.rb, line 91 def handle_returned_messages! @exchange.on_return do |return_info, metadata, content| begin logger.warn "Request #{metadata[:correlation_id]} returned" request = @requests[metadata[:correlation_id]] if request request.cancel! end rescue => e logger.error e.inspect + "\n" + e.backtrace.join("\n") end end end