class LogStash::Outputs::MonascaLogApi
This Logstash Output plugin, sends events to monasca-api. It authenticates against keystone and gets a token. The token is used to authenticate against the monasca-api and send log events.
Constants
- JSON_DIMS
- JSON_LOGS
Attributes
logs[RW]
start_time[RW]
time_thread[RW]
Public Instance Methods
close()
click to toggle source
# File lib/logstash/outputs/monasca_log_api.rb, line 86 def close stop_time_check end
multi_receive(events)
click to toggle source
# File lib/logstash/outputs/monasca_log_api.rb, line 79 def multi_receive(events) @logger.debug("Retrieving #{events.size} events") events.each do |event| encode(event) end end
register()
click to toggle source
# File lib/logstash/outputs/monasca_log_api.rb, line 64 def register check_config @mutex = Mutex.new @logger.info('Registering keystone user', :username => username, :project_name => project_name) @monasca_log_api_client = LogStash::Outputs::Monasca::MonascaLogApiClient .new monasca_log_api_url, monasca_log_api_insecure @logs = Hash.new @start_time = nil @cross_tenant = nil init_token initialize_logs_object start_time_check end
Private Instance Methods
add_log(log, cross_tenant)
click to toggle source
# File lib/logstash/outputs/monasca_log_api.rb, line 265 def add_log(log, cross_tenant) @logs[JSON_LOGS].push(log) if @logs[JSON_LOGS].size == 1 @start_time = Time.now end @cross_tenant = cross_tenant end
bytesize_of(entry)
click to toggle source
# File lib/logstash/outputs/monasca_log_api.rb, line 220 def bytesize_of(entry) entry.to_json.bytesize / 1024.0 end
check_config()
click to toggle source
# File lib/logstash/outputs/monasca_log_api.rb, line 273 def check_config bad_val = [] bad_val << 'num_of_logs' if num_of_logs <= 0 bad_val << 'elapsed_time_sec' if elapsed_time_sec <= 0 bad_val << 'delay' if delay <= 0 bad_val << 'max_data_size_kb' if max_data_size_kb <= 0 unless bad_val.empty? err = "Value of #{bad_val.join(', ')} need to be bigger than 0" raise LogStash::ConfigurationError, err end end
check_token()
click to toggle source
# File lib/logstash/outputs/monasca_log_api.rb, line 183 def check_token token = LogStash::Outputs::Keystone::Token.instance token.request_new_token( username, user_domain_name, password, project_name, project_domain_name) unless token.valid? end
encode(event)
click to toggle source
# File lib/logstash/outputs/monasca_log_api.rb, line 98 def encode(event) log, cross_tenant = generate_log_from_event(event) log_bytesize = bytesize_of(log) logs_bytesize = bytesize_of(@logs) if @logs[JSON_LOGS] # if new cross_tenant appears request has to be sent if @cross_tenant != cross_tenant @logger.debug('new cross_tenant. Sending logs.') @mutex.synchronize do send_logs add_log log, cross_tenant end return # if new log would exceed the bytesize then send logs without the new log elsif (logs_bytesize + log_bytesize) > max_data_size_kb @logger.debug('bytesize reached. Sending logs') @mutex.synchronize do send_logs add_log log, cross_tenant end return # if the new log would reach the maximum bytesize or the maximum allowed # number of sendable logs is reached elsif (@logs[JSON_LOGS].size + 1 >= num_of_logs) @logger.debug('bytesize or maximum number of logs reached. Sending logs') @mutex.synchronize do add_log log, cross_tenant send_logs end return end end # still free space to collect logs @mutex.synchronize do add_log log, cross_tenant end return end
generate_log_from_event(event)
click to toggle source
# File lib/logstash/outputs/monasca_log_api.rb, line 141 def generate_log_from_event(event) message = event.to_hash['message'] path = event.to_hash['path'] if event.to_hash['path'] stream = event.to_hash['stream'] if event.to_hash['stream'] local_dims = JSON.parse(event.to_hash['dimensions'].to_s) if event.to_hash['dimensions'] type = event.to_hash['type'] if event.to_hash['type'] cross_tenant = event.to_hash['cross_tenant'] if event.to_hash['cross_tenant'] log = { 'message' => message, 'dimensions' => {} } path && log['dimensions']['path'] = path stream && log['dimensions']['stream'] = stream log[JSON_DIMS]['type'] = type if type if local_dims begin JSON.parse(local_dims[0]) local_dims.each { |dim| parsed_dim = JSON.parse(dim) log[JSON_DIMS][parsed_dim[0].strip] = parsed_dim[1].strip } rescue log[JSON_DIMS][local_dims[0].strip] = local_dims[1].strip end end return log, cross_tenant end
get_sleep_time(attempt)
click to toggle source
# File lib/logstash/outputs/monasca_log_api.rb, line 285 def get_sleep_time(attempt) sleep_for = attempt**2 sleep_for <= 60 ? sleep_for : 60 end
init_token()
click to toggle source
# File lib/logstash/outputs/monasca_log_api.rb, line 92 def init_token token = LogStash::Outputs::Keystone::Token.instance token.set_keystone_client(keystone_api_url, keystone_api_insecure) check_token end
initialize_logs_object()
click to toggle source
# File lib/logstash/outputs/monasca_log_api.rb, line 170 def initialize_logs_object if @logs.empty? global_dims = {} dimensions.each { |dim| global_dims[dim.split(':')[0].strip] = dim.split(':')[1].strip } if dimensions @logs = {} @logs[JSON_DIMS] = global_dims unless global_dims.empty? @logs[JSON_LOGS] = [] @cross_tenant = nil end end
send_logs()
click to toggle source
# File lib/logstash/outputs/monasca_log_api.rb, line 224 def send_logs if @logs[JSON_LOGS] && !@logs[JSON_LOGS].empty? check_token token = LogStash::Outputs::Keystone::Token.instance @logger.debug("Sending #{@logs[JSON_LOGS].size} logs") retry_tries = 5 attempt = 0 begin tries ||= retry_tries @monasca_log_api_client.send_logs(@logs, token.id, @cross_tenant) rescue LogStash::Outputs::Monasca::MonascaLogApiClient::InvalidTokenError => e tries -= 1 if tries > 0 @logger.info("Unauthorized: #{e}. Requesting new token.") token.request_new_token( username, user_domain_name, password, project_name, project_domain_name ) retry else @logger.error("Unauthorized: #{e}. Requesting new token failed "\ "after #{retry_tries} retries.") sleep_for_retry(60) retry end rescue => e attempt += 1 sleep_for = get_sleep_time(attempt) @logger.error("Sending event to monasca-log-api threw exception, "\ "will sleep for #{sleep_for} seconds.", :exceptionew => e) sleep_for_retry(sleep_for) retry end @logs.clear initialize_logs_object end end
sleep_for_retry(duration)
click to toggle source
# File lib/logstash/outputs/monasca_log_api.rb, line 290 def sleep_for_retry(duration) Stud.stoppable_sleep(duration) { @stopping } end
start_time_check()
click to toggle source
# File lib/logstash/outputs/monasca_log_api.rb, line 190 def start_time_check @time_thread = Thread.new do loop do unless @start_time.nil? if @logs[JSON_LOGS] and (@logs[JSON_LOGS].size > 0) and ((Time.now - @start_time) >= elapsed_time_sec) @logger.debug('Time elapsed. Sending logs') @mutex.synchronize do send_logs end end end sleep delay end end end
stop_time_check()
click to toggle source
# File lib/logstash/outputs/monasca_log_api.rb, line 210 def stop_time_check #ensure that entries buffered in queue will be handled before stop @stopping = true @mutex.synchronize do send_logs end @time_thread.kill() if @time_thread @logger.info('Stopped time_check thread') end