class GooglePubsubEnhancer
Constants
- VERSION
Public Class Methods
name_by(type, name)
click to toggle source
# File lib/google_pubsub_enhancer.rb, line 13 def name_by(type, name) raise unless %w(topics subscriptions).include?(type) "projects/#{pubsub_config['project_id']}/#{type}/#{name}" end
new(logger: Logger.new(STDOUT),&block)
click to toggle source
# File lib/google_pubsub_enhancer.rb, line 26 def initialize(logger: Logger.new(STDOUT),&block) @logger = logger @stack = ::Middleware::Builder.new(&block).__send__(:to_app) end
pubsub_config()
click to toggle source
# File lib/google_pubsub_enhancer.rb, line 18 def pubsub_config key = ::Google::Cloud::Pubsub::Credentials::JSON_ENV_VARS.find { |n| !ENV[n].nil? } @pubsub_config ||= JSON.parse(ENV[key]) rescue => ex raise Exception, "Environment not setted properly" end
Public Instance Methods
run(subscription_short_name, opts={})
click to toggle source
# File lib/google_pubsub_enhancer.rb, line 31 def run(subscription_short_name, opts={}) configurated_options = configurate_options(opts) subscription = create_subscription(subscription_short_name) work(subscription, configurated_options) rescue => ex @logger.error "Retry: #{ex} " retry end
Private Instance Methods
acknowledge(subscription,env)
click to toggle source
# File lib/google_pubsub_enhancer.rb, line 54 def acknowledge(subscription,env) acked_messages = env[:received_messages] - env[:nacked_messages] begin subscription.acknowledge(acked_messages) rescue acked_messages.each do |msg| @logger.error "Retried acked message was: #{msg}" end retry end end
configurate_options(opts)
click to toggle source
# File lib/google_pubsub_enhancer.rb, line 72 def configurate_options(opts) raise unless opts.is_a?(Hash) opts[:shutdown] ||= proc { } opts end
create_subscription(subscription_short_name)
click to toggle source
# File lib/google_pubsub_enhancer.rb, line 66 def create_subscription(subscription_short_name) Google::Cloud::Pubsub.new.subscription(self.class.name_by('subscriptions', subscription_short_name)) rescue => ex raise Exception, 'Problem with subscription. Check spelling or permissions!' end
work(subscription, opts)
click to toggle source
# File lib/google_pubsub_enhancer.rb, line 42 def work(subscription, opts) return if opts[:shutdown].call while received_messages = subscription.pull(:max => GooglePubsubEnhancer::Constants::MAX_PULL_SIZE) break if opts[:shutdown].call || received_messages == nil next if received_messages.empty? @logger.debug{"#{received_messages.length} messages received"} env = {received_messages: received_messages, nacked_messages: []} @stack.call(env) acknowledge(subscription,env) end end