class Tinkerforge::IPConnection
Constants
- CALLBACK_CONNECTED
- CALLBACK_DISCONNECTED
- CALLBACK_ENUMERATE
- CONNECTION_STATE_CONNECTED
- CONNECTION_STATE_DISCONNECTED
returned by
get_connection_state
- CONNECTION_STATE_PENDING
- CONNECT_REASON_AUTO_RECONNECT
- CONNECT_REASON_REQUEST
connect_reason parameter for
CALLBACK_CONNECTED
- DISCONNECT_PROBE_INTERVAL
- DISCONNECT_REASON_ERROR
- DISCONNECT_REASON_REQUEST
disconnect_reason parameter for
CALLBACK_DISCONNECTED
- DISCONNECT_REASON_SHUTDOWN
- ENUMERATION_TYPE_AVAILABLE
enumeration_type parameter for
CALLBACK_ENUMERATE
- ENUMERATION_TYPE_CONNECTED
- ENUMERATION_TYPE_DISCONNECTED
- FUNCTION_DISCONNECT_PROBE
- FUNCTION_ENUMERATE
- QUEUE_KIND_EXIT
- QUEUE_KIND_META
- QUEUE_KIND_PACKET
Attributes
Public Class Methods
Creates an IP Connection object that can be used to enumerate the available devices. It is also required for the constructor of Bricks and Bricklets.
# File lib/tinkerforge/ip_connection.rb, line 582 def initialize @host = nil @port = 0 @timeout = 2.5 @auto_reconnect = true @auto_reconnect_allowed = false @auto_reconnect_pending = false @next_sequence_number = 0 # protected by sequence_number_mutex @sequence_number_mutex = Mutex.new @next_authentication_nonce = 0 # protected by authentication_mutex @authentication_mutex = Mutex.new # protects authentication handshake @devices = {} @replace_mutex = Mutex.new # used to synchronize replacements in the devices dict @registered_callbacks = {} @socket_mutex = Mutex.new @socket_send_mutex = Mutex.new @socket = nil # protected by socket_mutex @socket_id = 0 # protected by socket_mutex @receive_flag = false @receive_thread = nil @callback = nil @disconnect_probe_flag = false @disconnect_probe_queue = nil @disconnect_probe_thread = nil # protected by socket_mutex @waiter_queue = Queue.new @brickd = BrickDaemon.new '2', self end
Public Instance Methods
internal
# File lib/tinkerforge/ip_connection.rb, line 794 def add_device(device) @replace_mutex.synchronize { replaced_device = @devices.fetch device.uid, nil if replaced_device != nil replaced_device.replaced = true end @devices[device.uid] = device # FIXME: use a weakref here } end
Performs an authentication handshake with the connected Brick Daemon or WIFI/Ethernet Extension. If the handshake succeeds the connection switches from non-authenticated to authenticated state and communication can continue as normal. If the handshake fails then the connection gets closed. Authentication can fail if the wrong secret was used or if authentication is not enabled at all on the Brick Daemon or the WIFI/Ethernet Extension.
For more information about authentication see www.tinkerforge.com/en/doc/Tutorials/Tutorial_Authentication/Tutorial.html
# File lib/tinkerforge/ip_connection.rb, line 689 def authenticate(secret) if not secret.ascii_only? raise ArgumentError, "Authentication secret contains non-ASCII characters" end @authentication_mutex.synchronize { if @next_authentication_nonce == 0 @next_authentication_nonce = SecureRandom.random_number(1 << 32) end server_nonce = @brickd.get_authentication_nonce client_nonce = Packer.unpack(Packer.pack([@next_authentication_nonce], 'L'), 'C4')[0] @next_authentication_nonce += 1 nonce_bytes = Packer.pack [server_nonce, client_nonce], 'C4 C4' digest_bytes = OpenSSL::HMAC.digest 'sha1', secret, nonce_bytes digest = Packer.unpack(digest_bytes, 'C20')[0] @brickd.authenticate client_nonce, digest } end
Creates a TCP/IP connection to the given host
and port
. The host and port can point to a Brick Daemon or to a WIFI/Ethernet Extension.
Devices can only be controlled when the connection was established successfully.
Blocks until the connection is established and throws an exception if there is no Brick Daemon or WIFI/Ethernet Extension listening at the given host and port.
# File lib/tinkerforge/ip_connection.rb, line 632 def connect(host, port) @socket_mutex.synchronize { if @socket != nil raise AlreadyConnectedException, "Already connected to #{@host}:#{@port}" end @host = host @port = port connect_unlocked false } end
internal
# File lib/tinkerforge/ip_connection.rb, line 816 def create_packet_header(device, length, function_id) uid = 0 sequence_number = get_next_sequence_number response_expected = false r_bit = 0 if device != nil uid = device.uid response_expected = device.get_response_expected function_id end if response_expected r_bit = 1 end sequence_number_and_options = (sequence_number << 4) | (r_bit << 3) header = Packer.pack [uid, length, function_id, sequence_number_and_options, 0], 'L C C C C' [header, response_expected, sequence_number] end
Disconnects the TCP/IP connection from the Brick Daemon or the WIFI/Ethernet Extension.
# File lib/tinkerforge/ip_connection.rb, line 647 def disconnect callback = nil @socket_mutex.synchronize { @auto_reconnect_allowed = false if @auto_reconnect_pending # Abort pending auto reconnect @auto_reconnect_pending = false else if @socket == nil raise NotConnectedException, 'Not connected' end disconnect_unlocked end # Destroy callback thread callback = @callback @callback = nil } # Do this outside of socket_mutex to allow calling (dis-)connect from # the callbacks while blocking on the join call here callback.queue.push [QUEUE_KIND_META, [CALLBACK_DISCONNECTED, DISCONNECT_REASON_REQUEST, nil]] callback.queue.push [QUEUE_KIND_EXIT, nil] if Thread.current != callback.thread callback.thread.join end end
Broadcasts an enumerate request. All devices will respond with an enumerate callback.
# File lib/tinkerforge/ip_connection.rb, line 761 def enumerate request, _, _ = create_packet_header nil, 8, FUNCTION_ENUMERATE send_request request end
Returns true
if auto-reconnect is enabled, false
otherwise.
# File lib/tinkerforge/ip_connection.rb, line 742 def get_auto_reconnect @auto_reconnect end
Can return the following states:
-
CONNECTION_STATE_DISCONNECTED
: No connection is established. -
CONNECTION_STATE_CONNECTED
: A connection to the Brick Daemon or the WIFI/Ethernet Extension is established. -
CONNECTION_STATE_PENDING
: IP Connection is currently trying to connect.
# File lib/tinkerforge/ip_connection.rb, line 716 def get_connection_state if @socket != nil CONNECTION_STATE_CONNECTED elsif @auto_reconnect_pending CONNECTION_STATE_PENDING else CONNECTION_STATE_DISCONNECTED end end
internal
# File lib/tinkerforge/ip_connection.rb, line 807 def get_next_sequence_number @sequence_number_mutex.synchronize { sequence_number = @next_sequence_number + 1 @next_sequence_number = sequence_number % 15 sequence_number } end
Returns the timeout as set by set_timeout.
# File lib/tinkerforge/ip_connection.rb, line 755 def get_timeout @timeout end
Registers a callback with ID id
to the block block
.
# File lib/tinkerforge/ip_connection.rb, line 788 def register_callback(id, &block) callback = block @registered_callbacks[id] = callback end
internal
# File lib/tinkerforge/ip_connection.rb, line 838 def send_request(request) @socket_mutex.synchronize { if @socket == nil raise NotConnectedException, 'Not connected' end begin @socket_send_mutex.synchronize { @socket.send request, 0 } rescue IOError handle_disconnect_by_peer DISCONNECT_REASON_ERROR, @socket_id, true raise NotConnectedException, 'Not connected' rescue Errno::ECONNRESET handle_disconnect_by_peer DISCONNECT_REASON_SHUTDOWN, @socket_id, true raise NotConnectedException, 'Not connected' end @disconnect_probe_flag = false } end
Enables or disables auto-reconnect. If auto-reconnect is enabled, the IP Connection will try to reconnect to the previously given host and port, if the connection is lost.
Default value is true
.
# File lib/tinkerforge/ip_connection.rb, line 731 def set_auto_reconnect(auto_reconnect) @auto_reconnect = auto_reconnect if not @auto_reconnect # Abort potentially pending auto reconnect @auto_reconnect_allowed = false end end
Sets the timeout in seconds for getters and for setters for which the response expected flag is activated.
Default timeout is 2.5.
# File lib/tinkerforge/ip_connection.rb, line 750 def set_timeout(timeout) @timeout = timeout end
Unwaits the thread previously stopped by wait.
Wait and unwait act in the same way as “acquire” and “release” of a semaphore.
# File lib/tinkerforge/ip_connection.rb, line 783 def unwait @waiter_queue.push nil end
Stops the current thread until unwait is called.
This is useful if you rely solely on callbacks for events, if you want to wait for a specific callback or if the IP Connection was created in a thread.
Wait and unwait act in the same way as “acquire” and “release” of a semaphore.
# File lib/tinkerforge/ip_connection.rb, line 775 def wait @waiter_queue.pop end
Private Instance Methods
internal
# File lib/tinkerforge/ip_connection.rb, line 1214 def callback_loop(callback) alive = true while alive kind, data = callback.queue.pop # FIXME: Cannot lock callback mutex here because this can # deadlock due to an ordering problem with the socket mutex # callback.mutex.synchronize { if kind == QUEUE_KIND_EXIT alive = false elsif kind == QUEUE_KIND_META function_id, parameter, socket_id = data dispatch_meta function_id, parameter, socket_id elsif kind == QUEUE_KIND_PACKET # don't dispatch callbacks when the receive thread isn't running if callback.packet_dispatch_allowed dispatch_packet data end end #} end end
internal
# File lib/tinkerforge/ip_connection.rb, line 872 def connect_unlocked(is_auto_reconnect) # NOTE: Assumes that the socket mutex is locked # Create callback queue and thread if @callback == nil @callback = CallbackContext.new @callback.queue = Queue.new @callback.mutex = Mutex.new @callback.packet_dispatch_allowed = false @callback.thread = Thread.new(@callback) do |callback| callback_loop callback end @callback.thread.abort_on_exception = true end # Create socket @socket = TCPSocket.new @host, @port @socket.setsockopt(Socket::IPPROTO_TCP, Socket::TCP_NODELAY, 1) @socket_id += 1 # Create disconnect probe thread @disconnect_probe_flag = true @disconnect_probe_queue = Queue.new @disconnect_probe_thread = Thread.new(@disconnect_probe_queue) do |disconnect_probe_queue| disconnect_probe_loop disconnect_probe_queue end @disconnect_probe_thread.abort_on_exception = true # Create receive thread @callback.packet_dispatch_allowed = true @receive_flag = true @receive_thread = Thread.new(@socket_id) do |socket_id| receive_loop socket_id end @receive_thread.abort_on_exception = true # Trigger connected callback if is_auto_reconnect connect_reason = CONNECT_REASON_AUTO_RECONNECT else connect_reason = CONNECT_REASON_REQUEST end @auto_reconnect_allowed = false @auto_reconnect_pending = false @callback.queue.push [QUEUE_KIND_META, [CALLBACK_CONNECTED, connect_reason, nil]] end
internal
# File lib/tinkerforge/ip_connection.rb, line 1240 def disconnect_probe_loop(disconnect_probe_queue) # NOTE: the disconnect probe thread is not allowed to hold the socket_mutex at any # time because it is created and joined while the socket_mutex is locked request, _, _ = create_packet_header nil, 8, FUNCTION_DISCONNECT_PROBE while true begin Timeout::timeout(DISCONNECT_PROBE_INTERVAL) { disconnect_probe_queue.pop } rescue Timeout::Error if @disconnect_probe_flag begin @socket_send_mutex.synchronize { @socket.send request, 0 } rescue IOError handle_disconnect_by_peer DISCONNECT_REASON_ERROR, @socket_id, false break rescue Errno::ECONNRESET handle_disconnect_by_peer DISCONNECT_REASON_SHUTDOWN, @socket_id, false break end else @disconnect_probe_flag = true end next end break end end
internal
# File lib/tinkerforge/ip_connection.rb, line 924 def disconnect_unlocked # NOTE: Assumes that the socket mutex is locked # Destroy disconnect probe thread @disconnect_probe_queue.push true @disconnect_probe_thread.join @disconnect_probe_thread = nil # Stop dispatching packet callbacks before ending the receive # thread to avoid timeout exceptions due to callback functions # trying to call getters if Thread.current != @callback.thread # FIXME: Cannot lock callback mutex here because this can # deadlock due to an ordering problem with the socket mutex #@callback.mutex.synchronize { @callback.packet_dispatch_allowed = false #} else @callback.packet_dispatch_allowed = false end # Destroy receive thread @receive_flag = false @socket.shutdown(Socket::SHUT_RDWR) if @receive_thread != nil @receive_thread.join @receive_thread = nil end # Destroy socket @socket.close @socket = nil end
internal
# File lib/tinkerforge/ip_connection.rb, line 1024 def dispatch_meta(function_id, parameter, socket_id) if function_id == CALLBACK_CONNECTED cb = @registered_callbacks[CALLBACK_CONNECTED] if cb != nil cb.call parameter end elsif function_id == CALLBACK_DISCONNECTED if parameter != DISCONNECT_REASON_REQUEST # Need to do this here, the receive_loop is not allowed to # hold the socket_mutex because this could cause a deadlock # with a concurrent call to the (dis-)connect function @socket_mutex.synchronize { # Don't close the socket if it got disconnected or # reconnected in the meantime if @socket != nil and @socket_id == socket_id # Destroy disconnect probe thread @disconnect_probe_queue.push true @disconnect_probe_thread.join @disconnect_probe_thread = nil # Destroy socket @socket.close @socket = nil end } end # FIXME: Wait a moment here, otherwise the next connect # attempt will succeed, even if there is no open server # socket. the first receive will then fail directly sleep 0.1 cb = @registered_callbacks[CALLBACK_DISCONNECTED] if cb != nil cb.call parameter end if parameter != DISCONNECT_REASON_REQUEST and @auto_reconnect and @auto_reconnect_allowed @auto_reconnect_pending = true retry_connect = true # Block here until reconnect. this is okay, there is no # callback to deliver when there is no connection while retry_connect retry_connect = false @socket_mutex.synchronize { if @auto_reconnect_allowed and @socket == nil begin connect_unlocked true rescue retry_connect = true end else @auto_reconnect_pending = false end } if retry_connect sleep 0.1 end end end end end
internal
# File lib/tinkerforge/ip_connection.rb, line 1093 def dispatch_packet(packet) uid = Packer.get_uid_from_data packet function_id = Packer.get_function_id_from_data packet if function_id == CALLBACK_ENUMERATE cb = @registered_callbacks[CALLBACK_ENUMERATE] if cb == nil return end if packet.length != 34 return # silently ignoring callback with wrong length end payload = Packer.unpack packet[8..-1], 'Z8 Z8 k C3 C3 S C' cb.call(*payload) return end device = @devices[uid] if device != nil begin device.check_validity rescue TinkerforgeException return # silently ignoring callback for invalid device end if device.high_level_callbacks.has_key?(-function_id) hlcb = device.high_level_callbacks[-function_id] # [roles, options, data] format = device.callback_formats[function_id] # FIXME: currently assuming that low-level callback has more than one element if packet.length != format[0] return # silently ignoring callback with wrong length end payload = Packer.unpack packet[8..-1], format[1] has_data = false data = nil if hlcb[1]['fixed_length'] != nil length = hlcb[1]['fixed_length'] else length = payload[hlcb[0].index 'stream_length'] end if not hlcb[1]['single_chunk'] chunk_offset = payload[hlcb[0].index 'stream_chunk_offset'] else chunk_offset = 0 end chunk_data = payload[hlcb[0].index 'stream_chunk_data'] if hlcb[2] == nil # no stream in-progress if chunk_offset == 0 # stream starts hlcb[2] = chunk_data if hlcb[2].length >= length # stream complete has_data = true data = hlcb[2][0, length] hlcb[2] = nil else # ignore tail of current stream, wait for next stream start end else # stream in-progress if chunk_offset != hlcb[2].length # stream out-of-sync has_data = true data = nil hlcb[2] = nil else # stream in-sync hlcb[2] += chunk_data if hlcb[2].length >= length # stream complete has_data = true data = hlcb[2][0, length] hlcb[2] = nil end end end cb = device.registered_callbacks[-function_id] if has_data and cb != nil result = [] hlcb[0].zip(payload).each do |role, value| if role == 'stream_chunk_data' result << data elsif role == nil result << value end end cb.call(*result) end end end cb = device.registered_callbacks[function_id] if cb != nil format = device.callback_formats[function_id] if format == nil return # silently ignore registered but unknown callback end if packet.length != format[0] return # silently ignoring callback with wrong length end payload = Packer.unpack packet[8..-1], format[1] cb.call(*payload) end end end
internal
# File lib/tinkerforge/ip_connection.rb, line 1274 def handle_disconnect_by_peer(disconnect_reason, socket_id, disconnect_immediately) # NOTE: assumes that socket_mutex is locked if disconnect_immediately is true @auto_reconnect_allowed = true if disconnect_immediately disconnect_unlocked end @callback.queue.push [QUEUE_KIND_META, [CALLBACK_DISCONNECTED, disconnect_reason, socket_id]] end
internal
# File lib/tinkerforge/ip_connection.rb, line 1288 def handle_response(packet) @disconnect_probe_flag = false function_id = Packer.get_function_id_from_data packet sequence_number = Packer.get_sequence_number_from_data packet if sequence_number == 0 and function_id == CALLBACK_ENUMERATE if @registered_callbacks.has_key? CALLBACK_ENUMERATE @callback.queue.push [QUEUE_KIND_PACKET, packet] end return end uid = Packer.get_uid_from_data packet device = @devices[uid] if device != nil if sequence_number == 0 if device.registered_callbacks.has_key? function_id or \ device.high_level_callbacks.has_key?(-function_id) @callback.queue.push [QUEUE_KIND_PACKET, packet] end elsif device.expected_response_function_id == function_id and \ device.expected_response_sequence_number == sequence_number device.enqueue_response packet else end end end
internal
# File lib/tinkerforge/ip_connection.rb, line 961 def receive_loop(socket_id) pending_data = '' while @receive_flag begin result = IO.select [@socket], [], [], 1 rescue IOError # FIXME: handle this error? break end if result == nil or result[0].length < 1 next end begin data = @socket.recv 8192 rescue IOError handle_disconnect_by_peer DISCONNECT_REASON_ERROR, socket_id, false break rescue Errno::ECONNRESET handle_disconnect_by_peer DISCONNECT_REASON_SHUTDOWN, socket_id, false break rescue Errno::ESHUTDOWN # shutdown was called from disconnect_unlocked break end if not @receive_flag break end if data.length == 0 if @receive_flag handle_disconnect_by_peer DISCONNECT_REASON_SHUTDOWN, socket_id, false end break end pending_data += data while @receive_flag if pending_data.length < 8 # Wait for complete header break end length = Packer.get_length_from_data pending_data if pending_data.length < length # Wait for complete packet break end packet = pending_data[0, length] pending_data = pending_data[length..-1] handle_response packet end end end