class Tinkerforge::IPConnection

Constants

CALLBACK_CONNECTED
CALLBACK_DISCONNECTED
CALLBACK_ENUMERATE
CONNECTION_STATE_CONNECTED
CONNECTION_STATE_DISCONNECTED

returned by get_connection_state

CONNECTION_STATE_PENDING
CONNECT_REASON_AUTO_RECONNECT
CONNECT_REASON_REQUEST

connect_reason parameter for CALLBACK_CONNECTED

DISCONNECT_PROBE_INTERVAL
DISCONNECT_REASON_ERROR
DISCONNECT_REASON_REQUEST

disconnect_reason parameter for CALLBACK_DISCONNECTED

DISCONNECT_REASON_SHUTDOWN
ENUMERATION_TYPE_AVAILABLE

enumeration_type parameter for CALLBACK_ENUMERATE

ENUMERATION_TYPE_CONNECTED
ENUMERATION_TYPE_DISCONNECTED
FUNCTION_DISCONNECT_PROBE
FUNCTION_ENUMERATE
QUEUE_KIND_EXIT
QUEUE_KIND_META
QUEUE_KIND_PACKET

Attributes

timeout[RW]

Public Class Methods

new() click to toggle source

Creates an IP Connection object that can be used to enumerate the available devices. It is also required for the constructor of Bricks and Bricklets.

# File lib/tinkerforge/ip_connection.rb, line 582
def initialize
  @host = nil
  @port = 0

  @timeout = 2.5

  @auto_reconnect = true
  @auto_reconnect_allowed = false
  @auto_reconnect_pending = false

  @next_sequence_number = 0 # protected by sequence_number_mutex
  @sequence_number_mutex = Mutex.new

  @next_authentication_nonce = 0 # protected by authentication_mutex
  @authentication_mutex = Mutex.new # protects authentication handshake

  @devices = {}
  @replace_mutex = Mutex.new # used to synchronize replacements in the devices dict

  @registered_callbacks = {}

  @socket_mutex = Mutex.new
  @socket_send_mutex = Mutex.new
  @socket = nil # protected by socket_mutex
  @socket_id = 0 # protected by socket_mutex

  @receive_flag = false
  @receive_thread = nil

  @callback = nil

  @disconnect_probe_flag = false
  @disconnect_probe_queue = nil
  @disconnect_probe_thread = nil # protected by socket_mutex

  @waiter_queue = Queue.new

  @brickd = BrickDaemon.new '2', self
end

Public Instance Methods

add_device(device) click to toggle source

internal

# File lib/tinkerforge/ip_connection.rb, line 794
def add_device(device)
  @replace_mutex.synchronize {
    replaced_device = @devices.fetch device.uid, nil

    if replaced_device != nil
      replaced_device.replaced = true
    end

    @devices[device.uid] = device # FIXME: use a weakref here
  }
end
authenticate(secret) click to toggle source

Performs an authentication handshake with the connected Brick Daemon or WIFI/Ethernet Extension. If the handshake succeeds the connection switches from non-authenticated to authenticated state and communication can continue as normal. If the handshake fails then the connection gets closed. Authentication can fail if the wrong secret was used or if authentication is not enabled at all on the Brick Daemon or the WIFI/Ethernet Extension.

For more information about authentication see www.tinkerforge.com/en/doc/Tutorials/Tutorial_Authentication/Tutorial.html

# File lib/tinkerforge/ip_connection.rb, line 689
def authenticate(secret)
  if not secret.ascii_only?
    raise ArgumentError, "Authentication secret contains non-ASCII characters"
  end
  @authentication_mutex.synchronize {
    if @next_authentication_nonce == 0
      @next_authentication_nonce = SecureRandom.random_number(1 << 32)
    end

    server_nonce = @brickd.get_authentication_nonce
    client_nonce = Packer.unpack(Packer.pack([@next_authentication_nonce], 'L'), 'C4')[0]
    @next_authentication_nonce += 1
    nonce_bytes = Packer.pack [server_nonce, client_nonce], 'C4 C4'
    digest_bytes = OpenSSL::HMAC.digest 'sha1', secret, nonce_bytes
    digest = Packer.unpack(digest_bytes, 'C20')[0]

    @brickd.authenticate client_nonce, digest
  }
end
connect(host, port) click to toggle source

Creates a TCP/IP connection to the given host and port. The host and port can point to a Brick Daemon or to a WIFI/Ethernet Extension.

Devices can only be controlled when the connection was established successfully.

Blocks until the connection is established and throws an exception if there is no Brick Daemon or WIFI/Ethernet Extension listening at the given host and port.

