class LogStash::Inputs::Azure::Processor

Public Class Methods

new(queue, codec, checkpoint_interval, decorator, meta_data) click to toggle source
# File lib/logstash/inputs/processor.rb, line 10
def initialize(queue, codec, checkpoint_interval, decorator, meta_data)
  @queue = queue
  @codec = codec
  @checkpoint_interval = checkpoint_interval
  @decorator = decorator
  @meta_data = meta_data
  @logger = self.logger

end

Public Instance Methods

onClose(context, reason) click to toggle source
# File lib/logstash/inputs/processor.rb, line 24
def onClose(context, reason)
  @logger.info("Event Hub: #{context.getEventHubPath.to_s}, Partition: #{context.getPartitionId.to_s} is closing. (reason=#{reason.to_s})")
end
onError(context, error) click to toggle source
# File lib/logstash/inputs/processor.rb, line 72
def onError(context, error)
  @logger.error("Event Hub: #{context.getEventHubPath.to_s}, Partition: #{context.getPartitionId.to_s} experienced an error #{error.to_s})")
end
onEvents(context, batch) click to toggle source
# File lib/logstash/inputs/processor.rb, line 28
def onEvents(context, batch)
  @logger.debug("Event Hub: #{context.getEventHubPath.to_s}, Partition: #{context.getPartitionId.to_s} is processing a batch of size #{batch.size}.") if @logger.debug?
  last_payload = nil
  batch_size = 0
  batch.each do |payload|
    last_checkpoint = Time.now.to_i
    bytes = payload.getBytes
    batch_size += bytes.size
    @logger.trace("Event Hub: #{context.getEventHubPath.to_s}, Partition: #{context.getPartitionId.to_s}, Offset: #{payload.getSystemProperties.getOffset.to_s},"+
                      " Sequence: #{payload.getSystemProperties.getSequenceNumber.to_s}, Size: #{bytes.size}") if @logger.trace?

    @codec.decode(bytes.to_a.pack('C*')) do |event|

      @decorator.call(event)
      if @meta_data
        event.set("[@metadata][azure_event_hubs][name]", context.getEventHubPath)
        event.set("[@metadata][azure_event_hubs][consumer_group]", context.getConsumerGroupName)
        event.set("[@metadata][azure_event_hubs][processor_host]", context.getOwner)
        event.set("[@metadata][azure_event_hubs][partition]", context.getPartitionId)
        event.set("[@metadata][azure_event_hubs][offset]", payload.getSystemProperties.getOffset)
        event.set("[@metadata][azure_event_hubs][sequence]", payload.getSystemProperties.getSequenceNumber)
        event.set("[@metadata][azure_event_hubs][timestamp]",payload.getSystemProperties.getEnqueuedTime.getEpochSecond)
        event.set("[@metadata][azure_event_hubs][event_size]", bytes.size)
        event.set("[@metadata][azure_event_hubs][user_properties]", payload.getProperties)
      end
      @queue << event
      if @checkpoint_interval > 0
        now = Time.now.to_i
        since_last_check_point = now - last_checkpoint
        if since_last_check_point >= @checkpoint_interval
          context.checkpoint(payload).get
          last_checkpoint = now
        end
      end
    end
    last_payload = payload
  end

  @codec.flush
  #always create checkpoint at end of onEvents in case of sparse events
  context.checkpoint(last_payload).get if last_payload
  @logger.debug("Event Hub: #{context.getEventHubPath.to_s}, Partition: #{context.getPartitionId.to_s} finished processing a batch of #{batch_size} bytes.") if @logger.debug?
end
onOpen(context) click to toggle source
# File lib/logstash/inputs/processor.rb, line 20
def onOpen(context)
  @logger.info("Event Hub: #{context.getEventHubPath.to_s}, Partition: #{context.getPartitionId.to_s} is opening.")
end