class LogStash::Outputs::GoogleBigQuery

Summary: plugin to upload log events to Google BigQuery (BQ), rolling files based on the date pattern provided as a configuration setting. Events are written to files locally and, once file is closed, this plugin uploads it to the configured BigQuery dataset.

VERY IMPORTANT: . To make good use of BigQuery, your log events should be parsed and structured. Consider using grok to parse your events into fields that can be uploaded to BQ. . You must configure your plugin so it gets events with the same structure, so the BigQuery schema suits them. In case you want to upload log events with different structures, you can utilize multiple configuration blocks, separating different log events with Logstash conditionals. More details on Logstash conditionals can be found here: logstash.net/docs/1.2.1/configuration#conditionals

For more info on Google BigQuery, please go to: developers.google.com/bigquery/

In order to use this plugin, a Google service account must be used. For more information, please refer to: developers.google.com/storage/docs/authentication#service_accounts

Recommendations:

. Experiment with the settings depending on how much log data you generate, your needs to see “fresh” data, and how much data you could lose in the event of crash. For instance, if you want to see recent data in BQ quickly, you could configure the plugin to upload data every minute or so (provided you have enough log events to justify that). Note also, that if uploads are too frequent, there is no guarantee that they will be imported in the same order, so later data may be available before earlier data.

. BigQuery charges for storage and for queries, depending on how much data it reads to perform a query. These are other aspects to consider when considering the date pattern which will be used to create new tables and also how to compose the queries when using BQ. For more info on BigQuery Pricing, please access: developers.google.com/bigquery/pricing

USAGE: This is an example of logstash config:

source,json

output {

google_bigquery {
  project_id => "folkloric-guru-278"                        (required)
  dataset => "logs"                                         (required)
  csv_schema => "path:STRING,status:INTEGER,score:FLOAT"    (required) <1>
  key_path => "/path/to/privatekey.p12"                     (required)
  key_password => "notasecret"                              (optional)
  service_account => "1234@developer.gserviceaccount.com"   (required)
  temp_directory => "/tmp/logstash-bq"                      (optional)
  temp_file_prefix => "logstash_bq"                         (optional)
  date_pattern => "%Y-%m-%dT%H:00"                          (optional)
  flush_interval_secs => 2                                  (optional)
  uploader_interval_secs => 60                              (optional)
  deleter_interval_secs => 60                               (optional)
}

}


<1> Specify either a csv_schema or a json_schema.

Improvements TODO list:

  • Refactor common code between Google BQ and GCS plugins.

  • Turn Google API code into a Plugin Mixin (like AwsConfig).

Public Instance Methods

close() click to toggle source
# File lib/logstash/outputs/google_bigquery.rb, line 263
def close
  @logger.debug("BQ: close method called")

  @temp_file.flush()
  @temp_file.close()
end
receive(event) click to toggle source

Method called for each log event. It writes the event to the current output file, flushing depending on flush interval configuration.

