# File lib/fluent/plugin/out_hash_forward.rb, line 179 def primary_available?(nodes) nodes.size > 1 && nodes.first.available? end
class Fluent::HashForwardOutput
Attributes
hash_key_slice_lindex[RW]
hash_key_slice_rindex[RW]
regular_nodes[R]
for test
regular_weight_array[R]
standby_nodes[R]
standby_weight_array[R]
watcher_interval[RW]
Public Instance Methods
build_weight_array(nodes)
click to toggle source
This is just a partial copy from ForwardOuput#rebuild_weight_array
# File lib/fluent/plugin/out_hash_forward.rb, line 156 def build_weight_array(nodes) weight_array = [] gcd = nodes.map {|n| n.weight }.inject(0) {|r,w| r.gcd(w) } nodes.each {|n| (n.weight / gcd).times { weight_array << n } } weight_array end
cache_sock(node, sock)
click to toggle source
# File lib/fluent/plugin/out_hash_forward.rb, line 301 def cache_sock(node, sock) if sock get_sock[node] = sock get_sock_expired_at[node] = Time.now + @keepalive_time if @keepalive_time log.info "out_hash_forward: keepalive connection opened", :host=>node.host, :port=>node.port else get_sock[node] = nil get_sock_expired_at[node] = nil end end
configure(conf)
click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_hash_forward.rb, line 28 def configure(conf) super if @hash_key_slice lindex, rindex = @hash_key_slice.split('..', 2) if lindex.nil? or rindex.nil? or lindex !~ /^-?\d+$/ or rindex !~ /^-?\d+$/ raise Fluent::ConfigError, "out_hash_forard: hash_key_slice must be formatted like [num]..[num]" else @hash_key_slice_lindex = lindex.to_i @hash_key_slice_rindex = rindex.to_i end end if @heartbeat_type == :none @nodes = @nodes.map {|node| NonHeartbeatNode.new(node) } end @standby_nodes, @regular_nodes = @nodes.partition {|n| n.standby? } @regular_weight_array = build_weight_array(@regular_nodes) @standby_weight_array = build_weight_array(@standby_nodes) @cache_nodes = {} @sock = {} @sock_expired_at = {} @mutex = {} @watcher_interval = 1 end
get_index(key, size)
click to toggle source
hashing(key) mod N
# File lib/fluent/plugin/out_hash_forward.rb, line 184 def get_index(key, size) str_hash(key) % size end
get_mutex(node)
click to toggle source
# File lib/fluent/plugin/out_hash_forward.rb, line 295 def get_mutex(node) thread_id = Thread.current.object_id @mutex[thread_id] ||= {} @mutex[thread_id][node] ||= Mutex.new end
get_sock()
click to toggle source
# File lib/fluent/plugin/out_hash_forward.rb, line 312 def get_sock @sock[Thread.current.object_id] ||= {} end
get_sock_expired_at()
click to toggle source
# File lib/fluent/plugin/out_hash_forward.rb, line 316 def get_sock_expired_at @sock_expired_at[Thread.current.object_id] ||= {} end
nodes(tag)
click to toggle source
Get nodes (a regular_node and a standby_node if available) using hash algorithm
# File lib/fluent/plugin/out_hash_forward.rb, line 168 def nodes(tag) if nodes = @cache_nodes[tag] return nodes end hash_key = @hash_key_slice ? perform_hash_key_slice(tag) : tag regular_index = @regular_weight_array.size > 0 ? get_index(hash_key, @regular_weight_array.size) : 0 standby_index = @standby_weight_array.size > 0 ? get_index(hash_key, @standby_weight_array.size) : 0 nodes = [@regular_weight_array[regular_index], @standby_weight_array[standby_index]].compact @cache_nodes[tag] = nodes end
perform_hash_key_slice(tag)
click to toggle source
# File lib/fluent/plugin/out_hash_forward.rb, line 194 def perform_hash_key_slice(tag) tags = tag.split('.') sliced = tags[@hash_key_slice_lindex..@hash_key_slice_rindex] return sliced.nil? ? "" : sliced.join('.') end
primary_available?(nodes)
click to toggle source
rebuild_weight_array()
click to toggle source
Override: I change weight algorithm
# File lib/fluent/plugin/out_hash_forward.rb, line 152 def rebuild_weight_array end
reconnect(node)
click to toggle source
# File lib/fluent/plugin/out_hash_forward.rb, line 229 def reconnect(node) sock = connect(node) opt = [1, @send_timeout.to_i].pack('I!I!') # { int l_onoff; int l_linger; } sock.setsockopt(Socket::SOL_SOCKET, Socket::SO_LINGER, opt) opt = [@send_timeout.to_i, 0].pack('L!L!') # struct timeval sock.setsockopt(Socket::SOL_SOCKET, Socket::SO_SNDTIMEO, opt) sock end
run()
click to toggle source
Override to disable heartbeat
Calls superclass method
# File lib/fluent/plugin/out_hash_forward.rb, line 92 def run unless @heartbeat_type == :none super end end
send_data(node, tag, chunk)
click to toggle source
Override for keepalive
# File lib/fluent/plugin/out_hash_forward.rb, line 201 def send_data(node, tag, chunk) sock = nil get_mutex(node).synchronize do sock = get_sock[node] if @keepalive unless sock sock = reconnect(node) cache_sock(node, sock) if @keepalive end begin sock_write(sock, tag, chunk) node.heartbeat(false) log.debug "out_hash_forward: write to", :host=>node.host, :port=>node.port rescue Errno::EPIPE, Errno::ECONNRESET, Errno::ECONNABORTED, Errno::ETIMEDOUT => e log.warn "out_hash_forward: send_data failed #{e.class} #{e.message}", :host=>node.host, :port=>node.port if @keepalive sock.close rescue IOError cache_sock(node, nil) end raise e ensure unless @keepalive sock.close if sock end end end end
shutdown()
click to toggle source
# File lib/fluent/plugin/out_hash_forward.rb, line 69 def shutdown @finished = true @loop.watchers.each {|w| w.detach } @loop.stop unless @heartbeat_type == :none # custom @thread.join @usock.close if @usock stop_watcher end
sock_close(node)
click to toggle source
# File lib/fluent/plugin/out_hash_forward.rb, line 284 def sock_close(node) get_mutex(node).synchronize do if sock = get_sock[node] sock.close rescue IOError log.info "out_hash_forward: keepalive connection closed", :host=>node.host, :port=>node.port end get_sock[node] = nil get_sock_expired_at[node] = nil end end
sock_write(sock, tag, chunk)
click to toggle source
# File lib/fluent/plugin/out_hash_forward.rb, line 240 def sock_write(sock, tag, chunk) # beginArray(2) sock.write FORWARD_HEADER # writeRaw(tag) sock.write tag.to_msgpack # tag # beginRaw(size) sz = chunk.size #if sz < 32 # # FixRaw # sock.write [0xa0 | sz].pack('C') #elsif sz < 65536 # # raw 16 # sock.write [0xda, sz].pack('Cn') #else # raw 32 sock.write [0xdb, sz].pack('CN') #end # writeRawBody(packed_es) chunk.write_to(sock) end
start()
click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_hash_forward.rb, line 64 def start super start_watcher end
start_watcher()
click to toggle source
# File lib/fluent/plugin/out_hash_forward.rb, line 78 def start_watcher if @keepalive and @keepalive_time @watcher = Thread.new(&method(:watch_keepalive_time)) end end
stop_watcher()
click to toggle source
# File lib/fluent/plugin/out_hash_forward.rb, line 84 def stop_watcher if @watcher @watcher.terminate @watcher.join end end
str_hash(key)
click to toggle source
the simplest hashing ever gist.github.com/sonots/7263495
# File lib/fluent/plugin/out_hash_forward.rb, line 190 def str_hash(key) key.bytes.inject(&:+) end
watch_keepalive_time()
click to toggle source
watcher thread callback
# File lib/fluent/plugin/out_hash_forward.rb, line 265 def watch_keepalive_time while true sleep @watcher_interval thread_ids = @sock.keys thread_ids.each do |thread_id| @sock[thread_id].each do |node, sock| @mutex[thread_id][node].synchronize do next unless sock_expired_at = @sock_expired_at[thread_id][node] next unless Time.now >= sock_expired_at sock.close rescue IOError if sock @sock[thread_id][node] = nil @sock_expired_at[thread_id][node] = nil log.debug "out_hash_forward: keepalive connection closed", :host=>node.host, :port=>node.port, :thread_id=>thread_id end end end end end
write_objects(tag, chunk)
click to toggle source
Override
# File lib/fluent/plugin/out_hash_forward.rb, line 123 def write_objects(tag, chunk) return if chunk.empty? error = nil nodes = nodes(tag) if @keepalive and primary_available?(nodes) sock_close(nodes.last) # close standby end # below is just copy from out_forward nodes.each do |node| if node.available? begin send_data(node, tag, chunk) return rescue # for load balancing during detecting crashed servers error = $! # use the latest error end end end if error raise error else raise "no nodes are available" # TODO message end end