class LIFX::LAN::GatewayConnection

@api private @private

Constants

DEFAULT_MESSAGE_RATE
MAXIMUM_QUEUE_LENGTH
MAX_TCP_ATTEMPTS

Public Class Methods

new() click to toggle source
# File lib/lifx/lan/gateway_connection.rb, line 16
def initialize
  @threads = []
  @tcp_attempts = 0
  @threads << initialize_write_queue
end

Public Instance Methods

close() click to toggle source
# File lib/lifx/lan/gateway_connection.rb, line 80
def close
  @threads.each do |thr|
    thr.abort
  end
  [@tcp_transport, @udp_transport].compact.each(&:close)
end
connect_tcp(ip, port) click to toggle source
# File lib/lifx/lan/gateway_connection.rb, line 62
def connect_tcp(ip, port)
  if @tcp_attempts > MAX_TCP_ATTEMPTS
    logger.info("#{self}: Ignoring TCP service of #{ip}:#{port} due to too many failed attempts.")
    return
  end
  @tcp_attempts += 1
  logger.info("#{self}: Establishing connection to #{ip}:#{port}")
  @tcp_transport = Transport::TCP.new(ip, port)
  @tcp_transport.add_observer(self, :message_received) do |message: nil, ip: nil, transport: nil|
    notify_observers(:message_received, message: message, ip: ip, transport: @tcp_transport)
  end
  @tcp_transport.listen
end
connect_udp(ip, port) click to toggle source
# File lib/lifx/lan/gateway_connection.rb, line 58
def connect_udp(ip, port)
  @udp_transport = Transport::UDP.new(ip, port)
end
connected?() click to toggle source
# File lib/lifx/lan/gateway_connection.rb, line 54
def connected?
  udp_connected? || tcp_connected?
end
flush(timeout: nil) click to toggle source
# File lib/lifx/lan/gateway_connection.rb, line 87
def flush(timeout: nil)
  proc = lambda do
    while !@queue.empty?
      sleep 0.05
    end
  end
  if timeout
    Timeout.timeout(timeout, TimeoutError) do
      proc.call
    end
  else
    proc.call
  end
end
handle_message(message, ip, transport) click to toggle source
# File lib/lifx/lan/gateway_connection.rb, line 22
def handle_message(message, ip, transport)
  payload = message.payload
  case payload
  when Protocol::Device::StateService
    if use_udp? && !udp_connected? && payload.service == Protocol::Device::Service::UDP
      # UDP transport here is only for sending directly to bulb
      # We receive responses via UDP transport listening to broadcast in Network
      connect_udp(ip, payload.port.to_i)
    elsif use_tcp? && !tcp_connected? && payload.service == Protocol::Device::Service::TCP && (port = payload.port.snapshot) > 0
      connect_tcp(ip, port)
    end
  else
    logger.error("#{self}: Unhandled message: #{message}")
  end
end
inspect()
Alias for: to_s
set_message_rate(rate) click to toggle source
# File lib/lifx/lan/gateway_connection.rb, line 107
def set_message_rate(rate)
  @message_rate = rate
end
tcp_connected?() click to toggle source
# File lib/lifx/lan/gateway_connection.rb, line 50
def tcp_connected?
  @tcp_transport && @tcp_transport.connected?
end
to_s() click to toggle source
# File lib/lifx/lan/gateway_connection.rb, line 102
def to_s
  "#<LIFX::GatewayConnection tcp=#{@tcp_transport} tcp_attempts=#{@tcp_attempts} udp=#{@udp_transport}>"
end
Also aliased as: inspect
udp_connected?() click to toggle source
# File lib/lifx/lan/gateway_connection.rb, line 46
def udp_connected?
  @udp_transport && @udp_transport.connected?
end
use_tcp?() click to toggle source
# File lib/lifx/lan/gateway_connection.rb, line 42
def use_tcp?
  Config.allowed_transports.include?(:tcp)
end
use_udp?() click to toggle source
# File lib/lifx/lan/gateway_connection.rb, line 38
def use_udp?
  Config.allowed_transports.include?(:udp)
end
write(message) click to toggle source
# File lib/lifx/lan/gateway_connection.rb, line 76
def write(message)
  @queue.push(message)
end

Protected Instance Methods

actually_write(message) click to toggle source
# File lib/lifx/lan/gateway_connection.rb, line 155
def actually_write(message)
  check_connections

  # TODO: Support force sending over UDP
  if tcp_connected?
    if @tcp_transport.write(message)
      logger.debug("-> #{self} #{@tcp_transport}: #{message}")
      logger.debug("-> #{message.to_hex}")
      return true
    end
  end

  if udp_connected?
    if @udp_transport.write(message)
      logger.debug("-> #{self} #{@udp_transport}: #{message}")
      logger.debug("-> #{message.to_hex}")
      return true
    end
  end

  false
end
check_connections() click to toggle source
# File lib/lifx/lan/gateway_connection.rb, line 143
def check_connections
  if @tcp_transport && !tcp_connected?
    @tcp_transport = nil
    logger.info("#{self}: TCP connection dropped, clearing.")
  end

  if @udp_transport && !udp_connected?
    @udp_transport = nil
    logger.info("#{self}: UDP connection dropped, clearing.")
  end
end
initialize_write_queue() click to toggle source
# File lib/lifx/lan/gateway_connection.rb, line 118
def initialize_write_queue
  @queue = SizedQueue.new(MAXIMUM_QUEUE_LENGTH)
  @last_write = Time.now
  Thread.start do
    loop do
      if !connected?
        sleep 0.1
        next
      end
      delay = [(1.0 / message_rate) - (Time.now - @last_write), 0].max
      logger.debug("#{self}: Sleeping for #{delay}")
      sleep(delay)
      message = @queue.pop
      if !message.is_a?(Message)
        raise ArgumentError.new("Unexpected object in message queue: #{message.inspect}")
      end
      if !actually_write(message)
        logger.error("#{self}: Couldn't write, pushing back onto queue.")
        @queue << message
      end
      @last_write = Time.now
    end
  end
end
message_rate() click to toggle source
# File lib/lifx/lan/gateway_connection.rb, line 114
def message_rate
  @message_rate || DEFAULT_MESSAGE_RATE
end
observer_callback_definition() click to toggle source
# File lib/lifx/lan/gateway_connection.rb, line 178
def observer_callback_definition
  {
    message_received: -> (message: nil, ip: nil, transport: nil) {}
  }
end