class ConcurrentWorker::Worker
Attributes
req_counter[R]
Worker
: worker class
+cncr_block : concurrent processing block : thread(as ConcurrentThread)/process(as ConcurrentProcess) +base_block : user defined preparation to exec 'work block' +loop_block : loop of receiving request and exec 'work block' +work_block : user requested work
These blocks are executed with 'instance_exec' method of worker, so that they can share instance variables:@xxxx.
snd_queue_max[R]
Worker
: worker class
+cncr_block : concurrent processing block : thread(as ConcurrentThread)/process(as ConcurrentProcess) +base_block : user defined preparation to exec 'work block' +loop_block : loop of receiving request and exec 'work block' +work_block : user requested work
These blocks are executed with 'instance_exec' method of worker, so that they can share instance variables:@xxxx.
undone_requests[R]
Worker
: worker class
+cncr_block : concurrent processing block : thread(as ConcurrentThread)/process(as ConcurrentProcess) +base_block : user defined preparation to exec 'work block' +loop_block : loop of receiving request and exec 'work block' +work_block : user requested work
These blocks are executed with 'instance_exec' method of worker, so that they can share instance variables:@xxxx.
Public Class Methods
new(*args, **options, &work_block)
click to toggle source
# File lib/concurrent_worker/worker.rb, line 22 def initialize(*args, **options, &work_block) @args = args @options = options set_block(:work_block, &work_block) if work_block @state = :idle @result_callbacks = [] @retired_callbacks = [] @snd_queue_max = @options[:snd_queue_max] || 2 @req_mutex = Mutex.new @req_counter = RequestCounter.new @options[:result_callback_interrupt] ||= :immediate @options[:retired_callback_interrupt] ||= :immediate @undone_requests = [] case @options[:type] when :process class << self include ConcurrentProcess end when :thread class << self include ConcurrentThread end else class << self include ConcurrentThread end end end
Public Instance Methods
add_callback(&callback)
click to toggle source
# File lib/concurrent_worker/worker.rb, line 54 def add_callback(&callback) raise "block is nil" unless callback @result_callbacks.push(callback) end
add_retired_callback(&callback)
click to toggle source
# File lib/concurrent_worker/worker.rb, line 73 def add_retired_callback(&callback) raise "block is nil" unless callback @retired_callbacks.push(callback) end
call_result_callbacks(args)
click to toggle source
# File lib/concurrent_worker/worker.rb, line 62 def call_result_callbacks(args) Thread.handle_interrupt(Object => :never) do Thread.handle_interrupt(Object => @options[:result_callback_interrupt]) do @result_callbacks.each do |callback| callback.call(*args) end end @req_counter.pop end end
call_retired_callbacks()
click to toggle source
# File lib/concurrent_worker/worker.rb, line 81 def call_retired_callbacks Thread.handle_interrupt(Object => @options[:retired_callback_interrupt]) do @retired_callbacks.each do |callback| callback.call end end end
clear_callbacks()
click to toggle source
# File lib/concurrent_worker/worker.rb, line 58 def clear_callbacks @result_callbacks.clear end
clear_retired_callbacks()
click to toggle source
# File lib/concurrent_worker/worker.rb, line 77 def clear_retired_callbacks @retired_callbacks.clear end
define_block(symbol,&block)
click to toggle source
# File lib/concurrent_worker/worker.rb, line 113 def define_block(symbol,&block) worker_block = Proc.new do |*args| self.instance_exec(*args, &block) end instance_variable_set("@" + symbol.to_s, worker_block) end
define_block_yield(symbol)
click to toggle source
# File lib/concurrent_worker/worker.rb, line 120 def define_block_yield(symbol) define_singleton_method("yield_" + symbol.to_s) do |*args| blk = instance_variable_get("@" + symbol.to_s) if blk blk.call(*args) else raise "block " + symbol.to_s + " is not defined" end end end
join()
click to toggle source
# File lib/concurrent_worker/worker.rb, line 194 def join unless @state == :run return true end @req_counter.wait_until_less_than(1) quit wait_cncr_proc true end
queue_closed?()
click to toggle source
# File lib/concurrent_worker/worker.rb, line 15 def queue_closed? @req_counter.closed? end
queue_empty?()
click to toggle source
# File lib/concurrent_worker/worker.rb, line 18 def queue_empty? !queue_closed? && @req_counter.size == 0 end
quit()
click to toggle source
# File lib/concurrent_worker/worker.rb, line 182 def quit unless @state == :run return end begin send_req(nil) true rescue ClosedQueueError, IOError false end end
req(*args, &work_block)
click to toggle source
# File lib/concurrent_worker/worker.rb, line 166 def req(*args, &work_block) @req_mutex.synchronize do unless @state == :run run end @req_counter.wait_until_less_than(@snd_queue_max) if @snd_queue_max > 0 begin @req_counter.push([args, work_block]) send_req([args, work_block]) true rescue ClosedQueueError, IOError false end end end
req_counter_close()
click to toggle source
# File lib/concurrent_worker/worker.rb, line 89 def req_counter_close @req_counter.close until @req_counter.empty? @undone_requests.push(@req_counter.pop) end end
result_handle_thread(&recv_block)
click to toggle source
# File lib/concurrent_worker/worker.rb, line 96 def result_handle_thread(&recv_block) Thread.new do Thread.handle_interrupt(Object => :never) do begin Thread.handle_interrupt(Object => :immediate) do recv_block.call end ensure req_counter_close channel_close call_retired_callbacks end end end end
run()
click to toggle source
# File lib/concurrent_worker/worker.rb, line 159 def run @state = :run set_default_loop_block unless defined?(@loop_block) && @loop_block set_default_base_block unless defined?(@base_block) && @base_block cncr_block end
set_block(symbol, &block)
click to toggle source
# File lib/concurrent_worker/worker.rb, line 131 def set_block(symbol, &block) raise "block is nil" unless block unless [:base_block, :loop_block, :work_block].include?(symbol) raise symbol.to_s + " is not used as worker block" end define_block(symbol,&block) define_block_yield(symbol) end
set_default_base_block()
click to toggle source
# File lib/concurrent_worker/worker.rb, line 153 def set_default_base_block set_block(:base_block) do yield_loop_block end end
set_default_loop_block()
click to toggle source
# File lib/concurrent_worker/worker.rb, line 141 def set_default_loop_block set_block(:loop_block) do while req = receive_req (args, work_block) = req if work_block set_block(:work_block, &work_block) end send_res([yield_work_block(*args)]) end end end