# File lib/tinkerforge/ip_connection.rb, line 632
def connect(host, port)
  @socket_mutex.synchronize {
    if @socket != nil
      raise AlreadyConnectedException, "Already connected to #{@host}:#{@port}"
    end

    @host = host
    @port = port

    connect_unlocked false
  }
end
create_packet_header(device, length, function_id) click to toggle source

internal

# File lib/tinkerforge/ip_connection.rb, line 816
def create_packet_header(device, length, function_id)
  uid = 0
  sequence_number = get_next_sequence_number
  response_expected = false
  r_bit = 0

  if device != nil
    uid = device.uid
    response_expected = device.get_response_expected function_id
  end

  if response_expected
    r_bit = 1
  end

  sequence_number_and_options = (sequence_number << 4) | (r_bit << 3)
  header = Packer.pack [uid, length, function_id, sequence_number_and_options, 0], 'L C C C C'

  [header, response_expected, sequence_number]
end
disconnect() click to toggle source

Disconnects the TCP/IP connection from the Brick Daemon or the WIFI/Ethernet Extension.

# File lib/tinkerforge/ip_connection.rb, line 647
def disconnect
  callback = nil

  @socket_mutex.synchronize {
    @auto_reconnect_allowed = false

    if @auto_reconnect_pending
      # Abort pending auto reconnect
      @auto_reconnect_pending = false
    else
      if @socket == nil
        raise NotConnectedException, 'Not connected'
      end

      disconnect_unlocked
    end

    # Destroy callback thread
    callback = @callback
    @callback = nil
  }

  # Do this outside of socket_mutex to allow calling (dis-)connect from
  # the callbacks while blocking on the join call here
  callback.queue.push [QUEUE_KIND_META, [CALLBACK_DISCONNECTED,
                                         DISCONNECT_REASON_REQUEST, nil]]
  callback.queue.push [QUEUE_KIND_EXIT, nil]

  if Thread.current != callback.thread
    callback.thread.join
  end
end
enumerate() click to toggle source

Broadcasts an enumerate request. All devices will respond with an enumerate callback.

# File lib/tinkerforge/ip_connection.rb, line 761
def enumerate
  request, _, _ = create_packet_header nil, 8, FUNCTION_ENUMERATE

  send_request request
end
get_auto_reconnect() click to toggle source

Returns true if auto-reconnect is enabled, false otherwise.

# File lib/tinkerforge/ip_connection.rb, line 742
def get_auto_reconnect
  @auto_reconnect
end
get_connection_state() click to toggle source

Can return the following states:

# File lib/tinkerforge/ip_connection.rb, line 716
def get_connection_state
  if @socket != nil
    CONNECTION_STATE_CONNECTED
  elsif @auto_reconnect_pending
    CONNECTION_STATE_PENDING
  else
    CONNECTION_STATE_DISCONNECTED
  end
end
get_next_sequence_number() click to toggle source

internal

# File lib/tinkerforge/ip_connection.rb, line 807
def get_next_sequence_number
  @sequence_number_mutex.synchronize {
    sequence_number = @next_sequence_number + 1
    @next_sequence_number = sequence_number % 15
    sequence_number
  }
end
get_timeout() click to toggle source

Returns the timeout as set by set_timeout.

# File lib/tinkerforge/ip_connection.rb, line 755
def get_timeout
  @timeout
end
register_callback(id, &block) click to toggle source

Registers a callback with ID id to the block block.

# File lib/tinkerforge/ip_connection.rb, line 788
def register_callback(id, &block)
  callback = block
  @registered_callbacks[id] = callback
end
send_request(request) click to toggle source

internal

# File lib/tinkerforge/ip_connection.rb, line 838
def send_request(request)
  @socket_mutex.synchronize {
    if @socket == nil
      raise NotConnectedException, 'Not connected'
    end

    begin
      @socket_send_mutex.synchronize {
        @socket.send request, 0
      }
    rescue IOError
      handle_disconnect_by_peer DISCONNECT_REASON_ERROR, @socket_id, true
      raise NotConnectedException, 'Not connected'
    rescue Errno::ECONNRESET
      handle_disconnect_by_peer DISCONNECT_REASON_SHUTDOWN, @socket_id, true
      raise NotConnectedException, 'Not connected'
    end

    @disconnect_probe_flag = false
  }
end
set_auto_reconnect(auto_reconnect) click to toggle source

Enables or disables auto-reconnect. If auto-reconnect is enabled, the IP Connection will try to reconnect to the previously given host and port, if the connection is lost.

Default value is true.

