class LogStash::Inputs::Bitbucket

DISCLAIMER: Functions for this plugin are made public for the sake of creating concise unit tests

Constants

Schedule_types

Public Instance Methods

handle_branch_response(queue, uri, parameters, response, execution_time) click to toggle source
# File lib/logstash/inputs/bitbucket.rb, line 259
def handle_branch_response(queue, uri, parameters, response, execution_time)
  body = JSON.parse(response.body)

  #@logger.info("Handle Branch Response", :uri => uri, :project => parameters[:project], :repo => parameters[:repo], :start => body['start'], :size => body['size'])


  unless body['isLastPage']
    request_async(
        queue,
        "rest/api/1.0/projects/%{project}/repos/%{repo}/branches",
        {:project => parameters[:project], :repo => parameters[:repo]},
        {:query => {'details' => 'true', 'state' => 'ALL', 'start' => body['nextPageStart']}},
        'handle_branch_response')

    # Send HTTP requests
    client.execute!
  end

  # Iterate over each Branch
  body['values'].each { |branch|
    #@logger.info("Add Branch Request", :project => parameters[:project], :repo => parameters[:repo], :branch => branch['id'])

    # Push Branch event into queue
    event = LogStash::Event.new(branch)
    event.set("[@metadata][index]", "branch")
    event.set("[@metadata][id]", "#{parameters[:project]}-#{parameters[:repo]}-#{branch['id']}")
    queue << event
  }

end
handle_commits_response(queue, uri, parameters, response, execution_time) click to toggle source
# File lib/logstash/inputs/bitbucket.rb, line 290
def handle_commits_response(queue, uri, parameters, response, execution_time)

  body = JSON.parse(response.body)
  @logger.info("Handle Commits Response", :uri => uri, :project => parameters[:project], :repo => parameters[:repo], :start => body['start'], :size => body['size'])

  unless body['isLastPage']
    request_async(
        queue,
        "rest/api/1.0/projects/%{project}/repos/%{repo}/commits",
        {:project => parameters[:project], :repo => parameters[:repo]},
        {:query => {'details' => 'true', 'state' => 'ALL', 'start' => body['nextPageStart']}},
        'handle_commits_response')

    # Send HTTP requests
    client.execute!
  end

  # Iterate over each Commit
  body['values'].each { |commit|
    @logger.info("Add Commit Request", :project => parameters[:project], :repo => parameters[:repo], :commit => commit['id'])

    # Push Commit event into queue
    event = LogStash::Event.new(commit)
    event.set("[@metadata][index]", "commit")
    event.set("[@metadata][id]", "#{parameters[:project]}-#{parameters[:repo]}-#{commit['id']}")
    queue << event
  }
end
handle_failure(queue, path, parameters, exception, execution_time) click to toggle source
# File lib/logstash/inputs/bitbucket.rb, line 319
def handle_failure(queue, path, parameters, exception, execution_time)
  @logger.error('HTTP Request failed', :path => path, :parameters => parameters, :exception => exception, :backtrace => exception.backtrace);
end
handle_projects_response(queue, uri, parameters, response, execution_time) click to toggle source
# File lib/logstash/inputs/bitbucket.rb, line 107
def handle_projects_response(queue, uri, parameters, response, execution_time)
  # Decode JSON
  body = JSON.parse(response.body)

  #@logger.info("Handle Projects Response", :uri => uri, :start => body['start'], :size => body['size'])
  #@logger.info("Response Body", :body => response)

  request_count = 0

  # Fetch addition project pages
  unless body['isLastPage']
    request_async(
        queue,
        "rest/api/1.0/projects",
        {},
        {:query => {'start' => body['nextPageStart']}},
        'handle_projects_response'
    )

    client.execute!
  end

  # Iterate over each project
  body['values'].each do |project|
    #@logger.info("Add project", :project => project['key'])

    # Send get repos request
    request_async(
        queue,
        "rest/api/1.0/projects/%{project}/repos",
        {:project => project['key']},
        {},
        'handle_repos_response')

    request_count += 1

    if request_count > 1
      request_count = 0
      client.execute!
    end

    # Push project event into queue
    event = LogStash::Event.new(project)
    event.set('[@metadata][index]', 'project')
    event.set('[@metadata][id]', project['id'])
    queue << event
  end

  if request_count > 0
    # Send HTTP requests
    client.execute!
  end
end
handle_pull_requests_response(queue, uri, parameters, response, execution_time) click to toggle source
# File lib/logstash/inputs/bitbucket.rb, line 228
def handle_pull_requests_response(queue, uri, parameters, response, execution_time)
  # Decode JSON
  body = JSON.parse(response.body)

  #@logger.info("Handle Pull Requests Response", :uri => uri, :project => parameters[:project], :repo => parameters[:repo], :start => body['start'], :size => body['size'])

  # Fetch addition pull request pages
  unless body['isLastPage']
    request_async(
        queue,
        "rest/api/1.0/projects/%{project}/repos/%{repo}/pull-requests",
        {:project => parameters[:project], :repo => parameters[:repo]},
        {:query => {'state' => 'ALL', 'start' => body['nextPageStart']}},
        'handle_pull_requests_response')

    # Send HTTP requests
    client.execute!
  end

  # Iterate over each pull request
  body['values'].each { |pull_request|
    #@logger.info("Add Pull Request", :project => parameters[:project], :repo => parameters[:repo], :pull_request => pull_request['title'])

    # Push repo event into queue
    event = LogStash::Event.new(pull_request)
    event.set("[@metadata][index]", "pull_request")
    event.set("[@metadata][id]", "#{parameters[:project]}-#{parameters[:repo]}-#{pull_request['id']}")
    queue << event
  }
