class Fluent::AzuretablesOutput
Constants
- BATCHWRITE_ENTITY_LIMIT
- BATCHWRITE_SIZE_LIMIT
- ENTITY_SIZE_LIMIT
Public Instance Methods
configure(conf)
click to toggle source
This method is called before starting. 'conf' is a Hash that includes configuration parameters. If the configuration is invalid, raise Fluent::ConfigError.
Calls superclass method
# File lib/fluent/plugin/out_azuretables.rb, line 27 def configure(conf) super unless @partition_keys.nil? @partition_key_array = @partition_keys.split(',') end unless @row_keys.nil? @row_key_array = @row_keys.split(',') end @row_key_cnt = 0 end
format(tag, time, record)
click to toggle source
create entity from event record
# File lib/fluent/plugin/out_azuretables.rb, line 76 def format(tag, time, record) partition_keys = [] row_keys = [] record.each_pair do |name, val| if @partition_key_array && @partition_key_array.include?(name) partition_keys << val record.delete(name) elsif @row_key_array && @row_key_array.include?(name) row_keys << val record.delete(name) end end partition_keys << Time.now.strftime("%Y%m%d") row_keys << Time.now.getutc.to_i row_keys << @row_key_cnt @row_key_cnt += 1 entity = Hash.new entity['partition_key'] = partition_keys.join(@key_delimiter) entity['row_key'] = row_keys.join(@key_delimiter) entity['entity_values'] = record entity.to_msgpack end
format_key(record, keys, key_delimiter)
click to toggle source
# File lib/fluent/plugin/out_azuretables.rb, line 100 def format_key(record, keys, key_delimiter) ret = [] record.each_pair do |name, val| ret << val if keys.include?(name) end ret.join(key_delimiter) end
insert_entities(partition_key, entities)
click to toggle source
# File lib/fluent/plugin/out_azuretables.rb, line 129 def insert_entities(partition_key, entities) begin batch = Azure::Table::Batch.new(@table, partition_key) do entities.each do |entity| insert entity['row_key'], entity['entity_values'] end end return @azure_table_service.execute_batch(batch) rescue Exception => e log.fatal "UnknownError: '#{e}'" log.debug partition_key log.debug entities.inspect end end
shutdown()
click to toggle source
This method is called when shutting down.
Calls superclass method
# File lib/fluent/plugin/out_azuretables.rb, line 71 def shutdown super end
start()
click to toggle source
connect azure table storage service
Calls superclass method
# File lib/fluent/plugin/out_azuretables.rb, line 41 def start super unless @account_name.nil? || @access_key.nil? Azure.config.storage_account_name = @account_name Azure.config.storage_access_key = @access_key end begin @azure_table_service = Azure::Table::TableService.new # create table if not exits @azure_table_service.create_table(@table) if !table_exists?(@table) && @create_table_if_not_exists rescue Exception => e log.error e exit! end end
table_exists?(table_name)
click to toggle source
# File lib/fluent/plugin/out_azuretables.rb, line 58 def table_exists?(table_name) begin @azure_table_service.get_table(table_name) true rescue Azure::Core::Http::HTTPError => e false rescue Exception => e log.fatal "UnknownError: '#{e}'" exit! end end
write(chunk)
click to toggle source
# File lib/fluent/plugin/out_azuretables.rb, line 108 def write(chunk) batch_size = 0 group_entities = Hash.new chunk.msgpack_each do |entity| partition_key = entity['partition_key'] group_entities[partition_key] = [] unless group_entities.has_key?(partition_key) group_entities[partition_key] << entity batch_size += entity.to_json.length if group_entities[partition_key].size >= BATCHWRITE_ENTITY_LIMIT || batch_size >= BATCHWRITE_SIZE_LIMIT insert_entities(partition_key, group_entities[partition_key]) group_entities[partition_key] = [] batch_size = 0 end end unless group_entities.empty? group_entities.each_pair do |partition_key, entities| insert_entities(partition_key, entities) end end end