class LIFX::LAN::GatewayConnection
@api private @private
Constants
- DEFAULT_MESSAGE_RATE
- MAXIMUM_QUEUE_LENGTH
- MAX_TCP_ATTEMPTS
Public Class Methods
new()
click to toggle source
# File lib/lifx/lan/gateway_connection.rb, line 16 def initialize @threads = [] @tcp_attempts = 0 @threads << initialize_write_queue end
Public Instance Methods
close()
click to toggle source
# File lib/lifx/lan/gateway_connection.rb, line 80 def close @threads.each do |thr| thr.abort end [@tcp_transport, @udp_transport].compact.each(&:close) end
connect_tcp(ip, port)
click to toggle source
# File lib/lifx/lan/gateway_connection.rb, line 62 def connect_tcp(ip, port) if @tcp_attempts > MAX_TCP_ATTEMPTS logger.info("#{self}: Ignoring TCP service of #{ip}:#{port} due to too many failed attempts.") return end @tcp_attempts += 1 logger.info("#{self}: Establishing connection to #{ip}:#{port}") @tcp_transport = Transport::TCP.new(ip, port) @tcp_transport.add_observer(self, :message_received) do |message: nil, ip: nil, transport: nil| notify_observers(:message_received, message: message, ip: ip, transport: @tcp_transport) end @tcp_transport.listen end
connect_udp(ip, port)
click to toggle source
# File lib/lifx/lan/gateway_connection.rb, line 58 def connect_udp(ip, port) @udp_transport = Transport::UDP.new(ip, port) end
connected?()
click to toggle source
# File lib/lifx/lan/gateway_connection.rb, line 54 def connected? udp_connected? || tcp_connected? end
flush(timeout: nil)
click to toggle source
# File lib/lifx/lan/gateway_connection.rb, line 87 def flush(timeout: nil) proc = lambda do while !@queue.empty? sleep 0.05 end end if timeout Timeout.timeout(timeout, TimeoutError) do proc.call end else proc.call end end
handle_message(message, ip, transport)
click to toggle source
# File lib/lifx/lan/gateway_connection.rb, line 22 def handle_message(message, ip, transport) payload = message.payload case payload when Protocol::Device::StateService if use_udp? && !udp_connected? && payload.service == Protocol::Device::Service::UDP # UDP transport here is only for sending directly to bulb # We receive responses via UDP transport listening to broadcast in Network connect_udp(ip, payload.port.to_i) elsif use_tcp? && !tcp_connected? && payload.service == Protocol::Device::Service::TCP && (port = payload.port.snapshot) > 0 connect_tcp(ip, port) end else logger.error("#{self}: Unhandled message: #{message}") end end
set_message_rate(rate)
click to toggle source
# File lib/lifx/lan/gateway_connection.rb, line 107 def set_message_rate(rate) @message_rate = rate end
tcp_connected?()
click to toggle source
# File lib/lifx/lan/gateway_connection.rb, line 50 def tcp_connected? @tcp_transport && @tcp_transport.connected? end
to_s()
click to toggle source
# File lib/lifx/lan/gateway_connection.rb, line 102 def to_s "#<LIFX::GatewayConnection tcp=#{@tcp_transport} tcp_attempts=#{@tcp_attempts} udp=#{@udp_transport}>" end
Also aliased as: inspect
udp_connected?()
click to toggle source
# File lib/lifx/lan/gateway_connection.rb, line 46 def udp_connected? @udp_transport && @udp_transport.connected? end
use_tcp?()
click to toggle source
# File lib/lifx/lan/gateway_connection.rb, line 42 def use_tcp? Config.allowed_transports.include?(:tcp) end
use_udp?()
click to toggle source
# File lib/lifx/lan/gateway_connection.rb, line 38 def use_udp? Config.allowed_transports.include?(:udp) end
write(message)
click to toggle source
# File lib/lifx/lan/gateway_connection.rb, line 76 def write(message) @queue.push(message) end
Protected Instance Methods
actually_write(message)
click to toggle source
# File lib/lifx/lan/gateway_connection.rb, line 155 def actually_write(message) check_connections # TODO: Support force sending over UDP if tcp_connected? if @tcp_transport.write(message) logger.debug("-> #{self} #{@tcp_transport}: #{message}") logger.debug("-> #{message.to_hex}") return true end end if udp_connected? if @udp_transport.write(message) logger.debug("-> #{self} #{@udp_transport}: #{message}") logger.debug("-> #{message.to_hex}") return true end end false end
check_connections()
click to toggle source
# File lib/lifx/lan/gateway_connection.rb, line 143 def check_connections if @tcp_transport && !tcp_connected? @tcp_transport = nil logger.info("#{self}: TCP connection dropped, clearing.") end if @udp_transport && !udp_connected? @udp_transport = nil logger.info("#{self}: UDP connection dropped, clearing.") end end
initialize_write_queue()
click to toggle source
# File lib/lifx/lan/gateway_connection.rb, line 118 def initialize_write_queue @queue = SizedQueue.new(MAXIMUM_QUEUE_LENGTH) @last_write = Time.now Thread.start do loop do if !connected? sleep 0.1 next end delay = [(1.0 / message_rate) - (Time.now - @last_write), 0].max logger.debug("#{self}: Sleeping for #{delay}") sleep(delay) message = @queue.pop if !message.is_a?(Message) raise ArgumentError.new("Unexpected object in message queue: #{message.inspect}") end if !actually_write(message) logger.error("#{self}: Couldn't write, pushing back onto queue.") @queue << message end @last_write = Time.now end end end
message_rate()
click to toggle source
# File lib/lifx/lan/gateway_connection.rb, line 114 def message_rate @message_rate || DEFAULT_MESSAGE_RATE end
observer_callback_definition()
click to toggle source
# File lib/lifx/lan/gateway_connection.rb, line 178 def observer_callback_definition { message_received: -> (message: nil, ip: nil, transport: nil) {} } end