class Sensu::Transport::SNSSQS

Constants

KEEPALIVES_STR
NUMBER_STR
PIPE_ARR
PIPE_STR
STRING_STR
TYPE_STR

Attributes

logger[RW]

Public Class Methods

new() click to toggle source
# File lib/sensu/transport/snssqs.rb, line 19
def initialize
  @connected = false
  @subscribing = false
  @history = {}
  @metrics_buffer = ''
  @metrics_last_flush = 0

  # as of sensu 0.23.0 we need to call succeed when we have
  # successfully connected to SQS.
  #
  # we already have our own logic to maintain the connection to
  # SQS, so we can always say we're connected.
  #
  # See:
  # https://github.com/sensu/sensu/blob/cdc25b29169ef2dcd2e056416eab0e83dbe000bb/CHANGELOG.md#0230---2016-04-04
  succeed
end

Public Instance Methods

acknowledge(info) { |info| ... } click to toggle source

acknowledge will delete the given message from the SQS queue.

# File lib/sensu/transport/snssqs.rb, line 138
def acknowledge(info, &callback)
  EM.defer do
    @sqs.delete_message(
      queue_url: @settings[:consuming_sqs_queue_url],
      receipt_handle: info.receipt_handle
    )
    statsd_incr("sqs.#{@settings[:consuming_sqs_queue_url]}.message.deleted")
    yield(info) if callback
  end
end
connect(settings) click to toggle source
# File lib/sensu/transport/snssqs.rb, line 41
def connect(settings)
  @settings = settings
  @connected = true
  @results_callback = proc {}
  @keepalives_callback = proc {}
  # Sensu Windows install does not include a valid cert bundle for AWS
  Aws.use_bundled_cert! if Gem.win_platform?
  aws_client_settings = { region: @settings[:region] }
  unless @settings[:access_key_id].nil?
    aws_client_settings[:access_key_id] = @settings[:access_key_id]
    aws_client_settings[:secret_access_key] = @settings[:secret_access_key]
  end
  @sqs = Aws::SQS::Client.new aws_client_settings
  @sns = Aws::SNS::Client.new aws_client_settings

  # connect to statsd, if necessary
  @statsd = nil
  if !@settings[:statsd_addr].nil? && @settings[:statsd_addr] != ''
    pieces = @settings[:statsd_addr].split(':')
    @statsd = Statsd.new(pieces[0], pieces[1].to_i).tap do |sd|
      sd.namespace = @settings[:statsd_namespace]
    end
    @statsd_sample_rate = @settings[:statsd_sample_rate].to_f
  end

  # setup custom buffer
  @settings[:buffer_messages] = @settings.fetch(:buffer_messages, true)
  @settings[:check_min_ok] = @settings.fetch(:check_min_ok, 10)
  @settings[:check_max_delay] = @settings.fetch(:check_max_delay, 1800)
  @settings[:metrics_max_size] = @settings.fetch(:metrics_max_size, 102_400)
  @settings[:metrics_max_delay] = @settings.fetch(:metrics_max_delay, 900)
end
connected?() click to toggle source
# File lib/sensu/transport/snssqs.rb, line 37
def connected?
  @connected
end
handleBuffer(raw_message) click to toggle source
# File lib/sensu/transport/snssqs.rb, line 149
def handleBuffer(raw_message)
  json_message = ::JSON.parse raw_message
  drop = false

  if @settings[:buffer_messages] && json_message.key?('check') && json_message.key?('client')
    if json_message['check']['type'] != 'metric'
      return handleBufferCheckMessage(raw_message, json_message)
    elsif json_message['check']['type'] == 'metric'
      return handleBufferMetricMessage(raw_message, json_message)
    end
  end

  {
    'raw_message' => raw_message,
    'json_message' => json_message,
    'drop' => drop
  }
