class Redis::Stream::Wrapper

Constants

VERSION

Public Class Methods

new(redis, read_timeout_ms = 1000) click to toggle source

Creates a new instance if a Stream.

@param redis - An instance of Redis. @param read_timeout_ms - The read timeout granularity.

# File lib/redis/stream/wrapper.rb, line 12
def initialize(redis, read_timeout_ms = 1000)
  @redis = redis
  @listening = false
  @read_timeout_ms = read_timeout_ms
end

Public Instance Methods

ack_message(group, message) click to toggle source

ACK stream message.

@param group - The group that ack @param message - The message to add to the stream

# File lib/redis/stream/wrapper.rb, line 79
def ack_message(group, message)
  @redis.xack(message.stream, group, message.id)
end
add_message(message) click to toggle source

Adds a new message to the stream.

@param message - The message to add to the stream @return - Message with new id (if default was used)

# File lib/redis/stream/wrapper.rb, line 31
def add_message(message)
  copy_message(message, @redis.xadd(message.stream, message.payload, id: message.id))
end
clear_stream!(stream_name) click to toggle source

Deletes the stream.

@param stream_name - The name of the stream to delete

# File lib/redis/stream/wrapper.rb, line 22
def clear_stream!(stream_name)
  @redis.del(stream_name)
end
create_group(name, stream, start = '$', create_default_stream = true) click to toggle source

Create group stream message.

@param name - The group name @param stream - The concerned stream @param start - The start stream ($ is only new messages) @param create_default_stream - Bool to create a stream if it does not exist

# File lib/redis/stream/wrapper.rb, line 98
def create_group(name, stream, start = '$', create_default_stream = true)
  @redis.xgroup(:create, stream, name, start, mkstream: create_default_stream)
end
delete_group(name, stream) click to toggle source

Delete group stream message.

@param name - The group name @param stream - The concerned stream

# File lib/redis/stream/wrapper.rb, line 117
def delete_group(name, stream)
  @redis.xgroup(:destroy, stream, name)
end
delete_group_consumer(name, stream, consumer) click to toggle source

Delete group stream message.

@param name - The group name @param stream - The concerned stream @param consumer - The consumer name

# File lib/redis/stream/wrapper.rb, line 127
def delete_group_consumer(name, stream, consumer)
  @redis.xgroup(:delconsumer, stream, name, consumer)
end
delete_message(message) click to toggle source

Delete stream message.

@param message - The message to add to the stream

# File lib/redis/stream/wrapper.rb, line 87
def delete_message(message)
  @redis.xdel(message.stream, message.id)
end
info(type, key, group = nil) click to toggle source

Get info about streams / groups and consumers.

@param type - The type @param key - The concerned stream / group name @param group - The group name for consumer type

# File lib/redis/stream/wrapper.rb, line 108
def info(type, key, group = nil)
  @redis.xinfo(type, key, group)
end
listen(group, consumer_name, streams, opts = {}) { |message| ... } click to toggle source

Starts reading stream messages looping

@param group - A group to read stream @param consumer_name - A consumer name @param streams - A hash {:stream_name => 'stream_begin_value'} @param opts - A hash of options

# File lib/redis/stream/wrapper.rb, line 42
def listen(group, consumer_name, streams, opts = {})
  raise StreamReadError, "Already listening [#{stream}] stream" if @listening

  @listening = true
  opts[:block] = @read_timeout_ms if opts[:block].nil?
  while @listening
    results = @redis.xreadgroup(group, consumer_name, streams.keys, streams.values, opts)
    next unless results

    parse_read_response(results).each do |message|
      yield message
    end
  end
end
read(group, consumer_name, streams, opts = {}) click to toggle source

Starts reading stream messages.

@param group - A group to read stream @param consumer_name - A consumer name @param streams - A hash {:stream_name => 'stream_begin_value'} @param opts - A hash of options

# File lib/redis/stream/wrapper.rb, line 64
def read(group, consumer_name, streams, opts = {})
  opts[:block] = @read_timeout_ms if opts[:block].nil?
  results = @redis.xreadgroup(group, consumer_name, streams.keys, streams.values, opts)
  return unless results

  parse_read_response(results).each.map do |message|
    message
  end
end
stop_listening() click to toggle source

Stops reading message stream(s)

# File lib/redis/stream/wrapper.rb, line 133
def stop_listening
  @listening = false
end

Private Instance Methods

copy_message(message, new_id) click to toggle source

Returns a copy of the current instance, with the id set.

# File lib/redis/stream/wrapper.rb, line 161
def copy_message(message, new_id)
  message.new(
    id: new_id,
    stream: message.stream,
    payload: message.payload
  )
end
parse_payload(payload) click to toggle source
# File lib/redis/stream/wrapper.rb, line 151
def parse_payload(payload)
  if payload.is_a? Array
    Hash[payload.each_slice(2).to_a]
  else
    payload
  end
end
parse_read_response(results) click to toggle source
# File lib/redis/stream/wrapper.rb, line 139
def parse_read_response(results)
  results.map do |stream_name, messages|
    messages.map do |id, payload|
      Message.new(
        stream: stream_name,
        payload: parse_payload(payload),
        id: id
      )
    end
  end.flatten.compact
end