class TableSync::Publishing::Publisher

Constants

DEBOUNCE_TIME

Attributes

debounce_time[R]
original_attributes[R]
state[R]

Public Class Methods

new(object_class, original_attributes, **opts) click to toggle source

'original_attributes' are not published, they are used to resolve the routing key

# File lib/table_sync/publishing/publisher.rb, line 7
def initialize(object_class, original_attributes, **opts)
  @object_class = object_class.constantize
  @original_attributes = filter_safe_for_serialization(original_attributes.deep_symbolize_keys)
  @confirm = opts.fetch(:confirm, true)
  @debounce_time = opts[:debounce_time]&.seconds || DEBOUNCE_TIME
  @state = opts.fetch(:state, :updated).to_sym
  validate_state
end

Public Instance Methods

publish() click to toggle source
# File lib/table_sync/publishing/publisher.rb, line 16
def publish
  return enqueue_job if destroyed? || debounce_time.zero?

  sync_time = Rails.cache.read(cache_key) || current_time - debounce_time - 1.second
  return if sync_time > current_time

  next_sync_time = sync_time + debounce_time
  next_sync_time <= current_time ? enqueue_job : enqueue_job(next_sync_time)
end
publish_now() click to toggle source
# File lib/table_sync/publishing/publisher.rb, line 26
def publish_now
  # Update request and object does not exist -> skip publishing
  return if !object && !destroyed?

  Rabbit.publish(params)
  model_naming = TableSync.publishing_adapter.model_naming(object_class)
  TableSync::Instrument.notify table: model_naming.table, schema: model_naming.schema,
                               event: event, direction: :publish
end

Private Instance Methods

attributes_for_sync() click to toggle source
# File lib/table_sync/publishing/publisher.rb, line 88
        def attributes_for_sync
  if destroyed?
    if object_class.respond_to?(:table_sync_destroy_attributes)
      object_class.table_sync_destroy_attributes(original_attributes)
    else
      original_attributes
    end
  elsif attributes_for_sync_defined?
    object.attributes_for_sync
  else
    TableSync.publishing_adapter.attributes(object)
  end
end
attrs_for_callables() click to toggle source
# File lib/table_sync/publishing/publisher.rb, line 42
def attrs_for_callables
  attributes_for_sync
end
attrs_for_metadata() click to toggle source
# File lib/table_sync/publishing/publisher.rb, line 54
def attrs_for_metadata
  if object.respond_to?(:attrs_for_metadata)
    object.attrs_for_metadata
  else
    attrs_for_callables
  end
end
attrs_for_routing_key() click to toggle source
# File lib/table_sync/publishing/publisher.rb, line 46
def attrs_for_routing_key
  if object.respond_to?(:attrs_for_routing_key)
    object.attrs_for_routing_key
  else
    attrs_for_callables
  end
end
cache_key() click to toggle source
# File lib/table_sync/publishing/publisher.rb, line 114
def cache_key
  "#{object_class}/#{needle}_table_sync_time".delete(" ")
end
created?() click to toggle source
# File lib/table_sync/publishing/publisher.rb, line 122
def created?
  state == :created
end
destroyed?() click to toggle source
# File lib/table_sync/publishing/publisher.rb, line 118
def destroyed?
  state == :destroyed
end
enqueue_job(perform_at = current_time) click to toggle source
# File lib/table_sync/publishing/publisher.rb, line 70
def enqueue_job(perform_at = current_time)
  job = job_class.set(wait_until: perform_at)
  job.perform_later(object_class.name, original_attributes, state: state.to_s, confirm: confirm?)
  Rails.cache.write(cache_key, perform_at)
end
event() click to toggle source
# File lib/table_sync/publishing/publisher.rb, line 106
def event
  destroyed? ? :destroy : :update
end
job_callable() click to toggle source
# File lib/table_sync/publishing/publisher.rb, line 62
def job_callable
  TableSync.publishing_job_class_callable
end
job_callable_error_message() click to toggle source
# File lib/table_sync/publishing/publisher.rb, line 66
def job_callable_error_message
  "Can't publish, set TableSync.publishing_job_class_callable"
end
needle() click to toggle source
# File lib/table_sync/publishing/publisher.rb, line 110
def needle
  original_attributes.slice(*primary_keys)
end
object() click to toggle source
# File lib/table_sync/publishing/publisher.rb, line 102
        def object
  TableSync.publishing_adapter.find(object_class, needle)
end
publishing_data() click to toggle source
# File lib/table_sync/publishing/publisher.rb, line 80
def publishing_data
  {
    **super,
    event: event,
    metadata: { created: created? },
  }
end
routing_key() click to toggle source
# File lib/table_sync/publishing/publisher.rb, line 76
def routing_key
  resolve_routing_key
end
validate_state() click to toggle source
# File lib/table_sync/publishing/publisher.rb, line 126
def validate_state
  raise "Unknown state: #{state.inspect}" unless %i[created updated destroyed].include?(state)
end