class AzureEventHubsSplunkHttpSender

Public Class Methods

new(plugin, connection_string, hub_name, expiry=3600, proxy_addr='', proxy_port=3128, open_timeout=60, read_timeout=60) click to toggle source
# File lib/fluent/plugin/azureeventhubsplunk/http.rb, line 3
def initialize(plugin, connection_string, hub_name, expiry=3600, proxy_addr='', proxy_port=3128, open_timeout=60, read_timeout=60)
  require 'openssl'
  require 'base64'
  require 'net/http'
  require 'json'
  require 'cgi'
  require 'time'
  @plugin = plugin
  @connection_string = connection_string
  @hub_name = hub_name
  @expiry_interval = expiry
  @proxy_addr = proxy_addr
  @proxy_port = proxy_port
  @open_timeout = open_timeout
  @read_timeout = read_timeout

  if @connection_string.count(';') != 2
    raise "Connection String format is not correct"
  end

  @connection_string.split(';').each do |part|
    if ( part.index('Endpoint') == 0 )
      @endpoint = 'https' + part[11..-1]
    elsif ( part.index('SharedAccessKeyName') == 0 )
      @sas_key_name = part[20..-1]
    elsif ( part.index('SharedAccessKey') == 0 )
      @sas_key_value = part[16..-1]
    end
  end
  @uri = URI.parse("#{@endpoint}#{@hub_name}/messages")
  @plugin.log.info "Initialized AzureEventHubsSplunkHttpSender. uri: #{@uri}"
end

Public Instance Methods

send(payload) click to toggle source
# File lib/fluent/plugin/azureeventhubsplunk/http.rb, line 47
def send(payload)
  send_w_properties(payload, nil)
end
send_w_properties(payload, properties) click to toggle source
# File lib/fluent/plugin/azureeventhubsplunk/http.rb, line 51
def send_w_properties(payload, properties)
  token = generate_sas_token(@uri.to_s)
  headers = {
    'Content-Type' => 'application/json;type=entry;charset=utf-8',
    'Authorization' => token
  }
  if not properties.nil?
    headers = headers.merge(properties)
  end
  if (@proxy_addr.to_s.empty?)
      https = Net::HTTP.new(@uri.host, @uri.port)
      https.open_timeout = @open_timeout
      https.read_timeout = @read_timeout
  else
      https = Net::HTTP.new(@uri.host, @uri.port,@proxy_addr,@proxy_port)
      https.open_timeout = @open_timeout
      https.read_timeout = @read_timeout
  end
  https.use_ssl = true
  req = Net::HTTP::Post.new(@uri.request_uri, headers)
  req.body = payload.to_json
  @plugin.log.info "Sending payload to EventHub: #{req.body}"
  res = https.request(req)
  @plugin.log.info "Response code from EventHub: #{res.code}"
  @plugin.log.info "Response body from EventHub: #{res.body}"
rescue Timeout::Error, Errno::EINVAL, Errno::ECONNRESET, EOFError, Errno::ETIMEDOUT, Net::HTTPBadResponse, Net::HTTPHeaderSyntaxError, Net::ProtocolError => e
end

Private Instance Methods

generate_sas_token(uri) click to toggle source
# File lib/fluent/plugin/azureeventhubsplunk/http.rb, line 36
def generate_sas_token(uri)
  target_uri = CGI.escape(uri.downcase).downcase
  expiry = Time.now.to_i + @expiry_interval
  to_sign = "#{target_uri}\n#{expiry}";
  signature = CGI.escape(Base64.encode64(OpenSSL::HMAC.digest(OpenSSL::Digest.new('sha256'), @sas_key_value, to_sign)).strip())
  token = "SharedAccessSignature sr=#{target_uri}&sig=#{signature}&se=#{expiry}&skn=#{@sas_key_name}"
  return token
end