class LogStash::Inputs::Pravega

Public Instance Methods

register() click to toggle source
# File lib/logstash/inputs/pravega.rb, line 27
def register
  create_readerGroup()
  @inputs = YAML.load(File.open($root_dir + "/config.yml"))
  @reader_threads = @inputs['readers_thread']
  @min_read_timeout_ms = @inputs['min_read_timeout_ms']
end
run(logstash_queue) click to toggle source
# File lib/logstash/inputs/pravega.rb, line 34
def run(logstash_queue)
  # The pravega server will set the stream read timeout witn min(read_time_ms, 1000ms)
  # If the read_time_out is less than zero, it will throw the error. The logstash won't work.
  @read_timeout_ms = @min_read_timeout_ms if @read_timeout_ms < @min_read_timeout_ms
  logger.debug("The prechecked arguments: ", :reader_threads => @reader_threads, :read_timeout_ms => @read_timeout_ms)

  # To make the new created readers read data from segment in time, need to make the old readers offline
  @readerGroupManager.getReaderGroup(@groupName).getOnlineReaders().map { |reader| reader.close()}
  @runner_consumers = @reader_threads.times.map { |i| create_consumer() }
  @runner_threads = @runner_consumers.map { |consumer| thread_runner(logstash_queue, consumer) }
  @runner_threads.each{ |t| t.join }
end
stop() click to toggle source
# File lib/logstash/inputs/pravega.rb, line 47
def stop
  @runner_consumers.times.map { |consumer| consumer.close()}
end

Private Instance Methods

create_consumer() click to toggle source
# File lib/logstash/inputs/pravega.rb, line 72
def create_consumer()
  begin
    java_import("io.pravega.client.ClientFactory")
    java_import("io.pravega.client.stream.ReaderConfig")
    java_import("io.pravega.client.stream.impl.JavaSerializer")

    clientFactory = ClientFactory.withScope(scope, @uri)
    return clientFactory.createReader(SecureRandom.uuid,
                                      @groupName,
                                      JavaSerializer.new(),
                                      ReaderConfig.builder().build())
  end
end
create_readerGroup() click to toggle source
# File lib/logstash/inputs/pravega.rb, line 87
def create_readerGroup()
  begin
    java_import("io.pravega.client.ClientConfig")
    java_import("io.pravega.client.stream.impl.DefaultCredentials")
    java_import("io.pravega.client.admin.StreamManager")
    java_import("io.pravega.client.stream.StreamConfiguration")
    java_import("io.pravega.client.stream.ReaderGroupConfig")
    java_import("io.pravega.client.admin.ReaderGroupManager")
    java_import("io.pravega.client.stream.Stream")

    @uri = java.net.URI.new(pravega_endpoint)
    clientConfig = ClientConfig.builder()
                               .controllerURI(@uri)
                               .credentials(DefaultCredentials.new(password, username))
                               .validateHostName(false)
                               .build()
    streamManager = StreamManager.create(clientConfig)
    streamManager.createScope(scope)
    streamManager.createStream(scope, stream_name, StreamConfiguration.builder().build())
    readGroupConfig = ReaderGroupConfig.builder().stream(Stream.of(scope, stream_name)).build()
    @readerGroupManager = ReaderGroupManager.withScope(scope, @uri)
    @groupName = SecureRandom.uuid.gsub('-', '')
    @readerGroupManager.createReaderGroup(@groupName, readGroupConfig)
  end
end
thread_runner(logstash_queue, consumer) click to toggle source
# File lib/logstash/inputs/pravega.rb, line 52
def thread_runner(logstash_queue, consumer)
  Thread.new do 
    begin
      while true do
        data = consumer.readNextEvent(@read_timeout_ms).getEvent()
        logger.debug("Receive event ", :streamName => @stream_name, :data => data)
        if data.to_s.empty?
           next
        end
        @codec.decode(data) do |event|
          decorate(event)
          event.set("streamName", @stream_name)
          logstash_queue << event
        end
      end
    end
  end
end