class Bio::BaseSpace::Consumer

Multipart file upload consumer class.

TODO This file is not yet ported as the multipartFileUpload class is just mentioned in the comment section of the BaseSpaceAPI file.

Public Class Methods

new(task_queue, result_queue, pause_event, halt_event) click to toggle source
# File lib/basespace/model/multipart_upload.rb, line 85
def initialize(task_queue, result_queue, pause_event, halt_event)
  # TODO http://stackoverflow.com/questions/710785/working-with-multiple-processes-in-ruby
  #      http://stackoverflow.com/questions/855805/please-introduce-a-multi-processing-library-in-perl-or-ruby
  #      http://docs.python.jp/2.6/library/multiprocessing.html
  #multiprocessing.Process.__init__(self)
  @task_queue    = task_queue
  @result_queue  = result_queue
  @pause         = pauseEvent
  @halt          = haltEvent
end

Public Instance Methods

run() click to toggle source

TODO

# File lib/basespace/model/multipart_upload.rb, line 97
def run
  proc_name = self.name
  while True
    unless self.pause.is_set()
      next_task = self.task_queue.get()
    end
          
    if next_task is None or self.halt.is_set() # check if we are out of jobs or have been halted
      # Poison pill means shutdown
      puts "#{proc_name}: Exiting"
      self.task_queue.task_done()
      break
    elsif self.pause.is_set()                   # if we have been paused, sleep for a bit then check back
      puts "#{proc_name}: Paused"
      time.sleep(3)                                       
    else                                       # do some work
      puts "#{proc_name}: #{next_task}"
      answer = next_task()
      self.task_queue.task_done()
      if answer.state == 1                   # case everything went well
        self.result_queue.put(answer)
      else                                   # case something sent wrong
        if next_task.attempt < 3
          self.task_queue.put(next_task)  # queue the guy for a retry
        else                               # problems, shutting down this party
          self.halt.set()                 # halt all other process
        end
      end
    end
  end
end