class Fluent::HekkRedshiftOutput
Constants
- IGNORE_REDSHIFT_ERROR_REGEXP
ignore load table error. (invalid data format)
Public Class Methods
new()
click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_hekk_redshift.rb, line 9 def initialize super require 'aws-sdk' require 'zlib' require 'time' require 'tempfile' require 'pg' require 'json' require 'csv' end
Public Instance Methods
configure(conf)
click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_hekk_redshift.rb, line 47 def configure(conf) super @path = "#{@path}/" unless @path.end_with?('/') # append last slash @path = @path[1..-1] if @path.start_with?('/') # remove head slash @utc = true if conf['utc'] @db_conf = { host:@redshift_host, port:@redshift_port, dbname:@redshift_dbname, user:@redshift_user, password:@redshift_password } @delimiter = determine_delimiter(@file_type) if @delimiter.nil? or @delimiter.empty? $log.debug format_log("redshift file_type:#{@file_type} delimiter:'#{@delimiter}'") @copy_sql_template = "copy #{table_name_with_schema} from '%s' CREDENTIALS 'aws_access_key_id=#{@aws_key_id};aws_secret_access_key=%s' delimiter '#{@delimiter}' GZIP ESCAPE #{@redshift_copy_base_options} #{@redshift_copy_options};" @insert_sql_template = "insert into #{@redshift_copy_command_tablename}(#{@redshift_copy_command_columnname}) values('%s');" end
format(tag, time, record)
click to toggle source
# File lib/fluent/plugin/out_hekk_redshift.rb, line 77 def format(tag, time, record) if json? record.to_msgpack elsif msgpack? { @record_log_tag => record }.to_msgpack else "#{record[@record_log_tag]}\n" end end
start()
click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_hekk_redshift.rb, line 65 def start super # init s3 conf options = { :access_key_id => @aws_key_id, :secret_access_key => @aws_sec_key } options[:s3_endpoint] = @s3_endpoint if @s3_endpoint @s3 = AWS::S3.new(options) @bucket = @s3.buckets[@s3_bucket] end
write(chunk)
click to toggle source
# File lib/fluent/plugin/out_hekk_redshift.rb, line 87 def write(chunk) $log.debug format_log('start creating gz.') # create a gz file tmp = Tempfile.new('s3-') tmp = if json? || msgpack? create_gz_file_from_structured_data(tmp, chunk, @delimiter) else create_gz_file_from_flat_data(tmp, chunk) end # no data -> skip unless tmp $log.debug format_log('received no valid data. ') return false # for debug end # create a file path with time format s3path = create_s3path(@bucket, @path) # upload gz to s3 @bucket.objects[s3path].write(Pathname.new(tmp.path), :acl => :bucket_owner_full_control) # close temp file tmp.close! # copy gz on s3 to redshift s3_uri = "s3://#{@s3_bucket}/#{s3path}" copy_sql = @copy_sql_template % [s3_uri, @aws_sec_key] sql = @insert_sql_template % [copy_sql.gsub(/'/, "\\\\'")] insert_sql_to_redshift(sql) true # for debug end
Protected Instance Methods
format_log(message)
click to toggle source
# File lib/fluent/plugin/out_hekk_redshift.rb, line 124 def format_log(message) (@log_suffix and not @log_suffix.empty?) ? "#{message} #{@log_suffix}" : message end
Private Instance Methods
create_gz_file_from_flat_data(dst_file, chunk)
click to toggle source
# File lib/fluent/plugin/out_hekk_redshift.rb, line 137 def create_gz_file_from_flat_data(dst_file, chunk) gzw = nil begin gzw = Zlib::GzipWriter.new(dst_file) chunk.write_to(gzw) ensure gzw.close rescue nil if gzw end dst_file end
create_gz_file_from_structured_data(dst_file, chunk, delimiter)
click to toggle source
# File lib/fluent/plugin/out_hekk_redshift.rb, line 148 def create_gz_file_from_structured_data(dst_file, chunk, delimiter) # fetch the table definition from redshift redshift_table_columns = fetch_table_columns if redshift_table_columns == nil raise 'failed to fetch the redshift table definition.' elsif redshift_table_columns.empty? $log.warn format_log("no table on redshift. table_name=#{table_name_with_schema}") return nil end # convert json to tsv format text gzw = nil begin gzw = Zlib::GzipWriter.new(dst_file) chunk.msgpack_each do |record| begin hash = json? ? json_to_hash(record[@record_log_tag]) : record[@record_log_tag] tsv_text = hash_to_table_text(redshift_table_columns, hash, delimiter) gzw.write(tsv_text) if tsv_text and not tsv_text.empty? rescue if json? $log.error format_log("failed to create table text from json. text=(#{record[@record_log_tag]})"), :error=>$!.to_s else $log.error format_log("failed to create table text from msgpack. text=(#{record[@record_log_tag]})"), :error=>$!.to_s end $log.error_backtrace end end return nil unless gzw.pos > 0 ensure gzw.close rescue nil if gzw end dst_file end
create_s3path(bucket, path)
click to toggle source
# File lib/fluent/plugin/out_hekk_redshift.rb, line 258 def create_s3path(bucket, path) timestamp_key = (@utc) ? Time.now.utc.strftime(@timestamp_key_format) : Time.now.strftime(@timestamp_key_format) i = 0 begin suffix = "_#{'%02d' % i}" s3path = "#{path}#{timestamp_key}#{suffix}.gz" i += 1 end while bucket.objects[s3path].exists? s3path end
determine_delimiter(file_type)
click to toggle source
# File lib/fluent/plugin/out_hekk_redshift.rb, line 184 def determine_delimiter(file_type) case file_type when 'json', 'msgpack', 'tsv' "\t" when 'csv' ',' else raise Fluent::ConfigError, "Invalid file_type:#{file_type}." end end
fetch_columns_sql_with_schema()
click to toggle source
# File lib/fluent/plugin/out_hekk_redshift.rb, line 208 def fetch_columns_sql_with_schema @fetch_columns_sql ||= if @redshift_schemaname "select column_name from INFORMATION_SCHEMA.COLUMNS where table_schema = '#{@redshift_schemaname}' and table_name = '#{@redshift_tablename}' order by ordinal_position;" else "select column_name from INFORMATION_SCHEMA.COLUMNS where table_name = '#{@redshift_tablename}' order by ordinal_position;" end end
fetch_table_columns()
click to toggle source
# File lib/fluent/plugin/out_hekk_redshift.rb, line 195 def fetch_table_columns conn = PG.connect(@db_conf) begin columns = nil conn.exec(fetch_columns_sql_with_schema) do |result| columns = result.collect{|row| row['column_name']} end columns ensure conn.close rescue nil end end
generate_line_with_delimiter(val_list, delimiter)
click to toggle source
# File lib/fluent/plugin/out_hekk_redshift.rb, line 247 def generate_line_with_delimiter(val_list, delimiter) val_list = val_list.collect do |val| if val.nil? or val.empty? '' else val.gsub(/\\/, "\\\\\\").gsub(/\t/, "\\\t").gsub(/\n/, "\\\n") # escape tab, newline and backslash end end val_list.join(delimiter) + "\n" end
hash_to_table_text(redshift_table_columns, hash, delimiter)
click to toggle source
# File lib/fluent/plugin/out_hekk_redshift.rb, line 224 def hash_to_table_text(redshift_table_columns, hash, delimiter) return '' unless hash # extract values from hash val_list = redshift_table_columns.collect do |cn| val = hash[cn] val = JSON.generate(val) if val.kind_of?(Hash) or val.kind_of?(Array) if val.to_s.empty? nil else val.to_s end end if val_list.all?{|v| v.nil? or v.empty?} $log.warn format_log("no data match for table columns on redshift. data=#{hash} table_columns=#{redshift_table_columns}") return '' end generate_line_with_delimiter(val_list, delimiter) end
insert_sql_to_redshift(sql)
click to toggle source
# File lib/fluent/plugin/out_hekk_redshift.rb, line 277 def insert_sql_to_redshift(sql) $log.debug format_log('start inserting copy command.') conn = nil begin conn = PG.connect(@db_conf) conn.exec(sql) $log.info format_log('completed inserting to redshift.') rescue PG::Error => e $log.error format_log("failed to insert copy command into redshift. sql=#{sql}"), :error=>e.to_s raise e unless e.to_s =~ IGNORE_REDSHIFT_ERROR_REGEXP return false # for debug ensure conn.close rescue nil if conn end end
json?()
click to toggle source
# File lib/fluent/plugin/out_hekk_redshift.rb, line 129 def json? @file_type == 'json' end
json_to_hash(json_text)
click to toggle source
# File lib/fluent/plugin/out_hekk_redshift.rb, line 216 def json_to_hash(json_text) return nil if json_text.to_s.empty? JSON.parse(json_text) rescue => e $log.warn format_log('failed to parse json. '), :error => e.to_s end
msgpack?()
click to toggle source
# File lib/fluent/plugin/out_hekk_redshift.rb, line 133 def msgpack? @file_type == 'msgpack' end
table_name_with_schema()
click to toggle source
# File lib/fluent/plugin/out_hekk_redshift.rb, line 269 def table_name_with_schema @table_name_with_schema ||= if @redshift_schemaname "#{@redshift_schemaname}.#{@redshift_tablename}" else @redshift_tablename end end