class LogStash::Outputs::Pravega
Public Instance Methods
multi_receive_encoded(encoded)
click to toggle source
# File lib/logstash/outputs/pravega.rb, line 34 def multi_receive_encoded(encoded) pre_check(encoded) @producer = create_producer encoded.each do |event,data| begin @producer.writeEvent(routing_key,data) logger.debug("write in stream succssfully", :stream_name => @stream_name, :event => data) rescue LogStash::ShutdownSignal logger.debug("Pravega producer got shutdown signal") rescue => e logger.warn("Pravega producer threw exception, restarting", :exception => e) end end @producer.close() end
register()
click to toggle source
# File lib/logstash/outputs/pravega.rb, line 30 def register end
Private Instance Methods
create_producer()
click to toggle source
# File lib/logstash/outputs/pravega.rb, line 68 def create_producer begin java_import("io.pravega.client.admin.StreamManager") java_import("io.pravega.client.stream.ScalingPolicy") java_import("io.pravega.client.stream.StreamConfiguration") java_import("io.pravega.client.ClientConfig") java_import("io.pravega.client.stream.impl.DefaultCredentials") java_import("io.pravega.client.ClientFactory") java_import("io.pravega.client.stream.impl.JavaSerializer") java_import("io.pravega.client.stream.EventWriterConfig") uri = java.net.URI.new(pravega_endpoint) clientConfig = ClientConfig.builder() .controllerURI(uri) .credentials(DefaultCredentials.new(password, username)) .validateHostName(false) .build() streamManager = StreamManager.create(clientConfig) streamManager.createScope(scope) policy = ScalingPolicy.fixed(@num_of_segments) streamConfig = StreamConfiguration.builder().scalingPolicy(policy).build() streamManager.createStream(scope, stream_name, streamConfig) logger.debug("created stream successfully", :stream => @stream_name) clientFactory = ClientFactory.withScope(scope, clientConfig) writer = clientFactory.createEventWriter(stream_name, JavaSerializer.new(), EventWriterConfig.builder().build()) end end
pre_check(encoded)
click to toggle source
# File lib/logstash/outputs/pravega.rb, line 51 def pre_check(encoded) # If the num_of_segment <= 0, the IllegalArgumentException will be thrown and logstash will stop. @num_of_segments = 1 if @num_of_segments <= 0 # If the input stream name is same as output, it will filter the formatted json data again and again without endless. # So, the new output streamName needs to be created encoded.each do |event, data| if @stream_name == JSON.parse(data)['streamName'] @stream_name = "newOutputStream-".concat(SecureRandom.uuid) logger.info("The input stream_name shouldn't be equal to output, the new output_stream is created", :stream_name => @stream_name) break end end logger.debug("After arugument preCheck ", :num_of_segments => @num_of_segments, :stream_name => @stream_name) end