end
handleBufferCheckMessage(raw_message, json_message) click to toggle source
# File lib/sensu/transport/snssqs.rb, line 168
def handleBufferCheckMessage(raw_message, json_message)
  drop = false
  json_message['check']['type'] = 'standard' unless json_message['check'].key?('type')

  # create initial client history
  unless @history.key? json_message['client']
    logger.debug("[transport-snssqs] creating event history for client #{json_message['client']}")
    @history[json_message['client']] = {}
  end
  # create initial check history
  unless @history[json_message['client']].key? json_message['check']['name']
    logger.debug("[transport-snssqs] creating event history for check #{json_message['check']['name']}")
    @history[json_message['client']][json_message['check']['name']] = { 'ok_count' => 0, 'last_event' => 0 }
  end

  # handle ok events
  if json_message['check']['status'] == 0 && json_message['check'].key?('aggregate') == false && json_message['check'].key?('ttl') == false && json_message['check'].key?('force_resolve') == false
    @history[json_message['client']][json_message['check']['name']]['ok_count'] += 1

    if @history[json_message['client']][json_message['check']['name']]['ok_count'] < @settings[:check_min_ok]
      # history ok_count is too low
      logger.debug("[transport-snssqs] sending event because history ok_count #{@history[json_message['client']][json_message['check']['name']]['ok_count']} is too low for #{json_message['check']['name']}")
      @history[json_message['client']][json_message['check']['name']]['last_event'] = Time.now.to_i
    else
      max_delay = @settings[:check_max_delay]
      if json_message['check']['name'] == 'keepalive' && json_message['check'].key?('thresholds')
        max_delay = json_message['check']['thresholds']['warning'] if json_message['check']['thresholds'].key?('warning')
      end
      if @history[json_message['client']][json_message['check']['name']]['last_event'] < (Time.now.to_i - max_delay)
        # history last_event is too old
        logger.debug("[transport-snssqs] sending event because last_event history #{Time.now.to_i - @history[json_message['client']][json_message['check']['name']]['last_event']} is too old for #{json_message['check']['name']}")
        @history[json_message['client']][json_message['check']['name']]['last_event'] = Time.now.to_i
      else
        # history last_event is recent
        logger.debug("[transport-snssqs] skipping event because last_event history #{Time.now.to_i - @history[json_message['client']][json_message['check']['name']]['last_event']} is recent for #{json_message['check']['name']}")
        # ignore whole message
        drop = true
      end
    end
  # handle error events
  else
    # reset history
    logger.debug("[transport-snssqs] reseting event history for #{json_message['check']['name']}")
    @history[json_message['client']][json_message['check']['name']]['ok_count'] = 0
    @history[json_message['client']][json_message['check']['name']]['last_event'] = 0
  end

  {
    'raw_message' => raw_message,
    'json_message' => json_message,
    'drop' => drop
  }
end
handleBufferMetricMessage(raw_message, json_message) click to toggle source
# File lib/sensu/transport/snssqs.rb, line 222
def handleBufferMetricMessage(raw_message, json_message)
  drop = false

  if json_message['check']['status'] == 0 && json_message['check'].key?('force_resolve') == false

    @metrics_buffer += json_message['check']['output']
    if @metrics_buffer.length > 102_400 || @metrics_last_flush < ((Time.now.to_i - @settings[:metrics_max_delay]))
      json_message['check']['name'] = 'combined_metrics'
      json_message['check']['command'] = 'combined metrics by snssqs'
      json_message['check']['interval'] = @settings[:metrics_max_delay]
      json_message['check']['output'] = @metrics_buffer

      raw_message = json_message.to_json
      logger.info("[transport-snssqs] flushing metrics buffer #{@metrics_buffer.length}")
      @metrics_buffer = ''
      @metrics_last_flush = Time.now.to_i
    else
      # ignore whole message
      logger.debug("[transport-snssqs] storing output in metrics buffer #{@metrics_buffer.length}")
      drop = true
    end
  end
  
  if json_message['check']['status'] != 0
    drop = true
  end

  {
    'raw_message' => raw_message,
    'json_message' => json_message,
    'drop' => drop
  }
end
publish(type, pipe, message, options = {}, &callback) click to toggle source

publish publishes a message to the SNS topic.

The type, pipe, and options are transformed into SNS message attributes and included with the message.

# File lib/sensu/transport/snssqs.rb, line 260
def publish(type, pipe, message, options = {}, &callback)
  result = handleBuffer(message)
  if result['drop']
    return
  else
    message = result['raw_message']
    json_message = result['json_message']
  end

  attributes = {
    TYPE_STR => str_attr(type),
    PIPE_STR => str_attr(pipe)
  }

  attributes['client'] = str_attr(json_message['client']) if json_message.key?('client')
  if json_message.key?('check')
    attributes['check_name'] = str_attr(json_message['check']['name']) if json_message['check'].key?('name')
    attributes['check_type'] = str_attr(json_message['check']['type']) if json_message['check'].key?('type')
    attributes['check_status'] = int_attr(json_message['check']['status']) if json_message['check'].key?('status')
    attributes['check_force_resolve'] = int_attr(json_message['check']['status']) if json_message['check'].key?('check_force_resolve')
  end

  options.each do |k, v|
    attributes[k.to_s] = str_attr(v.to_s)
  end
  EM.defer { send_message(message, attributes, &callback) }
end
statsd_incr(stat) click to toggle source
# File lib/sensu/transport/snssqs.rb, line 74
def statsd_incr(stat)
  @statsd.increment(stat, @statsd_sample_rate) unless @statsd.nil?
end
statsd_time(stat) { || ... } click to toggle source
# File lib/sensu/transport/snssqs.rb, line 78
def statsd_time(stat)
  # always measure + run the block, but only if @statsd is set
  # do we actually report it.
  start = Time.now
  result = yield
  unless @statsd.nil?
    @statsd.timing(stat, ((Time.now - start) * 1000).round(5), @statsd_sample_rate)
  end
  result
