class FluentPluginKinesis::InputFilter

Constants

USER_AGENT_NAME

Public Instance Methods

configure(conf) click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_kinesis.rb, line 52
def configure(conf)
  super
  
  unless @state_dir_path
    $log.warn "'state_dir_path PATH' parameter is not set to a 'kinesis' source."
    $log.warn "this parameter is highly recommended to save the last rows to resume tailing."
  end
  @parser = Fluent::Plugin.new_parser(conf['format'])
  @parser.configure(conf)

  @map = {} #=> Thread Object management
  @thread_stop_map = {} #=> Thread stop flag management
  @dead_thread=[] #=> Dead Thread management
end
load_client() click to toggle source
# File lib/fluent/plugin/in_kinesis.rb, line 80
def load_client
      
  options = {}
  
  if @region
    options[:region] = @region
  end

  if @aws_key_id && @aws_sec_key
    options.update(
      access_key_id: @aws_key_id,
      secret_access_key: @aws_sec_key,
    )
  elsif @profile
    credentials_opts = {:profile_name => @profile}
    credentials_opts[:path] = @credentials_path if @credentials_path
    credentials = Aws::SharedCredentials.new(credentials_opts)
    options[:credentials] = credentials
  end
  
  @client = Aws::Kinesis::Client.new(options)
end
shutdown() click to toggle source
# File lib/fluent/plugin/in_kinesis.rb, line 76
def shutdown
  @stop_flag = true
end
start() click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_kinesis.rb, line 67
def start
  detach_multi_process do
    super
    @stop_flag = false
    load_client
    Thread.new(&method(:supervisor_thread))
  end
end