class BundleRequests::Consumer

Public Class Methods

new(app , config) click to toggle source
# File lib/bundle_requests/consumer.rb, line 3
def initialize(app , config)
  @app = app
  $waiting_threads = Queue.new
  generate_config_hash(config)
  Thread.new{consumer_code}
end

Public Instance Methods

call_bundle_api(rack_input,default_env={}) click to toggle source
# File lib/bundle_requests/consumer.rb, line 62
def call_bundle_api(rack_input,default_env={})
  env = default_env             # if this doesnt work assign myenv to some env of any threads
  env['PATH_INFO'] = $configuration['bundle_api']
  env['QUERY_STRING'] = ''
  env['REQUEST_METHOD'] = 'POST'
  env['CONTENT_LENGTH'] = {'requests' => rack_input}.to_json.length
  env['rack.input'] = StringIO.new({'requests' => rack_input}.to_json)
  request = Rack::Request.new(env)
  Rails.logger.info("new Environment is #{env}")
  Rails.logger.info("New request content are #{request}")
  result = @app.call(env)
  Rails.logger.info("Result are #{result}")
  result
end
consumer_code() click to toggle source

List All Instance Variables here for reference

  • @app

  • $configuration

  • @consumer

  • $waiting_threads

# File lib/bundle_requests/consumer.rb, line 23
def consumer_code
  while true
    c = $waiting_threads.length
    if c < $configuration["max_waiting_thread"]
      puts "#{c} threads are waiting so sleeping"
      sleep($configuration["wait_time"])
      next if $waiting_threads.length == 0
    end
    puts "Started request processing----------------------------------"
    threads = pop_some_waiting_threads
    rack_input = gather_all_requests(threads)
    result = call_bundle_api(rack_input,threads[0]['request'])      # may through exception if 0 threads are present remove it afterwards
    distribute_result_and_wakeup_those_threads(threads, result)
    puts "Completed proccessing requests------------------------------"
  end      
end
distribute_result_and_wakeup_those_threads(threads, result) click to toggle source
# File lib/bundle_requests/consumer.rb, line 77
def distribute_result_and_wakeup_those_threads(threads, result)
  for index in 0...threads.length
    t = threads[index]
    # hardcoded response for development. Replace it with result[index]
    t["response"] = [200,{"Content-Type" => "application/json"},["Status Ok"]]
    t.wakeup
  end
end
gather_all_requests(threads) click to toggle source
# File lib/bundle_requests/consumer.rb, line 50
def gather_all_requests(threads)
  rack_input = []
  threads.each do |t|
    e = t['request']
    req = Rack::Request.new(e)
    Rails.logger.debug req.inspect
    rack_input << JSON.parse(req.body.read)
  end
  Rails.logger.info rack_input
  rack_input
end
generate_config_hash(options) click to toggle source
# File lib/bundle_requests/consumer.rb, line 86
def generate_config_hash(options)
  if $configuration.nil?
    config = {
      "incoming_request" => "/api",
      "bundle_api" => "/bundle_api",
      "wait_time" => 10,
      "thread_wait_after_closing_entrance" => 2,
      "max_waiting_thread" => 16
    }

    options.each do |key,value|
      if !value.nil?
        config[key] = value
      end
    end
    $configuration =  config
  end
end
get_configuration() click to toggle source
# File lib/bundle_requests/consumer.rb, line 14
def get_configuration
  $configuration
end
get_waiting_threads() click to toggle source
# File lib/bundle_requests/consumer.rb, line 10
def get_waiting_threads
  $waiting_threads
end
pop_some_waiting_threads() click to toggle source
# File lib/bundle_requests/consumer.rb, line 40
def pop_some_waiting_threads
  threads = []
  $waiting_threads.length.times do
    t = $waiting_threads.pop
    threads << t
  end
  Rails.logger.info("Currently proccessing #{threads.length} threads")
  threads
end