class Fluent::Plugin::RedisPubOutput

Constants

DEFAULT_BUFFER_TYPE

Attributes

redis[R]

Public Instance Methods

configure(conf) click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_redis_pub.rb, line 45
def configure(conf)
  compat_parameters_convert(conf, :buffer, :inject)
  super

  raise Fluent::ConfigError, "'tag' in chunk_keys is required." if not @chunk_key_tag
  raise Fluent::ConfigError, "'time' in chunk_keys is required." if not @chunk_key_time
  @unpacker = Fluent::Engine.msgpack_factory.unpacker
end
format(tag, time, record) click to toggle source
# File lib/fluent/plugin/out_redis_pub.rb, line 73
def format(tag, time, record)
  record = inject_values_to_record(tag, time, record)
  log = {
    :tag => tag,
    :time => time,
    :record => record
  }
  log.to_msgpack
end
formatted_to_msgpack_binary() click to toggle source
# File lib/fluent/plugin/out_redis_pub.rb, line 83
def formatted_to_msgpack_binary
  true
end
shutdown() click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_redis_pub.rb, line 68
def shutdown
  @redis.quit
  super
end
start() click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_redis_pub.rb, line 54
def start
  super

  options = {
    host: @host,
    port: @port,
    thread_safe: true,
    db: @db_number
  }
  options[:password] = @password if @password

  @redis = Redis.new(options)
end
write(chunk) click to toggle source
# File lib/fluent/plugin/out_redis_pub.rb, line 87
def write(chunk)
  @redis.pipelined do
    chunk.msgpack_each do |record|
      @redis.publish @channel, record.to_json
    end
  end
end