class Fluent::Plugin::GrasslandOutput
Attributes
access_key_id[RW]
id[RW]
kinesis[RW]
partitionKeys[RW]
random[RW]
region[RW]
secret_access_key[RW]
sessionToken[RW]
stream_name[RW]
Public Class Methods
new()
click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_grassland.rb, line 12 def initialize super # require 'aws-sdk-v1' require 'aws-sdk' require 'base64' require 'json' require 'logger' require 'net/http' require 'uri' @random = Random.new log = Syslog::Logger.new 'grasslandplugin' log.info 'grassland initialize' # puts "grassland initialize" end
Public Instance Methods
configure(conf)
click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_grassland.rb, line 44 def configure(conf) super [:key].each do |name| unless self.instance_variable_get("@#{name}") raise ConfigError, "'#{name}' is required" end end end
format(tag, time, record)
click to toggle source
# File lib/fluent/plugin/out_grassland.rb, line 119 def format(tag, time, record) # print(record) ['dt', 'd'].each do |key| unless record.has_key?(key) log.info "input data error: '#{key}' is required" return "" end end unless record.has_key?('pt') record['pt'] = time end unless record.has_key?('cid') record['cid'] = @id end unless record.has_key?('uid') record['uid'] = '0' end record['pk'] = record['cid'] + record['dt'] return "#{record.to_json}," end
get_json(location, limit = 3)
click to toggle source
# File lib/fluent/plugin/out_grassland.rb, line 92 def get_json(location, limit = 3) raise ArgumentError, 'too many HTTP redirects' if limit == 0 uri = URI.parse(location) begin response = Net::HTTP.start(uri.host, uri.port, use_ssl: uri.scheme == 'https') do |http| http.open_timeout = 5 http.read_timeout = 10 http.get(uri.request_uri) end case response when Net::HTTPSuccess json = response.body JSON.parse(json) when Net::HTTPRedirection location = response['location'] warn "redirected to #{location}" get_json(location, limit - 1) else log.info [uri.to_s, response.value].join(" : ") # handle error end rescue => e log.info [uri.to_s, e.class, e].join(" : ") # handle error end end
resetAwsCredential()
click to toggle source
# File lib/fluent/plugin/out_grassland.rb, line 66 def resetAwsCredential() begin setCredential configure_aws @kinesis.put_record({ :stream_name => @stream_name, :data => "test", :partition_key => "#{random.rand(999)}" }) log.info "grassland: reset credential" rescue => e log.info [e.class, e].join(" : initialize error.") end end
setCredential()
click to toggle source
# File lib/fluent/plugin/out_grassland.rb, line 81 def setCredential() credential = get_json("#{@apiuri}?key=#{@key}") @id = credential['id'] @stream_name = credential['streamName'] @access_key_id = credential['accessKeyId'] @secret_access_key = credential['secretAccessKey'] @region = credential['region'] @sessionToken = credential['SessionToken'] @partitionKeys = credential['partitionKeyList'] end
set_interval(delay) { || ... }
click to toggle source
config_param :resetCredentialTimer, :integer, :default => 20
# File lib/fluent/plugin/out_grassland.rb, line 35 def set_interval(delay) Thread.new do loop do sleep delay yield # call passed block end end end
shutdown()
click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_grassland.rb, line 62 def shutdown super end
start()
click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_grassland.rb, line 54 def start super set_interval(@resetCredentialTimer){ resetAwsCredential } resetAwsCredential end
write(chunk)
click to toggle source
# File lib/fluent/plugin/out_grassland.rb, line 141 def write(chunk) buf = chunk.read dataList = JSON.parse("[#{buf.chop}]") putBuf = "" bufList = {} begin dataList.each do |data| # debug log # log.info data.to_json if bufList[":#{data['pk']}"] == nil then bufList[":#{data['pk']}"] = "#{data.to_json}," else bufList[":#{data['pk']}"] += "#{data.to_json}," end if bufList[":#{data['pk']}"].bytesize >= 30720 then @kinesis.put_record({ :stream_name => @stream_name, :data => "["+bufList[":#{data['pk']}"].chop+"]", :partition_key => partitionKeys[random.rand(partitionKeys.length)] # :partition_key => data['pk'] }) bufList.delete(":#{data['pk']}") end end dataList.each do |data| if bufList[":#{data['pk']}"] != nil then @kinesis.put_record({ :stream_name => @stream_name, :data => "["+bufList[":#{data['pk']}"].chop+"]", :partition_key => partitionKeys[random.rand(partitionKeys.length)] # :partition_key => data['pk'] }) bufList.delete(":#{data['pk']}") end end rescue log.info "error: put_record to grassland. maybe too many requests. few data dropped." end end
Private Instance Methods
configure_aws()
click to toggle source
# File lib/fluent/plugin/out_grassland.rb, line 184 def configure_aws options = { :access_key_id => @access_key_id, :secret_access_key => @secret_access_key, :region => @region, :session_token => @sessionToken } if @debug options.update( :logger => Logger.new($log.out), :log_level => :debug, #http_wire_trace => true ) end @kinesis = Aws::Kinesis::Client.new(options) end