# File lib/tinkerforge/ip_connection.rb, line 731
def set_auto_reconnect(auto_reconnect)
  @auto_reconnect = auto_reconnect

  if not @auto_reconnect
    # Abort potentially pending auto reconnect
    @auto_reconnect_allowed = false
  end
end
set_timeout(timeout) click to toggle source

Sets the timeout in seconds for getters and for setters for which the response expected flag is activated.

Default timeout is 2.5.

# File lib/tinkerforge/ip_connection.rb, line 750
def set_timeout(timeout)
  @timeout = timeout
end
unwait() click to toggle source

Unwaits the thread previously stopped by wait.

Wait and unwait act in the same way as “acquire” and “release” of a semaphore.

# File lib/tinkerforge/ip_connection.rb, line 783
def unwait
  @waiter_queue.push nil
end
wait() click to toggle source

Stops the current thread until unwait is called.

This is useful if you rely solely on callbacks for events, if you want to wait for a specific callback or if the IP Connection was created in a thread.

Wait and unwait act in the same way as “acquire” and “release” of a semaphore.

# File lib/tinkerforge/ip_connection.rb, line 775
def wait
  @waiter_queue.pop
end

Private Instance Methods

callback_loop(callback) click to toggle source

internal

# File lib/tinkerforge/ip_connection.rb, line 1214
def callback_loop(callback)
  alive = true

  while alive
    kind, data = callback.queue.pop

    # FIXME: Cannot lock callback mutex here because this can
    #        deadlock due to an ordering problem with the socket mutex
    # callback.mutex.synchronize {
      if kind == QUEUE_KIND_EXIT
        alive = false
      elsif kind == QUEUE_KIND_META
        function_id, parameter, socket_id = data

        dispatch_meta function_id, parameter, socket_id
      elsif kind == QUEUE_KIND_PACKET
        # don't dispatch callbacks when the receive thread isn't running
        if callback.packet_dispatch_allowed
          dispatch_packet data
        end
      end
    #}
  end
end
connect_unlocked(is_auto_reconnect) click to toggle source

internal

# File lib/tinkerforge/ip_connection.rb, line 872
def connect_unlocked(is_auto_reconnect)
  # NOTE: Assumes that the socket mutex is locked

  # Create callback queue and thread
  if @callback == nil
    @callback = CallbackContext.new
    @callback.queue = Queue.new
    @callback.mutex = Mutex.new
    @callback.packet_dispatch_allowed = false
    @callback.thread = Thread.new(@callback) do |callback|
      callback_loop callback
    end
    @callback.thread.abort_on_exception = true
  end

  # Create socket
  @socket = TCPSocket.new @host, @port
  @socket.setsockopt(Socket::IPPROTO_TCP, Socket::TCP_NODELAY, 1)
  @socket_id += 1

  # Create disconnect probe thread
  @disconnect_probe_flag = true
  @disconnect_probe_queue = Queue.new
  @disconnect_probe_thread = Thread.new(@disconnect_probe_queue) do |disconnect_probe_queue|
    disconnect_probe_loop disconnect_probe_queue
  end
  @disconnect_probe_thread.abort_on_exception = true

  # Create receive thread
  @callback.packet_dispatch_allowed = true

  @receive_flag = true
  @receive_thread = Thread.new(@socket_id) do |socket_id|
    receive_loop socket_id
  end
  @receive_thread.abort_on_exception = true

  # Trigger connected callback
  if is_auto_reconnect
    connect_reason = CONNECT_REASON_AUTO_RECONNECT
  else
    connect_reason = CONNECT_REASON_REQUEST
  end

  @auto_reconnect_allowed = false
  @auto_reconnect_pending = false

  @callback.queue.push [QUEUE_KIND_META, [CALLBACK_CONNECTED,
                                          connect_reason, nil]]
end
disconnect_probe_loop(disconnect_probe_queue) click to toggle source

internal

# File lib/tinkerforge/ip_connection.rb, line 1240
def disconnect_probe_loop(disconnect_probe_queue)
  # NOTE: the disconnect probe thread is not allowed to hold the socket_mutex at any
  #       time because it is created and joined while the socket_mutex is locked

  request, _, _ = create_packet_header nil, 8, FUNCTION_DISCONNECT_PROBE

  while true
    begin
      Timeout::timeout(DISCONNECT_PROBE_INTERVAL) {
        disconnect_probe_queue.pop
      }
    rescue Timeout::Error
      if @disconnect_probe_flag
        begin
          @socket_send_mutex.synchronize {
            @socket.send request, 0
          }
        rescue IOError
          handle_disconnect_by_peer DISCONNECT_REASON_ERROR, @socket_id, false
          break
        rescue Errno::ECONNRESET
          handle_disconnect_by_peer DISCONNECT_REASON_SHUTDOWN, @socket_id, false
          break
        end
      else
        @disconnect_probe_flag = true
      end
      next
    end
    break
  end
