class Akasha::Storage::HttpEventStore::Client
Eventstore HTTP client.
Constants
- MAX_RETRY_INTERVAL
An upper limit for a retry interval.
- MIN_RETRY_INTERVAL
A lower limit for a retry interval.
Public Class Methods
new(host: 'localhost', port: 2113, username: nil, password: nil)
click to toggle source
Creates a new client for the host and port with optional username and password for authenticating certain requests.
# File lib/akasha/storage/http_event_store/client.rb, line 29 def initialize(host: 'localhost', port: 2113, username: nil, password: nil) @username = username @password = password @conn = connection(host, port) @serializer = EventSerializer.new end
Public Instance Methods
merge_all_by_event(name, event_names, namespace: nil, max_retries: 0)
click to toggle source
Merges all streams into one, filtering the resulting stream so it only contains events with the specified names, using a projection.
Arguments:
`name` - name of the projection stream `event_names` - array of event names `namespace` - optional namespace; if provided, the resulting stream will only contain events with the same metadata.namespace `max_retries` - how many times to retry in case of network failures
# File lib/akasha/storage/http_event_store/client.rb, line 62 def merge_all_by_event(name, event_names, namespace: nil, max_retries: 0) retrying_on_network_failures(max_retries) do ProjectionManager.new(self).merge_all_by_event(name, event_names, namespace: namespace) end end
request(method, path, body = nil, headers = {})
click to toggle source
Issues a generic request against the API.
# File lib/akasha/storage/http_event_store/client.rb, line 82 def request(method, path, body = nil, headers = {}) body = @conn.public_send(method, path, body, auth_headers.merge(headers)).body return {} if body.empty? body end
retry_append_to_stream(stream_name, events, expected_revision = nil, max_retries: 0)
click to toggle source
Append events to stream, idempotently retrying_on_network_failures
up to `max_retries`
# File lib/akasha/storage/http_event_store/client.rb, line 37 def retry_append_to_stream(stream_name, events, expected_revision = nil, max_retries: 0) retrying_on_network_failures(max_retries) do append_to_stream(stream_name, events, expected_revision) end end
retry_read_events_forward(stream_name, start, count, poll = 0, max_retries: 0)
click to toggle source
Read events from stream, retrying_on_network_failures
up to `max_retries` in case of network failures. Reads `count` events starting from `start` inclusive. Can long-poll for events if `poll` is specified.`
# File lib/akasha/storage/http_event_store/client.rb, line 46 def retry_read_events_forward(stream_name, start, count, poll = 0, max_retries: 0) retrying_on_network_failures(max_retries) do safe_read_events(stream_name, start, count, poll) end end
retry_read_metadata(stream_name, max_retries: 0)
click to toggle source
Reads stream metadata.
# File lib/akasha/storage/http_event_store/client.rb, line 69 def retry_read_metadata(stream_name, max_retries: 0) retrying_on_network_failures(max_retries) do safe_read_metadata(stream_name) end end
retry_write_metadata(stream_name, metadata, max_retries: 0)
click to toggle source
Updates stream metadata.
# File lib/akasha/storage/http_event_store/client.rb, line 76 def retry_write_metadata(stream_name, metadata, max_retries: 0) event = Akasha::Event.new(:stream_metadata_changed, SecureRandom.uuid, metadata) retry_append_to_stream("#{stream_name}/metadata", [event], max_retries: max_retries) end
Private Instance Methods
append_to_stream(stream_name, events, expected_revision)
click to toggle source
# File lib/akasha/storage/http_event_store/client.rb, line 114 def append_to_stream(stream_name, events, expected_revision) @conn.post("/streams/#{stream_name}") do |req| req.headers = { 'Content-Type' => 'application/vnd.eventstore.events+json', 'ES-ExpectedVersion' => expected_revision } req.body = to_event_data(events).to_json end rescue HttpClientError => e raise unless e.status_code == 400 actual_version = e.response_headers['ES-CurrentVersion'] raise Akasha::ConflictError, "Race condition; expected last event version: #{expected_revision} actual: #{actual_version}" end
auth_headers()
click to toggle source
# File lib/akasha/storage/http_event_store/client.rb, line 100 def auth_headers return {} unless @username && @password auth = Base64.urlsafe_encode64([@username, @password].join(':')) { 'Authorization' => "Basic #{auth}" } end
connection(host, port)
click to toggle source
# File lib/akasha/storage/http_event_store/client.rb, line 90 def connection(host, port) Faraday.new do |conn| conn.host = host conn.port = port conn.response :json, content_type: 'application/json' conn.use ResponseHandler conn.adapter :typhoeus end end
handling_read_exceptions(stream_name) { || ... }
click to toggle source
# File lib/akasha/storage/http_event_store/client.rb, line 149 def handling_read_exceptions(stream_name) yield rescue HttpClientError => e raise unless e.status_code == 404 rescue URI::InvalidURIError raise InvalidStreamNameError, "Invalid stream name: #{stream_name}" end
retrying_on_network_failures(max_retries) { || ... }
click to toggle source
# File lib/akasha/storage/http_event_store/client.rb, line 106 def retrying_on_network_failures(max_retries) with_retries(base_sleep_seconds: MIN_RETRY_INTERVAL, max_sleep_seconds: MAX_RETRY_INTERVAL, max_tries: 1 + max_retries, rescue: [Faraday::TimeoutError, Faraday::ConnectionFailed]) do yield end end
safe_read_events(stream_name, start, count, poll)
click to toggle source
# File lib/akasha/storage/http_event_store/client.rb, line 129 def safe_read_events(stream_name, start, count, poll) handling_read_exceptions(stream_name) do resp = @conn.get("/streams/#{stream_name}/#{start}/forward/#{count}") do |req| req.headers = { 'Accept' => 'application/json' } req.headers['ES-LongPoll'] = poll if poll&.positive? req.params['embed'] = 'body' end to_events(resp.body['entries']) end || [] end
safe_read_metadata(stream_name)
click to toggle source
# File lib/akasha/storage/http_event_store/client.rb, line 142 def safe_read_metadata(stream_name) handling_read_exceptions(stream_name) do metadata = request(:get, "/streams/#{stream_name}/metadata", nil, 'Accept' => 'application/json') metadata.symbolize_keys end || {} end
to_event_data(events)
click to toggle source
# File lib/akasha/storage/http_event_store/client.rb, line 157 def to_event_data(events) @serializer.serialize(events) end
to_events(es_events)
click to toggle source
# File lib/akasha/storage/http_event_store/client.rb, line 161 def to_events(es_events) es_events = es_events.map do |ev| ev['data'] &&= JSON.parse(ev['data']) ev['metaData'] &&= JSON.parse(ev['metaData']) ev end @serializer.deserialize(es_events) end