class LogStash::Inputs::Bitbucket
DISCLAIMER: Functions for this plugin are made public for the sake of creating concise unit tests
Constants
- Schedule_types
Public Instance Methods
handle_branch_response(queue, uri, parameters, response, execution_time)
click to toggle source
# File lib/logstash/inputs/bitbucket.rb, line 259 def handle_branch_response(queue, uri, parameters, response, execution_time) body = JSON.parse(response.body) #@logger.info("Handle Branch Response", :uri => uri, :project => parameters[:project], :repo => parameters[:repo], :start => body['start'], :size => body['size']) unless body['isLastPage'] request_async( queue, "rest/api/1.0/projects/%{project}/repos/%{repo}/branches", {:project => parameters[:project], :repo => parameters[:repo]}, {:query => {'details' => 'true', 'state' => 'ALL', 'start' => body['nextPageStart']}}, 'handle_branch_response') # Send HTTP requests client.execute! end # Iterate over each Branch body['values'].each { |branch| #@logger.info("Add Branch Request", :project => parameters[:project], :repo => parameters[:repo], :branch => branch['id']) # Push Branch event into queue event = LogStash::Event.new(branch) event.set("[@metadata][index]", "branch") event.set("[@metadata][id]", "#{parameters[:project]}-#{parameters[:repo]}-#{branch['id']}") queue << event } end
handle_commits_response(queue, uri, parameters, response, execution_time)
click to toggle source
# File lib/logstash/inputs/bitbucket.rb, line 290 def handle_commits_response(queue, uri, parameters, response, execution_time) body = JSON.parse(response.body) @logger.info("Handle Commits Response", :uri => uri, :project => parameters[:project], :repo => parameters[:repo], :start => body['start'], :size => body['size']) unless body['isLastPage'] request_async( queue, "rest/api/1.0/projects/%{project}/repos/%{repo}/commits", {:project => parameters[:project], :repo => parameters[:repo]}, {:query => {'details' => 'true', 'state' => 'ALL', 'start' => body['nextPageStart']}}, 'handle_commits_response') # Send HTTP requests client.execute! end # Iterate over each Commit body['values'].each { |commit| @logger.info("Add Commit Request", :project => parameters[:project], :repo => parameters[:repo], :commit => commit['id']) # Push Commit event into queue event = LogStash::Event.new(commit) event.set("[@metadata][index]", "commit") event.set("[@metadata][id]", "#{parameters[:project]}-#{parameters[:repo]}-#{commit['id']}") queue << event } end
handle_failure(queue, path, parameters, exception, execution_time)
click to toggle source
# File lib/logstash/inputs/bitbucket.rb, line 319 def handle_failure(queue, path, parameters, exception, execution_time) @logger.error('HTTP Request failed', :path => path, :parameters => parameters, :exception => exception, :backtrace => exception.backtrace); end
handle_projects_response(queue, uri, parameters, response, execution_time)
click to toggle source
# File lib/logstash/inputs/bitbucket.rb, line 107 def handle_projects_response(queue, uri, parameters, response, execution_time) # Decode JSON body = JSON.parse(response.body) #@logger.info("Handle Projects Response", :uri => uri, :start => body['start'], :size => body['size']) #@logger.info("Response Body", :body => response) request_count = 0 # Fetch addition project pages unless body['isLastPage'] request_async( queue, "rest/api/1.0/projects", {}, {:query => {'start' => body['nextPageStart']}}, 'handle_projects_response' ) client.execute! end # Iterate over each project body['values'].each do |project| #@logger.info("Add project", :project => project['key']) # Send get repos request request_async( queue, "rest/api/1.0/projects/%{project}/repos", {:project => project['key']}, {}, 'handle_repos_response') request_count += 1 if request_count > 1 request_count = 0 client.execute! end # Push project event into queue event = LogStash::Event.new(project) event.set('[@metadata][index]', 'project') event.set('[@metadata][id]', project['id']) queue << event end if request_count > 0 # Send HTTP requests client.execute! end end
handle_pull_requests_response(queue, uri, parameters, response, execution_time)
click to toggle source
# File lib/logstash/inputs/bitbucket.rb, line 228 def handle_pull_requests_response(queue, uri, parameters, response, execution_time) # Decode JSON body = JSON.parse(response.body) #@logger.info("Handle Pull Requests Response", :uri => uri, :project => parameters[:project], :repo => parameters[:repo], :start => body['start'], :size => body['size']) # Fetch addition pull request pages unless body['isLastPage'] request_async( queue, "rest/api/1.0/projects/%{project}/repos/%{repo}/pull-requests", {:project => parameters[:project], :repo => parameters[:repo]}, {:query => {'state' => 'ALL', 'start' => body['nextPageStart']}}, 'handle_pull_requests_response') # Send HTTP requests client.execute! end # Iterate over each pull request body['values'].each { |pull_request| #@logger.info("Add Pull Request", :project => parameters[:project], :repo => parameters[:repo], :pull_request => pull_request['title']) # Push repo event into queue event = LogStash::Event.new(pull_request) event.set("[@metadata][index]", "pull_request") event.set("[@metadata][id]", "#{parameters[:project]}-#{parameters[:repo]}-#{pull_request['id']}") queue << event } end
handle_repos_response(queue, uri, parameters, response, execution_time)
click to toggle source
Process response from get repos API request
# File lib/logstash/inputs/bitbucket.rb, line 162 def handle_repos_response(queue, uri, parameters, response, execution_time) # Decode JSON body = JSON.parse(response.body) #@logger.info("Handle Repos Response", :uri => uri, :project => parameters[:project], :start => body['start'], :size => body['size']) request_count = 0 # Fetch addition repo pages unless body['isLastPage'] request_async( queue, "rest/api/1.0/projects/%{project}/repos", {:project => parameters[:project]}, {:query => {'start' => body['nextPageStart']}}, 'handle_repos_response' ) client.execute! end # Iterate over each repo body['values'].each { |repo| #@logger.info("Add repo", :project => parameters[:project], :repo => repo['slug']) # Send get pull requests request request_async( queue, "rest/api/1.0/projects/%{project}/repos/%{repo}/pull-requests", {:project => parameters[:project], :repo => repo['slug']}, {:query => {'state' => 'ALL'}}, 'handle_pull_requests_response') # Semd Branch requests request request_async( queue, "rest/api/1.0/projects/%{project}/repos/%{repo}/branches", {:project => parameters[:project], :repo => repo['slug']}, {:query => {'state' => 'ALL'}}, 'handle_branch_response') request_async( queue, "rest/api/1.0/projects/%{project}/repos/%{repo}/commits", {:project => parameters[:project], :repo => repo['slug']}, {:query => {'state' => 'ALL'}}, 'handle_commits_response') request_count +=1 if request_count > 1 request_count = 0 client.execute! end # Push repo event into queue event = LogStash::Event.new(repo) event.set("[@metadata][index]", "repo") event.set("[@metadata][id]", repo['id']) queue << event } if request_count > 0 # Send HTTP requests client.execute! end end
register()
click to toggle source
# File lib/logstash/inputs/bitbucket.rb, line 45 def register @host = Socket.gethostname.force_encoding(Encoding::UTF_8) @authorization = "Bearer #{@token}" @logger.info('Register BitBucket Input', :schedule => @schedule, :hostname => @hostname, :port => @port) end
request_async(queue, path, parameters, request_options, callback)
click to toggle source
# File lib/logstash/inputs/bitbucket.rb, line 89 def request_async(queue, path, parameters, request_options, callback) started = Time.now method = parameters[:method] ? parameters.delete(:method) : :get uri = "#{@scheme}://#{@hostname}/#{path}" % parameters request_options[:headers] = {'Authorization' => @authorization} # @logger.info("Fetching URL", :method => method, :request => uri) client.parallel.send(method, uri, request_options). on_success {|response| self.send(callback, queue, uri, parameters, response, Time.now - started)}. on_failure {|exception| handle_failure(queue, uri, parameters, exception, Time.now - started) } end
run(queue)
click to toggle source
# File lib/logstash/inputs/bitbucket.rb, line 51 def run(queue) @logger.info('RUN') #schedule hash must contain exactly one of the allowed keys msg_invalid_schedule = "Invalid config. schedule hash must contain " + "exactly one of the following keys - cron, at, every or in" raise Logstash::ConfigurationError, msg_invalid_schedule if @schedule.keys.length != 1 schedule_type = @schedule.keys.first schedule_value = @schedule[schedule_type] raise LogStash::ConfigurationError, msg_invalid_schedule unless Schedule_types.include?(schedule_type) @scheduler = Rufus::Scheduler.new(:max_work_threads => 1) #as of v3.0.9, :first_in => :now doesn't work. Use the following workaround instead opts = schedule_type == "every" ? {:first_in => 0.01} : {} @scheduler.send(schedule_type, schedule_value, opts) {run_once(queue)} @scheduler.join @logger.info('RUN COMPLETE') end
run_once(queue)
click to toggle source
# File lib/logstash/inputs/bitbucket.rb, line 69 def run_once(queue) @logger.info('RUN ONCE') request_async( queue, 'rest/api/1.0/projects', {}, {}, 'handle_projects_response') # request_async( # queue, # "rest/api/1.0/projects/%{project}/repos", # {:project => 'SOCK', :start => 0}, # {:headers => {'Authorization' => @authorization}}, # 'handle_repos_response') client.execute! end
stop()
click to toggle source
# File lib/logstash/inputs/bitbucket.rb, line 323 def stop # nothing to do in this case so it is not necessary to define stop # examples of common "stop" tasks: # * close sockets (unblocking blocking reads/accepts) # * cleanup temporary files # * terminate spawned threads end