class Fluent::PullForwardOutput

Constants

DEFAULT_PULLFORWARD_LISTEN_PORT

Public Class Methods

new() click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_pull_forward.rb, line 42
def initialize
  super
end

Public Instance Methods

configure(conf) click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_pull_forward.rb, line 50
def configure(conf)
  super
  if @users.size < 1
    raise Fluent::ConfigError, "no <user> sections specified"
  end
end
dequeue_chunks() click to toggle source
# File lib/fluent/plugin/out_pull_forward.rb, line 130
def dequeue_chunks
  response = []

  unpacker = MessagePack::Unpacker.new

  @buffer.pull_chunks do |chunk|
    next if chunk.empty?
    unpacker.feed_each(chunk.read) do |ary|
      response << ary
    end
  end

  response.to_json
end
format(tag, time, record) click to toggle source
# File lib/fluent/plugin/out_pull_forward.rb, line 126
def format(tag, time, record)
  [tag, time, record].to_msgpack
end
run() click to toggle source
# File lib/fluent/plugin/out_pull_forward.rb, line 78
def run
  cert, key = self.certificate
  realm = "Fluentd fluent-plugin-pullforward server"

  logger = $log
  auth_logger = Fluent::PluginLogger.new(logger)
  auth_logger.level = @auth_loglevel
  server_logger = Fluent::PluginLogger.new(logger)
  server_logger.level = @server_loglevel

  auth_db = HtpasswdDummy.new
  @users.each do |user|
    auth_db.set_passwd(realm, user.username, user.password)
  end
  authenticator = WEBrick::HTTPAuth::BasicAuth.new(
    :UserDB => auth_db,
    :Realm => realm,
    :Logger => Fluent::PullForward::WEBrickLogger.new(auth_logger),
  )

  @server = WEBrick::HTTPServer.new(
    :BindAddress => @bind,
    :Port => @port,
    # :DocumentRoot => '.',
    :Logger => Fluent::PullForward::WEBrickLogger.new(server_logger),
    :AccessLog => [],
    :SSLEnable  => true,
    :SSLCertificate => cert,
    :SSLPrivateKey => key
  )
  @server.logger.info("hogepos")

  @server.mount_proc('/') do |req, res|
    unless req.ssl?
      raise WEBrick::HTTPStatus::Forbidden, "pullforward plugin does not permit non-HTTPS requests"
    end
    if req.path != '/'
      raise WEBrick::HTTPStatus::NotFound, "valid path is only '/'"
    end
    authenticator.authenticate(req, res)
    res.content_type = 'application/json'
    res.body = dequeue_chunks()
  end

  log.info "listening pullforward socket on #{@bind}:#{@port}"
  @server.start
end
shutdown() click to toggle source
# File lib/fluent/plugin/out_pull_forward.rb, line 62
def shutdown
  @server.stop if @server
  @thread.kill
  @thread.join
end
start() click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_pull_forward.rb, line 57
def start
  super
  @thread = Thread.new(&method(:run))
end