class Fluent::Plugin::GrasslandOutput

Attributes

access_key_id[RW]
id[RW]
kinesis[RW]
partitionKeys[RW]
random[RW]
region[RW]
secret_access_key[RW]
sessionToken[RW]
stream_name[RW]

Public Class Methods

new() click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_grassland.rb, line 12
def initialize
  super
  # require 'aws-sdk-v1'
  require 'aws-sdk'
  require 'base64'
  require 'json'
  require 'logger'
  require 'net/http'
  require 'uri'
  @random = Random.new

  log = Syslog::Logger.new 'grasslandplugin'
  log.info 'grassland initialize'
  # puts "grassland initialize"
end

Public Instance Methods

configure(conf) click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_grassland.rb, line 44
def configure(conf)
  super

  [:key].each do |name|
    unless self.instance_variable_get("@#{name}")
      raise ConfigError, "'#{name}' is required"
    end
  end
end
format(tag, time, record) click to toggle source
# File lib/fluent/plugin/out_grassland.rb, line 119
def format(tag, time, record)
  # print(record)
  ['dt', 'd'].each do |key|
    unless record.has_key?(key)
      log.info "input data error: '#{key}' is required"
      return ""
    end
  end
  unless record.has_key?('pt')
    record['pt'] = time
  end
  unless record.has_key?('cid')
    record['cid'] = @id
  end
  unless record.has_key?('uid')
    record['uid'] = '0'
  end

  record['pk'] = record['cid'] + record['dt']
  return "#{record.to_json},"
end
get_json(location, limit = 3) click to toggle source
# File lib/fluent/plugin/out_grassland.rb, line 92
def get_json(location, limit = 3)
  raise ArgumentError, 'too many HTTP redirects' if limit == 0
  uri = URI.parse(location)
  begin
    response = Net::HTTP.start(uri.host, uri.port, use_ssl: uri.scheme == 'https') do |http|
      http.open_timeout = 5
      http.read_timeout = 10
      http.get(uri.request_uri)
    end
    case response
    when Net::HTTPSuccess
      json = response.body
      JSON.parse(json)
    when Net::HTTPRedirection
      location = response['location']
      warn "redirected to #{location}"
      get_json(location, limit - 1)
    else
      log.info [uri.to_s, response.value].join(" : ")
      # handle error
    end
  rescue => e
    log.info [uri.to_s, e.class, e].join(" : ")
    # handle error
  end
end
resetAwsCredential() click to toggle source
# File lib/fluent/plugin/out_grassland.rb, line 66
def resetAwsCredential()
  begin
    setCredential
    configure_aws
    @kinesis.put_record({
      :stream_name   => @stream_name,
      :data          => "test",
      :partition_key => "#{random.rand(999)}"
    })
    log.info "grassland: reset credential"
  rescue => e
    log.info [e.class, e].join(" : initialize error.")
  end
end
setCredential() click to toggle source
# File lib/fluent/plugin/out_grassland.rb, line 81
def setCredential()
  credential = get_json("#{@apiuri}?key=#{@key}")
  @id = credential['id']
  @stream_name = credential['streamName']
  @access_key_id = credential['accessKeyId']
  @secret_access_key = credential['secretAccessKey']
  @region = credential['region']
  @sessionToken = credential['SessionToken']
  @partitionKeys = credential['partitionKeyList']
end
set_interval(delay) { || ... } click to toggle source

config_param :resetCredentialTimer, :integer, :default => 20

# File lib/fluent/plugin/out_grassland.rb, line 35
def set_interval(delay)
  Thread.new do
    loop do
      sleep delay
      yield # call passed block
    end
  end
end
shutdown() click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_grassland.rb, line 62
def shutdown
  super
end
start() click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_grassland.rb, line 54
def start
  super
  set_interval(@resetCredentialTimer){
    resetAwsCredential
  }
  resetAwsCredential
end
write(chunk) click to toggle source
# File lib/fluent/plugin/out_grassland.rb, line 141
def write(chunk)
  buf = chunk.read
  dataList = JSON.parse("[#{buf.chop}]")
  putBuf = ""
  bufList = {}

  begin
    dataList.each do |data|
      # debug log
      # log.info data.to_json
      if bufList[":#{data['pk']}"] == nil then
        bufList[":#{data['pk']}"] = "#{data.to_json},"
      else
        bufList[":#{data['pk']}"] += "#{data.to_json},"
      end
      if bufList[":#{data['pk']}"].bytesize >= 30720 then
        @kinesis.put_record({
          :stream_name   => @stream_name,
          :data          => "["+bufList[":#{data['pk']}"].chop+"]",
          :partition_key => partitionKeys[random.rand(partitionKeys.length)]
          # :partition_key => data['pk']
        })
        bufList.delete(":#{data['pk']}")
      end
    end
    dataList.each do |data|
      if bufList[":#{data['pk']}"] != nil then
        @kinesis.put_record({
          :stream_name   => @stream_name,
          :data          => "["+bufList[":#{data['pk']}"].chop+"]",
          :partition_key => partitionKeys[random.rand(partitionKeys.length)]
          # :partition_key => data['pk']
        })
        bufList.delete(":#{data['pk']}")
      end
    end
  rescue
    log.info "error: put_record to grassland. maybe too many requests. few data dropped."
  end
end

Private Instance Methods

configure_aws() click to toggle source
# File lib/fluent/plugin/out_grassland.rb, line 184
def configure_aws
  options = {
    :access_key_id     => @access_key_id,
    :secret_access_key => @secret_access_key,
    :region            => @region,
    :session_token     => @sessionToken
  }

  if @debug
    options.update(
      :logger          => Logger.new($log.out),
      :log_level       => :debug,
      #http_wire_trace => true
    )
  end

  @kinesis = Aws::Kinesis::Client.new(options)
end