class DpStmMap::DistributedPersistentStmMap

Public Class Methods

new(host, port, local_storage) click to toggle source
# File lib/dp_stm_map/Client.rb, line 16
def initialize host, port, local_storage
  @host=host
  @port=port
  @connect_listeners=[]
  @disconnect_listeners=[]
  @connect_state=:disconnected
  @mutex=Mutex.new


  @content={}


  @state=ClientLocalStore.new local_storage


  @validators=[]
  @listeners=[]

  @outgoing_queue=Queue.new


  @outcome_futures={}

  @transaction_id_condition_variable=ConditionVariable.new

end

Public Instance Methods

atomic(timeout=nil) { |view| ... } click to toggle source
# File lib/dp_stm_map/Client.rb, line 186
def atomic timeout=nil

  outcome_future=Queue.new


  result=nil


  tx_id=SecureRandom.uuid

  view=AtomicView.new @state

  @mutex.synchronize do
    result=yield view
    @outcome_futures[tx_id]=outcome_future
  end

  changes=view.changes

  @validators.each do |validator|
    validator.call changes
  end



  changes=view.changes

  transitions={}

  new_content={}

  changes.each do |k,(old,new)|
    transitions[k] =  [content_digest(old), content_digest(new)]
    new_content[content_digest(new)]=new
  end

  send_to_server ClientTransactionMessage.new tx_id, transitions, new_content

  outcome=outcome_future.pop


  if ClientTransactionSuccessfulMessage === outcome
    @mutex.synchronize do
      while @state.current_transaction_sequence < outcome.transaction_sequence
        @transaction_id_condition_variable.wait(@mutex)
      end
    end
  end


  result

end
atomic_read() { |atomic_read_view state| ... } click to toggle source
# File lib/dp_stm_map/Client.rb, line 249
def atomic_read
  @mutex.synchronize do
    yield AtomicReadView.new @state
  end
end
content_digest(content) click to toggle source
# File lib/dp_stm_map/Client.rb, line 241
def content_digest content
  unless content == nil
    Digest::SHA2.hexdigest(content)
  else
    nil
  end
end
on_atomic(&block) click to toggle source
# File lib/dp_stm_map/Client.rb, line 176
def on_atomic &block
  @listeners << block
end
on_connected(&block) click to toggle source
# File lib/dp_stm_map/Client.rb, line 166
def on_connected &block
  @connect_listeners << block
end
on_disconnected(&block) click to toggle source
# File lib/dp_stm_map/Client.rb, line 170
def on_disconnected &block
  @disconnect_listeners << block
end
send_to_server(message) click to toggle source

private

# File lib/dp_stm_map/Client.rb, line 259
def send_to_server message
  # puts "about to send #{message}"
  @outgoing_queue << message
end
start() click to toggle source
# File lib/dp_stm_map/Client.rb, line 43
def start


  latch=Queue.new

  @reading_thread=Thread.new do
    begin
      # puts "connecting"
      @client_socket=nil
      @client_socket=TCPSocket.new @host,@port

      @client_socket.setsockopt(Socket::IPPROTO_TCP, Socket::TCP_NODELAY, 1)
      # puts "connected"
      @connect_state=:connected


      Thread.new do
        begin
          loop do
            message=@outgoing_queue.pop
            serialized=message.serialize
            @client_socket.write([serialized.bytesize].pack("Q>"))
            @client_socket.write(serialized)
            @client_socket.flush
            # puts "sent #{message}"
          end
        rescue => e
          # puts "Error during processing: #{$!}"
          # puts "Backtrace:\n\t#{e.backtrace.join("\n\t")}"
        end
      end

      send_to_server ClientHelloMessage.new(@state.current_transaction_sequence)


      latch << "connected"

      @connect_listeners.each do |listener|
        listener.call
      end



      loop do
        read=@client_socket.read(8)
        # break unless read
        len=read.unpack("Q>")[0]

        msg=JsonMessage.deserialize(@client_socket.read(len))
        # puts "got from server %s " % msg
        if ClientTransactionOutcomeMessage === msg
          # pp msg
          @mutex.synchronize do
            if @outcome_futures.has_key? msg.transaction_id
              @outcome_futures.delete(msg.transaction_id).push msg
            end
          end
        end
        if TransactionMessage === msg
          # pp msg
          @mutex.synchronize do

            changes=@state.update msg.transaction_sequence, msg.new_content, msg.transitions, msg.delete_content


            @listeners.each do |listener|
              begin
                listener.call changes
              rescue
              end
            end

            @transaction_id_condition_variable.broadcast

          end

        end
      end
    rescue ShutdownError => e
      # puts "shutdown"
    rescue => e          
      # puts "Error during processing: #{$!}"
      # puts "Backtrace:\n\t#{e.backtrace.join("\n\t")}"
      # puts "error %s" % e
      if @client_socket
        @client_socket.close
      end
      if @connect_state == :connected
        @connect_state=:disconnected
        @disconnect_listeners.each do |listener|
          listener.call
        end
      end
      # puts "Exception: %s" % e
      sleep 0.1
      retry
    ensure
      if @connect_state == :connected
        @connect_state=:disconnected
        @disconnect_listeners.each do |listener|
          listener.call
        end
      end
      if @client_socket && !@client_socket.closed?
        @client_socket.close
      end
    end

  end

  latch.pop
end
stop() click to toggle source
# File lib/dp_stm_map/Client.rb, line 156
def stop
  @reading_thread.raise ShutdownError
  begin
    @reading_thread.join
  rescue => e
  end
  # puts "stopped"
end
validate_atomic(&block) click to toggle source
# File lib/dp_stm_map/Client.rb, line 181
def validate_atomic &block
  @validators << block
end