class Redis::Stream::Wrapper
Constants
- VERSION
Public Class Methods
Public Instance Methods
ACK stream message.
@param group - The group that ack @param message - The message to add to the stream
# File lib/redis/stream/wrapper.rb, line 79 def ack_message(group, message) @redis.xack(message.stream, group, message.id) end
Adds a new message to the stream.
@param message - The message to add to the stream @return - Message
with new id (if default was used)
# File lib/redis/stream/wrapper.rb, line 31 def add_message(message) copy_message(message, @redis.xadd(message.stream, message.payload, id: message.id)) end
Deletes the stream.
@param stream_name - The name of the stream to delete
# File lib/redis/stream/wrapper.rb, line 22 def clear_stream!(stream_name) @redis.del(stream_name) end
Create group stream message.
@param name - The group name @param stream - The concerned stream @param start - The start stream ($ is only new messages) @param create_default_stream - Bool to create a stream if it does not exist
# File lib/redis/stream/wrapper.rb, line 98 def create_group(name, stream, start = '$', create_default_stream = true) @redis.xgroup(:create, stream, name, start, mkstream: create_default_stream) end
Delete group stream message.
@param name - The group name @param stream - The concerned stream
# File lib/redis/stream/wrapper.rb, line 117 def delete_group(name, stream) @redis.xgroup(:destroy, stream, name) end
Delete group stream message.
@param name - The group name @param stream - The concerned stream @param consumer - The consumer name
# File lib/redis/stream/wrapper.rb, line 127 def delete_group_consumer(name, stream, consumer) @redis.xgroup(:delconsumer, stream, name, consumer) end
Delete stream message.
@param message - The message to add to the stream
# File lib/redis/stream/wrapper.rb, line 87 def delete_message(message) @redis.xdel(message.stream, message.id) end
Get info about streams / groups and consumers.
@param type - The type @param key - The concerned stream / group name @param group - The group name for consumer type
# File lib/redis/stream/wrapper.rb, line 108 def info(type, key, group = nil) @redis.xinfo(type, key, group) end
Starts reading stream messages looping
@param group - A group to read stream @param consumer_name - A consumer name @param streams - A hash {:stream_name => 'stream_begin_value'} @param opts - A hash of options
# File lib/redis/stream/wrapper.rb, line 42 def listen(group, consumer_name, streams, opts = {}) raise StreamReadError, "Already listening [#{stream}] stream" if @listening @listening = true opts[:block] = @read_timeout_ms if opts[:block].nil? while @listening results = @redis.xreadgroup(group, consumer_name, streams.keys, streams.values, opts) next unless results parse_read_response(results).each do |message| yield message end end end
Starts reading stream messages.
@param group - A group to read stream @param consumer_name - A consumer name @param streams - A hash {:stream_name => 'stream_begin_value'} @param opts - A hash of options
# File lib/redis/stream/wrapper.rb, line 64 def read(group, consumer_name, streams, opts = {}) opts[:block] = @read_timeout_ms if opts[:block].nil? results = @redis.xreadgroup(group, consumer_name, streams.keys, streams.values, opts) return unless results parse_read_response(results).each.map do |message| message end end
Stops reading message stream(s)
# File lib/redis/stream/wrapper.rb, line 133 def stop_listening @listening = false end
Private Instance Methods
Returns a copy of the current instance, with the id set.
# File lib/redis/stream/wrapper.rb, line 161 def copy_message(message, new_id) message.new( id: new_id, stream: message.stream, payload: message.payload ) end
# File lib/redis/stream/wrapper.rb, line 151 def parse_payload(payload) if payload.is_a? Array Hash[payload.each_slice(2).to_a] else payload end end
# File lib/redis/stream/wrapper.rb, line 139 def parse_read_response(results) results.map do |stream_name, messages| messages.map do |id, payload| Message.new( stream: stream_name, payload: parse_payload(payload), id: id ) end end.flatten.compact end