class LogStash::Outputs::MonascaLogApi

This Logstash Output plugin, sends events to monasca-api. It authenticates against keystone and gets a token. The token is used to authenticate against the monasca-api and send log events.

Constants

JSON_DIMS
JSON_LOGS

Attributes

logs[RW]
start_time[RW]
time_thread[RW]

Public Instance Methods

close() click to toggle source
# File lib/logstash/outputs/monasca_log_api.rb, line 86
def close
  stop_time_check
end
multi_receive(events) click to toggle source
# File lib/logstash/outputs/monasca_log_api.rb, line 79
def multi_receive(events)
  @logger.debug("Retrieving #{events.size} events")
  events.each do |event|
    encode(event)
  end
end
register() click to toggle source
# File lib/logstash/outputs/monasca_log_api.rb, line 64
def register
  check_config
  @mutex = Mutex.new
  @logger.info('Registering keystone user',
    :username => username, :project_name => project_name)
  @monasca_log_api_client = LogStash::Outputs::Monasca::MonascaLogApiClient
    .new monasca_log_api_url, monasca_log_api_insecure
  @logs = Hash.new
  @start_time = nil
  @cross_tenant = nil
  init_token
  initialize_logs_object
  start_time_check
end

Private Instance Methods

add_log(log, cross_tenant) click to toggle source
# File lib/logstash/outputs/monasca_log_api.rb, line 265
def add_log(log, cross_tenant)
  @logs[JSON_LOGS].push(log)
  if @logs[JSON_LOGS].size == 1
    @start_time = Time.now
  end
  @cross_tenant = cross_tenant
end
bytesize_of(entry) click to toggle source
# File lib/logstash/outputs/monasca_log_api.rb, line 220
def bytesize_of(entry)
  entry.to_json.bytesize / 1024.0
end
check_config() click to toggle source
# File lib/logstash/outputs/monasca_log_api.rb, line 273
def check_config
  bad_val = []
  bad_val << 'num_of_logs' if num_of_logs <= 0
  bad_val << 'elapsed_time_sec' if elapsed_time_sec <= 0
  bad_val << 'delay' if delay <= 0
  bad_val << 'max_data_size_kb' if max_data_size_kb <= 0
  unless bad_val.empty?
    err = "Value of #{bad_val.join(', ')} need to be bigger than 0"
    raise LogStash::ConfigurationError, err
  end
end
check_token() click to toggle source
# File lib/logstash/outputs/monasca_log_api.rb, line 183
def check_token
  token = LogStash::Outputs::Keystone::Token.instance
  token.request_new_token(
    username, user_domain_name, password,
    project_name, project_domain_name) unless token.valid?
end
encode(event) click to toggle source
# File lib/logstash/outputs/monasca_log_api.rb, line 98
def encode(event)
  log, cross_tenant = generate_log_from_event(event)
  log_bytesize = bytesize_of(log)
  logs_bytesize = bytesize_of(@logs)

  if @logs[JSON_LOGS]
    # if new cross_tenant appears request has to be sent
    if @cross_tenant != cross_tenant
      @logger.debug('new cross_tenant. Sending logs.')
      @mutex.synchronize do
        send_logs
        add_log log, cross_tenant
      end
      return

    # if new log would exceed the bytesize then send logs without the new log
    elsif (logs_bytesize + log_bytesize) > max_data_size_kb
      @logger.debug('bytesize reached. Sending logs')
      @mutex.synchronize do
        send_logs
        add_log log, cross_tenant
      end
      return

    # if the new log would reach the maximum bytesize or the maximum allowed
    # number of sendable logs is reached
    elsif (@logs[JSON_LOGS].size + 1 >= num_of_logs)
      @logger.debug('bytesize or maximum number of logs reached. Sending logs')
      @mutex.synchronize do
        add_log log, cross_tenant
        send_logs
      end
      return
    end
  end

  # still free space to collect logs
  @mutex.synchronize do
    add_log log, cross_tenant
  end
  return
