class LogStash::Codecs::Zylog

This codec converts protobuf encoded messages into logstash events and vice versa.

Requires the protobuf definitions as ruby files. You can create those using the [ruby-protoc compiler](github.com/codekitchen/ruby-protocol-buffers).

The following shows a usage example for decoding events from a kafka stream:

source,ruby

kafka {

zk_connect => "127.0.0.1"
topic_id => "your_topic_goes_here"
codec => protobuf 
{
  class_name => "Animal::Unicorn"
  include_path => ['/path/to/protobuf/definitions/UnicornProtobuf.pb.rb']
}

}

Public Instance Methods

decode(data) { |event| ... } click to toggle source
# File lib/logstash/codecs/zylog.rb, line 77
def decode(data)
  begin
    @logger.debug("data class type: #{data.class}.")
    @logger.debug("receive data length: #{data.length}.")
    if data.length >= 5
      payload = data.bytes.to_a
      @logger.debug("receive data byte 1th-5th: #{payload[0]} #{payload[1]} #{payload[2]} #{payload[3]} #{payload[4]}")
      total_packet_length = (payload[4].ord) + (payload[3].ord << 8) + (payload[2].ord << 16) + (payload[1].ord << 24)
      @logger.debug("total packet length: #{total_packet_length}.")
      if data.length >= total_packet_length
        data = data.slice(5, total_packet_length - 5 - 1)
        @logger.debug("after extract receive data length: #{data.length}.")
      else
        raise(StandardError, "packet to small, length: #{data.length}.")
      end
    else
      raise(StandardError, "packet to small, length: #{data.length}.")
    end

    decoded = @obj.parse(data.to_s)
    yield LogStash::Event.new(decoded.to_hash) if block_given?
  rescue => e
    @logger.warn("Couldn't decode protobuf: #{e.inspect}.")
    # raise e
  end
end
encode(event) click to toggle source
# File lib/logstash/codecs/zylog.rb, line 105
def encode(event)
  protobytes = generate_protobuf(event)
  @on_event.call(event, protobytes)
end
register() click to toggle source
# File lib/logstash/codecs/zylog.rb, line 69
def register
  @pb_metainfo = {}
  include_path.each { |path| require_pb_path(path) }
  @obj = create_object_from_name(class_name)
  @logger.debug("Zylog files successfully loaded.")
end

Private Instance Methods

_encode(datahash, class_name) click to toggle source
# File lib/logstash/codecs/zylog.rb, line 125
def _encode(datahash, class_name)
  fields = prepare_for_encoding(datahash)    
  meta = get_complex_types(class_name) # returns a hash with member names and their protobuf class names
  meta.map do | (k,typeinfo) |
    if fields.include?(k)
      original_value = fields[k] 
      proto_obj = create_object_from_name(typeinfo)
      fields[k] = 
        if original_value.is_a?(::Array)
          # make this field an array/list of protobuf objects
          # value is a list of hashed complex objects, each of which needs to be protobuffed and
          # put back into the list.
          original_value.map { |x| _encode(x, typeinfo) } 
          original_value
        else 
          recursive_fix = _encode(original_value, class_name)
          proto_obj.new(recursive_fix)
        end # if is array
    end
  end    
  fields
end
convert_to_string?(v) click to toggle source
# File lib/logstash/codecs/zylog.rb, line 158
def convert_to_string?(v)
  !(v.is_a?(Fixnum) || v.is_a?(::Hash) || v.is_a?(::Array) || [true, false].include?(v))
end
create_object_from_name(name) click to toggle source
# File lib/logstash/codecs/zylog.rb, line 168
def create_object_from_name(name)
  begin
    @logger.debug("Creating instance of " + name)
    name.split('::').inject(Object) { |n,c| n.const_get c }
   end
end
generate_protobuf(event) click to toggle source
# File lib/logstash/codecs/zylog.rb, line 112
def generate_protobuf(event)
  begin
    data = _encode(event, @class_name)
    msg = @obj.new(data)
    msg.serialize_to_string
  rescue NoMethodError
    @logger.debug("error 2: NoMethodError. Maybe mismatching protobuf definition. Required fields are: " + event.to_hash.keys.join(", "))
  rescue => e
    @logger.debug("Couldn't generate protobuf: ${e}")
  end
end
get_complex_types(class_name) click to toggle source
# File lib/logstash/codecs/zylog.rb, line 175
def get_complex_types(class_name)
  @pb_metainfo[class_name]
end
prepare_for_encoding(datahash) click to toggle source
# File lib/logstash/codecs/zylog.rb, line 149
def prepare_for_encoding(datahash)
  # the data cannot be encoded until certain criteria are met:
  # 1) remove @ signs from keys
  # 2) convert timestamps and other objects to strings
  next unless datahash.is_a?(::Hash)    
  ::Hash[datahash.map{|(k,v)| [remove_atchar(k.to_s), (convert_to_string?(v) ? v.to_s : v)] }]
end
remove_atchar(key) click to toggle source
# File lib/logstash/codecs/zylog.rb, line 163
def remove_atchar(key) # necessary for @timestamp fields and the likes. Zylog definition doesn't handle @ in field names well.
  key.dup.gsub(/@/,'')
end
require_pb_path(dir_or_file) click to toggle source
# File lib/logstash/codecs/zylog.rb, line 219
def require_pb_path(dir_or_file)
  f = dir_or_file.end_with? ('.rb')
  begin
    if f
      @logger.debug("Including protobuf file: " + dir_or_file)
      require_with_metadata_analysis dir_or_file
    else 
      Dir[ dir_or_file + '/*.rb'].each { |file|
        @logger.debug("Including protobuf path: " + dir_or_file + "/" + file)
        require_with_metadata_analysis file 
      }
    end
  end
end
require_with_metadata_analysis(filename) click to toggle source
# File lib/logstash/codecs/zylog.rb, line 179
def require_with_metadata_analysis(filename)
  require filename
  regex_class_name = /\s*class\s*(?<name>.+?)\s+/
  regex_module_name = /\s*module\s*(?<name>.+?)\s+/
  regex_pbdefs = /\s*(optional|repeated)(\s*):(?<type>.+),(\s*):(?<name>\w+),(\s*)(?<position>\d+)/
  # now we also need to find out which class it contains and the protobuf definitions in it.
  # We'll unfortunately need that later so that we can create nested objects.
  begin 
    class_name = ""
    type = ""
    field_name = ""
    classname_found = false
    File.readlines(filename).each do |line|
      if ! (line =~ regex_module_name).nil? && !classname_found # because it might be declared twice in the file
        class_name << $1 
        class_name << "::"
  
      end
      if ! (line =~ regex_class_name).nil? && !classname_found # because it might be declared twice in the file
        class_name << $1
        @pb_metainfo[class_name] = {}
        classname_found = true
      end
      if ! (line =~ regex_pbdefs).nil?
        type = $1
        field_name = $2
        if type =~ /::/
          @pb_metainfo[class_name][field_name] = type.gsub!(/^:/,"")
          
        end
      end
    end
  rescue Exception => e
    @logger.warn("error 3: unable to read pb definition from file  " + filename+ ". Reason: #{e.inspect}. Last settings were: class #{class_name} field #{field_name} type #{type}. Backtrace: " + e.backtrace.inspect.to_s)
  end
  if class_name.nil?
    @logger.warn("error 4: class name not found in file  " + filename)
  end    
end