class Akasha::Storage::HttpEventStore::Stream

HTTP Eventstore stream.

Attributes

name[R]

Public Class Methods

new(client, stream_name, max_retries: 0) click to toggle source

Create a stream object for accessing a ES stream. Does not create the underlying stream itself. Use the `max_retries` option to choose how many times to retry in case of network failures.

# File lib/akasha/storage/http_event_store/stream.rb, line 12
def initialize(client, stream_name, max_retries: 0)
  @client = client
  @name = stream_name
  @max_retries = max_retries
end

Public Instance Methods

metadata() click to toggle source

Reads stream metadata.

# File lib/akasha/storage/http_event_store/stream.rb, line 49
def metadata
  @client.retry_read_metadata(@name, max_retries: @max_retries)
end
metadata=(metadata) click to toggle source

Updates stream metadata.

# File lib/akasha/storage/http_event_store/stream.rb, line 54
def metadata=(metadata)
  @client.retry_write_metadata(@name, metadata, max_retries: @max_retries)
end
read_events(start, page_size, poll: 0) { |events| ... } click to toggle source

Reads events from the stream starting from `start` inclusive. If block given, reads all events from the position in pages of `page_size`. If block not given, reads `size` events from the position. You can also turn on long-polling using `poll` and setting it to the number of seconds to wait for.

# File lib/akasha/storage/http_event_store/stream.rb, line 34
def read_events(start, page_size, poll: 0)
  if block_given?
    position = start
    loop do
      events = read_events(position, page_size, poll: poll)
      return if events.empty?
      yield(events)
      position += events.size
    end
  else
    @client.retry_read_events_forward(@name, start, page_size, poll, max_retries: @max_retries)
  end
end
write_events(events, revision: nil) click to toggle source

Appends `events` to the stream. You can specify `revision` to use optimistic concurrency control:

- nil  - just append, no concurrency control,
- -1   - the stream doesn't exist,
- >= 0 - expected revision of the last event in stream.
# File lib/akasha/storage/http_event_store/stream.rb, line 23
def write_events(events, revision: nil)
  return if events.empty?
  expected_version = revision.nil? ? -2 : revision
  @client.retry_append_to_stream(@name, events, expected_version, max_retries: @max_retries)
end