end
generate_log_from_event(event) click to toggle source
# File lib/logstash/outputs/monasca_log_api.rb, line 141
def generate_log_from_event(event)
  message = event.to_hash['message']
  path = event.to_hash['path'] if event.to_hash['path']
  stream = event.to_hash['stream'] if event.to_hash['stream']
  local_dims = JSON.parse(event.to_hash['dimensions'].to_s) if
    event.to_hash['dimensions']
  type = event.to_hash['type'] if event.to_hash['type']
  cross_tenant = event.to_hash['cross_tenant'] if
    event.to_hash['cross_tenant']

  log = { 'message' => message, 'dimensions' => {} }
  path && log['dimensions']['path'] = path
  stream && log['dimensions']['stream'] = stream

  log[JSON_DIMS]['type'] = type if type
  if local_dims
    begin
      JSON.parse(local_dims[0])
      local_dims.each { |dim|
        parsed_dim = JSON.parse(dim)
        log[JSON_DIMS][parsed_dim[0].strip] = parsed_dim[1].strip
      }
    rescue
      log[JSON_DIMS][local_dims[0].strip] = local_dims[1].strip
    end
  end
  return log, cross_tenant
end
get_sleep_time(attempt) click to toggle source
# File lib/logstash/outputs/monasca_log_api.rb, line 285
def get_sleep_time(attempt)
  sleep_for = attempt**2
  sleep_for <= 60 ? sleep_for : 60
end
init_token() click to toggle source
# File lib/logstash/outputs/monasca_log_api.rb, line 92
def init_token
  token = LogStash::Outputs::Keystone::Token.instance
  token.set_keystone_client(keystone_api_url, keystone_api_insecure)
  check_token
end
initialize_logs_object() click to toggle source
# File lib/logstash/outputs/monasca_log_api.rb, line 170
def initialize_logs_object
  if @logs.empty?
    global_dims = {}
    dimensions.each { |dim|
      global_dims[dim.split(':')[0].strip] = dim.split(':')[1].strip
      } if dimensions
    @logs = {}
    @logs[JSON_DIMS] = global_dims unless global_dims.empty?
    @logs[JSON_LOGS] = []
    @cross_tenant = nil
  end
end
send_logs() click to toggle source
# File lib/logstash/outputs/monasca_log_api.rb, line 224
def send_logs
  if @logs[JSON_LOGS] && !@logs[JSON_LOGS].empty?
    check_token
    token = LogStash::Outputs::Keystone::Token.instance
    @logger.debug("Sending #{@logs[JSON_LOGS].size} logs")
    retry_tries = 5
    attempt = 0
    begin
      tries ||= retry_tries
      @monasca_log_api_client.send_logs(@logs, token.id, @cross_tenant)
    rescue LogStash::Outputs::Monasca::MonascaLogApiClient::InvalidTokenError => e
      tries -= 1
      if tries > 0
        @logger.info("Unauthorized: #{e}. Requesting new token.")
        token.request_new_token(
          username, user_domain_name, password,
          project_name, project_domain_name
        )
        retry
      else
        @logger.error("Unauthorized: #{e}. Requesting new token failed "\
                      "after #{retry_tries} retries.")

        sleep_for_retry(60)
        retry
      end
    rescue => e
      attempt += 1
      sleep_for = get_sleep_time(attempt)
      @logger.error("Sending event to monasca-log-api threw exception, "\
                    "will sleep for #{sleep_for} seconds.",
                    :exceptionew => e)

      sleep_for_retry(sleep_for)
      retry
    end
    @logs.clear
    initialize_logs_object
  end
end
sleep_for_retry(duration) click to toggle source
# File lib/logstash/outputs/monasca_log_api.rb, line 290
def sleep_for_retry(duration)
  Stud.stoppable_sleep(duration) { @stopping }
end
start_time_check() click to toggle source
# File lib/logstash/outputs/monasca_log_api.rb, line 190
def start_time_check
  @time_thread = Thread.new do

    loop do
      unless @start_time.nil?

        if @logs[JSON_LOGS] and (@logs[JSON_LOGS].size > 0) and
          ((Time.now - @start_time) >= elapsed_time_sec)
          @logger.debug('Time elapsed. Sending logs')
          @mutex.synchronize do
            send_logs
          end
        end

      end
      sleep delay
    end
  end
end
stop_time_check() click to toggle source
# File lib/logstash/outputs/monasca_log_api.rb, line 210
def stop_time_check
  #ensure that entries buffered in queue will be  handled before stop
  @stopping = true
  @mutex.synchronize do
    send_logs
  end
  @time_thread.kill() if @time_thread
  @logger.info('Stopped time_check thread')
end