class IB::Connection

Encapsulates API connection to TWS or Gateway

Constants

DEFAULT_OPTIONS

Please note, we are realizing only the most current TWS protocol versions, thus improving performance at the expense of backwards compatibility. Older protocol versions support can be found in older gem versions.

Attributes

current[RW]
client_id[RW]
client_version[RW]
local_connect_time[RW]
next_local_id[RW]
next_order_id[RW]
next_order_id=[RW]
options[RW]
reader[RW]
remote_connect_time[RW]
server_version[RW]
socket[RW]

Public Class Methods

new(opts = {}) click to toggle source
# File lib/ib/connection.rb, line 44
def initialize opts = {}
  @options = DEFAULT_OPTIONS.merge(opts)

  # A couple of locks to avoid race conditions in JRuby
  @subscribe_lock = Mutex.new
  @receive_lock = Mutex.new

  self.default_logger = options[:logger] if options[:logger]
  @connected = false
  self.next_local_id = nil

  connect if options[:connect]
  Connection.current = self
end

Public Instance Methods

cancel_order(*local_ids) click to toggle source

Cancel Orders by their local ids (convenience wrapper for send_message :CancelOrder).

# File lib/ib/connection.rb, line 314
def cancel_order *local_ids
  local_ids.each do |local_id|
    send_message :CancelOrder, :local_id => local_id.to_i
  end
end
clear_received(*message_types) click to toggle source

Clear received messages Hash

# File lib/ib/connection.rb, line 172
def clear_received *message_types
  @receive_lock.synchronize do
    if message_types.empty?
      received.each { |message_type, container| container.clear }
    else
      message_types.each { |message_type| received[message_type].clear }
    end
  end
end
close()
Alias for: disconnect
connect() click to toggle source

Working with connection

# File lib/ib/connection.rb, line 61
def connect
  error "Already connected!" if connected?

  # TWS always sends NextValidId message at connect - save this id
  self.subscribe(:NextValidId) do |msg|
    self.next_local_id = msg.local_id
    log.info "Got next valid order id: #{next_local_id}."
  end

  @socket = IBSocket.open(options[:host], options[:port])

  # Secret handshake
  @client_version = options[:client_version]
  socket.write_data @client_version
  @server_version = socket.read_int
  if @server_version < options[:server_version]
    error "Server version #{@server_version}, #{options[:server_version]} required."
  end
  @remote_connect_time = socket.read_string
  @local_connect_time = Time.now

  # Sending (arbitrary) client ID to identify subsequent communications.
  # The client with a client_id of 0 can manage the TWS-owned open orders.
  # Other clients can only manage their own open orders.
  @client_id = options[:client_id] || random_id
  socket.write_data @client_id

  @connected = true
  log.info "Connected to server, ver: #{@server_version}, connection time: " +
    "#{@local_connect_time} local, " +
    "#{@remote_connect_time} remote."

  start_reader if options[:reader] # Allows reconnect
end
Also aliased as: open
connected?() click to toggle source
# File lib/ib/connection.rb, line 111
def connected?
  @connected
end
disconnect() click to toggle source
# File lib/ib/connection.rb, line 98
def disconnect
  if reader_running?
    @reader_running = false
    @reader.join
  end
  if connected?
    socket.close
    @connected = false
  end
end
Also aliased as: close
dispatch(what, *args)
Alias for: send_message
modify_order(order, contract) click to toggle source

Modify Order (convenience wrapper for send_message :PlaceOrder). Returns order_id.

# File lib/ib/connection.rb, line 309
def modify_order order, contract
  order.modify contract, self
end
open()
Alias for: connect
place_order(order, contract) click to toggle source

Place Order (convenience wrapper for send_message :PlaceOrder). Assigns client_id and order_id fields to placed order. Returns assigned order_id.

# File lib/ib/connection.rb, line 304
def place_order order, contract
  order.place contract, self  if order.is_a? IB::Order
end
process_message() click to toggle source

Process single incoming message (blocking!)

# File lib/ib/connection.rb, line 254
def process_message
  msg_id = socket.read_int # This read blocks!

  # Debug:
  log.debug "Got message #{msg_id} (#{Messages::Incoming::Classes[msg_id]})"

  # Create new instance of the appropriate message type,
  # and have it read the message from socket.
  # NB: Failure here usually means unsupported message type received
  error "Got unsupported message #{msg_id}" unless Messages::Incoming::Classes[msg_id]
  msg = Messages::Incoming::Classes[msg_id].new(socket)

  # Deliver message to all registered subscribers, alert if no subscribers
  @subscribe_lock.synchronize do
    subscribers[msg.class].each { |_, subscriber| subscriber.call(msg) }
  end
  log.warn "No subscribers for message #{msg.class}!" if subscribers[msg.class].empty?

  # Collect all received messages into a @received Hash
  if options[:received]
    @receive_lock.synchronize do
      received[msg.message_type] << msg
    end
  end
end
process_messages(poll_time = 200) click to toggle source

Process incoming messages during poll_time (200) msecs, nonblocking

