class FFWD::TCP::Connection

Constants

DEFAULT_FLUSH_PERIOD

default flush period, if non-zero will cause the connection to be buffered.

DEFAULT_TCP_OUTBOUND_LIMIT

default amount of bytes that the outbound connection will allow in its application-level buffer.

INITIAL_TIMEOUT

Attributes

log[R]
peer[R]
reporter_meta[R]

Public Class Methods

new(log, host, port, handler, config) click to toggle source
# File lib/ffwd/protocol/tcp/connection.rb, line 37
def initialize log, host, port, handler, config
  @log = log
  @host = host
  @port = port
  @handler = handler
  @config = config

  @tcp_outbound_limit = config[:tcp_outbound_limit]

  @peer = "#{host}:#{port}"
  @closing = false
  @reconnect_timeout = INITIAL_TIMEOUT
  @reporter_meta = {:component => @handler.plugin_type, :peer => peer}

  @timer = nil
  @c = nil
  @open = false
end
prepare(opts) click to toggle source
# File lib/ffwd/protocol/tcp/connection.rb, line 30
def self.prepare opts
  opts[:flush_period] ||= DEFAULT_FLUSH_PERIOD
  opts[:tcp_outbound_limit] ||= DEFAULT_TCP_OUTBOUND_LIMIT
  opts[:ignored] = (opts[:ignored] || []).map{|v| Utils.check_ignored v}
  opts
end

Public Instance Methods

connect() click to toggle source

Start attempting to connect.

# File lib/ffwd/protocol/tcp/connection.rb, line 57
def connect
  @c = EM.connect @host, @port, @handler, self, @config
  log.info "Connect to tcp://#{@host}:#{@port}"
  log.info "  config: #{@config.inspect}"
end
connection_completed() click to toggle source
# File lib/ffwd/protocol/tcp/connection.rb, line 86
def connection_completed
  @open = true
  @log.info "Connected tcp://#{peer}"
  @reconnect_timeout = INITIAL_TIMEOUT

  unless @timer.nil?
    @timer.cancel
    @timer = nil
  end
end
disconnect() click to toggle source

Explicitly disconnect and discard any reconnect attempts..

# File lib/ffwd/protocol/tcp/connection.rb, line 64
def disconnect
  log.info "Disconnecting from tcp://#{@host}:#{@port}"
  @closing = true

  @c.close_connection if @c
  @timer.cancel if @timer
  @c = nil
  @timer = nil
end
send_all(events, metrics) click to toggle source
# File lib/ffwd/protocol/tcp/connection.rb, line 82
def send_all events, metrics
  @c.send_all events, metrics
end
send_event(event) click to toggle source
# File lib/ffwd/protocol/tcp/connection.rb, line 74
def send_event event
  @c.send_event event
end
send_metric(metric) click to toggle source
# File lib/ffwd/protocol/tcp/connection.rb, line 78
def send_metric metric
  @c.send_metric metric
end
unbind() click to toggle source
# File lib/ffwd/protocol/tcp/connection.rb, line 97
def unbind
  @open = false
  @c = nil

  if @closing
    return
  end

  @log.info "Disconnected from tcp://#{peer}, reconnecting in #{@reconnect_timeout}s"

  unless @timer.nil?
    @timer.cancel
    @timer = nil
  end

  @timer = EM::Timer.new(@reconnect_timeout) do
    @reconnect_timeout *= 2
    @timer = nil
    @c = EM.connect @host, @port, @handler, self, *@args
  end
end
writable?() click to toggle source

Check if a connection is writable or not.

# File lib/ffwd/protocol/tcp/connection.rb, line 120
def writable?
  not @c.nil? and @open and @c.get_outbound_data_size < @tcp_outbound_limit
end