class Fluent::Plugin::S3Input

Constants

DEFAULT_PARSE_TYPE
EXTRACTOR_REGISTRY

Attributes

bucket[R]

Public Class Methods

new() click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_s3.rb, line 18
def initialize
  super
  @extractor = nil
end

Private Class Methods

register_extractor(name, extractor) click to toggle source
# File lib/fluent/plugin/in_s3.rb, line 415
def self.register_extractor(name, extractor)
  EXTRACTOR_REGISTRY.register(name, extractor)
end

Public Instance Methods

configure(conf) click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_s3.rb, line 120
def configure(conf)
  super

  if @s3_endpoint && (@s3_endpoint.end_with?('amazonaws.com') && !['fips', 'gov'].any? { |e| @s3_endpoint.include?(e) })
    raise Fluent::ConfigError, "s3_endpoint parameter is not supported for S3, use s3_region instead. This parameter is for S3 compatible services"
  end

  if @sqs.endpoint && (@sqs.endpoint.end_with?('amazonaws.com') && !['fips', 'gov'].any? { |e| @sqs.endpoint.include?(e) })
    raise Fluent::ConfigError, "sqs/endpoint parameter is not supported for SQS, use s3_region instead. This parameter is for SQS compatible services"
  end

  parser_config = conf.elements("parse").first
  unless @sqs.queue_name
    raise Fluent::ConfigError, "sqs/queue_name is required"
  end

  Aws.use_bundled_cert! if @use_bundled_cert

  @extractor = EXTRACTOR_REGISTRY.lookup(@store_as).new(log: log)
  @extractor.configure(conf)

  @parser = parser_create(conf: parser_config, default_type: DEFAULT_PARSE_TYPE)
end
shutdown() click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_s3.rb, line 169
def shutdown
  @running = false
  super
end
start() click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_s3.rb, line 144
def start
  super

  s3_client = create_s3_client
  log.debug("Succeeded to create S3 client")
  @s3 = Aws::S3::Resource.new(client: s3_client)
  @bucket = @s3.bucket(@s3_bucket)

  raise "#{@bucket.name} is not found." unless @bucket.exists?

  check_apikeys if @check_apikey_on_start

  sqs_client = create_sqs_client
  log.debug("Succeeded to create SQS client")
  response = sqs_client.get_queue_url(queue_name: @sqs.queue_name, queue_owner_aws_account_id: @sqs.queue_owner_aws_account_id)
  sqs_queue_url = response.queue_url
  log.debug("Succeeded to get SQS queue URL")
  @include_file_regex = Regexp.new(@sqs.include_file_regex)

  @poller = Aws::SQS::QueuePoller.new(sqs_queue_url, client: sqs_client)

  @running = true
  thread_create(:in_s3, &method(:run))
end

Private Instance Methods

check_apikeys() click to toggle source
# File lib/fluent/plugin/in_s3.rb, line 284
def check_apikeys
  @bucket.objects.first
  log.debug("Succeeded to verify API keys")
rescue => e
  raise "can't call S3 API. Please check your credentials or s3_region configuration. error = #{e.inspect}"
end
create_s3_client() click to toggle source
# File lib/fluent/plugin/in_s3.rb, line 257
def create_s3_client
  options = setup_credentials
  options[:region] = @s3_region if @s3_region
  options[:endpoint] = @s3_endpoint if @s3_endpoint
  options[:force_path_style] = @force_path_style
  options[:http_proxy] = @proxy_uri if @proxy_uri
  log.on_trace do
    options[:http_wire_trace] = true
    options[:logger] = log
  end

  Aws::S3::Client.new(options)
end
create_sqs_client() click to toggle source
# File lib/fluent/plugin/in_s3.rb, line 271
def create_sqs_client
  options = setup_credentials
  options[:region] = @s3_region if @s3_region
  options[:endpoint] = @sqs.endpoint if @sqs.endpoint
  options[:http_proxy] = @proxy_uri if @proxy_uri
  log.on_trace do
    options[:http_wire_trace] = true
    options[:logger] = log
  end

  Aws::SQS::Client.new(options)
