class LogStash::Outputs::Dis

Write events to a DIS stream, using DIS Kafka Adapter.

Public Instance Methods

close() click to toggle source
# File lib/logstash/outputs/dis.rb, line 201
def close
  @producer.close
end
multi_receive(events) click to toggle source
# File lib/logstash/outputs/dis.rb, line 140
def multi_receive(events)
  t = Thread.current
  if !@thread_batch_map.include?(t)
    @thread_batch_map[t] = java.util.ArrayList.new(events.size)
  end

  events.each do |event|
    break if event == LogStash::SHUTDOWN
    @codec.encode(event)
  end

  batch = @thread_batch_map[t]
  if batch.any?
    retrying_send(batch)
    batch.clear
  end
end
prepare(record) click to toggle source

def register

# File lib/logstash/outputs/dis.rb, line 135
def prepare(record)
  # This output is threadsafe, so we need to keep a batch per thread.
  @thread_batch_map[Thread.current].add(record)
end
register() click to toggle source
# File lib/logstash/outputs/dis.rb, line 107
def register
  @thread_batch_map = Concurrent::Hash.new

  if !@retries.nil? 
    if @retries < 0
      raise ConfigurationError, "A negative retry count (#{@retries}) is not valid. Must be a value >= 0"
    end

    @logger.warn("Kafka output is configured with finite retry. This instructs Logstash to LOSE DATA after a set number of send attempts fails. If you do not want to lose data if Kafka is down, then you must remove the retry setting.", :retries => @retries)
  end


  @producer = create_producer
  if value_serializer == 'com.huaweicloud.dis.adapter.kafka.common.serialization.StringSerializer'
    @codec.on_event do |event, data|
      write_to_dis(event, data)
    end
  elsif value_serializer == 'com.huaweicloud.dis.adapter.kafka.common.serialization.ByteArraySerializer'
    @codec.on_event do |event, data|
      write_to_dis(event, data.to_java_bytes)
    end
  else
    raise ConfigurationError, "'value_serializer' only supports com.huaweicloud.dis.adapter.kafka.common.serialization.ByteArraySerializer and com.huaweicloud.dis.adapter.kafka.common.serialization.StringSerializer" 
  end
end
retrying_send(batch) click to toggle source
# File lib/logstash/outputs/dis.rb, line 158
def retrying_send(batch)
  remaining = @retries;

  while batch.any?
    if !remaining.nil?
      if remaining < 0
        # TODO(sissel): Offer to DLQ? Then again, if it's a transient fault,
        # DLQing would make things worse (you dlq data that would be successful
        # after the fault is repaired)
        logger.info("Exhausted user-configured retry count when sending to Kafka. Dropping these events.",
                    :max_retries => @retries, :drop_count => batch.count)
        break
      end

      remaining -= 1
    end

    failures = []

    futures = batch.collect do |record| 
      begin
        # send() can throw an exception even before the future is created.
        @producer.send(record)
      rescue org.apache.kafka.common.errors.TimeoutException => e
        failures << record
        nil
      rescue org.apache.kafka.common.errors.InterruptException => e
        failures << record
        nil
      rescue com.huaweicloud.dis.adapter.kafka.common.errors.SerializationException => e
        # TODO(sissel): Retrying will fail because the data itself has a problem serializing.
        # TODO(sissel): Let's add DLQ here.
        failures << record
        nil
      end
    end.compact

    break

  end

end

Private Instance Methods

