class Akasha::Storage::HttpEventStore::ProjectionManager

Manages HTTP ES projections.

Public Class Methods

new(client) click to toggle source
# File lib/akasha/storage/http_event_store/projection_manager.rb, line 6
def initialize(client)
  @client = client
end

Public Instance Methods

merge_all_by_event(name, event_names, namespace: nil) click to toggle source

Merges all streams into one, filtering the resulting stream so it only contains events with the specified names, using a projection.

Arguments:

`name` - name of the projection stream
`event_names` - array of event names
`namespace` - optional namespace; if provided, the resulting stream will
              only contain events with the same metadata.namespace
# File lib/akasha/storage/http_event_store/projection_manager.rb, line 19
def merge_all_by_event(name, event_names, namespace: nil)
  attempt_create_projection(name, event_names, namespace) ||
    update_projection(name, event_names, namespace)
end

Private Instance Methods

attempt_create_projection(name, event_names, namespace) click to toggle source
# File lib/akasha/storage/http_event_store/projection_manager.rb, line 62
def attempt_create_projection(name, event_names, namespace)
  create_options = {
    name: name,
    emit: :yes,
    checkpoints: :yes,
    enabled: :yes
  }
  query_string = Rack::Utils.build_query(create_options)
  @client.request(:post, "/projections/continuous?#{query_string}",
                  projection_javascript(name, event_names, namespace),
                  'Content-Type' => 'application/javascript')
  true
rescue HttpClientError => e
  return false if e.status_code == 409
  raise
end
projection_javascript(name, events, namespace) click to toggle source
# File lib/akasha/storage/http_event_store/projection_manager.rb, line 26
        def projection_javascript(name, events, namespace)
          callback_fmt = if namespace.nil?
                           <<~JS
                             '%{en}': function(s, e) {
                               linkTo('%{name}', e)
                             }
                           JS
                         else
                           <<~JS
                             '%{en}': function(s, e) {
                               if (e['metadata'] !== null && e['metadata']['namespace'] === '%{namespace}') {
                                 linkTo('%{name}', e)
                               }
                             }
                           JS
                         end
          callbacks = events.map { |en| format(callback_fmt, en: en, name: name, namespace: namespace) }

          # Alternative code using internal indexing.
          # It's broken though because it reorders events for aggregates (because the streams
          # it uses are per-event). An alternative would be to use aggregates as streams
          # to pull from.
          # et_streams = events.map { |en| "\"$et-#{en}\"" }
          # "fromStreams([#{et_streams.join(', ')}]).when({ #{callbacks.join(', ')} });"
          <<~JS
            // This is hard to find, so I'm leaving it here:
            // options({
            //   reorderEvents: true,
            //   processingLag: 100 //time in ms
            // });
            fromAll().when({
              #{callbacks.join(', ')}
            });
          JS
        end
update_projection(name, event_names, namespace) click to toggle source
# File lib/akasha/storage/http_event_store/projection_manager.rb, line 79
def update_projection(name, event_names, namespace)
  @client.request(:put, "/projection/#{name}/query?emit=yet",
                  projection_javascript(name, event_names, namespace),
                  'Content-Type' => 'application/javascript')
end