end
process(body) click to toggle source
# File lib/fluent/plugin/in_s3.rb, line 291
def process(body)
  s3 = body["Records"].first["s3"]
  raw_key = s3["object"]["key"]
  key = CGI.unescape(raw_key)

  unless key =~ @include_file_regex
    log.info("#{key} doesn't match regex. skipping")
    return
  end

  io = @bucket.object(key).get.body
  content = @extractor.extract(io)
  es = Fluent::MultiEventStream.new
  content.each_line do |line|
    @parser.parse(line) do |time, record|
      if @add_object_metadata
        record['s3_bucket'] = @s3_bucket
        record['s3_key'] = raw_key
      end
      es.add(time, record)
    end
  end
  router.emit_stream(@tag, es)
end
run() click to toggle source
# File lib/fluent/plugin/in_s3.rb, line 176
def run
  options = {}
  options[:wait_time_seconds] = @sqs.wait_time_seconds
  options[:skip_delete] = @sqs.skip_delete
  @poller.before_request do |stats|
    throw :stop_polling unless @running
  end
  begin
    @poller.poll(options) do |message|
      begin
        body = Yajl.load(message.body)
        log.debug(body)
        next unless body["Records"] # skip test queue

        process(body)
      rescue => e
        log.warn(error: e)
        log.warn_backtrace(e.backtrace)
        throw :skip_delete
      end
    end
  rescue => e
    log.warn("SQS Polling Failed. Retry in #{@sqs.retry_error_interval} seconds", error: e)
    sleep(@sqs.retry_error_interval)
    retry
  end
end
setup_credentials() click to toggle source
# File lib/fluent/plugin/in_s3.rb, line 204
def setup_credentials
  options = {}
  credentials_options = {}
  case
  when @aws_key_id && @aws_sec_key
    options[:access_key_id] = @aws_key_id
    options[:secret_access_key] = @aws_sec_key
  when @assume_role_credentials
    c = @assume_role_credentials
    credentials_options[:role_arn] = c.role_arn
    credentials_options[:role_session_name] = c.role_session_name
    credentials_options[:policy] = c.policy if c.policy
    credentials_options[:duration_seconds] = c.duration_seconds if c.duration_seconds
    credentials_options[:external_id] = c.external_id if c.external_id
    if @s3_region
      credentials_options[:client] = Aws::STS::Client.new(:region => @s3_region)
    end
    options[:credentials] = Aws::AssumeRoleCredentials.new(credentials_options)
  when @web_identity_credentials
    c = @web_identity_credentials
    credentials_options[:role_arn] = c.role_arn
    credentials_options[:role_session_name] = c.role_session_name
    credentials_options[:web_identity_token_file] = c.web_identity_token_file
    credentials_options[:policy] = c.policy if c.policy
    credentials_options[:duration_seconds] = c.duration_seconds if c.duration_seconds
    if @s3_region
      credentials_options[:client] = Aws::STS::Client.new(:region => @s3_region)
    end
    options[:credentials] = Aws::AssumeRoleWebIdentityCredentials.new(credentials_options)
  when @instance_profile_credentials
    c = @instance_profile_credentials
    credentials_options[:retries] = c.retries if c.retries
    credentials_options[:ip_address] = c.ip_address if c.ip_address
    credentials_options[:port] = c.port if c.port
    credentials_options[:http_open_timeout] = c.http_open_timeout if c.http_open_timeout
    credentials_options[:http_read_timeout] = c.http_read_timeout if c.http_read_timeout
    if ENV["AWS_CONTAINER_CREDENTIALS_RELATIVE_URI"]
      options[:credentials] = Aws::ECSCredentials.new(credentials_options)
    else
      options[:credentials] = Aws::InstanceProfileCredentials.new(credentials_options)
    end
  when @shared_credentials
    c = @shared_credentials
    credentials_options[:path] = c.path if c.path
    credentials_options[:profile_name] = c.profile_name if c.profile_name
    options[:credentials] = Aws::SharedCredentials.new(credentials_options)
  else
    # Use default credentials
    # See http://docs.aws.amazon.com/sdkforruby/api/Aws/S3/Client.html
  end
  options
end