class GooglePubsubEnhancer

Constants

VERSION

Public Class Methods

name_by(type, name) click to toggle source
# File lib/google_pubsub_enhancer.rb, line 13
def name_by(type, name)
  raise unless %w(topics subscriptions).include?(type)
  "projects/#{pubsub_config['project_id']}/#{type}/#{name}"
end
new(logger: Logger.new(STDOUT),&block) click to toggle source
# File lib/google_pubsub_enhancer.rb, line 26
def initialize(logger: Logger.new(STDOUT),&block)
  @logger = logger
  @stack = ::Middleware::Builder.new(&block).__send__(:to_app)
end
pubsub_config() click to toggle source
# File lib/google_pubsub_enhancer.rb, line 18
def pubsub_config
  key = ::Google::Cloud::Pubsub::Credentials::JSON_ENV_VARS.find { |n| !ENV[n].nil? }
  @pubsub_config ||= JSON.parse(ENV[key])
rescue => ex
  raise Exception, "Environment not setted properly"
end

Public Instance Methods

run(subscription_short_name, opts={}) click to toggle source
# File lib/google_pubsub_enhancer.rb, line 31
def run(subscription_short_name, opts={})
  configurated_options = configurate_options(opts)
  subscription = create_subscription(subscription_short_name)
  work(subscription, configurated_options)
rescue => ex
  @logger.error "Retry: #{ex} "
  retry
end

Private Instance Methods

acknowledge(subscription,env) click to toggle source
# File lib/google_pubsub_enhancer.rb, line 54
def acknowledge(subscription,env)
  acked_messages = env[:received_messages] - env[:nacked_messages]
  begin
    subscription.acknowledge(acked_messages)
  rescue 
    acked_messages.each do |msg|
      @logger.error "Retried acked message was: #{msg}"
    end
    retry
  end
end
configurate_options(opts) click to toggle source
# File lib/google_pubsub_enhancer.rb, line 72
def configurate_options(opts)
  raise unless opts.is_a?(Hash)
  opts[:shutdown] ||= proc { }
  opts
end
create_subscription(subscription_short_name) click to toggle source
# File lib/google_pubsub_enhancer.rb, line 66
def create_subscription(subscription_short_name)
  Google::Cloud::Pubsub.new.subscription(self.class.name_by('subscriptions', subscription_short_name))
rescue => ex
  raise Exception, 'Problem with subscription. Check spelling or permissions!'
end
work(subscription, opts) click to toggle source
# File lib/google_pubsub_enhancer.rb, line 42
def work(subscription, opts)
  return if opts[:shutdown].call
  while received_messages = subscription.pull(:max => GooglePubsubEnhancer::Constants::MAX_PULL_SIZE)
    break if opts[:shutdown].call || received_messages == nil
    next if received_messages.empty?
    @logger.debug{"#{received_messages.length} messages received"}
    env = {received_messages: received_messages, nacked_messages: []}
    @stack.call(env)
    acknowledge(subscription,env)
  end
end