class RTCBX::Candles

Attributes

bucket_thread[R]

This thread monitors the websocket object and puts each match object into the proper bucket. This thread maintains the buckets object.

buckets[R]

A hash of buckets Each key is an epoch which stores every match message for that minute (The epoch plus 60 seconds) Each minute interval is a bucket, which is used to calculate that minute's Candle

buckets_lock[R]

Mutex to allow our two threads to produce and consume buckets

candle_thread[R]

The candle_thread consumes the buckets created by the bucket_thread in buckets and turns them into Candle objects. These are then appended to the candles array. This functionality could be improved. Ideally you're consuming this array into a database to keep history in realtime.

candles[R]

An array of generated candles. You should process these by putting them into a database and removing them from the array. If you want to help me abstract this to a pluggable database system, open an issue.

current_bucket[R]

The epoch representing the current bucket

first_bucket[R]

The epoch of the first bucket

initial_time[R]

The first full minute that we can collect for. (Time object)

Public Class Methods

new(options = {}, &block) click to toggle source

Create a new Candles object to start and track candles Pass a block to run a block whenever a candle is created.

Calls superclass method RTCBX::new
# File lib/rtcbx/candles.rb, line 44
def initialize(options = {}, &block)
  super(options, &block)
  @buckets_lock = Mutex.new
end

Public Instance Methods

start!() click to toggle source

Start tracking candles

Calls superclass method RTCBX#start!
# File lib/rtcbx/candles.rb, line 50
def start!
  super
  #
  # Calculate the first minute to start relying on just the websocket for
  # data.
  #
  @initial_time = Time.now
  @first_bucket = initial_time.to_i + (60 - initial_time.sec)

  start_bucket_thread
  start_candle_thread
end

Private Instance Methods

start_bucket_thread() click to toggle source

Start the thread to create buckets

# File lib/rtcbx/candles.rb, line 66
def start_bucket_thread
  @bucket_thread = Thread.new do
    @buckets = {}
    @current_bucket = first_bucket
    @buckets[current_bucket.to_i] = []

    loop do
      message = queue.pop
      next unless message.fetch('type') == 'match'

      next unless Time.parse(message.fetch('time')) >= Time.at(first_bucket)

      timestamp = Time.parse(message.fetch('time'))
      message_bucket = timestamp.to_i - timestamp.sec
      @buckets_lock.synchronize do
        if message_bucket >= current_bucket
          @current_bucket = message_bucket
          @buckets[current_bucket.to_i] = []
          @buckets[current_bucket.to_i] << message
        else
          @buckets[current_bucket.to_i] << message
        end
      end
    end
  end
end
start_candle_thread() click to toggle source

Start the thread to consume buckets to Candle objects

# File lib/rtcbx/candles.rb, line 94
def start_candle_thread
  @candle_thread = Thread.new do
    @candles = []
    sleep(60 - Time.now.sec)
    loop do
      buckets.keys.each do |key|
        next unless key + 60 <= Time.now.to_i

        @buckets_lock.synchronize do
          candle = Candle.new(key, buckets[key]) unless buckets[key].empty?
          @candles << candle
          # Run candle callback
          #
          @message_callbacks.each { |c| c.call(candle) }
          buckets.delete(key)
        end
      end

      sleep(60 - Time.now.sec)
    end
  end
end