end
subscribe(type, pipe, funnel = nil, _options = {}, &callback) click to toggle source

subscribe will begin “subscribing” to the consuming sqs queue.

This method is intended for use by the Sensu server; fanout subscriptions initiated by the Sensu client process are treated as a no-op.

What this really means is that we will start polling for messages from the SQS queue, and, depending on the message type, it will call the appropriate callback.

This assumes that the SQS Queue is consuming “Raw” messages from SNS.

“subscribing” means that the “callback” parameter will be called when there is a message for you to consume.

“funnel” and “type” parameters are completely ignored.

# File lib/sensu/transport/snssqs.rb, line 106
def subscribe(type, pipe, funnel = nil, _options = {}, &callback)
  if type == :fanout
    logger.debug("skipping unsupported fanout subscription type=#{type}, pipe=#{pipe}, funnel=#{funnel}")
    return
  end

  logger.info("subscribing to type=#{type}, pipe=#{pipe}, funnel=#{funnel}")

  if pipe == KEEPALIVES_STR
    @keepalives_callback = callback
  else
    @results_callback = callback
  end

  unless @subscribing
    do_all_the_time do
      EM::Iterator.new(receive_messages, 10).each do |msg, iter|
        statsd_time("sqs.#{@settings[:consuming_sqs_queue_url]}.process_timing") do
          if msg.message_attributes[PIPE_STR].string_value == KEEPALIVES_STR
            @keepalives_callback.call(msg, msg.body)
          else
            @results_callback.call(msg, msg.body)
          end
        end
        iter.next
      end
    end
    @subscribing = true
  end
end

Private Instance Methods

do_all_the_time(&blk) click to toggle source
# File lib/sensu/transport/snssqs.rb, line 298
def do_all_the_time(&blk)
  callback = proc {
    do_all_the_time(&blk)
  }
  EM.defer(blk, callback)
end
int_attr(nr) click to toggle source
# File lib/sensu/transport/snssqs.rb, line 294
def int_attr(nr)
  { data_type: NUMBER_STR, string_value: nr.to_s }
end
receive_messages() click to toggle source

receive_messages returns an array of SQS messages for the consuming queue

# File lib/sensu/transport/snssqs.rb, line 322
def receive_messages
  resp = @sqs.receive_message(
    message_attribute_names: PIPE_ARR,
    queue_url: @settings[:consuming_sqs_queue_url],
    wait_time_seconds: @settings[:wait_time_seconds],
    max_number_of_messages: @settings[:max_number_of_messages]
  )
  resp.messages.select do |msg|
    # switching whether to transform the message based on the existance of message_attributes
    # if this is a raw SNS message, it exists in the root of the message and no conversion is needed
    # if it doesn't, it is an encapsulated messsage (meaning the SNS message is a stringified JSON in the body of the SQS message)
    begin
      logger.debug('[transport-snssqs] msg parse start')
      unless msg.key? 'message_attributes'
        # extracting original SNS message
        json_message = ::JSON.parse msg.body
        logger.debug('[transport-snssqs] msg parsed from JSON')
        # if there is no Message, this isn't a SNS message and something has gone terribly wrong
        unless json_message.key? 'Message'
          logger.info('[transport-snssqs] msg body without SNS Message received')
          next
        end
        # replacing the body with the SNS message (as it would be in a raw delivered SNS-SQS message)
        msg.body = json_message['Message']
        msg.message_attributes = {}
        # discarding messages without attributes, since this would lead to an exception in subscribe
        unless json_message.key? 'MessageAttributes'
          logger.info('[transport-snssqs] msg body without message attributes received')
          next
        end
        # parsing the message_attributes
        json_message['MessageAttributes'].each do |name, value|
          msg.message_attributes[name] = Aws::SQS::Types::MessageAttributeValue.new
          msg.message_attributes[name].string_value = value['Value']
          msg.message_attributes[name].data_type = 'String'
        end
      end
      logger.debug('[transport-snssqs] msg parsed successfully')
      msg
    rescue ::JSON::JSONError => e
      logger.info(e)
    end
  end
rescue Aws::SQS::Errors::ServiceError => e
  logger.info(e)
end
send_message(msg, attributes) { |{ response: resp }| ... } click to toggle source
# File lib/sensu/transport/snssqs.rb, line 305
def send_message(msg, attributes, &callback)
  resp = '' if false # need to set this before the retries block
  with_retries(max_tries: 2, base_sleep_seconds: 5.0, max_sleep_seconds: 15.0) do
    resp = @sns.publish(
      target_arn: @settings[:publishing_sns_topic_arn],
      message: msg,
      message_attributes: attributes
    )
  end
  statsd_incr("sns.#{@settings[:publishing_sns_topic_arn]}.message.published")
  yield({ response: resp }) if callback
end
str_attr(str) click to toggle source
# File lib/sensu/transport/snssqs.rb, line 290
def str_attr(str)
  { data_type: STRING_STR, string_value: str }
end