class Fluent::Plugin::SakuraIOInput

Public Instance Methods

configure(conf) click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_sakuraio.rb, line 17
def configure(conf)
  super

  @time_parser = Fluent::TimeParser.new(nil)
end
ensure_reactor_running() click to toggle source
# File lib/fluent/plugin/in_sakuraio.rb, line 30
def ensure_reactor_running
  return if EM.reactor_running?

  thread_create(:in_sakuraio_reactor) do
    EM.run
  end
end
handle_message(event) click to toggle source
# File lib/fluent/plugin/in_sakuraio.rb, line 68
def handle_message(event)
  log.debug "sakuraio: received message #{event.data}"
  records = parse(event.data)
  return if records.empty?

  records.each do |r|
    router.emit(r['tag'], r['time'], r['record'])
  end
end
parse(text) click to toggle source
# File lib/fluent/plugin/in_sakuraio.rb, line 78
def parse(text)
  parser = Yajl::Parser.new
  j = parser.parse(text)
  records = []
  case j['type']
  when 'connection'
    parse_connection(records, j)
  when 'location'
    parse_location(records, j)
  when 'channels'
    parse_channels(records, j)
  else
    log.debug "unknown type: #{j['type']}: #{text}"
  end
  records
end
parse_channel(mod, msg_time, chan) click to toggle source
# File lib/fluent/plugin/in_sakuraio.rb, line 135
def parse_channel(mod, msg_time, chan)
  {
    'tag' => mod + '.channels.' + chan['channel'].to_s,
    'record' => {
      'module' => mod,
      'channel' => chan['channel'],
      'type' => chan['type'],
      'value' => chan['value']
    },
    'time' => @time_parser.parse(chan['datetime']) || msg_time
  }
end
parse_channels(records, data) click to toggle source
# File lib/fluent/plugin/in_sakuraio.rb, line 126
def parse_channels(records, data)
  msg_time = @time_parser.parse(data['datetime'])
  mod = data['module']
  data['payload']['channels'].each do |c|
    records.push(parse_channel(mod, msg_time, c))
  end
  records
end
parse_connection(records, data) click to toggle source
# File lib/fluent/plugin/in_sakuraio.rb, line 95
def parse_connection(records, data)
  record = {
    'tag' => data['module'] + '.connection',
    'record' => {
      'module' => data['module'],
      'is_online' => data['payload']['is_online']
    },
    'time' => @time_parser.parse(data['datetime'])
  }
  records.push(record)
  records
end
parse_location(records, data) click to toggle source
# File lib/fluent/plugin/in_sakuraio.rb, line 108
def parse_location(records, data)
  c = data['payload']['coordinate']
  if c != 'null'
    record = {
      'tag' => data['module'] + '.location',
      'record' => {
        'module' => data['module'],
        'latitude' => c['latitude'],
        'longitude' => c['longitude'],
        'range_m' => c['range_m']
      },
      'time' => @time_parser.parse(data['datetime'])
    }
    records.push(record)
  end
  records
end
run() click to toggle source
# File lib/fluent/plugin/in_sakuraio.rb, line 44
def run
  options = {}
  options[:ping] = @ping if @ping.positive?
  client = Faye::WebSocket::Client.new(@url, nil, options)
  EM.next_tick do
    client.on :open do
      log.info "sakuraio: starting websocket connection for #{@url}."
    end

    client.on :message do |event|
      handle_message(event)
    end

    client.on :error do |event|
      log.warn "sakuraio: #{event.message}"
    end

    client.on :close do |event|
      log.warn "sakuraio: #{event.code} #{event.reason}"
      run
    end
  end
end
shutdown() click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_sakuraio.rb, line 38
def shutdown
  EM.stop if EM.reactor_running?

  super
end
start() click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_sakuraio.rb, line 23
def start
  super

  ensure_reactor_running
  thread_create(:in_sakuraio, &method(:run))
end