create_producer() click to toggle source
# File lib/logstash/outputs/dis.rb, line 240
def create_producer
  begin
    props = java.util.Properties.new
    kafka = com.huaweicloud.dis.adapter.kafka.clients.producer.ProducerConfig

    props.put("IS_DEFAULT_TRUSTED_JKS_ENABLED", default_trusted_jks_enabled.to_s)
    props.put("security.token", security_token) unless security_token.nil?
    props.put("exception.retries", exception_retries.to_s)
    props.put("records.retries", records_retries.to_s)
    props.put("PROXY_HOST", proxy_host) unless proxy_host.nil?
    props.put("PROXY_PORT", proxy_port.to_s)
    props.put("PROXY_PROTOCOL", proxy_protocol)
    props.put("PROXY_USERNAME", proxy_username) unless proxy_username.nil?
    props.put("PROXY_PASSWORD", proxy_password) unless proxy_password.nil?
    props.put("PROXY_WORKSTATION", proxy_workstation) unless proxy_workstation.nil?
    props.put("PROXY_DOMAIN", proxy_domain) unless proxy_domain.nil?
    props.put("NON_PROXY_HOSTS", proxy_non_proxy_hosts) unless proxy_non_proxy_hosts.nil?
    
    props.put("batch.size", batch_size.to_s)
    props.put("batch.count", batch_count.to_s)
    props.put("buffer.memory", buffer_memory.to_s)
    props.put("buffer.count", buffer_count.to_s)
    props.put("linger.ms", linger_ms.to_s)
    props.put("block.on.buffer.full", block_on_buffer_full.to_s)
    props.put("max.block.ms", max_block_ms.to_s)
    props.put("backoff.max.interval.ms", backoff_max_interval_ms.to_s)
    props.put("max.in.flight.requests.per.connection", max_in_flight_requests_per_connection.to_s)
    props.put("records.retriable.error.code", records_retriable_error_code) unless records_retriable_error_code.nil?
    props.put("body.compress.enabled", body_compress_enabled.to_s) unless body_compress_enabled.nil?
    props.put("body.compress.type", body_compress_type.to_s) unless body_compress_type.nil?
    props.put("order.by.partition", order_by_partition.to_s)
    props.put("body.serialize.type", body_serialize_type.to_s)
    props.put("metadata.timeout.ms", metadata_timeout_ms.to_s)
    # props.put(kafka::RETRIES_CONFIG, retries.to_s) unless retries.nil?
    # props.put(kafka::RETRY_BACKOFF_MS_CONFIG, retry_backoff_ms.to_s)
    props.put("key.deserializer", "com.huaweicloud.dis.adapter.kafka.common.serialization.StringDeserializer") 
    props.put("value.deserializer", "com.huaweicloud.dis.adapter.kafka.common.serialization.StringDeserializer")

    # endpoint, project_id, region, ak, sk
    props.put("endpoint", endpoint)
    props.put("projectId", project_id)
    props.put("region", region)
    props.put("ak", ak)
    if is_sk_encrypted
      decrypted_sk = decrypt(@sk)
      props.put("sk", decrypted_sk)
    else
      props.put("sk", sk)
    end


    com.huaweicloud.dis.adapter.kafka.clients.producer.DISKafkaProducer.new(props)
  rescue => e
    logger.error("Unable to create DIS Kafka producer from given configuration",
                 :kafka_error_message => e,
                 :cause => e.respond_to?(:getCause) ? e.getCause() : nil)
    raise e
  end
end
decrypt(encrypted_sk) click to toggle source
# File lib/logstash/outputs/dis.rb, line 301
def decrypt(encrypted_sk)
  com.huaweicloud.dis.util.encrypt.EncryptUtils.dec([@encrypt_key].to_java(java.lang.String), encrypted_sk)
rescue => e
  logger.error("Unable to decrypt sk from given configuration",
                :decrypt_error_message => e,
                :cause => e.respond_to?(:getCause) ? e.getCause() : nil)
end
write_to_dis(event, serialized_data) click to toggle source
# File lib/logstash/outputs/dis.rb, line 207
def write_to_dis(event, serialized_data)
  stream = event.get("stream");
      if stream.nil?
        stream = @stream;
      end

      message_key = event.get("partition_key");
      if message_key.nil?
        message_key = @message_key;
      end

      partition_id = event.get("partition_id");

      if message_key.nil? && partition_id.nil?
        # record = ProducerRecord.new(event.sprintf(@stream), serialized_data)
        record = ProducerRecord.new(stream, serialized_data)
      elsif partition_id.nil?
        # record = ProducerRecord.new(event.sprintf(@stream), event.sprintf(@message_key), serialized_data)
        # record = ProducerRecord.new(stream, event.sprintf(@message_key), serialized_data)
        record = ProducerRecord.new(stream, message_key, serialized_data)
      else
        record = ProducerRecord.new(stream, partition_id.to_i, message_key, serialized_data)
      end
      prepare(record)
rescue LogStash::ShutdownSignal
  @logger.debug('DIS Kafka producer got shutdown signal')
  # sleep for 5 second to guaranteed data transmission is completed
  sleep(5)
rescue => e
  @logger.warn('DIS kafka producer threw exception, restarting',
               :exception => e)
end