class BundleRequests::Consumer
Public Class Methods
new(app , config)
click to toggle source
# File lib/bundle_requests/consumer.rb, line 3 def initialize(app , config) @app = app $waiting_threads = Queue.new generate_config_hash(config) Thread.new{consumer_code} end
Public Instance Methods
call_bundle_api(rack_input,default_env={})
click to toggle source
# File lib/bundle_requests/consumer.rb, line 62 def call_bundle_api(rack_input,default_env={}) env = default_env # if this doesnt work assign myenv to some env of any threads env['PATH_INFO'] = $configuration['bundle_api'] env['QUERY_STRING'] = '' env['REQUEST_METHOD'] = 'POST' env['CONTENT_LENGTH'] = {'requests' => rack_input}.to_json.length env['rack.input'] = StringIO.new({'requests' => rack_input}.to_json) request = Rack::Request.new(env) Rails.logger.info("new Environment is #{env}") Rails.logger.info("New request content are #{request}") result = @app.call(env) Rails.logger.info("Result are #{result}") result end
consumer_code()
click to toggle source
List All Instance Variables here for reference
-
@app
-
$configuration
-
@consumer
-
$waiting_threads
# File lib/bundle_requests/consumer.rb, line 23 def consumer_code while true c = $waiting_threads.length if c < $configuration["max_waiting_thread"] puts "#{c} threads are waiting so sleeping" sleep($configuration["wait_time"]) next if $waiting_threads.length == 0 end puts "Started request processing----------------------------------" threads = pop_some_waiting_threads rack_input = gather_all_requests(threads) result = call_bundle_api(rack_input,threads[0]['request']) # may through exception if 0 threads are present remove it afterwards distribute_result_and_wakeup_those_threads(threads, result) puts "Completed proccessing requests------------------------------" end end
distribute_result_and_wakeup_those_threads(threads, result)
click to toggle source
# File lib/bundle_requests/consumer.rb, line 77 def distribute_result_and_wakeup_those_threads(threads, result) for index in 0...threads.length t = threads[index] # hardcoded response for development. Replace it with result[index] t["response"] = [200,{"Content-Type" => "application/json"},["Status Ok"]] t.wakeup end end
gather_all_requests(threads)
click to toggle source
# File lib/bundle_requests/consumer.rb, line 50 def gather_all_requests(threads) rack_input = [] threads.each do |t| e = t['request'] req = Rack::Request.new(e) Rails.logger.debug req.inspect rack_input << JSON.parse(req.body.read) end Rails.logger.info rack_input rack_input end
generate_config_hash(options)
click to toggle source
# File lib/bundle_requests/consumer.rb, line 86 def generate_config_hash(options) if $configuration.nil? config = { "incoming_request" => "/api", "bundle_api" => "/bundle_api", "wait_time" => 10, "thread_wait_after_closing_entrance" => 2, "max_waiting_thread" => 16 } options.each do |key,value| if !value.nil? config[key] = value end end $configuration = config end end
get_configuration()
click to toggle source
# File lib/bundle_requests/consumer.rb, line 14 def get_configuration $configuration end
get_waiting_threads()
click to toggle source
# File lib/bundle_requests/consumer.rb, line 10 def get_waiting_threads $waiting_threads end
pop_some_waiting_threads()
click to toggle source
# File lib/bundle_requests/consumer.rb, line 40 def pop_some_waiting_threads threads = [] $waiting_threads.length.times do t = $waiting_threads.pop threads << t end Rails.logger.info("Currently proccessing #{threads.length} threads") threads end