module NoBrainer::Streams
Private Class Methods
streams_connection()
click to toggle source
# File lib/nobrainer_streams.rb, line 61 def self.streams_connection @@streams_connection ||= NoBrainer::ConnectionManager.get_new_connection.raw end
Private Instance Methods
nobrainer_cursors()
click to toggle source
# File lib/nobrainer_streams.rb, line 57 def nobrainer_cursors @_nobrainer_cursors ||= [] end
nobrainer_stop_all_streams()
click to toggle source
# File lib/nobrainer_streams.rb, line 46 def nobrainer_stop_all_streams nobrainer_cursors.each do |cursor| begin logger.info "Closing cursor: #{cursor.inspect}" cursor.close rescue => e logger.error "Could not close cursor: #{e.message}\n#{e.backtrace.join("\n")}" end end end
nobrainer_stream_from(query, options = {}, callback = nil)
click to toggle source
# File lib/nobrainer_streams.rb, line 26 def nobrainer_stream_from(query, options = {}, callback = nil) callback ||= -> (changes) do transmit changes, via: "streamed from #{query.inspect}" end deserialize = -> (changes) do klass = query.model old_val = changes['old_val'] new_val = changes['new_val'] changes['old_val'] = klass.new(old_val) if old_val changes['new_val'] = klass.new(new_val) if new_val callback.call(changes) end # defer_subscription_confirmation! connection = NoBrainer::Streams::streams_connection cursor = query.to_rql.changes(options).async_run(connection, ConcurrentAsyncHandler, &deserialize) nobrainer_cursors << cursor end
stop_all_streams()
click to toggle source
Calls superclass method
# File lib/nobrainer_streams.rb, line 21 def stop_all_streams nobrainer_stop_all_streams super end
stream_from(query, options = {}, callback = nil)
click to toggle source
Calls superclass method
# File lib/nobrainer_streams.rb, line 13 def stream_from(query, options = {}, callback = nil) if query.respond_to? :to_rql nobrainer_stream_from(query, options, callback) else super(query, options, callback) end end