class OML4R::Channel
Measurement Channel
Attributes
url[R]
Public Class Methods
[](name = :default, domain = :default)
click to toggle source
# File lib/oml4r.rb, line 577 def self.[](name = :default, domain = :default) key = "#{name}:#{domain}" unless (@@channels.key?(key)) # If domain != :default and we have one for :default, create a new one if (domain != :default) if (dc = @@channels["#{name}:default"]) return self._create(key, domain, dc.url) end end raise OML4RException.new "OML4R: Unknown channel '#{name}'" end @@channels[key] end
_connect(fquri)
click to toggle source
Parse the given fully-qualified collection URI, and return a suitably connected objet
Supported URIs are
tcp:host:port file:/P/A/T/H
@param fquri [String] a fully qualified collection URI @return [IO] an object suitably connected to the required URL
@raise [OML4RException] in case of an unknown scheme
# File lib/oml4r.rb, line 563 def self._connect(fquri) scheme, host, port = fquri.split(':') out = case scheme when 'tcp' out = TCPSocket.new(host, port) when 'file' # host is really a filename here out = (host == '-' ? $stdout : File.open(host, "w+")) else raise OML4RException.new "OML4R: Unknown scheme '#{scheme}" end out end
_create(key, domain, url)
click to toggle source
# File lib/oml4r.rb, line 548 def self._create(key, domain, url) @@channels[key] = self.new(url, domain) end
close_all()
click to toggle source
# File lib/oml4r.rb, line 611 def self.close_all() @@channels.values.each { |c| c.close } @@channels = {} MPBase.__unfreeze__() end
create(name, url, domain = :default)
click to toggle source
# File lib/oml4r.rb, line 537 def self.create(name, url, domain = :default) key = "#{name}:#{domain}" if channel = @@channels[key] if url != channel.url raise OML4RException.new "OML4R: Channel '#{name}' already defined with different url" end return channel end return self._create(key, domain, url) end
init_all(domain, nodeID, appName, startTime, protocol)
click to toggle source
# File lib/oml4r.rb, line 591 def self.init_all(domain, nodeID, appName, startTime, protocol) @@default_domain = domain @@nodeID = nodeID @@appName = appName @@startTime = startTime @@protocol = protocol MPBase.__freeze__(appName, startTime) # send channel header @@channels.values.each { |c| c.init(nodeID, appName, startTime, protocol) } # send schema definitions MPBase.each_mp do |klass, defs| klass.__print_meta__(appName) end MPBase.__useOML__() end
new(url, domain)
click to toggle source
# File lib/oml4r.rb, line 658 def initialize(url, domain) @domain = domain @url = url @index = -1 @schemas = [] @header_sent = false @queue = Queue.new start_runner end
Public Instance Methods
build_schema(mp_name, add_prefix, pdefs)
click to toggle source
# File lib/oml4r.rb, line 627 def build_schema(mp_name, add_prefix, pdefs) @index += 1 line = [@index, (!@@appName.nil? && add_prefix)? "#{@@appName}_#{mp_name}" : mp_name] pdefs.each do |d| line << "#{d[:name]}:#{d[:type]}" end msg = line.join(' ') @schemas << msg [@index, msg] end
close()
click to toggle source
# File lib/oml4r.rb, line 652 def close() @queue.push nil # indicate end of work @runner.join() end
init(nodeID, appName, startTime, protocol)
click to toggle source
# File lib/oml4r.rb, line 647 def init(nodeID, appName, startTime, protocol) @nodeID, @appName, @startTime, @protocol = nodeID, appName, startTime, protocol @out = _connect(@url) end
send(msg)
click to toggle source
# File lib/oml4r.rb, line 638 def send(msg) @queue.push msg end
send_schema_update(msg)
click to toggle source
# File lib/oml4r.rb, line 642 def send_schema_update(msg) @header_sent = true @queue.push msg end
url=(url)
click to toggle source
# File lib/oml4r.rb, line 619 def url=(url) return if @url == url if @out raise "Can't change channel's URL when it is already connected" end @url = url end
Protected Instance Methods
_connect(url)
click to toggle source
# File lib/oml4r.rb, line 668 def _connect(url) if url.start_with? 'file:' proto, fname = url.split(':') out = (fname == '-' ? $stdout : File.open(fname, "w+")) elsif url.start_with? 'tcp:' #tcp:norbit.npc.nicta.com.au:3003 proto, host, port = url.split(':') port ||= DEF_SERVER_PORT out = TCPSocket.new(host, port) else raise OML4RException.new "OML4R: Unknown transport in server url '#{url}'" end @out = out end
_send(msg)
click to toggle source
# File lib/oml4r.rb, line 737 def _send(msg) begin unless @header_sent _send_protocol_header(@out) @header_sent = true end @out.puts msg @out.flush rescue Errno::EPIPE # Trying to reconnect OML4R.logger.info "Trying to reconnect to '#{@url}'" loop do sleep 5 begin @out = _connect(@url) @header_sent = false OML4R.logger.info "Reconnected to '#{@url}'" return _send(msg) rescue Errno::ECONNREFUSED => ex OML4R.logger.warn "Exception while reconnect '#{@url}' (#{ex.class})" end #Errno::ECONNREFUSED end end end
_send_protocol_header(stream)
click to toggle source
# File lib/oml4r.rb, line 684 def _send_protocol_header(stream) header = [] header << "protocol: #{@protocol}" header << "content: text" d = (@domain == :default) ? @@default_domain : @domain raise MissingArgumentException.new "Missing domain name" unless d case @protocol || OML4R::DEF_PROTOCOL when 4 header << "domain: #{d}" header << "start-time: #{@startTime.tv_sec}" header << "sender-id: #{@nodeID}" header << "app-name: #{@appName}" @schemas.each do |s| header << "schema: #{s}" end header << "" else raise OML4RException.new "Unsupported protocol #{@protocol}" end stream.puts header end
start_runner()
click to toggle source
# File lib/oml4r.rb, line 706 def start_runner @runner = Thread.new do active = true begin while (active) msg = @queue.pop active = !msg.nil? if !@queue.empty? ma = [msg] while !@queue.empty? msg = @queue.pop if (active = !msg.nil?) ma << msg end end msg = ma.join("\n") end #$stderr.puts ">>>>>>#{@domain}: <#{msg}>" unless msg.nil? _send msg end end @out.close unless @out == $stdout @out = nil rescue Exception => ex OML4R.logger.warn "Exception while sending message to channel '#{@url}' (#{ex})" end OML4R.logger.info "Channel #{url} closed" end end