class Bitflyer::Realtime::WebSocketClient

Attributes

channel_callbacks[RW]
channel_names[RW]
disconnected[RW]
last_ping_at[RW]
last_pong_at[RW]
ping_interval[RW]
ping_timeout[RW]
ready[RW]
websocket_client[RW]

Public Class Methods

new(host:, key:, secret:, debug: false) click to toggle source
# File lib/bitflyer/realtime/websocket.rb, line 13
def initialize(host:, key:, secret:, debug: false)
  @host = host
  @key = key
  @secret = secret
  @debug = debug
  @channel_names = []
  @channel_callbacks = {}
  connect
  start_monitoring
end

Public Instance Methods

authenticate() click to toggle source
# File lib/bitflyer/realtime/websocket.rb, line 127
def authenticate
  debug_log 'Authenticate'
  timestamp = Time.now.to_i
  nonce = Random.new.bytes(16).unpack1('H*')
  signature = OpenSSL::HMAC.hexdigest(OpenSSL::Digest.new('sha256'), @secret, timestamp.to_s + nonce)
  auth_params = {
    api_key: @key,
    timestamp: timestamp,
    nonce: nonce,
    signature: signature
  }
  @websocket_client.send "420#{['auth', auth_params].to_json}"
end
authenticated(json:) click to toggle source
# File lib/bitflyer/realtime/websocket.rb, line 141
def authenticated(json:)
  raise "Authentication failed: #{json}" if json != '[null]'

  debug_log 'Authenticated'
  subscribe_channels
  @ready&.call
end
connect() click to toggle source
# File lib/bitflyer/realtime/websocket.rb, line 31
def connect
  @websocket_client = WebSocket::Client::Simple.connect "#{@host}/socket.io/?transport=websocket"
  this = self
  @websocket_client.on(:message) { |payload| this.handle_message(payload: payload) }
  @websocket_client.on(:error) { |error| this.handle_error(error: error) }
  @websocket_client.on(:close) { |error| this.handle_close(error: error) }
rescue SocketError => e
  puts e
  puts e.backtrace.join("\n")
end
debug_log(message) click to toggle source
# File lib/bitflyer/realtime/websocket.rb, line 173
def debug_log(message)
  return unless @debug

  p message
end
disconnect() click to toggle source
# File lib/bitflyer/realtime/websocket.rb, line 161
def disconnect
  debug_log 'Disconnecting from server...'
  @websocket_client.close
end
emit_message(json:) click to toggle source
# File lib/bitflyer/realtime/websocket.rb, line 166
def emit_message(json:)
  channel_name, *messages = JSON.parse json
  return unless channel_name

  messages.each { |message| @channel_callbacks[channel_name.to_sym]&.call(message) }
end
handle_close(error:) click to toggle source
# File lib/bitflyer/realtime/websocket.rb, line 108
def handle_close(error:)
  debug_log error
  @disconnected&.call(error)
end
handle_error(error:) click to toggle source
# File lib/bitflyer/realtime/websocket.rb, line 82
def handle_error(error:)
  debug_log error
  return unless error.is_a? Errno::ECONNRESET

  @disconnected&.call(error)
  reconnect
end
handle_message(payload:) click to toggle source
# File lib/bitflyer/realtime/websocket.rb, line 90
def handle_message(payload:) # rubocop:disable Metrics/CyclomaticComplexity
  debug_log payload.data
  return unless payload.data =~ /^\d+/

  code, body = payload.data.scan(/^(\d+)(.*)$/)[0]

  case code.to_i
  when 0 then setup_by_response(json: body)
  when 3 then receive_pong
  when 41 then disconnect
  when 42 then emit_message(json: body)
  when 430 then authenticated(json: body)
  end
rescue StandardError => e
  puts e
  puts e.backtrace.join("\n")
end
receive_pong() click to toggle source
# File lib/bitflyer/realtime/websocket.rb, line 156
def receive_pong
  debug_log 'Received pong'
  @last_pong_at = Time.now.to_i
end
reconnect() click to toggle source
# File lib/bitflyer/realtime/websocket.rb, line 73
def reconnect
  return if @websocket_client&.open?

  debug_log 'Reconnecting...'

  @websocket_client.close if @websocket_client.open?
  connect
end
send_ping() click to toggle source
# File lib/bitflyer/realtime/websocket.rb, line 56
def send_ping
  return unless @last_ping_at && @ping_interval
  return unless Time.now.to_i - @last_ping_at > @ping_interval / 1000

  debug_log 'Sent ping'
  @websocket_client.send '2'
  @last_ping_at = Time.now.to_i
end
setup_by_response(json:) click to toggle source
# File lib/bitflyer/realtime/websocket.rb, line 113
def setup_by_response(json:)
  body = JSON.parse json
  @ping_interval = body['pingInterval'].to_i || 25_000
  @ping_timeout  = body['pingTimeout'].to_i || 60_000
  @last_ping_at = Time.now.to_i
  @last_pong_at = Time.now.to_i
  if @key && @secret
    authenticate
  else
    subscribe_channels
    @ready&.call
  end
end
start_monitoring() click to toggle source
# File lib/bitflyer/realtime/websocket.rb, line 42
def start_monitoring
  Thread.new do
    loop do
      sleep 1
      if @websocket_client&.open?
        send_ping
        wait_pong
      else
        reconnect
      end
    end
  end
end
subscribe(channel_name:, &block) click to toggle source
# File lib/bitflyer/realtime/websocket.rb, line 24
def subscribe(channel_name:, &block)
  debug_log "Subscribe #{channel_name}"
  @channel_names = (@channel_names + [channel_name]).uniq
  @channel_callbacks[channel_name] = block
  @websocket_client.send "42#{['subscribe', channel_name].to_json}"
end
subscribe_channels() click to toggle source
# File lib/bitflyer/realtime/websocket.rb, line 149
def subscribe_channels
  @channel_callbacks.each do |channel_name, _|
    debug_log "42#{{ subscribe: channel_name }.to_json}"
    @websocket_client.send "42#{['subscribe', channel_name].to_json}"
  end
end
wait_pong() click to toggle source
# File lib/bitflyer/realtime/websocket.rb, line 65
def wait_pong
  return unless @last_pong_at && @ping_timeout
  return unless Time.now.to_i - @last_pong_at > (@ping_interval + @ping_timeout) / 1000

  debug_log 'Timed out waiting pong'
  @websocket_client.close
end