class DpStmMap::DistributedPersistentStmMap
Public Class Methods
new(host, port, local_storage)
click to toggle source
# File lib/dp_stm_map/Client.rb, line 16 def initialize host, port, local_storage @host=host @port=port @connect_listeners=[] @disconnect_listeners=[] @connect_state=:disconnected @mutex=Mutex.new @content={} @state=ClientLocalStore.new local_storage @validators=[] @listeners=[] @outgoing_queue=Queue.new @outcome_futures={} @transaction_id_condition_variable=ConditionVariable.new end
Public Instance Methods
atomic(timeout=nil) { |view| ... }
click to toggle source
# File lib/dp_stm_map/Client.rb, line 186 def atomic timeout=nil outcome_future=Queue.new result=nil tx_id=SecureRandom.uuid view=AtomicView.new @state @mutex.synchronize do result=yield view @outcome_futures[tx_id]=outcome_future end changes=view.changes @validators.each do |validator| validator.call changes end changes=view.changes transitions={} new_content={} changes.each do |k,(old,new)| transitions[k] = [content_digest(old), content_digest(new)] new_content[content_digest(new)]=new end send_to_server ClientTransactionMessage.new tx_id, transitions, new_content outcome=outcome_future.pop if ClientTransactionSuccessfulMessage === outcome @mutex.synchronize do while @state.current_transaction_sequence < outcome.transaction_sequence @transaction_id_condition_variable.wait(@mutex) end end end result end
atomic_read() { |atomic_read_view state| ... }
click to toggle source
# File lib/dp_stm_map/Client.rb, line 249 def atomic_read @mutex.synchronize do yield AtomicReadView.new @state end end
content_digest(content)
click to toggle source
# File lib/dp_stm_map/Client.rb, line 241 def content_digest content unless content == nil Digest::SHA2.hexdigest(content) else nil end end
on_atomic(&block)
click to toggle source
# File lib/dp_stm_map/Client.rb, line 176 def on_atomic &block @listeners << block end
on_connected(&block)
click to toggle source
# File lib/dp_stm_map/Client.rb, line 166 def on_connected &block @connect_listeners << block end
on_disconnected(&block)
click to toggle source
# File lib/dp_stm_map/Client.rb, line 170 def on_disconnected &block @disconnect_listeners << block end
send_to_server(message)
click to toggle source
private
# File lib/dp_stm_map/Client.rb, line 259 def send_to_server message # puts "about to send #{message}" @outgoing_queue << message end
start()
click to toggle source
# File lib/dp_stm_map/Client.rb, line 43 def start latch=Queue.new @reading_thread=Thread.new do begin # puts "connecting" @client_socket=nil @client_socket=TCPSocket.new @host,@port @client_socket.setsockopt(Socket::IPPROTO_TCP, Socket::TCP_NODELAY, 1) # puts "connected" @connect_state=:connected Thread.new do begin loop do message=@outgoing_queue.pop serialized=message.serialize @client_socket.write([serialized.bytesize].pack("Q>")) @client_socket.write(serialized) @client_socket.flush # puts "sent #{message}" end rescue => e # puts "Error during processing: #{$!}" # puts "Backtrace:\n\t#{e.backtrace.join("\n\t")}" end end send_to_server ClientHelloMessage.new(@state.current_transaction_sequence) latch << "connected" @connect_listeners.each do |listener| listener.call end loop do read=@client_socket.read(8) # break unless read len=read.unpack("Q>")[0] msg=JsonMessage.deserialize(@client_socket.read(len)) # puts "got from server %s " % msg if ClientTransactionOutcomeMessage === msg # pp msg @mutex.synchronize do if @outcome_futures.has_key? msg.transaction_id @outcome_futures.delete(msg.transaction_id).push msg end end end if TransactionMessage === msg # pp msg @mutex.synchronize do changes=@state.update msg.transaction_sequence, msg.new_content, msg.transitions, msg.delete_content @listeners.each do |listener| begin listener.call changes rescue end end @transaction_id_condition_variable.broadcast end end end rescue ShutdownError => e # puts "shutdown" rescue => e # puts "Error during processing: #{$!}" # puts "Backtrace:\n\t#{e.backtrace.join("\n\t")}" # puts "error %s" % e if @client_socket @client_socket.close end if @connect_state == :connected @connect_state=:disconnected @disconnect_listeners.each do |listener| listener.call end end # puts "Exception: %s" % e sleep 0.1 retry ensure if @connect_state == :connected @connect_state=:disconnected @disconnect_listeners.each do |listener| listener.call end end if @client_socket && !@client_socket.closed? @client_socket.close end end end latch.pop end
stop()
click to toggle source
# File lib/dp_stm_map/Client.rb, line 156 def stop @reading_thread.raise ShutdownError begin @reading_thread.join rescue => e end # puts "stopped" end
validate_atomic(&block)
click to toggle source
# File lib/dp_stm_map/Client.rb, line 181 def validate_atomic &block @validators << block end