class Fluent::Plugin::SakuraIOInput
Public Instance Methods
configure(conf)
click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_sakuraio.rb, line 17 def configure(conf) super @time_parser = Fluent::TimeParser.new(nil) end
ensure_reactor_running()
click to toggle source
# File lib/fluent/plugin/in_sakuraio.rb, line 30 def ensure_reactor_running return if EM.reactor_running? thread_create(:in_sakuraio_reactor) do EM.run end end
handle_message(event)
click to toggle source
# File lib/fluent/plugin/in_sakuraio.rb, line 68 def handle_message(event) log.debug "sakuraio: received message #{event.data}" records = parse(event.data) return if records.empty? records.each do |r| router.emit(r['tag'], r['time'], r['record']) end end
parse(text)
click to toggle source
# File lib/fluent/plugin/in_sakuraio.rb, line 78 def parse(text) parser = Yajl::Parser.new j = parser.parse(text) records = [] case j['type'] when 'connection' parse_connection(records, j) when 'location' parse_location(records, j) when 'channels' parse_channels(records, j) else log.debug "unknown type: #{j['type']}: #{text}" end records end
parse_channel(mod, msg_time, chan)
click to toggle source
# File lib/fluent/plugin/in_sakuraio.rb, line 135 def parse_channel(mod, msg_time, chan) { 'tag' => mod + '.channels.' + chan['channel'].to_s, 'record' => { 'module' => mod, 'channel' => chan['channel'], 'type' => chan['type'], 'value' => chan['value'] }, 'time' => @time_parser.parse(chan['datetime']) || msg_time } end
parse_channels(records, data)
click to toggle source
# File lib/fluent/plugin/in_sakuraio.rb, line 126 def parse_channels(records, data) msg_time = @time_parser.parse(data['datetime']) mod = data['module'] data['payload']['channels'].each do |c| records.push(parse_channel(mod, msg_time, c)) end records end
parse_connection(records, data)
click to toggle source
# File lib/fluent/plugin/in_sakuraio.rb, line 95 def parse_connection(records, data) record = { 'tag' => data['module'] + '.connection', 'record' => { 'module' => data['module'], 'is_online' => data['payload']['is_online'] }, 'time' => @time_parser.parse(data['datetime']) } records.push(record) records end
parse_location(records, data)
click to toggle source
# File lib/fluent/plugin/in_sakuraio.rb, line 108 def parse_location(records, data) c = data['payload']['coordinate'] if c != 'null' record = { 'tag' => data['module'] + '.location', 'record' => { 'module' => data['module'], 'latitude' => c['latitude'], 'longitude' => c['longitude'], 'range_m' => c['range_m'] }, 'time' => @time_parser.parse(data['datetime']) } records.push(record) end records end
run()
click to toggle source
# File lib/fluent/plugin/in_sakuraio.rb, line 44 def run options = {} options[:ping] = @ping if @ping.positive? client = Faye::WebSocket::Client.new(@url, nil, options) EM.next_tick do client.on :open do log.info "sakuraio: starting websocket connection for #{@url}." end client.on :message do |event| handle_message(event) end client.on :error do |event| log.warn "sakuraio: #{event.message}" end client.on :close do |event| log.warn "sakuraio: #{event.code} #{event.reason}" run end end end
shutdown()
click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_sakuraio.rb, line 38 def shutdown EM.stop if EM.reactor_running? super end
start()
click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_sakuraio.rb, line 23 def start super ensure_reactor_running thread_create(:in_sakuraio, &method(:run)) end