class BackgrounDRb::Connection
Attributes
cluster_conn[RW]
connection_status[RW]
server_ip[RW]
server_port[RW]
Public Class Methods
new(ip,port,cluster_conn)
click to toggle source
# File lib/backgroundrb/bdrb_connection.rb, line 5 def initialize ip,port,cluster_conn @mutex = Mutex.new @server_ip = ip @server_port = port @cluster_conn = cluster_conn @connection_status = true end
Public Instance Methods
all_worker_info()
click to toggle source
# File lib/backgroundrb/bdrb_connection.rb, line 109 def all_worker_info p_data = { } p_data[:type] = :all_worker_info dump_object(p_data) bdrb_response = nil @mutex.synchronize { bdrb_response = read_from_bdrb() } close_connection bdrb_response end
ask_result(p_data)
click to toggle source
# File lib/backgroundrb/bdrb_connection.rb, line 145 def ask_result(p_data) if BDRB_CONFIG[:backgroundrb][:result_storage] == 'memcache' return_result_from_memcache(p_data) else p_data[:type] = :get_result dump_object(p_data) bdrb_response = nil @mutex.synchronize { bdrb_response = read_from_bdrb() } close_connection bdrb_response ? bdrb_response[:data] : nil end end
ask_work(p_data)
click to toggle source
# File lib/backgroundrb/bdrb_connection.rb, line 84 def ask_work p_data p_data[:type] = :async_invoke dump_object(p_data) bdrb_response = nil @mutex.synchronize { bdrb_response = read_from_bdrb() } close_connection bdrb_response end
close_connection()
click to toggle source
# File lib/backgroundrb/bdrb_connection.rb, line 79 def close_connection @connection.close @connection = nil end
delete_worker(p_data)
click to toggle source
# File lib/backgroundrb/bdrb_connection.rb, line 119 def delete_worker p_data p_data[:type] = :delete_worker dump_object(p_data) close_connection end
dump_object(data)
click to toggle source
# File lib/backgroundrb/bdrb_connection.rb, line 68 def dump_object data establish_connection raise BackgrounDRb::BdrbConnError.new("Error while connecting to the backgroundrb server #{server_info}") unless @connection_status object_dump = Marshal.dump(data) dump_length = object_dump.length.to_s length_str = dump_length.rjust(9,'0') final_data = length_str + object_dump @mutex.synchronize { write_data(final_data) } end
establish_connection()
click to toggle source
# File lib/backgroundrb/bdrb_connection.rb, line 14 def establish_connection begin timeout(3) do @connection = TCPSocket.open(server_ip, server_port) @connection.setsockopt(Socket::IPPROTO_TCP,Socket::TCP_NODELAY,1) end @connection_status = true rescue Timeout::Error @connection_status = false rescue Exception => e @connection_status = false end end
flush_in_loop(data)
click to toggle source
# File lib/backgroundrb/bdrb_connection.rb, line 56 def flush_in_loop(data) t_length = data.length loop do break if t_length <= 0 written_length = @connection.write(data) raise "Error writing to socket" if written_length <= 0 result = @connection.flush data = data[written_length..-1] t_length = data.length end end
gen_key(options)
click to toggle source
# File lib/backgroundrb/bdrb_connection.rb, line 136 def gen_key options if BDRB_CONFIG[:backgroundrb][:result_storage] == 'memcache' key = [options[:worker],options[:worker_key],options[:job_key]].compact.join('_') key else options[:job_key] end end
new_worker(p_data)
click to toggle source
# File lib/backgroundrb/bdrb_connection.rb, line 93 def new_worker p_data p_data[:type] = :start_worker dump_object(p_data) close_connection # RailsWorkerProxy.worker(p_data[:worker],p_data[:worker_key],self) end
read_from_bdrb(timeout = 3)
click to toggle source
# File lib/backgroundrb/bdrb_connection.rb, line 158 def read_from_bdrb(timeout = 3) begin ret_val = select([@connection],nil,nil,timeout) return nil unless ret_val raw_response = read_object() master_response = Marshal.load(raw_response) return master_response rescue return nil end end
read_object()
click to toggle source
# File lib/backgroundrb/bdrb_connection.rb, line 125 def read_object begin message_length_str = @connection.read(9) message_length = message_length_str.to_i message_data = @connection.read(message_length) return message_data rescue raise BackgrounDRb::BdrbConnError.new("Not able to connect #{server_info}") end end
send_request(p_data)
click to toggle source
# File lib/backgroundrb/bdrb_connection.rb, line 170 def send_request(p_data) p_data[:type] = :sync_invoke dump_object(p_data) bdrb_response = nil @mutex.synchronize { bdrb_response = read_from_bdrb(nil) } close_connection bdrb_response end
server_info()
click to toggle source
# File lib/backgroundrb/bdrb_connection.rb, line 52 def server_info "#{server_ip}:#{server_port}" end
worker_info(p_data)
click to toggle source
# File lib/backgroundrb/bdrb_connection.rb, line 100 def worker_info(p_data) p_data[:type] = :worker_info dump_object(p_data) bdrb_response = nil @mutex.synchronize { bdrb_response = read_from_bdrb() } close_connection bdrb_response end
write_data(data)
click to toggle source
# File lib/backgroundrb/bdrb_connection.rb, line 28 def write_data data begin flush_in_loop(data) rescue Errno::EAGAIN return rescue Errno::EPIPE establish_connection if @connection_status flush_in_loop(data) else @connection_status = false raise BackgrounDRb::BdrbConnError.new("Error while writing #{server_info}") end rescue establish_connection if @connection_status flush_in_loop(data) else @connection_status = false raise BackgrounDRb::BdrbConnError.new("Error while writing #{server_info}") end end end