class Vines::Stream
The base class for various XMPP streams (c2s, s2s, component, http), containing behavior common to all streams like rate limiting, stanza parsing, and stream error handling.
Constants
- ERROR
- PAD
Attributes
Public Class Methods
# File lib/vines/stream.rb, line 16 def initialize(config) @config = config end
Public Instance Methods
Advance the stream's state machine to the new state. XML nodes received by the stream will be passed to this state's node
method.
# File lib/vines/stream.rb, line 143 def advance(state) @state = state end
# File lib/vines/stream.rb, line 93 def available_resources(*jid) router.available_resources(*jid, user.jid) end
# File lib/vines/stream.rb, line 110 def cert_domain_matches?(domain) @store.domain?(get_peer_cert, domain) end
Advance the state machine into the Closed
state so any remaining queued nodes are not processed while we're waiting for EM to actually close the connection.
# File lib/vines/stream.rb, line 47 def close_connection(after_writing=false) super @closed = true advance(Client::Closed.new(self)) end
# File lib/vines/stream.rb, line 89 def connected_resources(jid) router.connected_resources(jid, user.jid) end
Initialize a new XML parser for this connection. This is called when the stream is first connected as well as for stream restarts during negotiation. Subclasses can override this method to provide a different type of parser (e.g. HTTP).
# File lib/vines/stream.rb, line 36 def create_parser @parser = Parser.new.tap do |p| p.stream_open {|node| @nodes.push(node) } p.stream_close { close_connection } p.stanza {|node| @nodes.push(node) } end end
# File lib/vines/stream.rb, line 123 def encrypt cert, key = @store.files_for_domain(domain) start_tls(cert_chain_file: cert, private_key_file: key, verify_peer: true) end
Returns true if the TLS certificate and private key files for this domain exist and can be used to encrypt this stream.
# File lib/vines/stream.rb, line 130 def encrypt? !@store.files_for_domain(domain).nil? end
Stream
level errors close the stream while stanza and SASL
errors are written to the client and leave the stream open. All exceptions should pass through this method for consistent handling.
# File lib/vines/stream.rb, line 150 def error(e) case e when SaslError, StanzaError write(e.to_xml) when StreamError send_stream_error(e) close_stream else log.error(e) send_stream_error(StreamErrors::InternalServerError.new) close_stream end end
# File lib/vines/stream.rb, line 97 def interested_resources(*jid) router.interested_resources(*jid, user.jid) end
# File lib/vines/stream.rb, line 20 def post_init @remote_addr, @local_addr = addresses @user, @closed, @stanza_size = nil, false, 0 @bucket = TokenBucket.new(100, 10) @store = Store.new(@config.certs) @nodes = EM::Queue.new process_node_queue create_parser log.info { "%s %21s -> %s" % ['Stream connected:'.ljust(PAD), @remote_addr, @local_addr] } end
# File lib/vines/stream.rb, line 53 def receive_data(data) return if @closed @stanza_size += data.bytesize if @stanza_size < max_stanza_size @parser << data rescue error(StreamErrors::NotWellFormed.new) else error(StreamErrors::PolicyViolation.new('max stanza size reached')) end end
Reset the connection's XML parser when a new <stream:stream> header is received.
# File lib/vines/stream.rb, line 65 def reset create_parser end
# File lib/vines/stream.rb, line 164 def router @config.router end
# File lib/vines/stream.rb, line 101 def ssl_verify_peer(pem) # EM is supposed to close the connection when this returns false, # but it only does that for inbound connections, not when we # make a connection to another server. @store.trusted?(pem).tap do |trusted| close_connection unless trusted end end
Returns the storage system for the domain. If no domain is given, the stream's storage mechanism is returned.
# File lib/vines/stream.rb, line 71 def storage(domain=nil) @config.storage(domain || self.domain) end
# File lib/vines/stream.rb, line 134 def unbind router.delete(self) log.info { "%s %21s -> %s" % ['Stream disconnected:'.ljust(PAD), @remote_addr, @local_addr] } log.info { "Streams connected: #{router.size}" } end
Reload the user's information into their active connections. Call this after storage.save_user() to sync the new user state with their other connections.
# File lib/vines/stream.rb, line 83 def update_user_streams(user) connected_resources(user.jid.bare).each do |stream| stream.user.update_from(user) end end
Returns the Vines::Config::Host
virtual host for the stream's domain.
# File lib/vines/stream.rb, line 76 def vhost @config.vhost(domain) end
Send the data over the wire to this client.
# File lib/vines/stream.rb, line 115 def write(data) log_node(data, :out) if data.respond_to?(:to_xml) data = data.to_xml(:indent => 0) end send_data(data) end
Private Instance Methods
Return the remote and local socket addresses used by this connection.
# File lib/vines/stream.rb, line 171 def addresses [get_peername, get_sockname].map do |addr| addr ? Socket.unpack_sockaddr_in(addr)[0, 2].reverse.join(':') : 'unknown' end end
Write a closing stream tag to the stream then close the stream. Subclasses can override this method for custom close behavior.
# File lib/vines/stream.rb, line 185 def close_stream write('</stream:stream>') close_connection_after_writing end
# File lib/vines/stream.rb, line 220 def enforce_rate_limit unless @bucket.take(1) raise StreamErrors::PolicyViolation.new('rate limit exceeded') end end
# File lib/vines/stream.rb, line 190 def error?(node) ns = node.namespace ? node.namespace.href : nil node.name == ERROR && ns == NAMESPACES[:stream] end
# File lib/vines/stream.rb, line 226 def log_node(node, direction) return unless log.debug? from, to = @remote_addr, @local_addr from, to = to, from if direction == :out label = (direction == :out) ? 'Sent' : 'Received' log.debug("%s %21s -> %s\n%s\n" % ["#{label} stanza:".ljust(PAD), from, to, node]) end
# File lib/vines/stream.rb, line 207 def process_node(node) log_node(node, :in) @stanza_size = 0 enforce_rate_limit if error?(node) close_stream else state.node(node) end rescue => e error(e) end
Schedule a queue pop on the EM thread to handle the next element. This guarantees all stanzas received on this stream are processed in order. tools.ietf.org/html/rfc6120#section-10.1
# File lib/vines/stream.rb, line 198 def process_node_queue @nodes.pop do |node| Fiber.new do process_node(node) process_node_queue end.resume unless @closed end end
Write the StreamError's xml to the stream. Subclasses can override this method with custom error writing behavior.
# File lib/vines/stream.rb, line 179 def send_stream_error(e) write(e.to_xml) end
Returns the current State
of the stream's state machine. Provided as a method so subclasses can override the behavior.
# File lib/vines/stream.rb, line 237 def state @state end
Return true
if this is a valid domain-only JID
that can be used in stream initiation stanza headers.
# File lib/vines/stream.rb, line 243 def valid_address?(jid) JID.new(jid).domain? rescue false end