class Gossiperl::Client::Messaging
Public Class Methods
new(worker, &block)
click to toggle source
# File lib/gossiperl_client/messaging.rb, line 9 def initialize worker, &block self.worker = worker @callback_block = block end
Public Instance Methods
digest_ack(digest)
click to toggle source
# File lib/gossiperl_client/messaging.rb, line 72 def digest_ack digest ack = ::Gossiperl::Client::Thrift::DigestAck.new ack.name = self.worker.options[:client_name].to_s ack.heartbeat = Time.now.to_i ack.reply_id = digest.id ack.membership = [] self.send ack end
digest_exit()
click to toggle source
# File lib/gossiperl_client/messaging.rb, line 109 def digest_exit digest = ::Gossiperl::Client::Thrift::DigestExit.new digest.name = self.worker.options[:client_name].to_s digest.heartbeat = Time.now.to_i digest.secret = self.worker.options[:client_secret].to_s self.send digest self.worker.working = false end
digest_forwarded_ack(digest_id)
click to toggle source
# File lib/gossiperl_client/messaging.rb, line 81 def digest_forwarded_ack digest_id ack = ::Gossiperl::Client::Thrift::DigestForwardedAck.new ack.name = self.worker.options[:client_name].to_s ack.secret = self.worker.options[:client_secret].to_s ack.reply_id = digest_id self.send ack end
digest_subscribe(event_types)
click to toggle source
# File lib/gossiperl_client/messaging.rb, line 89 def digest_subscribe event_types digest = ::Gossiperl::Client::Thrift::DigestSubscribe.new digest.name = self.worker.options[:client_name].to_s digest.secret = self.worker.options[:client_secret].to_s digest.id = SecureRandom.uuid.to_s digest.heartbeat = Time.now.to_i digest.event_types = event_types.map{|item| item.to_s} self.send digest end
digest_unsubscribe(event_types)
click to toggle source
# File lib/gossiperl_client/messaging.rb, line 99 def digest_unsubscribe event_types digest = ::Gossiperl::Client::Thrift::DigestUnsubscribe.new digest.name = self.worker.options[:client_name].to_s digest.secret = self.worker.options[:client_secret].to_s digest.id = SecureRandom.uuid.to_s digest.heartbeat = Time.now.to_i digest.event_types = event_types.map{|item| item.to_s} self.send digest end
get_callback_block()
click to toggle source
# File lib/gossiperl_client/messaging.rb, line 14 def get_callback_block @callback_block end
send(digest)
click to toggle source
# File lib/gossiperl_client/messaging.rb, line 68 def send digest self.transport.send digest end
start()
click to toggle source
# File lib/gossiperl_client/messaging.rb, line 18 def start self.transport = Gossiperl::Client::Transport::Udp.new( self.worker ) if self.worker.options.has_key?(:thrift_window) self.transport.recv_buf_size = self.worker.options[:thrift_window] end Thread.new(self) do |msg| msg.transport.handle do |data| if data.kind_of? Hash if data.has_key?(:error) msg.worker.process_event( { :event => :failed, :error => data[:error] } ) elsif data.has_key?(:forward) msg.worker.process_event( { :event => :forwarded, :digest => data[:envelope], :digest_type => data[:type] } ) msg.digest_forwarded_ack data[:envelope].id else msg.worker.process_event( { :event => :failed, :error => { :unsupported_hash_response => data } } ) end else if data.is_a?( Gossiperl::Client::Thrift::Digest ) msg.digest_ack data elsif data.is_a?( Gossiperl::Client::Thrift::DigestAck ) msg.worker.state.receive data elsif data.is_a?( Gossiperl::Client::Thrift::DigestEvent ) msg.worker.process_event( { :event => :event, :details => { :type => data.event_type, :member => data.event_object, :heartbeat => data.heartbeat } } ) elsif data.is_a?( Gossiperl::Client::Thrift::DigestSubscribeAck ) msg.worker.process_event( { :event => :subscribed, :details => { :types => data.event_types.map{|item| item.to_sym}, :heartbeat => data.heartbeat } } ) elsif data.is_a?( Gossiperl::Client::Thrift::DigestUnsubscribeAck ) msg.worker.process_event( { :event => :unsubscribed, :details => { :types => data.event_types.map{|item| item.to_sym}, :heartbeat => data.heartbeat } } ) elsif data.is_a?( Gossiperl::Client::Thrift::DigestForwardedAck ) msg.worker.process_event( { :event => :forwarded_ack, :details => { :reply_id => data.reply_id } } ) else msg.worker.process_event( { :event => :failed, :error => { :unsupported_digest => data } } ) end end end end end