class Fluent::WebSocketOutput

Public Instance Methods

buffer(data) click to toggle source
# File lib/fluent/plugin/out_websocket.rb, line 89
def buffer(data)
  return unless @buffered_messages > 0
  @buffer << data
  # Buffer only new @buffered_messages messages
  @buffer = @buffer[-@buffered_messages, @buffered_messages] if @buffer.length > @buffered_messages
end
configure(conf) click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_websocket.rb, line 32
def configure(conf)
  super
  @thread = Thread.new do
    $log.trace "Started em-websocket thread."
    $log.info "WebSocket server #{@host}:#{@port} [msgpack: #{@use_msgpack}]"
    EM.run {
      EM::WebSocket.run(:host => @host, :port => @port) do |ws|
        ws.onopen { |handshake|
          callback = @use_msgpack ? proc{|msg| ws.send_binary(msg)} : proc{|msg| ws.send(msg)}
          $lock.synchronize do
            sid = $channel.subscribe callback
            $log.trace "WebSocket connection: ID " + sid.to_s
            ws.onclose {
              $log.trace "Connection closed: ID " + sid.to_s
              $lock.synchronize do
                $channel.unsubscribe(sid)
              end
            }
            @buffer.each do |msg|
              ws.send(msg)
            end
          end

          #ws.onmessage { |msg|
          #}
        }
      end
    }
  end
end
emit(tag, es, chain) click to toggle source
# File lib/fluent/plugin/out_websocket.rb, line 75
def emit(tag, es, chain)
  chain.next
  es.each {|time,record|
    data = [record]
    if (@add_time) then data.unshift(time) end
    if (@add_tag) then data.unshift(tag) end
    output = @use_msgpack ? data.to_msgpack : Yajl::Encoder.encode( data )
    buffer(output)
    $lock.synchronize do
      $channel.push output
    end
  }
end
shutdown() click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_websocket.rb, line 68
def shutdown
  super
  EM.stop
  Thread::kill(@thread)
  $log.trace "Killed em-websocket thread."
end
start() click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_websocket.rb, line 63
def start
  @buffer = []
  super
end