module NoBrainer::Streams

Private Class Methods

streams_connection() click to toggle source
# File lib/nobrainer_streams.rb, line 61
def self.streams_connection
  @@streams_connection ||= NoBrainer::ConnectionManager.get_new_connection.raw
end

Private Instance Methods

nobrainer_cursors() click to toggle source
# File lib/nobrainer_streams.rb, line 57
def nobrainer_cursors
  @_nobrainer_cursors ||= []
end
nobrainer_stop_all_streams() click to toggle source
# File lib/nobrainer_streams.rb, line 46
def nobrainer_stop_all_streams
  nobrainer_cursors.each do |cursor|
    begin
      logger.info "Closing cursor: #{cursor.inspect}"
      cursor.close
    rescue => e
      logger.error "Could not close cursor: #{e.message}\n#{e.backtrace.join("\n")}"
    end
  end
end
nobrainer_stream_from(query, options = {}, callback = nil) click to toggle source
# File lib/nobrainer_streams.rb, line 26
def nobrainer_stream_from(query, options = {}, callback = nil)
  callback ||= -> (changes) do
    transmit changes, via: "streamed from #{query.inspect}"
  end

  deserialize = -> (changes) do
    klass = query.model
    old_val = changes['old_val']
    new_val = changes['new_val']
    changes['old_val'] = klass.new(old_val)  if old_val
    changes['new_val'] = klass.new(new_val)  if new_val
    callback.call(changes)
  end

  # defer_subscription_confirmation!
  connection = NoBrainer::Streams::streams_connection
  cursor = query.to_rql.changes(options).async_run(connection, ConcurrentAsyncHandler, &deserialize)
  nobrainer_cursors << cursor
end
stop_all_streams() click to toggle source
Calls superclass method
# File lib/nobrainer_streams.rb, line 21
def stop_all_streams
  nobrainer_stop_all_streams
  super
end
stream_from(query, options = {}, callback = nil) click to toggle source
Calls superclass method
# File lib/nobrainer_streams.rb, line 13
def stream_from(query, options = {}, callback = nil)
  if query.respond_to? :to_rql
    nobrainer_stream_from(query, options, callback)
  else
    super(query, options, callback)
  end
end