class SeapigClient

Attributes

connected[R]
error[R]
socket[R]
uri[R]

Public Class Methods

new(uri, options={}) click to toggle source
# File lib/seapig-client-ruby/client.rb, line 30
def initialize(uri, options={})
        @uri = uri
        @options = options
        @slave_objects = {}
        @master_objects = {}
        @connected = false
        @socket = nil
        @error = nil
        connect
end

Public Instance Methods

connect() click to toggle source
# File lib/seapig-client-ruby/client.rb, line 42
def connect

        disconnect if @socket
        @reconnect_on_close = true

        @timeout_timer ||= EM.add_periodic_timer(10) {
                next if not @socket
                next if Time.new.to_f - @last_communication_at < 20
                puts "Seapig ping timeout, reconnecting" if @options[:debug]
                connect
        }

        @last_communication_at = Time.new.to_f

        puts 'Connecting to seapig server' if @options[:debug]
        @socket = WebSocket::EventMachine::Client.connect(uri: @uri)

        @socket.onopen {
                puts 'Connected to seapig server' if @options[:debug]
                @connected = true
                @error = nil
                @onstatuschange_proc.call(self) if @onstatuschange_proc
                @socket.send JSON.dump(action: 'client-options-set', options: @options)
                @slave_objects.each_pair { |id, object|
                        @socket.send JSON.dump(action: 'object-consumer-register', pattern: id, :"version-known" => object.version)
                        object.validate
                }
                @master_objects.each_pair { |id, object|
                        @socket.send JSON.dump(action: 'object-producer-register', pattern: id, :"version-known" => object.version)
                        object.children.each { |child_id, child| child.upload(0, {}, child.version, true) } if id.include?('*')
                }
                @last_communication_at = Time.new.to_f
        }

        @socket.onclose(&(@socket_onclose = Proc.new { |code, reason|
                puts 'Seapig connection closed (code:'+code.inspect+', reason:'+reason.inspect+')' if @options[:debug]
                @connected = false
                @socket = nil
                @timeout_timer.cancel if @timeout_timer
                @timeout_timer = nil
                @slave_objects.values.each { |object| object.invalidate }
                @onstatuschange_proc.call(self) if @onstatuschange_proc
                EM.cancel_timer(@reconnection_timer) if @reconnection_timer
                @reconnection_timer = nil
                @reconnection_timer = EM.add_timer(1) { connect } if @reconnect_on_close
        }))

        @socket.onerror { |error|
                puts 'Seapig socket error: '+error.inspect if @options[:debug]
                @error = { while: @socket ? "connecting" : "connected", error: error }
                @socket_onclose.call(nil, error) if @socket
        }

        @socket.onmessage { |data|
                message = JSON.load(data)
                case message['action']
                when 'object-update'
                        @slave_objects.each_pair { |id, object|
                                object.patch(message) if object.matches(message['id'])
                        }
                when 'object-destroy'
                        @slave_objects.each_pair { |id, object|
                                object.destroy(message['id']) if object.matches(message['id'])
                        }
                        @master_objects.each_pair { |id, object|
                                object.destroy(message['id']) if object.matches(message['id'])
                        }
                when 'object-produce'
                        handler = @master_objects.values.find { |object| object.matches(message['id']) }
                        puts 'Seapig server submitted invalid "produce" request: '+message.inspect if (not handler) and @options[:debug]
                        handler.produce(message['id'],message['version-inferred']) if handler
                else
                        raise 'Seapig server submitted an unsupported message: '+message.inspect if @options[:debug]
                end
                @last_communication_at = Time.new.to_f
        }

        @socket.onping {
                @last_communication_at = Time.new.to_f
        }

end
detach_fd() click to toggle source
# File lib/seapig-client-ruby/client.rb, line 150
def detach_fd #FIXME: this is, most likely, broken
        disconnect(true)
end
disconnect(detach_fd = false) click to toggle source
# File lib/seapig-client-ruby/client.rb, line 126
def disconnect(detach_fd = false)
        @reconnect_on_close = false
        if @timeout_timer
                @timeout_timer.cancel
                @timeout_timer = nil
        end
        if @reconnection_timer
                EM.cancel_timer(@reconnection_timer)
                @reconnection_timer = nil
        end
        if @socket
                if detach_fd #FIXME: this is, most likely, broken
                        IO.new(@socket.detach).close
                        @socket.onclose {}
                        @socket_onclose.call("fd detach", "fd detach")
                else
                        @socket.onclose {}
                        @socket.close
                        @socket_onclose.call("close","close")
                end
        end
end
master(id, options={}) click to toggle source
# File lib/seapig-client-ruby/client.rb, line 169
def master(id, options={})
        @master_objects[id] = if id.include?('*') then SeapigWildcardMasterObject.new(self, id, options) else SeapigMasterObject.new(self, id, options) end
        @socket.send JSON.dump(action: 'object-producer-register', pattern: id, :"version-known" => @master_objects[id].version) if @connected
        @master_objects[id]
end
onstatuschange(&block) click to toggle source
# File lib/seapig-client-ruby/client.rb, line 155
def onstatuschange(&block)
        @onstatuschange_proc = block
        self
end
slave(id, options={}) click to toggle source
# File lib/seapig-client-ruby/client.rb, line 161
def slave(id, options={})
        raise "Both or none of 'object' and 'version' are needed" if (options[:object] and not options[:version]) or (not options[:object] and options[:version])
        @slave_objects[id] = if id.include?('*') then SeapigWildcardSlaveObject.new(self, id, options) else SeapigSlaveObject.new(self, id, options) end
        @socket.send JSON.dump(action: 'object-consumer-register', pattern: id, :"version-known" => @slave_objects[id].version) if @connected
        @slave_objects[id]
end