# File lib/ib/connection.rb, line 245
def process_messages poll_time = 200 # in msec
  time_out = Time.now + poll_time/1000.0
  while (time_left = time_out - Time.now) > 0
    # If socket is readable, process single incoming message
    process_message if select [socket], nil, nil, time_left
  end
end
reader_running?() click to toggle source
# File lib/ib/connection.rb, line 240
def reader_running?
  @reader_running && @reader && @reader.alive?
end
received() click to toggle source

Hash of received messages, keyed by message type

# File lib/ib/connection.rb, line 183
def received
  @received ||= Hash.new { |hash, message_type| hash[message_type] = Array.new }
end
received?(message_type, times=1) click to toggle source

Check if messages of given type were received at_least n times

# File lib/ib/connection.rb, line 188
def received? message_type, times=1
  @receive_lock.synchronize do
    received[message_type].size >= times
  end
end
satisfied?(*conditions) click to toggle source

Check if all given conditions are satisfied

# File lib/ib/connection.rb, line 195
def satisfied? *conditions
  !conditions.empty? &&
  conditions.inject(true) do |result, condition|
    result && if condition.is_a?(Symbol)
    received?(condition)
    elsif condition.is_a?(Array)
      received?(*condition)
    elsif condition.respond_to?(:call)
      condition.call
    else
      error "Unknown wait condition #{condition}"
    end
  end
end
send_message(what, *args) click to toggle source

Send an outgoing message.

# File lib/ib/connection.rb, line 283
def send_message what, *args
  message =
  case
  when what.is_a?(Messages::Outgoing::AbstractMessage)
    what
  when what.is_a?(Class) && what < Messages::Outgoing::AbstractMessage
    what.new *args
  when what.is_a?(Symbol)
    Messages::Outgoing.const_get(what).new *args

  else
    error "Only able to send outgoing IB messages", :args
  end
  error "Not able to send messages, IB not connected!" unless connected?
  message.send_to socket
end
Also aliased as: dispatch
start_reader() click to toggle source

Start reader thread that continuously reads messages from @socket in background. If you don't start reader, you should manually poll @socket for messages or use process_messages(msec) API.

# File lib/ib/connection.rb, line 232
def start_reader
  Thread.abort_on_exception = true
  @reader_running = true
  @reader = Thread.new do
    process_messages while @reader_running
  end
end
subscribe(*args, &block) click to toggle source

Subscribe Proc or block to specific type(s) of incoming message events. Listener will be called later with received message instance as its argument. Returns subscriber id to allow unsubscribing

# File lib/ib/connection.rb, line 120
def subscribe *args, &block
  @subscribe_lock.synchronize do
    subscriber = args.last.respond_to?(:call) ? args.pop : block
    id = random_id

    error "Need subscriber proc or block", :args unless subscriber.is_a? Proc

    args.each do |what|
      message_classes =
      case
      when what.is_a?(Class) && what < Messages::Incoming::AbstractMessage
        [what]
      when what.is_a?(Symbol)
        [Messages::Incoming.const_get(what)]
      when what.is_a?(Regexp)
        Messages::Incoming::Classes.values.find_all { |klass| klass.to_s =~ what }
      else
        error "#{what} must represent incoming IB message class", :args
      end
      message_classes.flatten.each do |message_class|
        # TODO: Fix: RuntimeError: can't add a new key into hash during iteration
        subscribers[message_class][id] = subscriber
      end
    end
    id
  end
end
subscribers() click to toggle source

Message subscribers. Key is the message class to listen for. Value is a Hash of subscriber Procs, keyed by their subscription id. All subscriber Procs will be called with the message instance as an argument when a message of that type is received.

# File lib/ib/connection.rb, line 165
def subscribers
  @subscribers ||= Hash.new { |hash, subs| hash[subs] = Hash.new }
end
unsubscribe(*ids) click to toggle source

Remove all subscribers with specific subscriber id (TODO: multiple ids)

# File lib/ib/connection.rb, line 149
def unsubscribe *ids
  @subscribe_lock.synchronize do
    removed = []
    ids.each do |id|
      removed_at_id = subscribers.map { |_, subscribers| subscribers.delete id }.compact
      error "No subscribers with id #{id}" if removed_at_id.empty?
      removed << removed_at_id
    end
    removed.flatten
  end
end
wait_for(*args, &block) click to toggle source

Wait for specific condition(s) - given as callable/block, or message type(s) - given as Symbol or [Symbol, times] pair. Timeout after given time or 1 second.

# File lib/ib/connection.rb, line 213
def wait_for *args, &block
  timeout = args.find { |arg| arg.is_a? Numeric } # extract timeout from args
  end_time = Time.now + (timeout || 1) # default timeout 1 sec
  conditions = args.delete_if { |arg| arg.is_a? Numeric }.push(block).compact

  until end_time < Time.now || satisfied?(*conditions)
    if @reader
      sleep 0.05
    else
      process_messages 50
    end
  end
end

Protected Instance Methods

random_id() click to toggle source
# File lib/ib/connection.rb, line 322
def random_id
  rand 999999999
end