# File lib/logstash/outputs/google_bigquery.rb, line 227
def receive(event)
  @logger.debug("BQ: receive method called", :event => event)

  # TODO validate the schema if @ignore_unknown_values is off and alert the user now.
  # consider creating a bad-data table to store invalid records

  # Message must be written as json
  message = LogStash::Json.dump(event.to_hash)
  # Remove "@" from property names
  message = message.gsub(/\"@(\w+)\"/, '"\1"')

  new_base_path = get_base_path()

  # Time to roll file based on the date pattern? Or are we due to upload it to BQ?
  if (@current_base_path != new_base_path || Time.now - @last_file_time >= @uploader_interval_secs)
    @logger.debug("BQ: log file will be closed and uploaded",
                  :filename => File.basename(@temp_file.to_path),
                  :size => @temp_file.size.to_s,
                  :uploader_interval_secs => @uploader_interval_secs.to_s)
    # Close alone does not guarantee that data is physically written to disk,
    # so flushing it before.
    @temp_file.fsync()
    @temp_file.close()
    initialize_next_log()
  end

  @temp_file.write(message)
  @temp_file.write("\n")

  sync_log_file()

  @logger.debug("BQ: event appended to log file",
                :filename => File.basename(@temp_file.to_path))
end
register() click to toggle source
# File lib/logstash/outputs/google_bigquery.rb, line 181
def register
  require 'csv'
  require "fileutils"
  require "thread"

  @logger.debug("BQ: register plugin")

  if !@csv_schema.nil?
    @fields = Array.new

    CSV.parse(@csv_schema.gsub('\"', '""')).flatten.each do |field|
      temp = field.strip.split(":")

      # Check that the field in the schema follows the format (<name>:<value>)
      if temp.length != 2
        raise "BigQuery schema must follow the format <field-name>:<field-value>"
      end

      @fields << { "name" => temp[0], "type" => temp[1] }
    end

    # Check that we have at least one field in the schema
    if @fields.length == 0
      raise "BigQuery schema must contain at least one field"
    end

    @json_schema = { "fields" => @fields }
  end
  if @json_schema.nil?
    raise "Configuration must provide either json_schema or csv_schema."
  end

  @upload_queue = Queue.new
  @delete_queue = Queue.new
  @last_flush_cycle = Time.now
  initialize_temp_directory()
  recover_temp_directories()
  initialize_current_log()
  initialize_google_client()
  initialize_uploader()
  initialize_deleter()
end

Private Instance Methods

get_base_path() click to toggle source

Returns base path to log file that is invariant regardless of any user options.

# File lib/logstash/outputs/google_bigquery.rb, line 470
def get_base_path
  return get_undated_path() + "_" + Time.now.strftime(@date_pattern)
end
get_date_pattern(filename) click to toggle source

Returns date from a temporary log file name.

# File lib/logstash/outputs/google_bigquery.rb, line 483
def get_date_pattern(filename)
  match = /^#{get_undated_path()}_(?<date>.*)\.part(\d+)\.log$/.match(filename)
  return match[:date]
end
get_full_path() click to toggle source

Returns full path to the log file based on global variables (like current_base_path) and configuration options (max file size).

# File lib/logstash/outputs/google_bigquery.rb, line 477
def get_full_path
  return @current_base_path + ".part" + ("%03d" % @size_counter) + ".log"
end
get_job_status(job_id) click to toggle source

Get the job status for a given job ID

# File lib/logstash/outputs/google_bigquery.rb, line 578
def get_job_status(job_id)
  begin
    @logger.debug("BQ: check job status.",
                  :job_id => job_id)
    get_result = @client.execute(:api_method => @bq.jobs.get,
                                 :parameters => {
                                   'jobId' => job_id,
                                   'projectId' => @project_id
                                 })
    response = LogStash::Json.load(get_result.response.body)
    @logger.debug("BQ: successfully invoked API.",
                  :response => response)

    # Successful invocation
    return response
  rescue => e
    @logger.error("BQ: failed to check status", :exception => e)
    # TODO(rdc): limit retries?
    sleep 1
    retry
  end
end
get_latest_part_number(base_path) click to toggle source

Returns latest part number for a base path. This method checks all existing log files in order to find the highest part number, so this file can be used for appending log events.

Only applicable if max file size is enabled.

# File lib/logstash/outputs/google_bigquery.rb, line 494
def get_latest_part_number(base_path)
  part_numbers = Dir.glob(base_path + ".part*.log").map do |item|
    match = /^.*\.part(?<part_num>\d+).log$/.match(item)
    next if match.nil?
    match[:part_num].to_i
  end

  return part_numbers.max + 1 if part_numbers.any?
  0
end
get_undated_path() click to toggle source

Returns undated path used to construct base path and final full path. This path only includes directory, prefix, and hostname info.

# File lib/logstash/outputs/google_bigquery.rb, line 462
def get_undated_path
  return @temp_directory + File::SEPARATOR + @temp_file_prefix + "_" +
    Socket.gethostname()
end
initialize_current_log() click to toggle source

Opens log file on plugin initialization, trying to resume from an existing file. If max file size is enabled, find the highest part number and resume from it.

# File lib/logstash/outputs/google_bigquery.rb, line 524
def initialize_current_log
  @current_base_path = get_base_path
  @last_file_time = Time.now
  @size_counter = get_latest_part_number(@current_base_path)
  @logger.debug("BQ: resuming from latest part.",
                :part => @size_counter)
  open_current_file()
end
initialize_deleter() click to toggle source

Starts thread to delete uploaded log files once their jobs are done.

Deleter is done in a separate thread, not holding the receive method above.

# File lib/logstash/outputs/google_bigquery.rb, line 364
def initialize_deleter
  @deleter = Thread.new do
    @logger.debug("BQ: starting deleter")
    while true
      delete_item = @delete_queue.pop
      job_id = delete_item["job_id"]
      filename = delete_item["filename"]
      job_status = get_job_status(job_id)
      case job_status["status"]["state"]
      when "DONE"
        if job_status["status"].has_key?("errorResult")
          @logger.error("BQ: job failed, please enable debug and check full "\
                        "response (the issue is probably an incompatible "\
                        "schema). NOT deleting local file.",
                        :job_id => job_id,
                        :filename => filename,
                        :job_status => job_status)
        else
          @logger.debug("BQ: job is done, deleting local temporary file ",
                        :job_id => job_id,
                        :filename => filename,
                        :job_status => job_status)
          File.delete(filename) if File.exist?(filename)
          File.delete(filename + ".bqjob") if File.exist?(filename + ".bqjob")
        end
      when "PENDING", "RUNNING"
        @logger.debug("BQ: job is not done, NOT deleting local file yet.",
                      :job_id => job_id,
                      :filename => filename,
                      :job_status => job_status)
        @delete_queue << delete_item
      else
        @logger.error("BQ: unknown job status, NOT deleting local file yet.",
                      :job_id => job_id,
                      :filename => filename,
                      :job_status => job_status)
        File.open(filename + ".err", 'w') do |file|
          file.write(LogStash::Json.dump(job_status))
        end
      end

      sleep @deleter_interval_secs
    end
  end
end
initialize_google_client() click to toggle source

Initializes Google Client instantiating client and authorizing access.

# File lib/logstash/outputs/google_bigquery.rb, line 551
def initialize_google_client
  require "google/api_client"
  require "openssl"

  @client = Google::APIClient.new(:application_name =>
                                  'Logstash Google BigQuery output plugin',
                                  :application_version => '0.1')
  @bq = @client.discovered_api('bigquery', 'v2')


  key = Google::APIClient::PKCS12.load_key(@key_path, @key_password)
  # Authorization scope reference:
  # https://developers.google.com/bigquery/docs/authorization
  service_account = Google::APIClient::JWTAsserter.new(@service_account,
                                                       'https://www.googleapis.com/auth/bigquery',
                                                       key)
  @client.authorization = service_account.authorize
end
initialize_next_log() click to toggle source

Generates new log file name based on configuration options and opens log file. If max file size is enabled, part number if incremented in case the the base log file name is the same (e.g. log file was not rolled given the date pattern).

# File lib/logstash/outputs/google_bigquery.rb, line 538
def initialize_next_log
  new_base_path = get_base_path
  @size_counter = @current_base_path == new_base_path ? @size_counter + 1 : 0
  @logger.debug("BQ: opening next log file.",
                :filename => @current_base_path,
                :part => @size_counter)
  @current_base_path = new_base_path
  @last_file_time = Time.now
  open_current_file()
end
initialize_temp_directory() click to toggle source

Creates temporary directory, if it does not exist.

A random suffix is appended to the temporary directory

# File lib/logstash/outputs/google_bigquery.rb, line 295
def initialize_temp_directory
  if @temp_directory.empty?
    require "stud/temporary"
    @temp_directory = Stud::Temporary.directory("logstash-bq")
    @logger.info("BQ: temporary directory generated",
                 :directory => @temp_directory)
  end

  if !(File.directory? @temp_directory)
    @logger.debug("BQ: directory doesn't exist. Creating it.",
                  :directory => @temp_directory)
    FileUtils.mkdir_p(@temp_directory)
  end
end
initialize_uploader() click to toggle source

Starts thread to upload log files.

Uploader is done in a separate thread, not holding the receive method above.

# File lib/logstash/outputs/google_bigquery.rb, line 414
def initialize_uploader
  @uploader = Thread.new do
    @logger.debug("BQ: starting uploader")
    while true
      filename = @upload_queue.pop

      # Reenqueue if it is still the current file.
      if filename == @temp_file.to_path
        if @current_base_path == get_base_path()
          if Time.now - @last_file_time < @uploader_interval_secs
            @logger.debug("BQ: reenqueue as log file is being currently appended to.",
                          :filename => filename)
            @upload_queue << filename

            # If we got here, it means that older files were uploaded, so let's
            # wait before checking on this file again.
            #
            # Use the min so we don't wait too long if the logs start rotating too quickly
            # due to an increased number of events.
            sleep [60, @uploader_interval_secs].min
            next
          else
            @logger.debug("BQ: flush and close file to be uploaded.",
                          :filename => filename)
            @temp_file.flush()
            @temp_file.close()
            initialize_next_log()
          end
        end
      end

      if File.size(filename) > 0
        job_id = upload_object(filename)
        @delete_queue << { "filename" => filename, "job_id" => job_id }
        File.open(filename + ".bqjob", 'w') { |file| file.write(job_id) }
      else
        @logger.debug("BQ: skipping empty file.")
        @logger.debug("BQ: delete local temporary file ",
                      :filename => filename)
        File.delete(filename)
      end
    end
  end
end
open_current_file() click to toggle source

Opens current log file and updates @temp_file with an instance of IOWriter. This method also adds file to the upload queue.

# File lib/logstash/outputs/google_bigquery.rb, line 508
def open_current_file()
  path = get_full_path()
  stat = File.stat(path) rescue nil
  if stat and stat.ftype == "fifo" and RUBY_PLATFORM == "java"
    fd = java.io.FileWriter.new(java.io.File.new(path))
  else
    fd = File.new(path, "a")
  end
  @temp_file = IOWriter.new(fd)
  @upload_queue << @temp_file.to_path
end
raise_if_error(response) click to toggle source
# File lib/logstash/outputs/google_bigquery.rb, line 570
def raise_if_error(response)
  if response["status"].has_key?("errorResult")
    raise response["status"]["errorResult"]["message"]
  end
end
recover_temp_directories() click to toggle source

If logstash fails for any reason, this method will attempt to recover any files that were in flight during the crash. The recovery works in two phases. All files with .bqjob file entries will be put into the delete_queue. All files without .bqjob files will be put into the upload_queue.

If a temp_directory configuration is defined, this method will look in the defined directory. If temp_directory is not defined, this method will look in all /tmp/logstash-bq-* directories.

Note that there is still a small race condition where a file might have been uploaded, but the .bqjob file wasn't saved yet. In such a case, the file will be loaded into BigQuery twice.

# File lib/logstash/outputs/google_bigquery.rb, line 324
def recover_temp_directories
  if @temp_directory.empty?
    Dir.entries('/tmp').select{
      |entry| File.directory? File.join('/tmp', entry) and entry.start_with?('logstash-bq-')
    }.each {
      |entry| recover_temp_directory(File.join('/tmp',entry))
    }
  else
    recover_temp_directory(@temp_directory)
  end
end
recover_temp_directory(directory) click to toggle source

List all files in a directory, and add them to either the upload_queue or delete_queue.

# File lib/logstash/outputs/google_bigquery.rb, line 339
def recover_temp_directory(directory)
  # Recover files that have been uploaded, but not deleted.
  Dir.glob(File.join(directory, "*.bqjob")).each do |bqjob_filename|
    job_id = File.open(bqjob_filename, 'r') {|f| f.read}.strip
    filename = File.join(directory, File.basename(bqjob_filename, ".bqjob"))
    @logger.debug("BQ: resuming job.",
                  :job_id => job_id,
                  :filename => filename)
    @delete_queue << { "filename" => filename, "job_id" => job_id }
  end

  # Recover files that have (probably) not yet been uploaded.
  Dir.glob(File.join(directory, "*.log")).each do |filename|
    if !File.exists?(filename + '.bqjob')
      @logger.debug("BQ: recovering file.",
                  :filename => filename)
      @upload_queue << filename
    end
  end
end
sync_log_file() click to toggle source

Flushes temporary log file every flush_interval_secs seconds or so. This is triggered by events, but if there are no events there's no point flushing files anyway.

Inspired by lib/logstash/outputs/file.rb (flush(fd), flush_pending_files)

# File lib/logstash/outputs/google_bigquery.rb, line 277
def sync_log_file
  if flush_interval_secs <= 0
    @temp_file.fsync
    return
  end

  return unless Time.now - @last_flush_cycle >= flush_interval_secs
  @temp_file.fsync
  @logger.debug("BQ: flushing file",
                :path => @temp_file.to_path,
                :fd => @temp_file)
  @last_flush_cycle = Time.now
end
upload_object(filename) click to toggle source

Uploads a local file to the configured bucket.

# File lib/logstash/outputs/google_bigquery.rb, line 603
def upload_object(filename)
  begin
    table_id = @table_prefix + @table_separator + get_date_pattern(filename)
    # BQ does not accept anything other than alphanumeric and _
    # Ref: https://developers.google.com/bigquery/browser-tool-quickstart?hl=en
    table_id.tr!(':-','_')

    @logger.debug("BQ: upload object.",
                  :filename => filename,
                  :table_id => table_id)
    media = Google::APIClient::UploadIO.new(filename, "application/octet-stream")
    body = {
      "configuration" => {
        "load" => {
          "sourceFormat" => "NEWLINE_DELIMITED_JSON",
          "schema" => @json_schema,
          "destinationTable"  =>  {
            "projectId" => @project_id,
            "datasetId" => @dataset,
            "tableId" => table_id
          },
          'createDisposition' => 'CREATE_IF_NEEDED',
          'writeDisposition' => 'WRITE_APPEND',
          'ignoreUnknownValues' => @ignore_unknown_values
        }
      }
    }
    insert_result = @client.execute(:api_method => @bq.jobs.insert,
                                    :body_object => body,
                                    :parameters => {
                                      'uploadType' => 'multipart',
                                      'projectId' => @project_id
                                    },
                                    :media => media)

    media.close()
    response_body = LogStash::Json.load(insert_result.response.body)

    raise_if_error(response_body)

    job_id = response_body["jobReference"]["jobId"]
    @logger.debug("BQ: multipart insert",
                  :job_id => job_id)
    return job_id
  rescue => e
    @logger.error("BQ: failed to upload file. retrying.", :exception => e)
    # TODO(rdc): limit retries?
    sleep 1
    retry
  end
end