class Fluent::Plugin::SakuraIOOutput

Public Instance Methods

configure(conf) click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_sakuraio.rb, line 17
def configure(conf)
  super

  ensure_reactor_running
  thread_create(:out_sakuraio, &method(:run))
end
encode_record(mod, record) click to toggle source
# File lib/fluent/plugin/out_sakuraio.rb, line 67
def encode_record(mod, record)
  data = []
  @channels.each do |ch, v|
    key, type = v
    data.push('channel' => ch.to_i,
              'type' => type,
              'value' => record[key])
  end
  hash = { 'type' => 'channels',
           'module' => mod,
           'payload' => { 'channels' => data } }
  Yajl::Encoder.encode(hash)
end
ensure_reactor_running() click to toggle source
# File lib/fluent/plugin/out_sakuraio.rb, line 24
def ensure_reactor_running
  return if EM.reactor_running?

  thread_create(:out_sakuraio_reactor) do
    EM.run
  end
end
process(_tag, events) click to toggle source
# File lib/fluent/plugin/out_sakuraio.rb, line 56
def process(_tag, events)
  events.each do |_time, record|
    log.debug "sakuraio: process record #{record}"
    modules.each do |m|
      s = encode_record(m, record)
      log.debug "sakuraio: encoded json #{s}"
      @client.send(s)
    end
  end
end
run() click to toggle source
# File lib/fluent/plugin/out_sakuraio.rb, line 32
def run
  options = {}
  options[:ping] = @ping if @ping.positive?
  @client = Faye::WebSocket::Client.new(@url, nil, options)
  EM.next_tick do
    @client.on :open do
      log.info "sakuraio: starting websocket connection for #{@url}."
    end

    @client.on :message do |event|
      log.debug "sakuraio: received message #{event.data}"
    end

    @client.on :error do |event|
      log.warn "sakuraio: #{event.message}"
    end

    @client.on :close do |event|
      log.warn "sakuraio: #{event.code} #{event.reason}"
      run
    end
  end
end