class Message::PulsarClient
Public Class Methods
new()
click to toggle source
# File lib/fluent/plugin/pulsar_client/PulsarClient.rb, line 37 def initialize() @client_created_id = 1 @request_id = 1 @producer_id = 1 end
Public Instance Methods
ack(consumer_id, ledgerId, entryId)
click to toggle source
# File lib/fluent/plugin/pulsar_client/PulsarClient.rb, line 370 def ack(consumer_id, ledgerId, entryId) command_ack(consumer_id, ledgerId, entryId) end
close()
click to toggle source
# File lib/fluent/plugin/pulsar_client/PulsarClient.rb, line 344 def close() close_socket() end
close_socket()
click to toggle source
# File lib/fluent/plugin/pulsar_client/PulsarClient.rb, line 49 def close_socket() @sock.close end
command_ack(consumer_id, ledgerId, entryId)
click to toggle source
# File lib/fluent/plugin/pulsar_client/PulsarClient.rb, line 318 def command_ack(consumer_id, ledgerId, entryId) base_command = Pulsar::Proto::BaseCommand.new( :type => Pulsar::Proto::BaseCommand::Type::ACK ) base_command.ack = Pulsar::Proto::CommandAck.new base_command.ack.consumer_id = consumer_id base_command.ack.ack_type = Pulsar::Proto::CommandAck::AckType::Individual base_command.ack.message_id = [Pulsar::Proto::MessageIdData.new( :ledgerId => ledgerId, :entryId => entryId )] byte_cmd = base_command.serialize_to_string() total_length = byte_cmd.length + 4 total_frame = [total_length].pack('N') + [byte_cmd.length].pack('N') + byte_cmd @sock.write(total_frame) end
command_connect()
click to toggle source
# File lib/fluent/plugin/pulsar_client/PulsarClient.rb, line 53 def command_connect() base_command = Pulsar::Proto::BaseCommand.new( :type => Pulsar::Proto::BaseCommand::Type::CONNECT ) base_command.connect = Pulsar::Proto::CommandConnect.new base_command.connect.client_version = "ruby-client-0.0.2" base_command.connect.protocol_version = 6 byte_cmd = base_command.serialize_to_string() total_length = byte_cmd.length + 4 total_frame = [total_length].pack('N') + [byte_cmd.length].pack('N') + byte_cmd @sock.write(total_frame) recv_length = @sock.read(4).unpack('N')[0] recv_cmd_length = @sock.read(4).unpack('N')[0] recv_cmd_byte = @sock.read(recv_cmd_length) recv_cmd = Pulsar::Proto::BaseCommand.new recv_cmd.parse_from_string(recv_cmd_byte) if recv_cmd.type == Pulsar::Proto::BaseCommand::Type::CONNECTED then return true else return false end end
command_flow()
click to toggle source
# File lib/fluent/plugin/pulsar_client/PulsarClient.rb, line 252 def command_flow() base_command = Pulsar::Proto::BaseCommand.new( :type => Pulsar::Proto::BaseCommand::Type::FLOW ) base_command.flow = Pulsar::Proto::CommandFlow.new base_command.flow.consumer_id = @client_created_id base_command.flow.messagePermits = 1000 byte_cmd = base_command.serialize_to_string() total_length = byte_cmd.length + 4 total_frame = [total_length].pack('N') + [byte_cmd.length].pack('N') + byte_cmd @sock.write(total_frame) end
command_lookup(topic)
click to toggle source
# File lib/fluent/plugin/pulsar_client/PulsarClient.rb, line 169 def command_lookup(topic) base_command = Pulsar::Proto::BaseCommand.new( :type => Pulsar::Proto::BaseCommand::Type::LOOKUP ) base_command.lookupTopic = Pulsar::Proto::CommandLookupTopic.new base_command.lookupTopic.topic = topic base_command.lookupTopic.request_id = @request_id byte_cmd = base_command.serialize_to_string() total_length = byte_cmd.length + 4 total_frame = [total_length].pack('N') + [byte_cmd.length].pack('N') + byte_cmd @sock.write(total_frame) recv_length = @sock.read(4).unpack('N')[0] recv_cmd_length = @sock.read(4).unpack('N')[0] recv_cmd_byte = @sock.read(recv_cmd_length) recv_cmd = Pulsar::Proto::BaseCommand.new recv_cmd.parse_from_string(recv_cmd_byte) if recv_cmd.type == Pulsar::Proto::BaseCommand::Type::LOOKUP_RESPONSE then return true else print("type:" + recv_cmd.type.to_s + "\n") return false end end
command_message()
click to toggle source
# File lib/fluent/plugin/pulsar_client/PulsarClient.rb, line 270 def command_message() recv_length = @sock.read(4).unpack('N')[0] recv_cmd_length = @sock.read(4).unpack('N')[0] recv_cmd_byte = @sock.read(recv_cmd_length) recv_cmd = Pulsar::Proto::BaseCommand.new recv_cmd.parse_from_string(recv_cmd_byte) if recv_cmd.type == Pulsar::Proto::BaseCommand::Type::MESSAGE then consumer_id = recv_cmd.message.consumer_id ledgerId = recv_cmd.message.message_id.ledgerId entryId = recv_cmd.message.message_id.entryId recv_meta_length = 0 recv_meta_byte = nil recv_magic = @sock.read(2) if recv_magic == [0x0e, 0x01].pack('C*') then recv_crc = @sock.read(4) recv_meta_length = @sock.read(4).unpack('N')[0] recv_meta_byte = @sock.read(recv_meta_length) else recv_remain = @sock.read(2) recv_meta_length = (recv_magic + recv_remain).unpack('N')[0] recv_meta_byte = @sock.read(recv_meta_length) end payload_length = recv_length - (4 + recv_cmd_length + 4 + recv_meta_length) message = @sock.read(payload_length) recv_message = PulsarMessage.new recv_message.client_created_id = consumer_id recv_message.message_ledger_id = ledgerId recv_message.message_entry_id = entryId recv_message.message = message return recv_message end end
command_producer(topic)
click to toggle source
# File lib/fluent/plugin/pulsar_client/PulsarClient.rb, line 86 def command_producer(topic) base_command = Pulsar::Proto::BaseCommand.new( :type => Pulsar::Proto::BaseCommand::Type::PRODUCER ) base_command.producer = Pulsar::Proto::CommandProducer.new base_command.producer.topic = topic base_command.producer.producer_id = @producer_id base_command.producer.request_id = @request_id byte_cmd = base_command.serialize_to_string() total_length = byte_cmd.length + 4 total_frame = [total_length].pack('N') + [byte_cmd.length].pack('N') + byte_cmd @sock.write(total_frame) recv_length = @sock.read(4).unpack('N')[0] recv_cmd_length = @sock.read(4).unpack('N')[0] recv_cmd_byte = @sock.read(recv_cmd_length) recv_cmd = Pulsar::Proto::BaseCommand.new recv_cmd.parse_from_string(recv_cmd_byte) if recv_cmd.type == Pulsar::Proto::BaseCommand::Type::PRODUCER_SUCCESS then return true else return false end end
command_send(producer_name, num_messages, message)
click to toggle source
# File lib/fluent/plugin/pulsar_client/PulsarClient.rb, line 120 def command_send(producer_name, num_messages, message) base_command = Pulsar::Proto::BaseCommand.new( :type => Pulsar::Proto::BaseCommand::Type::SEND ) base_command.send = Pulsar::Proto::CommandSend.new base_command.send.producer_id = @producer_id base_command.send.sequence_id = 0 base_command.send.num_messages = num_messages byte_cmd = base_command.serialize_to_string() magic_number= [0x0e, 0x01].pack('C*') metadata = Pulsar::Proto::MessageMetadata.new metadata.producer_name = producer_name metadata.sequence_id = 0 metadata.publish_time = Time.now.to_i * 1000 byte_meta = metadata.serialize_to_string() meta_payload = [byte_meta.length].pack('N') + byte_meta + message.bytes.pack('C*') crc = Digest::CRC32c.new crc << meta_payload checksum = crc.checksum total_length = 4 + byte_cmd.length + 6 + meta_payload.length total_frame = [total_length].pack('N') + [byte_cmd.length].pack('N') + byte_cmd + magic_number + [checksum].pack('N') + meta_payload @sock.write(total_frame) recv_length = @sock.read(4).unpack('N')[0] recv_cmd_length = @sock.read(4).unpack('N')[0] recv_cmd_byte = @sock.read(recv_cmd_length) recv_cmd = Pulsar::Proto::BaseCommand.new recv_cmd.parse_from_string(recv_cmd_byte) if recv_cmd.type == Pulsar::Proto::BaseCommand::Type::PRODUCER_SUCCESS then return true else return false end end
command_subscribe(topic, subscription, sub_type)
click to toggle source
# File lib/fluent/plugin/pulsar_client/PulsarClient.rb, line 204 def command_subscribe(topic, subscription, sub_type) base_command = Pulsar::Proto::BaseCommand.new( :type => Pulsar::Proto::BaseCommand::Type::SUBSCRIBE ) base_command.subscribe = Pulsar::Proto::CommandSubscribe.new base_command.subscribe.topic = topic base_command.subscribe.subscription = subscription if sub_type == 1 then base_command.subscribe.subType = Pulsar::Proto::CommandSubscribe::SubType::Exclusive elsif sub_type == 2 then base_command.subscribe.subType = Pulsar::Proto::CommandSubscribe::SubType::Shared elsif sub_type == 3 then base_command.subscribe.subType = Pulsar::Proto::CommandSubscribe::SubType::Failover end base_command.subscribe.consumer_id = @client_created_id base_command.subscribe.request_id = @request_id byte_cmd = base_command.serialize_to_string() total_length = byte_cmd.length + 4 total_frame = [total_length].pack('N') + [byte_cmd.length].pack('N') + byte_cmd @sock.write(total_frame) recv_length = @sock.read(4).unpack('N')[0] recv_cmd_length = @sock.read(4).unpack('N')[0] recv_cmd_byte = @sock.read(recv_cmd_length) recv_cmd = Pulsar::Proto::BaseCommand.new recv_cmd.parse_from_string(recv_cmd_byte) if recv_cmd.type == Pulsar::Proto::BaseCommand::Type::CONNECTED then return true elsif recv_cmd.type == Pulsar::Proto::BaseCommand::Type::SUCCESS then return true else print("type:" + recv_cmd.type.to_s + "\n") return false end end
connect(host, port)
click to toggle source
# File lib/fluent/plugin/pulsar_client/PulsarClient.rb, line 339 def connect(host, port) connect_socket(host, port) command_connect() end
connect_socket(host, port)
click to toggle source
# File lib/fluent/plugin/pulsar_client/PulsarClient.rb, line 43 def connect_socket(host, port) @sock = Socket.new(Socket::AF_INET, Socket::SOCK_STREAM, 0) sockaddr = Socket.sockaddr_in(port, host) @sock.connect(sockaddr) end
get_message()
click to toggle source
# File lib/fluent/plugin/pulsar_client/PulsarClient.rb, line 365 def get_message() m = command_message() return m end
send(topic, producer_name, num_messages, message)
click to toggle source
# File lib/fluent/plugin/pulsar_client/PulsarClient.rb, line 348 def send(topic, producer_name, num_messages, message) s = command_producer(topic) # try again if s == false then command_lookup(topic) command_producer(topic) end command_send(producer_name, num_messages, message) end
subscribe(topic, subscription, sub_type)
click to toggle source
# File lib/fluent/plugin/pulsar_client/PulsarClient.rb, line 359 def subscribe(topic, subscription, sub_type) command_lookup(topic) command_subscribe(topic, subscription, sub_type) command_flow() end