module PubSubTie::Publisher
Public Instance Methods
batch(event_sym, messages, resource)
click to toggle source
# File lib/pubsub_tie/publisher.rb, line 39 def batch(event_sym, messages, resource) topic = @pubsub. topic(Events.full_name event_sym) messages.each do |data| message = augmented(data, event_sym) topic.publish_async(payload(validate_data(event_sym, message), resource), publish_time: Time.now.utc) do |result| unless result.succeeded? Rails.logger.error( "Failed to publish #{message} to #{event_sym} on #{resource} due to #{result.error}") end end end topic.async_publisher.stop.wait! end
configure(config)
click to toggle source
# File lib/pubsub_tie/publisher.rb, line 7 def configure(config) @pubsub = google_pubsub(config) end
google_pubsub(config)
click to toggle source
# File lib/pubsub_tie/publisher.rb, line 11 def google_pubsub(config) keyfile = File.join(PubSubTie.app_root, 'config', config['keyfile']) creds = Google::Cloud::PubSub::Credentials.new keyfile Google::Cloud::PubSub.new(project_id: config['project_id'], credentials: creds) end
publish(event_sym, data, resource)
click to toggle source
Publishes event data asynchronously to topic inferred from event_sym. Data is augmented with event_name and event_time and validated against loaded configuration
# File lib/pubsub_tie/publisher.rb, line 24 def publish(event_sym, data, resource) message = augmented(data, event_sym) @pubsub. topic(Events.full_name event_sym). # publish(message(payload, resource), publish_time: Time.now.utc) publish_async(payload(validate_data(event_sym, message), resource), publish_time: Time.now.utc) do |result| unless result.succeeded? Rails.logger.error( "Failed to publish #{message} to #{event_sym} on #{resource} due to #{result.error}") end end end
Private Instance Methods
augmented(data, event_sym)
click to toggle source
# File lib/pubsub_tie/publisher.rb, line 76 def augmented(data, event_sym) {event_name: Events.name(event_sym), event_time: Time.current.utc}.merge(data.to_hash.to_options) end
bad_type(field, data)
click to toggle source
# File lib/pubsub_tie/publisher.rb, line 111 def bad_type(field, data) raise ArgumentError.new("Bad type for field #{field} in event #{data}") end
missing_required(sym, data)
click to toggle source
# File lib/pubsub_tie/publisher.rb, line 72 def missing_required(sym, data) Events.required(sym) - data.keys end
payload(data, resource)
click to toggle source
# File lib/pubsub_tie/publisher.rb, line 56 def payload(data, resource) # TODO: embed resource in message data.to_json end
validate_data(sym, data)
click to toggle source
# File lib/pubsub_tie/publisher.rb, line 61 def validate_data(sym, data) missing = missing_required(sym, data) unless missing.empty? raise ArgumentError.new( "Missing event required args for #{sym}: #{missing}") end validate_types(sym, data.slice(*(Events.required(sym) + Events.optional(sym)))) end
validate_type(field, val, data, sym)
click to toggle source
# File lib/pubsub_tie/publisher.rb, line 89 def validate_type(field, val, data, sym) types = Events.types(sym) case val when String bad_type(field, data) unless types[field.to_s] == "STRING" when Integer bad_type(field, data) unless ["INT", "FLOAT", "INT64", "SMALLINT", "INTEGER", "BIGINT", "TINYINT", "BYTEINT"].include? types[field.to_s] when Numeric bad_type(field, data) unless ["DECIMAL", "BIGDECIMAL", "FLOAT", "FLOAT64"].include? types[field.to_s] when Time bad_type(field, data) unless types[field.to_s] == "TIMESTAMP" when DateTime bad_type(field, data) unless types[field.to_s] == "DATETIME" when Array bad_type(field, data) unless Events.repeated(sym).include? field val.each {|elem| validate_type(field, elem, data, sym) } else bad_type(field, data) end end
validate_types(sym, data)
click to toggle source
# File lib/pubsub_tie/publisher.rb, line 81 def validate_types(sym, data) data.each do |field, val| validate_type(field, val, data, sym) end data end