end
handle_repos_response(queue, uri, parameters, response, execution_time) click to toggle source

Process response from get repos API request

# File lib/logstash/inputs/bitbucket.rb, line 162
def handle_repos_response(queue, uri, parameters, response, execution_time)
  # Decode JSON
  body = JSON.parse(response.body)

  #@logger.info("Handle Repos Response", :uri => uri, :project => parameters[:project], :start => body['start'], :size => body['size'])

  request_count = 0

  # Fetch addition repo pages
  unless body['isLastPage']
    request_async(
        queue,
        "rest/api/1.0/projects/%{project}/repos",
      {:project => parameters[:project]},
        {:query => {'start' => body['nextPageStart']}},
        'handle_repos_response'
    )

    client.execute!
  end

  # Iterate over each repo
  body['values'].each { |repo|
   #@logger.info("Add repo", :project => parameters[:project], :repo => repo['slug'])

    # Send get pull requests request
    request_async(
        queue,
        "rest/api/1.0/projects/%{project}/repos/%{repo}/pull-requests",
        {:project => parameters[:project], :repo => repo['slug']},
        {:query => {'state' => 'ALL'}},
        'handle_pull_requests_response')
    # Semd Branch requests request
    request_async(
        queue,
        "rest/api/1.0/projects/%{project}/repos/%{repo}/branches",
        {:project => parameters[:project], :repo => repo['slug']},
        {:query => {'state' => 'ALL'}},
        'handle_branch_response')

    request_async(
        queue,
        "rest/api/1.0/projects/%{project}/repos/%{repo}/commits",
        {:project => parameters[:project], :repo => repo['slug']},
        {:query => {'state' => 'ALL'}},
        'handle_commits_response')
    request_count +=1

    if request_count > 1
      request_count = 0
      client.execute!
    end

    # Push repo event into queue
    event = LogStash::Event.new(repo)
    event.set("[@metadata][index]", "repo")
    event.set("[@metadata][id]", repo['id'])
    queue << event
  }

  if request_count > 0
    # Send HTTP requests
    client.execute!
  end
end
register() click to toggle source
# File lib/logstash/inputs/bitbucket.rb, line 45
def register
  @host = Socket.gethostname.force_encoding(Encoding::UTF_8)
  @authorization = "Bearer #{@token}"
  @logger.info('Register BitBucket Input', :schedule => @schedule, :hostname => @hostname, :port => @port)
end
request_async(queue, path, parameters, request_options, callback) click to toggle source
# File lib/logstash/inputs/bitbucket.rb, line 89
def request_async(queue, path, parameters, request_options, callback)
  started = Time.now

  method = parameters[:method] ? parameters.delete(:method) : :get

  uri = "#{@scheme}://#{@hostname}/#{path}" % parameters

  request_options[:headers] = {'Authorization' => @authorization}

 # @logger.info("Fetching URL", :method => method, :request => uri)

  client.parallel.send(method, uri, request_options).
      on_success {|response| self.send(callback, queue, uri, parameters, response, Time.now - started)}.
      on_failure {|exception|
        handle_failure(queue, uri, parameters, exception, Time.now - started)
      }
end
run(queue) click to toggle source
# File lib/logstash/inputs/bitbucket.rb, line 51
def run(queue)
  @logger.info('RUN')
  #schedule hash must contain exactly one of the allowed keys
  msg_invalid_schedule = "Invalid config. schedule hash must contain " +
      "exactly one of the following keys - cron, at, every or in"
  raise Logstash::ConfigurationError, msg_invalid_schedule if @schedule.keys.length != 1
  schedule_type = @schedule.keys.first
  schedule_value = @schedule[schedule_type]
  raise LogStash::ConfigurationError, msg_invalid_schedule unless Schedule_types.include?(schedule_type)

  @scheduler = Rufus::Scheduler.new(:max_work_threads => 1)
  #as of v3.0.9, :first_in => :now doesn't work. Use the following workaround instead
  opts = schedule_type == "every" ? {:first_in => 0.01} : {}
  @scheduler.send(schedule_type, schedule_value, opts) {run_once(queue)}
  @scheduler.join
  @logger.info('RUN COMPLETE')
end
run_once(queue) click to toggle source
# File lib/logstash/inputs/bitbucket.rb, line 69
def run_once(queue)
  @logger.info('RUN ONCE')

  request_async(
      queue,
      'rest/api/1.0/projects',
      {},
      {},
      'handle_projects_response')

  # request_async(
  #   queue,
  #   "rest/api/1.0/projects/%{project}/repos",
  #   {:project => 'SOCK', :start => 0},
  #   {:headers => {'Authorization' => @authorization}},
  #   'handle_repos_response')

  client.execute!
end
stop() click to toggle source
# File lib/logstash/inputs/bitbucket.rb, line 323
def stop
  # nothing to do in this case so it is not necessary to define stop
  # examples of common "stop" tasks:
  #  * close sockets (unblocking blocking reads/accepts)
  #  * cleanup temporary files
  #  * terminate spawned threads
end