class Polipus::PolipusCrawler
Constants
- OPTS
Attributes
Public Class Methods
# File lib/polipus.rb, line 161 def self.crawl(*args, &block) new(*args, &block).takeover end
# File lib/polipus.rb, line 101 def initialize(job_name = 'polipus', urls = [], options = {}) @job_name = job_name @options = OPTS.merge(options) @options[:queue_timeout] = 1 if @options[:queue_timeout] <= 0 @logger = @options[:logger] ||= Logger.new(nil) unless @logger.class.to_s == 'Log4r::Logger' @logger.level = @options[:logger_level] ||= Logger::INFO end @storage = @options[:storage] ||= Storage.dev_null @workers_pool = [] @follow_links_like = [] @skip_links_like = [] @on_page_downloaded = [] @on_before_save = [] @on_page_error = [] @focus_crawl_block = nil @on_crawl_start = [] @on_crawl_end = [] @redis_factory = nil @overflow_manager = nil @crawler_name = `hostname`.strip + "-#{@job_name}" @storage.include_query_string_in_uuid = @options[:include_query_string_in_saved_page] @urls = [urls].flatten.map { |url| URI(url) } @urls.each { |url| url.path = '/' if url.path.empty? } if @options[:obey_robots_txt] @robots = if @options[:user_agent].respond_to?(:sample) Polipus::Robotex.new(@options[:user_agent].sample) else Polipus::Robotex.new(@options[:user_agent]) end end # Attach signal handling if enabled SignalHandler.enable if @options[:enable_signal_handler] if queue_overflow_adapter @on_crawl_start << lambda do |_| Thread.new do Thread.current[:name] = :overflow_items_controller overflow_items_controller.run end end end @on_crawl_end << lambda do |_| Thread.list.select { |thread| thread.status && Thread.current[:name] == :overflow_items_controller }.each(&:kill) end execute_plugin 'on_initialize' yield self if block_given? end
Public Instance Methods
# File lib/polipus.rb, line 351 def add_to_queue(page) if [:url, :referer, :depth].all? { |method| page.respond_to?(method) } add_url(page.url, referer: page.referer, depth: page.depth) else add_url(page) end end
Enqueue an url, no matter what
# File lib/polipus.rb, line 360 def add_url(url, params = {}) page = Page.new(url, params) yield(page) if block_given? internal_queue << page.to_json end
A block of code will be executed on every page downloaded. The code is used to extract urls to visit see links_for
method
# File lib/polipus.rb, line 317 def focus_crawl(&block) @focus_crawl_block = block self end
A pattern or an array of patterns can be passed as argument An url will be discarded if it doesn’t match patterns
# File lib/polipus.rb, line 270 def follow_links_like(*patterns) @follow_links_like = @follow_links_like += patterns.uniq.compact self end
A block of code will be executed on every page downloaded before being saved in the registered storage
# File lib/polipus.rb, line 303 def on_before_save(&block) @on_before_save << block self end
A block of code will be executed when crawl session is over
# File lib/polipus.rb, line 290 def on_crawl_end(&block) @on_crawl_end << block self end
A block of code will be executed when crawl session is starting
# File lib/polipus.rb, line 296 def on_crawl_start(&block) @on_crawl_start << block self end
A block of code will be executed on every page downloaded The block takes the page as argument
# File lib/polipus.rb, line 284 def on_page_downloaded(&block) @on_page_downloaded << block self end
A block of code will be executed whether a page contains an error
# File lib/polipus.rb, line 309 def on_page_error(&block) @on_page_error << block self end
# File lib/polipus.rb, line 326 def queue_size internal_queue.size end
# File lib/polipus.rb, line 347 def redis @redis ||= redis_factory_adapter end
# File lib/polipus.rb, line 334 def redis_factory(&block) @redis_factory = block self end
# File lib/polipus.rb, line 322 def redis_options @options[:redis_options] end
A pattern or an array of patterns can be passed as argument An url will be discarded if it matches a pattern
# File lib/polipus.rb, line 277 def skip_links_like(*patterns) @skip_links_like = @skip_links_like += patterns.uniq.compact self end
# File lib/polipus.rb, line 330 def stats_reset! ["polipus:#{@job_name}:errors", "polipus:#{@job_name}:pages"].each { |e| redis.del e } end
Request to Polipus
to stop its work (gracefully) cler_queue = true if you want to delete all of the pending urls to visit
# File lib/polipus.rb, line 368 def stop!(cler_queue = false) SignalHandler.terminate internal_queue.clear(true) if cler_queue end
# File lib/polipus.rb, line 165 def takeover @urls.each do |u| add_url(u) { |page| page.user_data.p_seeded = true } end return if internal_queue.empty? @on_crawl_start.each { |e| e.call(self) } execute_plugin 'on_crawl_start' @options[:workers].times do |worker_number| @workers_pool << Thread.new do @logger.debug { "Start worker #{worker_number}" } http = HTTP.new(@options) queue = queue_factory queue.process(false, @options[:queue_timeout]) do |message| next if message.nil? execute_plugin 'on_message_received' page = Page.from_json message unless should_be_visited?(page.url, false) @logger.info { "[worker ##{worker_number}] Page (#{page.url}) is no more welcome." } queue.commit next end if page_exists? page @logger.info { "[worker ##{worker_number}] Page (#{page.url}) already stored." } queue.commit next end url = page.url.to_s @logger.debug { "[worker ##{worker_number}] Fetching page: [#{page.url}] Referer: #{page.referer} Depth: #{page.depth}" } execute_plugin 'on_before_download' pages = http.fetch_pages(url, page.referer, page.depth) if pages.count > 1 rurls = pages.map { |e| e.url.to_s }.join(' --> ') @logger.info { "Got redirects! #{rurls}" } page = pages.pop page.aliases = pages.map { |e| e.url } if page_exists? page @logger.info { "[worker ##{worker_number}] Page (#{page.url}) already stored." } queue.commit next end else page = pages.last end execute_plugin 'on_after_download' if page.error @logger.warn { "Page #{page.url} has error: #{page.error}" } incr_error @on_page_error.each { |e| e.call(page) } end # Execute on_before_save blocks @on_before_save.each { |e| e.call(page) } page.storable? && @storage.add(page) @logger.debug { "[worker ##{worker_number}] Fetched page: [#{page.url}] Referrer: [#{page.referer}] Depth: [#{page.depth}] Code: [#{page.code}] Response Time: [#{page.response_time}]" } @logger.info { "[worker ##{worker_number}] Page (#{page.url}) downloaded" } incr_pages # Execute on_page_downloaded blocks @on_page_downloaded.each { |e| e.call(page) } if @options[:depth_limit] == false || @options[:depth_limit] > page.depth links_for(page).each do |url_to_visit| next unless should_be_visited?(url_to_visit) enqueue url_to_visit, page end else @logger.info { "[worker ##{worker_number}] Depth limit reached #{page.depth}" } end @logger.debug { "[worker ##{worker_number}] Queue size: #{queue.size}" } @overflow_manager.perform if @overflow_manager && queue.empty? execute_plugin 'on_message_processed' if SignalHandler.terminated? @logger.info { 'About to exit! Thanks for using Polipus' } queue.commit break end true end end end @workers_pool.each { |w| w.join } @on_crawl_end.each { |e| e.call(self) } execute_plugin 'on_crawl_end' end
# File lib/polipus.rb, line 339 def url_tracker @url_tracker ||= @options[:url_tracker] ||= UrlTracker.bloomfilter(key_name: "polipus_bf_#{job_name}", redis: redis_factory_adapter, driver: 'lua') end
Private Instance Methods
Returns true
if we are obeying robots.txt and the link is granted access in it. Always returns true
when we are not obeying robots.txt.
# File lib/polipus.rb, line 424 def allowed_by_robot?(link) return true if @robots.nil? @options[:obey_robots_txt] ? @robots.allowed?(link) : true end
The url is enqueued for a later visit
# File lib/polipus.rb, line 430 def enqueue(url_to_visit, current_page) page_to_visit = Page.new(url_to_visit.to_s, referer: current_page.url.to_s, depth: current_page.depth + 1) internal_queue << page_to_visit.to_json to_track = @options[:include_query_string_in_saved_page] ? url_to_visit.to_s : url_to_visit.to_s.gsub(/\?.*$/, '') url_tracker.visit to_track @logger.debug { "Added (#{url_to_visit}) to the queue" } end
It invokes a plugin method if any
# File lib/polipus.rb, line 479 def execute_plugin(method) Polipus::Plugin.plugins.each do |k, p| next unless p.respond_to?(method) @logger.info { "Running plugin method #{method} on #{k}" } ret_val = p.send(method, self) instance_eval(&ret_val) if ret_val.kind_of? Proc end end
If stats enabled, it increments errors found
# File lib/polipus.rb, line 453 def incr_error redis.incr "polipus:#{@job_name}:errors" if @options[:stats_enabled] end
If stats enabled, it increments pages downloaded
# File lib/polipus.rb, line 458 def incr_pages redis.incr "polipus:#{@job_name}:pages" if @options[:stats_enabled] end
# File lib/polipus.rb, line 474 def internal_queue @internal_queue ||= queue_factory end
It extracts URLs from the page
# File lib/polipus.rb, line 399 def links_for(page) page.domain_aliases = domain_aliases @focus_crawl_block.nil? ? page.links : @focus_crawl_block.call(page) end
It handles the overflow item policy (if any)
# File lib/polipus.rb, line 463 def overflow_items_controller @overflow_manager = QueueOverflow::Manager.new(self, queue_factory, @options[:queue_items_limit]) # In the time, url policy may change so policy is re-evaluated @overflow_manager.url_filter do |page| should_be_visited?(page.url, false) end QueueOverflow::Worker.new(@overflow_manager) end
whether a page exists or not
# File lib/polipus.rb, line 414 def page_exists?(page) return false if page.user_data && page.user_data.p_seeded @storage.exists?(page) && !page_expired?(page) end
whether a page is expired or not
# File lib/polipus.rb, line 405 def page_expired?(page) return false if @options[:ttl_page].nil? stored_page = @storage.get(page) r = stored_page && stored_page.expired?(@options[:ttl_page]) @logger.debug { "Page #{page.url} marked as expired" } if r r end
It creates a new distributed queue
# File lib/polipus.rb, line 448 def queue_factory Redis::Queue.new("polipus_queue_#{@job_name}", "bp_polipus_queue_#{@job_name}", redis: redis_factory_adapter) end
It creates a redis client
# File lib/polipus.rb, line 439 def redis_factory_adapter if @redis_factory @redis_factory.call(redis_options) else Redis.new(redis_options) end end
URLs enqueue policy
# File lib/polipus.rb, line 376 def should_be_visited?(url, with_tracker = true) case # robots.txt when !allowed_by_robot?(url) false # Check against whitelist pattern matching when !@follow_links_like.empty? && @follow_links_like.none? { |p| url.path =~ p } false # Check against blacklist pattern matching when @skip_links_like.any? { |p| url.path =~ p } false # Page is marked as expired when page_expired?(Page.new(url)) true # Check against url tracker when with_tracker && url_tracker.visited?(@options[:include_query_string_in_saved_page] ? url.to_s : url.to_s.gsub(/\?.*$/, '')) false else true end end