class Thrift::AMQPClientTransport
Public Class Methods
from_channel(channel, exchange_name, routing_key)
click to toggle source
# File lib/thrift/amqp/client.rb, line 11 def from_channel(channel, exchange_name, routing_key) new(nil, exchange_name, routing_key, channel: channel) end
new(amqp_uri, exchange_name, routing_key, opts = {})
click to toggle source
# File lib/thrift/amqp/client.rb, line 16 def initialize(amqp_uri, exchange_name, routing_key, opts = {}) @outbuf = Bytes.empty_byte_buffer @inbuf_r, @inbuf_w = IO.pipe(binmode: true) @inbuf_w.set_encoding('binary') if opts[:channel] @channel = opts[:channel] else @conn = Bunny.new(amqp_uri) end @opened = false @handle_conn_lifecycle = opts[:channel].nil? @exchange_name = exchange_name @routing_key = routing_key @oneway = opts.fetch(:oneway, false) end
Public Instance Methods
close()
click to toggle source
# File lib/thrift/amqp/client.rb, line 64 def close if open? @reply_queue.delete unless @oneway @channel.close if @handle_conn_lifecycle @opened = false end end
flush()
click to toggle source
# File lib/thrift/amqp/client.rb, line 84 def flush open unless open? @service_exchange.publish( @outbuf, routing_key: @routing_key, correlation_id: generate_uuid, reply_to: @oneway ? '' : @reply_queue.name ) @outbuf = Bytes.empty_byte_buffer end
open()
click to toggle source
# File lib/thrift/amqp/client.rb, line 34 def open return if open? if @channel.nil? || !@channel.open? unless @conn raise TransportException.new( TransportException::NOT_OPEN, 'channel cosed' ) end @conn.start @channel = @conn.create_channel end @service_exchange = @channel.exchange(@exchange_name) unless @oneway @reply_queue = @channel.queue('', auto_delete: true, exclusive: true) @reply_queue.subscribe( block: false, manual_ack: true ) do |delivery_info, properties, payload| @inbuf_w << Bytes.force_binary_encoding(payload) @channel.acknowledge(delivery_info.delivery_tag, false) end end @opened = true end
open?()
click to toggle source
# File lib/thrift/amqp/client.rb, line 72 def open? @opened && @channel && @channel.open? end
read(sz)
click to toggle source
# File lib/thrift/amqp/client.rb, line 76 def read(sz) @inbuf_r.read(sz) end
write(buf)
click to toggle source
# File lib/thrift/amqp/client.rb, line 80 def write(buf) @outbuf << Bytes.force_binary_encoding(buf) end
Protected Instance Methods
generate_uuid()
click to toggle source
# File lib/thrift/amqp/client.rb, line 99 def generate_uuid SecureRandom.hex(13) end