class Fluent::Plugin::ProtobufHttpInput
Implementation of HTTP input plugin for Protobuf
Public Class Methods
new()
click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_protobuf_http.rb, line 53 def initialize super @protos = [] # list of *.proto files @compiled_protos = [] # list of compiled protos i.e. *_pb.rb files @msgclass_lookup = {} # Lookup Hash: { msgtype => msgclass } end
Public Instance Methods
compile_protos()
click to toggle source
# File lib/fluent/plugin/in_protobuf_http.rb, line 61 def compile_protos log.debug("Checking proto_dir [#{@proto_dir}]...") path = File.expand_path(@proto_dir) raise Fluent::ConfigError, "protos_dir does not exist! [#{path}]" unless Dir.exist?(path) @protos = Dir["#{path}/*.proto"] raise Fluent::ConfigError, "Empty proto_dir! [#{path}]" unless @protos.any? log.info("Compiling .proto files [#{@protos.length}]...") `protoc --ruby_out=#{path} --proto_path=#{path} #{path}/*.proto` raise Fluent::ConfigError, 'Could not compile! See error(s) above.' unless $CHILD_STATUS.success? log.info("Compiled successfully:\n- #{@protos.join("\n- ")}") @protos.each do |proto| @compiled_protos.push(get_compiled_proto(proto)) end log.info("Compiled .proto files:\n- #{@compiled_protos.join("\n- ")}") end
deserialize_msg(msgtype, serialized_msg)
click to toggle source
# File lib/fluent/plugin/in_protobuf_http.rb, line 272 def deserialize_msg(msgtype, serialized_msg) msgclass = @msgclass_lookup[msgtype] log.debug("Deserializing {#{@in_mode}} message of type [#{msgclass}]...") begin case @in_mode when :binary msgclass.decode(serialized_msg) when :json msgclass.decode_json(serialized_msg) end rescue Google::Protobuf::ParseError => e log.error("Incompatible message! [msgtype: #{msgtype}, size: #{serialized_msg.length} bytes] #{e}") nil rescue StandardError => e log.error("Deserializaton failed! Error: #{e}") nil end end
get_compiled_proto(proto)
click to toggle source
# File lib/fluent/plugin/in_protobuf_http.rb, line 84 def get_compiled_proto(proto) proto_suffix = '.proto' compiled_proto_suffix = '_pb.rb' compiled_proto = proto.chomp(proto_suffix) + compiled_proto_suffix raise Fluent::ConfigError, "Compiled proto not found! [#{compiled_proto}]" unless File.file?(compiled_proto) compiled_proto end
get_msg_class(msg_type)
click to toggle source
# File lib/fluent/plugin/in_protobuf_http.rb, line 134 def get_msg_class(msg_type) msg = Google::Protobuf::DescriptorPool.generated_pool.lookup(msg_type) raise Fluent::ConfigError, "Message type ['#{msg_type}'] not registered!'" if msg.nil? msg.msgclass end
get_msg_types(compiled_proto)
click to toggle source
# File lib/fluent/plugin/in_protobuf_http.rb, line 115 def get_msg_types(compiled_proto) log.debug("Extracting message types [#{compiled_proto}]...") msg_types = [] File.foreach(compiled_proto) do |line| if line.lstrip.start_with?('add_message') msg_type = line[/"([^"]*)"/, 1] # regex: <add_message> 'msg_type' <do> msg_types.push(msg_type) unless msg_type.nil? end end if msg_types.any? log.info("Total [#{msg_types.length}] message types in [#{compiled_proto}]:\n- #{msg_types.join("\n- ")}") else log.warn("No message types found! [#{compiled_proto}]") end msg_types end
get_query_params(query_string)
click to toggle source
# File lib/fluent/plugin/in_protobuf_http.rb, line 256 def get_query_params(query_string) if query_string.nil? log.warn("Empty query string! 'msgtype' is required!") return nil end query = WEBrick::HTTPUtils.parse_query(query_string) msgtype = query['msgtype'] log.warn("'msgtype' not found in 'query_string' [#{query_string}]") if msgtype.nil? batch = query['batch'] log.warn("'batch' not found in 'query_string' [#{query_string}]") if batch.nil? [msgtype, batch] end
populate_msgclass_lookup()
click to toggle source
# File lib/fluent/plugin/in_protobuf_http.rb, line 94 def populate_msgclass_lookup @compiled_protos.each do |compiled_proto| msg_types = get_msg_types(compiled_proto) next unless msg_types.any? begin require compiled_proto rescue LoadError => e raise Fluent::ConfigError, "Possible 'import' issue! Use a single self-contianed .proto file! #{e}" end msg_types.each do |msg_type| @msgclass_lookup[msg_type] = get_msg_class(msg_type) end end raise Fluent::ConfigError, "No message types found! Check proto_dir [#{@proto_dir}]!" if @msgclass_lookup.empty? log.info("Registered messages [#{@msgclass_lookup.keys.length}]:\n- #{@msgclass_lookup.keys.join("\n- ")}") end
serialize_msg(msgtype, deserialized_msg)
click to toggle source
# File lib/fluent/plugin/in_protobuf_http.rb, line 291 def serialize_msg(msgtype, deserialized_msg) msgclass = @msgclass_lookup[msgtype] log.debug("Serializing [#{@in_mode} > #{@out_mode}]...") begin case @out_mode when :binary msgclass.encode(deserialized_msg) when :json msgclass.encode_json(deserialized_msg) end rescue StandardError => e log.error("Serialization failed! [msgtype: #{msgtype}, msg: #{deserialized_msg}] Error: #{e}") nil end end
shutdown()
click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_protobuf_http.rb, line 307 def shutdown @compiled_protos.each do |compiled_proto| File.delete(compiled_proto) end super end
start()
click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_protobuf_http.rb, line 141 def start super compile_protos populate_msgclass_lookup # TLS check proto = :tcp tls_opts = nil if @transport_config && @transport_config.protocol == :tls proto = :tls tls_opts = @transport_config.to_h end log.info("Starting protobuf #{proto == :tcp ? 'HTTP' : 'HTTPS'} server [#{@bind}:#{@port}]...") log.debug("TLS configuration:\n#{tls_opts}") if tls_opts http_server_create_http_server(:protobuf_server, addr: @bind, port: @port, logger: log, proto: proto, tls_opts: tls_opts) do |server| server.post("/#{tag}") do |req| peeraddr = "#{req.peeraddr[2]}:#{req.peeraddr[1]}" # ip:port serialized_msg = req.body log.info("[R] {#{@in_mode}} [#{peeraddr}, size: #{serialized_msg.length} bytes]") log.debug("Dumping serialized message [#{serialized_msg.length} bytes]:\n#{serialized_msg}") content_type = req.header['content-type'][0] unless valid_content_type?(content_type) status = "Invalid 'Content-Type' header! [#{content_type}]" log.warn("[X] Message rejected! [#{peeraddr}] #{status}") next [400, { 'Content-Type' => 'application/json', 'Connection' => 'close' }, { 'status' => status }.to_json] end log.debug("[>] Content-Type: #{content_type}") msgtype, batch = get_query_params(req.query_string) unless @msgclass_lookup.key?(msgtype) status = "Invalid 'msgtype' in 'query_string'! [#{msgtype}]" log.warn("[X] Message rejected! [#{peeraddr}] #{status}") next [400, { 'Content-Type' => 'application/json', 'Connection' => 'close' }, { 'status' => status }.to_json] end log.debug("[>] Query parameters: [msgtype: #{content_type}, batch: #{batch}]") deserialized_msg = deserialize_msg(msgtype, serialized_msg) if deserialized_msg.nil? status = "Incompatible message! [msgtype: #{msgtype}, size: #{serialized_msg.length} bytes]" log.warn("[X] Message rejected! [#{peeraddr}] #{status}") next [400, { 'Content-Type' => 'application/json', 'Connection' => 'close' }, { 'status' => status }.to_json] end is_batch = !batch.nil? && batch == 'true' log.debug("[>] Message validated! [msgtype: #{content_type}, is_batch: #{is_batch}]") # Log single message unless is_batch log.info("[S] {#{@in_mode}} [#{peeraddr}, msgtype: #{msgtype}, size: #{serialized_msg.length} bytes]") time = Fluent::Engine.now event_msg = serialize_msg(msgtype, deserialized_msg) record = { 'message' => event_msg } router.emit(@tag, time, record) log.info("[S] {#{@out_mode}} [#{peeraddr}, msgtype: #{msgtype}, size: #{event_msg.length} bytes]") next [200, { 'Content-Type' => 'text/plain' }, nil] end # Log batch messages log.info("[B] {#{@in_mode}} [#{peeraddr}, msgtype: #{msgtype}, size: #{serialized_msg.length} bytes]") if deserialized_msg.type.nil? || deserialized_msg.batch.nil? || deserialized_msg.batch.empty? status = "Invalid 'batch' message! [msgtype: #{msgtype}, size: #{serialized_msg.length} bytes]" log.warn("[X] Message rejected! [#{peeraddr}] #{status}") next [400, { 'Content-Type' => 'application/json', 'Connection' => 'close' }, { 'status' => status }.to_json] end batch_type = deserialized_msg.type batch_msgs = deserialized_msg.batch batch_size = batch_msgs.length log.info("[B] Emitting message stream/batch [batch_size: #{batch_size} messages]...") stream = MultiEventStream.new batch_msgs.each do |batch_msg| time = Fluent::Engine.now record = { 'message' => serialize_msg(batch_type, batch_msg) } stream.add(time, record) end router.emit_stream(@tag, stream) status = "Batch received! [batch_type: #{batch_type}, batch_size: #{batch_size} messages]" log.info("[B] {#{@out_mode}} [#{peeraddr}, msgtype: #{msgtype}] #{status}") [200, { 'Content-Type' => 'application/json', 'Connection' => 'close' }, { 'status' => status }.to_json] end end end
valid_content_type?(content_type)
click to toggle source
# File lib/fluent/plugin/in_protobuf_http.rb, line 242 def valid_content_type?(content_type) hdr_binary = 'application/octet-stream' hdr_json = 'application/json' case @in_mode when :binary content_type == hdr_binary when :json content_type == hdr_json when :binary_and_json content_type == hdr_binary || content_type == hdr_json end end