class Fluent::WebSocketOutput
Public Instance Methods
buffer(data)
click to toggle source
# File lib/fluent/plugin/out_websocket.rb, line 89 def buffer(data) return unless @buffered_messages > 0 @buffer << data # Buffer only new @buffered_messages messages @buffer = @buffer[-@buffered_messages, @buffered_messages] if @buffer.length > @buffered_messages end
configure(conf)
click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_websocket.rb, line 32 def configure(conf) super @thread = Thread.new do $log.trace "Started em-websocket thread." $log.info "WebSocket server #{@host}:#{@port} [msgpack: #{@use_msgpack}]" EM.run { EM::WebSocket.run(:host => @host, :port => @port) do |ws| ws.onopen { |handshake| callback = @use_msgpack ? proc{|msg| ws.send_binary(msg)} : proc{|msg| ws.send(msg)} $lock.synchronize do sid = $channel.subscribe callback $log.trace "WebSocket connection: ID " + sid.to_s ws.onclose { $log.trace "Connection closed: ID " + sid.to_s $lock.synchronize do $channel.unsubscribe(sid) end } @buffer.each do |msg| ws.send(msg) end end #ws.onmessage { |msg| #} } end } end end
emit(tag, es, chain)
click to toggle source
# File lib/fluent/plugin/out_websocket.rb, line 75 def emit(tag, es, chain) chain.next es.each {|time,record| data = [record] if (@add_time) then data.unshift(time) end if (@add_tag) then data.unshift(tag) end output = @use_msgpack ? data.to_msgpack : Yajl::Encoder.encode( data ) buffer(output) $lock.synchronize do $channel.push output end } end
shutdown()
click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_websocket.rb, line 68 def shutdown super EM.stop Thread::kill(@thread) $log.trace "Killed em-websocket thread." end
start()
click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_websocket.rb, line 63 def start @buffer = [] super end