end
disconnect_unlocked() click to toggle source

internal

# File lib/tinkerforge/ip_connection.rb, line 924
def disconnect_unlocked
  # NOTE: Assumes that the socket mutex is locked

  # Destroy disconnect probe thread
  @disconnect_probe_queue.push true
  @disconnect_probe_thread.join
  @disconnect_probe_thread = nil

  # Stop dispatching packet callbacks before ending the receive
  # thread to avoid timeout exceptions due to callback functions
  # trying to call getters
  if Thread.current != @callback.thread
    # FIXME: Cannot lock callback mutex here because this can
    #        deadlock due to an ordering problem with the socket mutex
    #@callback.mutex.synchronize {
      @callback.packet_dispatch_allowed = false
    #}
  else
    @callback.packet_dispatch_allowed = false
  end

  # Destroy receive thread
  @receive_flag = false

  @socket.shutdown(Socket::SHUT_RDWR)

  if @receive_thread != nil
    @receive_thread.join
    @receive_thread = nil
  end

  # Destroy socket
  @socket.close
  @socket = nil
end
dispatch_meta(function_id, parameter, socket_id) click to toggle source

internal

# File lib/tinkerforge/ip_connection.rb, line 1024
def dispatch_meta(function_id, parameter, socket_id)
  if function_id == CALLBACK_CONNECTED
    cb = @registered_callbacks[CALLBACK_CONNECTED]

    if cb != nil
      cb.call parameter
    end
  elsif function_id == CALLBACK_DISCONNECTED
    if parameter != DISCONNECT_REASON_REQUEST
      # Need to do this here, the receive_loop is not allowed to
      # hold the socket_mutex because this could cause a deadlock
      # with a concurrent call to the (dis-)connect function
      @socket_mutex.synchronize {
        # Don't close the socket if it got disconnected or
        # reconnected in the meantime
        if @socket != nil and @socket_id == socket_id
          # Destroy disconnect probe thread
          @disconnect_probe_queue.push true
          @disconnect_probe_thread.join
          @disconnect_probe_thread = nil

          # Destroy socket
          @socket.close
          @socket = nil
        end
      }
    end

    # FIXME: Wait a moment here, otherwise the next connect
    # attempt will succeed, even if there is no open server
    # socket. the first receive will then fail directly
    sleep 0.1

    cb = @registered_callbacks[CALLBACK_DISCONNECTED]

    if cb != nil
      cb.call parameter
    end

    if parameter != DISCONNECT_REASON_REQUEST and @auto_reconnect and @auto_reconnect_allowed
      @auto_reconnect_pending = true
      retry_connect = true

      # Block here until reconnect. this is okay, there is no
      # callback to deliver when there is no connection
      while retry_connect
        retry_connect = false

        @socket_mutex.synchronize {
          if @auto_reconnect_allowed and @socket == nil
            begin
              connect_unlocked true
            rescue
              retry_connect = true
            end
          else
            @auto_reconnect_pending = false
          end
        }

        if retry_connect
          sleep 0.1
        end
      end
    end
  end
end
dispatch_packet(packet) click to toggle source

internal

