class Diameter::Stack
Public Class Methods
Stack
constructor.
@note The stack does not advertise any applications to peers by
default - {#add_handler} must be called early on.
@param host [String] The Diameter
Identity of this stack (for
the Origin-Host AVP).
@param realm [String] The Diameter
realm of this stack (for
the Origin-Realm AVP).
@option opts [Fixnum] timeout (60)
The number of seconds to wait for an answer before notifying the caller of a timeout and forgetting about the request.
# File lib/diameter/stack.rb, line 28 def initialize(host, realm, opts={}) @local_host = host @local_realm = realm @auth_apps = [] @acct_apps = [] @pending_ete = {} @tcp_helper = TCPStackHelper.new(self) @peer_table = {} @handlers = {} @answer_timeout = opts.fetch(:timeout, 60) @threadpool = Concurrent::ThreadPoolExecutor.new( min_threads: 5, max_threads: 5, max_queue: 1, fallback_policy: :caller_runs ) @res = Dnsruby::Resolver.new Diameter.logger.log(Logger::INFO, 'Stack initialized') end
Public Instance Methods
Adds a handler for a specific Diameter
application.
@note If you expect to only send requests for this application,
not receive them, the block can be a no-op (e.g. `{ nil }`)
@param app_id [Fixnum] The Diameter
application ID. @option opts [true, false] auth
Whether we should advertise support for this application in the Auth-Application-ID AVP. Note that at least one of auth or acct must be specified.
@option opts [true, false] acct
Whether we should advertise support for this application in the Acct-Application-ID AVP. Note that at least one of auth or acct must be specified.
@option opts [Fixnum] vendor
If we should advertise support for this application in a Vendor-Specific-Application-Id AVP, this specifies the associated Vendor-Id.
@yield [req, cxn] Passes a Diameter
message (and its originating
connection) for application-specific handling.
@yieldparam [Message] req The parsed Diameter
message from the peer. @yieldparam [Socket] cxn The TCP connection to the peer, to be
passed to {Stack#send_answer}.
# File lib/diameter/stack.rb, line 91 def add_handler(app_id, opts={}, &blk) vendor = opts.fetch(:vendor, 0) auth = opts.fetch(:auth, false) acct = opts.fetch(:acct, false) raise ArgumentError.new("Must specify at least one of auth or acct") unless auth or acct @acct_apps << [app_id, vendor] if acct @auth_apps << [app_id, vendor] if auth @handlers[app_id] = blk end
Closes the given connection, blanking out any internal data structures associated with it.
Likely to be moved to the Peer
object in a future release/
@param connection [Socket] The connection to close.
# File lib/diameter/stack.rb, line 124 def close(connection) @tcp_helper.close(connection) end
Creates a Peer
connection to a Diameter
agent at the specific network location indicated by peer_uri.
@param peer_uri [URI] The aaa:// URI identifying the peer. Should
contain a hostname/IP; may contain a port (default 3868).
@param peer_host [String] The DiameterIdentity of this peer, which
will uniquely identify it in the peer table.
@param realm [String] The Diameter
realm of this peer. @return [Peer] The Diameter
peer chosen.
# File lib/diameter/stack.rb, line 161 def connect_to_peer(peer_uri, peer_host, realm) @peer_table[peer_host] = Peer.new(peer_host, realm) @peer_table[peer_host].state = :WAITING # Will move to :UP when the CEA is received uri = URI(peer_uri) cxn = @tcp_helper.setup_new_connection(uri.host, uri.port) @peer_table[peer_host].cxn = cxn avps = [AVP.create('Origin-Host', @local_host), AVP.create('Origin-Realm', @local_realm), AVP.create('Host-IP-Address', IPAddr.new('127.0.0.1')), AVP.create('Vendor-Id', 100), AVP.create('Product-Name', 'ruby-diameter') ] avps += app_avps cer_bytes = Message.new(version: 1, command_code: 257, app_id: 0, request: true, proxyable: false, retransmitted: false, error: false, avps: avps).to_wire @tcp_helper.send(cer_bytes, cxn) @peer_table[peer_host] end
Looks up the given Diameter
realm with DNS-SRV, and establishes a connection to one peer in that realm.
@param realm [String] The Diameter
realm to connect to.
@return [Peer] The Diameter
peer chosen.
# File lib/diameter/stack.rb, line 135 def connect_to_realm(realm) possible_peers = [] @res.query("_diameter._tcp.#{realm}", "SRV").each_answer do |a| possible_peers << {name: a.target.to_s, port: a.port, priority: a.priority, weight: a.weight} end # Prefer the lowest priority and the highest weight possible_peers.sort!{ |a, b| (a[:priority] <=> b[:priority]) || (b[:weight] <=> a[:weight])} Diameter.logger.debug("Sorted list of peers for realm #{realm} is #{possible_peers.inspect}") primary = possible_peers[0] url = "aaa://#{primary[:name]}:#{primary[:port]}" Diameter.logger.info("Primary peer for realm #{realm} is #{primary[:name]}, (#{url})") connect_to_peer(url, primary[:name], realm) end
@private Handles a Diameter
request straight from a network connection. Intended to be called by TCPStackHelper after it retrieves a message, not directly by users.
# File lib/diameter/stack.rb, line 282 def handle_message(msg_bytes, cxn) # Common processing - ensure that this message has come in on this # peer's expected connection, and update the last time we saw # activity on this peer msg = Message.from_bytes(msg_bytes) Diameter.logger.debug("Handling message #{msg}") peer = msg.avp_by_name('Origin-Host').octet_string if @peer_table[peer] @peer_table[peer].reset_timer unless @peer_table[peer].cxn == cxn Diameter.logger.log(Logger::WARN, "Ignoring message - claims to be from #{peer} but comes from #{cxn} not #{@peer_table[peer].cxn}") end end if msg.command_code == 257 && msg.answer handle_cea(msg, cxn) elsif msg.command_code == 257 && msg.request handle_cer(msg, cxn) elsif msg.command_code == 280 && msg.request handle_dwr(msg, cxn) elsif msg.command_code == 280 && msg.answer # No-op - we've already updated our timestamp elsif msg.answer handle_other_answer(msg, cxn) elsif @handlers.has_key? msg.app_id @handlers[msg.app_id].call(msg, cxn) else Diameter.logger.warn("Ignoring message from unrecognised application #{msg.app_id} (Command-Code #{msg.command_code})") end end
Retrieves the current state of a peer, defaulting to :CLOSED if the peer does not exist.
@param id [String] The Diameter
identity of the peer. @return [Keyword] The state of the peer (:UP, :WAITING or :CLOSED).
# File lib/diameter/stack.rb, line 268 def peer_state(id) if !@peer_table.key? id :CLOSED else @peer_table[id].state end end
Sends a Diameter
answer. This is sent over the same connection the request was received on (which needs to be passed into to this method).
This adds this stack's Origin-Host and Origin-Realm AVPs, if those AVPs don't already exist.
@param ans [Message] The Diameter
answer @param original_cxn [Socket] The connection which the request
came in on. This will have been passed to the block registered with {Stack#add_handler}.
# File lib/diameter/stack.rb, line 257 def send_answer(ans, original_cxn) fail "Must pass an answer" unless ans.answer ans.add_origin_host_and_realm(@local_host, @local_realm) @tcp_helper.send(ans.to_wire, original_cxn) end
Sends a Diameter
request. This is routed to an appropriate peer based on the Destination-Host AVP
(or, if that is absent, on the Destination-Realm AVP
).
This adds this stack's Origin-Host and Origin-Realm AVPs, if those AVPs don't already exist.
@param req [Message] The request to send. @param peer [Peer] (Optional) A peer to use as the first hop for the message
# File lib/diameter/stack.rb, line 192 def send_request(req, options={}) fail "Must pass a request" unless req.request req.add_origin_host_and_realm(@local_host, @local_realm) peer = options[:peer] if peer.nil? peer = if req['Destination-Host'] peer_identity = req['Destination-Host'].octet_string Diameter.logger.debug("Selecting peer by Destination-Host (#{peer_identity})") @peer_table[peer_identity] elsif req['Destination-Realm'] realm = req['Destination-Realm'].octet_string Diameter.logger.debug("Selecting peer by Destination-Realm (#{realm})") @peer_table.values.select { |p| p.realm == realm }.sample else fail "Request must have Destination-Host or Destination-Realm" end else Diameter.logger.debug("Peer selection forced to #{peer.identity}") end if peer.nil? Diameter.logger.warn("No peer is available to send message - cannot route") fail "No acceptable peer" elsif peer.state == :UP q = Queue.new @pending_ete[req.ete] = q @tcp_helper.send(req.to_wire, peer.cxn) =begin # Time this request out if no answer is received Diameter.logger.debug("Scheduling timeout for #{@answer_timeout}s time") Concurrent::timer(@answer_timeout) do Diameter.logger.debug("Timing out message with EtE #{req.ete}") q = @pending_ete.delete(req.ete) if q q.push(:timeout) end end =end p = Concurrent::Promise.execute(executor: @threadpool) { Diameter.logger.debug("Waiting for answer to message with EtE #{req.ete}, queue #{q}") val = q.pop Diameter.logger.debug("Promise fulfilled for message with EtE #{req.ete}") val } return p else Diameter.logger.warn("Peer #{peer.identity} is in state #{peer.state} - cannot route") end end
This shuts the stack down, closing all TCP connections and terminating any background threads still waiting for an answer.
# File lib/diameter/stack.rb, line 108 def shutdown @tcp_helper.shutdown @pending_ete.each do |ete, q| Diameter.logger.debug("Shutting down queue #{q} as no answer has been received with EtE #{ete}") q.push :shutdown end @threadpool.kill @threadpool.wait_for_termination(5) end
Complete the stack initialization and begin reading from the TCP connections.
# File lib/diameter/stack.rb, line 55 def start @tcp_helper.start_main_loop end
Private Instance Methods
# File lib/diameter/stack.rb, line 315 def app_avps avps = [] @auth_apps.each do |app_id, vendor| avps << if vendor == 0 AVP.create("Auth-Application-Id", app_id) else AVP.create("Vendor-Specific-Application-Id", [AVP.create("Auth-Application-Id", app_id), AVP.create("Vendor-Id", vendor)]) end end @acct_apps.each do |app_id, vendor| avps << if vendor == 0 AVP.create("Acct-Application-Id", app_id) else AVP.create("Vendor-Specific-Application-Id", [AVP.create("Acct-Application-Id", app_id), AVP.create("Vendor-Id", vendor)]) end end avps end
# File lib/diameter/stack.rb, line 387 def handle_cea(cea, cxn) host = cea.avp_by_name('Origin-Host').octet_string if @peer_table.has_key? host @peer_table[host].state = :UP @peer_table[host].reset_timer else entry = @peer_table.find { |h, p| p.cxn == cxn } if entry.nil? Diameter.logger.warn("Ignoring CEA from unknown peer #{host}") Diameter.logger.debug("Known peers are #{@peer_table.keys}") else old_host, peer = entry Diameter.logger.warn("Peer identity changed #{old_host} => #{host}") @peer_table.delete(old_host) peer.identity = host @peer_table[host] = peer @peer_table[host].state = :UP @peer_table[host].reset_timer end end end
# File lib/diameter/stack.rb, line 361 def handle_cer(cer, cxn) if shared_apps(cer).empty? rc = 5010 else rc = 2001 end cea = cer.create_answer(rc, avps: [AVP.create('Origin-Host', @local_host), AVP.create('Origin-Realm', @local_realm)] + app_avps) @tcp_helper.send(cea.to_wire, cxn) if rc == 2001 peer = cer.avp_by_name('Origin-Host').octet_string realm = cer.avp_by_name('Origin-Realm').octet_string Diameter.logger.debug("Creating peer table entry for peer #{peer} in realm #{realm}") @peer_table[peer] = Peer.new(peer, realm) @peer_table[peer].state = :UP @peer_table[peer].reset_timer @peer_table[peer].cxn = cxn else @tcp_helper.close(cxn) end end
# File lib/diameter/stack.rb, line 413 def handle_dpa end
# File lib/diameter/stack.rb, line 410 def handle_dpr end
# File lib/diameter/stack.rb, line 425 def handle_dwa end
# File lib/diameter/stack.rb, line 416 def handle_dwr(dwr, cxn) dwa = dwr.create_answer(2001, avps: [AVP.create('Origin-Host', @local_host), AVP.create('Origin-Realm', @local_realm)]) @tcp_helper.send(dwa.to_wire, cxn) # send DWA end
# File lib/diameter/stack.rb, line 431 def handle_other_answer(msg, _cxn) Diameter.logger.debug("Handling answer with End-to-End identifier #{msg.ete}") q = @pending_ete[msg.ete] q.push msg Diameter.logger.debug("Passed answer to fulfil sender's Promise object'") @pending_ete.delete msg.ete end
# File lib/diameter/stack.rb, line 428 def handle_other_request end