class LogStash::Outputs::Dis
Write events to a DIS stream, using DIS Kafka Adapter.
Public Instance Methods
close()
click to toggle source
# File lib/logstash/outputs/dis.rb, line 201 def close @producer.close end
multi_receive(events)
click to toggle source
# File lib/logstash/outputs/dis.rb, line 140 def multi_receive(events) t = Thread.current if !@thread_batch_map.include?(t) @thread_batch_map[t] = java.util.ArrayList.new(events.size) end events.each do |event| break if event == LogStash::SHUTDOWN @codec.encode(event) end batch = @thread_batch_map[t] if batch.any? retrying_send(batch) batch.clear end end
prepare(record)
click to toggle source
def register
# File lib/logstash/outputs/dis.rb, line 135 def prepare(record) # This output is threadsafe, so we need to keep a batch per thread. @thread_batch_map[Thread.current].add(record) end
register()
click to toggle source
# File lib/logstash/outputs/dis.rb, line 107 def register @thread_batch_map = Concurrent::Hash.new if !@retries.nil? if @retries < 0 raise ConfigurationError, "A negative retry count (#{@retries}) is not valid. Must be a value >= 0" end @logger.warn("Kafka output is configured with finite retry. This instructs Logstash to LOSE DATA after a set number of send attempts fails. If you do not want to lose data if Kafka is down, then you must remove the retry setting.", :retries => @retries) end @producer = create_producer if value_serializer == 'com.huaweicloud.dis.adapter.kafka.common.serialization.StringSerializer' @codec.on_event do |event, data| write_to_dis(event, data) end elsif value_serializer == 'com.huaweicloud.dis.adapter.kafka.common.serialization.ByteArraySerializer' @codec.on_event do |event, data| write_to_dis(event, data.to_java_bytes) end else raise ConfigurationError, "'value_serializer' only supports com.huaweicloud.dis.adapter.kafka.common.serialization.ByteArraySerializer and com.huaweicloud.dis.adapter.kafka.common.serialization.StringSerializer" end end
retrying_send(batch)
click to toggle source
# File lib/logstash/outputs/dis.rb, line 158 def retrying_send(batch) remaining = @retries; while batch.any? if !remaining.nil? if remaining < 0 # TODO(sissel): Offer to DLQ? Then again, if it's a transient fault, # DLQing would make things worse (you dlq data that would be successful # after the fault is repaired) logger.info("Exhausted user-configured retry count when sending to Kafka. Dropping these events.", :max_retries => @retries, :drop_count => batch.count) break end remaining -= 1 end failures = [] futures = batch.collect do |record| begin # send() can throw an exception even before the future is created. @producer.send(record) rescue org.apache.kafka.common.errors.TimeoutException => e failures << record nil rescue org.apache.kafka.common.errors.InterruptException => e failures << record nil rescue com.huaweicloud.dis.adapter.kafka.common.errors.SerializationException => e # TODO(sissel): Retrying will fail because the data itself has a problem serializing. # TODO(sissel): Let's add DLQ here. failures << record nil end end.compact break end end
Private Instance Methods
create_producer()
click to toggle source
# File lib/logstash/outputs/dis.rb, line 240 def create_producer begin props = java.util.Properties.new kafka = com.huaweicloud.dis.adapter.kafka.clients.producer.ProducerConfig props.put("IS_DEFAULT_TRUSTED_JKS_ENABLED", default_trusted_jks_enabled.to_s) props.put("security.token", security_token) unless security_token.nil? props.put("exception.retries", exception_retries.to_s) props.put("records.retries", records_retries.to_s) props.put("PROXY_HOST", proxy_host) unless proxy_host.nil? props.put("PROXY_PORT", proxy_port.to_s) props.put("PROXY_PROTOCOL", proxy_protocol) props.put("PROXY_USERNAME", proxy_username) unless proxy_username.nil? props.put("PROXY_PASSWORD", proxy_password) unless proxy_password.nil? props.put("PROXY_WORKSTATION", proxy_workstation) unless proxy_workstation.nil? props.put("PROXY_DOMAIN", proxy_domain) unless proxy_domain.nil? props.put("NON_PROXY_HOSTS", proxy_non_proxy_hosts) unless proxy_non_proxy_hosts.nil? props.put("batch.size", batch_size.to_s) props.put("batch.count", batch_count.to_s) props.put("buffer.memory", buffer_memory.to_s) props.put("buffer.count", buffer_count.to_s) props.put("linger.ms", linger_ms.to_s) props.put("block.on.buffer.full", block_on_buffer_full.to_s) props.put("max.block.ms", max_block_ms.to_s) props.put("backoff.max.interval.ms", backoff_max_interval_ms.to_s) props.put("max.in.flight.requests.per.connection", max_in_flight_requests_per_connection.to_s) props.put("records.retriable.error.code", records_retriable_error_code) unless records_retriable_error_code.nil? props.put("body.compress.enabled", body_compress_enabled.to_s) unless body_compress_enabled.nil? props.put("body.compress.type", body_compress_type.to_s) unless body_compress_type.nil? props.put("order.by.partition", order_by_partition.to_s) props.put("body.serialize.type", body_serialize_type.to_s) props.put("metadata.timeout.ms", metadata_timeout_ms.to_s) # props.put(kafka::RETRIES_CONFIG, retries.to_s) unless retries.nil? # props.put(kafka::RETRY_BACKOFF_MS_CONFIG, retry_backoff_ms.to_s) props.put("key.deserializer", "com.huaweicloud.dis.adapter.kafka.common.serialization.StringDeserializer") props.put("value.deserializer", "com.huaweicloud.dis.adapter.kafka.common.serialization.StringDeserializer") # endpoint, project_id, region, ak, sk props.put("endpoint", endpoint) props.put("projectId", project_id) props.put("region", region) props.put("ak", ak) if is_sk_encrypted decrypted_sk = decrypt(@sk) props.put("sk", decrypted_sk) else props.put("sk", sk) end com.huaweicloud.dis.adapter.kafka.clients.producer.DISKafkaProducer.new(props) rescue => e logger.error("Unable to create DIS Kafka producer from given configuration", :kafka_error_message => e, :cause => e.respond_to?(:getCause) ? e.getCause() : nil) raise e end end
decrypt(encrypted_sk)
click to toggle source
# File lib/logstash/outputs/dis.rb, line 301 def decrypt(encrypted_sk) com.huaweicloud.dis.util.encrypt.EncryptUtils.dec([@encrypt_key].to_java(java.lang.String), encrypted_sk) rescue => e logger.error("Unable to decrypt sk from given configuration", :decrypt_error_message => e, :cause => e.respond_to?(:getCause) ? e.getCause() : nil) end
write_to_dis(event, serialized_data)
click to toggle source
# File lib/logstash/outputs/dis.rb, line 207 def write_to_dis(event, serialized_data) stream = event.get("stream"); if stream.nil? stream = @stream; end message_key = event.get("partition_key"); if message_key.nil? message_key = @message_key; end partition_id = event.get("partition_id"); if message_key.nil? && partition_id.nil? # record = ProducerRecord.new(event.sprintf(@stream), serialized_data) record = ProducerRecord.new(stream, serialized_data) elsif partition_id.nil? # record = ProducerRecord.new(event.sprintf(@stream), event.sprintf(@message_key), serialized_data) # record = ProducerRecord.new(stream, event.sprintf(@message_key), serialized_data) record = ProducerRecord.new(stream, message_key, serialized_data) else record = ProducerRecord.new(stream, partition_id.to_i, message_key, serialized_data) end prepare(record) rescue LogStash::ShutdownSignal @logger.debug('DIS Kafka producer got shutdown signal') # sleep for 5 second to guaranteed data transmission is completed sleep(5) rescue => e @logger.warn('DIS kafka producer threw exception, restarting', :exception => e) end