# File lib/tinkerforge/ip_connection.rb, line 1093
def dispatch_packet(packet)
  uid = Packer.get_uid_from_data packet
  function_id = Packer.get_function_id_from_data packet

  if function_id == CALLBACK_ENUMERATE
    cb = @registered_callbacks[CALLBACK_ENUMERATE]

    if cb == nil
      return
    end

    if packet.length != 34
      return # silently ignoring callback with wrong length
    end

    payload = Packer.unpack packet[8..-1], 'Z8 Z8 k C3 C3 S C'

    cb.call(*payload)
    return
  end

  device = @devices[uid]

  if device != nil
    begin
      device.check_validity
    rescue TinkerforgeException
      return # silently ignoring callback for invalid device
    end

    if device.high_level_callbacks.has_key?(-function_id)
      hlcb = device.high_level_callbacks[-function_id] # [roles, options, data]
      format = device.callback_formats[function_id] # FIXME: currently assuming that low-level callback has more than one element

      if packet.length != format[0]
        return # silently ignoring callback with wrong length
      end

      payload = Packer.unpack packet[8..-1], format[1]
      has_data = false
      data = nil

      if hlcb[1]['fixed_length'] != nil
        length = hlcb[1]['fixed_length']
      else
        length = payload[hlcb[0].index 'stream_length']
      end

      if not hlcb[1]['single_chunk']
        chunk_offset = payload[hlcb[0].index 'stream_chunk_offset']
      else
        chunk_offset = 0
      end

      chunk_data = payload[hlcb[0].index 'stream_chunk_data']

      if hlcb[2] == nil # no stream in-progress
        if chunk_offset == 0 # stream starts
          hlcb[2] = chunk_data

          if hlcb[2].length >= length # stream complete
            has_data = true
            data = hlcb[2][0, length]
            hlcb[2] = nil
          else # ignore tail of current stream, wait for next stream start
          end
        else # stream in-progress
          if chunk_offset != hlcb[2].length # stream out-of-sync
            has_data = true
            data = nil
            hlcb[2] = nil
          else # stream in-sync
            hlcb[2] += chunk_data

            if hlcb[2].length >= length # stream complete
              has_data = true
              data = hlcb[2][0, length]
              hlcb[2] = nil
            end
          end
        end

        cb = device.registered_callbacks[-function_id]

        if has_data and cb != nil
          result = []

          hlcb[0].zip(payload).each do |role, value|
            if role == 'stream_chunk_data'
              result << data
            elsif role == nil
              result << value
            end
          end

          cb.call(*result)
        end
      end
    end

    cb = device.registered_callbacks[function_id]

    if cb != nil
      format = device.callback_formats[function_id]

      if format == nil
        return # silently ignore registered but unknown callback
      end

      if packet.length != format[0]
        return # silently ignoring callback with wrong length
      end

      payload = Packer.unpack packet[8..-1], format[1]

      cb.call(*payload)
    end
  end
end
handle_disconnect_by_peer(disconnect_reason, socket_id, disconnect_immediately) click to toggle source

internal

# File lib/tinkerforge/ip_connection.rb, line 1274
def handle_disconnect_by_peer(disconnect_reason, socket_id, disconnect_immediately)
  # NOTE: assumes that socket_mutex is locked if disconnect_immediately is true

  @auto_reconnect_allowed = true

  if disconnect_immediately
    disconnect_unlocked
  end

  @callback.queue.push [QUEUE_KIND_META, [CALLBACK_DISCONNECTED,
                                          disconnect_reason, socket_id]]
end
handle_response(packet) click to toggle source

internal

# File lib/tinkerforge/ip_connection.rb, line 1288
def handle_response(packet)
  @disconnect_probe_flag = false

  function_id = Packer.get_function_id_from_data packet
  sequence_number = Packer.get_sequence_number_from_data packet

  if sequence_number == 0 and function_id == CALLBACK_ENUMERATE
    if @registered_callbacks.has_key? CALLBACK_ENUMERATE
      @callback.queue.push [QUEUE_KIND_PACKET, packet]
    end
    return
  end

  uid = Packer.get_uid_from_data packet
  device = @devices[uid]

  if device != nil
    if sequence_number == 0
      if device.registered_callbacks.has_key? function_id or \
         device.high_level_callbacks.has_key?(-function_id)
        @callback.queue.push [QUEUE_KIND_PACKET, packet]
      end
    elsif device.expected_response_function_id == function_id and \
          device.expected_response_sequence_number == sequence_number
      device.enqueue_response packet
    else
    end
  end
end
receive_loop(socket_id) click to toggle source

internal

# File lib/tinkerforge/ip_connection.rb, line 961
def receive_loop(socket_id)
  pending_data = ''

  while @receive_flag
    begin
      result = IO.select [@socket], [], [], 1
    rescue IOError
      # FIXME: handle this error?
      break
    end

    if result == nil or result[0].length < 1
      next
    end

    begin
      data = @socket.recv 8192
    rescue IOError
      handle_disconnect_by_peer DISCONNECT_REASON_ERROR, socket_id, false
      break
    rescue Errno::ECONNRESET
      handle_disconnect_by_peer DISCONNECT_REASON_SHUTDOWN, socket_id, false
      break
    rescue Errno::ESHUTDOWN
      # shutdown was called from disconnect_unlocked
      break
    end

    if not @receive_flag
      break
    end

    if data.length == 0
      if @receive_flag
        handle_disconnect_by_peer DISCONNECT_REASON_SHUTDOWN, socket_id, false
      end
      break
    end

    pending_data += data

    while @receive_flag
      if pending_data.length < 8
        # Wait for complete header
        break
      end

      length = Packer.get_length_from_data pending_data

      if pending_data.length < length
        # Wait for complete packet
        break
      end

      packet = pending_data[0, length]
      pending_data = pending_data[length..-1]

      handle_response packet
    end
  end
end