class SeapigClient
Attributes
connected[R]
error[R]
socket[R]
uri[R]
Public Class Methods
new(uri, options={})
click to toggle source
# File lib/seapig-client-ruby/client.rb, line 30 def initialize(uri, options={}) @uri = uri @options = options @slave_objects = {} @master_objects = {} @connected = false @socket = nil @error = nil connect end
Public Instance Methods
connect()
click to toggle source
# File lib/seapig-client-ruby/client.rb, line 42 def connect disconnect if @socket @reconnect_on_close = true @timeout_timer ||= EM.add_periodic_timer(10) { next if not @socket next if Time.new.to_f - @last_communication_at < 20 puts "Seapig ping timeout, reconnecting" if @options[:debug] connect } @last_communication_at = Time.new.to_f puts 'Connecting to seapig server' if @options[:debug] @socket = WebSocket::EventMachine::Client.connect(uri: @uri) @socket.onopen { puts 'Connected to seapig server' if @options[:debug] @connected = true @error = nil @onstatuschange_proc.call(self) if @onstatuschange_proc @socket.send JSON.dump(action: 'client-options-set', options: @options) @slave_objects.each_pair { |id, object| @socket.send JSON.dump(action: 'object-consumer-register', pattern: id, :"version-known" => object.version) object.validate } @master_objects.each_pair { |id, object| @socket.send JSON.dump(action: 'object-producer-register', pattern: id, :"version-known" => object.version) object.children.each { |child_id, child| child.upload(0, {}, child.version, true) } if id.include?('*') } @last_communication_at = Time.new.to_f } @socket.onclose(&(@socket_onclose = Proc.new { |code, reason| puts 'Seapig connection closed (code:'+code.inspect+', reason:'+reason.inspect+')' if @options[:debug] @connected = false @socket = nil @timeout_timer.cancel if @timeout_timer @timeout_timer = nil @slave_objects.values.each { |object| object.invalidate } @onstatuschange_proc.call(self) if @onstatuschange_proc EM.cancel_timer(@reconnection_timer) if @reconnection_timer @reconnection_timer = nil @reconnection_timer = EM.add_timer(1) { connect } if @reconnect_on_close })) @socket.onerror { |error| puts 'Seapig socket error: '+error.inspect if @options[:debug] @error = { while: @socket ? "connecting" : "connected", error: error } @socket_onclose.call(nil, error) if @socket } @socket.onmessage { |data| message = JSON.load(data) case message['action'] when 'object-update' @slave_objects.each_pair { |id, object| object.patch(message) if object.matches(message['id']) } when 'object-destroy' @slave_objects.each_pair { |id, object| object.destroy(message['id']) if object.matches(message['id']) } @master_objects.each_pair { |id, object| object.destroy(message['id']) if object.matches(message['id']) } when 'object-produce' handler = @master_objects.values.find { |object| object.matches(message['id']) } puts 'Seapig server submitted invalid "produce" request: '+message.inspect if (not handler) and @options[:debug] handler.produce(message['id'],message['version-inferred']) if handler else raise 'Seapig server submitted an unsupported message: '+message.inspect if @options[:debug] end @last_communication_at = Time.new.to_f } @socket.onping { @last_communication_at = Time.new.to_f } end
detach_fd()
click to toggle source
# File lib/seapig-client-ruby/client.rb, line 150 def detach_fd #FIXME: this is, most likely, broken disconnect(true) end
disconnect(detach_fd = false)
click to toggle source
# File lib/seapig-client-ruby/client.rb, line 126 def disconnect(detach_fd = false) @reconnect_on_close = false if @timeout_timer @timeout_timer.cancel @timeout_timer = nil end if @reconnection_timer EM.cancel_timer(@reconnection_timer) @reconnection_timer = nil end if @socket if detach_fd #FIXME: this is, most likely, broken IO.new(@socket.detach).close @socket.onclose {} @socket_onclose.call("fd detach", "fd detach") else @socket.onclose {} @socket.close @socket_onclose.call("close","close") end end end
master(id, options={})
click to toggle source
# File lib/seapig-client-ruby/client.rb, line 169 def master(id, options={}) @master_objects[id] = if id.include?('*') then SeapigWildcardMasterObject.new(self, id, options) else SeapigMasterObject.new(self, id, options) end @socket.send JSON.dump(action: 'object-producer-register', pattern: id, :"version-known" => @master_objects[id].version) if @connected @master_objects[id] end
onstatuschange(&block)
click to toggle source
# File lib/seapig-client-ruby/client.rb, line 155 def onstatuschange(&block) @onstatuschange_proc = block self end
slave(id, options={})
click to toggle source
# File lib/seapig-client-ruby/client.rb, line 161 def slave(id, options={}) raise "Both or none of 'object' and 'version' are needed" if (options[:object] and not options[:version]) or (not options[:object] and options[:version]) @slave_objects[id] = if id.include?('*') then SeapigWildcardSlaveObject.new(self, id, options) else SeapigSlaveObject.new(self, id, options) end @socket.send JSON.dump(action: 'object-consumer-register', pattern: id, :"version-known" => @slave_objects[id].version) if @connected @slave_objects[id] end
unlink(id)
click to toggle source
# File lib/seapig-client-ruby/client.rb, line 176 def unlink(id) if @slave_objects[id] @slave_objects.delete(id) @socket.send(JSON.stringify(action: 'object-consumer-unregister', pattern: id)) if @connected end if @master_objects[id] @master_objects.delete(id) @socket.send(JSON.stringify(action: 'object-producer-unregister', pattern: id)) if @connected end end