class Google::Cloud::PubSub::Subscriber::TimedUnaryBuffer
@private
Attributes
interval[R]
max_bytes[R]
Public Class Methods
new(subscriber, max_bytes: 500_000, interval: 1.0)
click to toggle source
Calls superclass method
# File lib/google/cloud/pubsub/subscriber/timed_unary_buffer.rb, line 31 def initialize subscriber, max_bytes: 500_000, interval: 1.0 super() # to init MonitorMixin @subscriber = subscriber @max_bytes = max_bytes @interval = interval # Using a Hash ensures there is only one entry for each ack_id in # the buffer. Adding an entry again will overwrite the previous # entry. @register = {} @task = Concurrent::TimerTask.new execution_interval: interval do flush! end end
Public Instance Methods
acknowledge(ack_ids)
click to toggle source
# File lib/google/cloud/pubsub/subscriber/timed_unary_buffer.rb, line 48 def acknowledge ack_ids return if ack_ids.empty? synchronize do ack_ids.each do |ack_id| # ack has no deadline set, use :ack indicate it is an ack @register[ack_id] = :ack end end true end
flush!()
click to toggle source
# File lib/google/cloud/pubsub/subscriber/timed_unary_buffer.rb, line 86 def flush! # Grab requests from the buffer and release synchronize ASAP requests = flush_requests! return if requests.empty? # Perform the RCP calls concurrently with_threadpool do |pool| requests[:acknowledge].each do |ack_req| add_future pool do @subscriber.service.acknowledge ack_req.subscription, *ack_req.ack_ids end end requests[:modify_ack_deadline].each do |mod_ack_req| add_future pool do @subscriber.service.modify_ack_deadline mod_ack_req.subscription, mod_ack_req.ack_ids, mod_ack_req.ack_deadline_seconds end end end true end
modify_ack_deadline(deadline, ack_ids)
click to toggle source
# File lib/google/cloud/pubsub/subscriber/timed_unary_buffer.rb, line 61 def modify_ack_deadline deadline, ack_ids return if ack_ids.empty? synchronize do ack_ids.each do |ack_id| @register[ack_id] = deadline end end true end
renew_lease(deadline, ack_ids)
click to toggle source
# File lib/google/cloud/pubsub/subscriber/timed_unary_buffer.rb, line 73 def renew_lease deadline, ack_ids return if ack_ids.empty? synchronize do ack_ids.each do |ack_id| # Don't overwrite pending actions when renewing leased messages. @register[ack_id] ||= deadline end end true end
start()
click to toggle source
# File lib/google/cloud/pubsub/subscriber/timed_unary_buffer.rb, line 109 def start @task.execute self end
started?()
click to toggle source
# File lib/google/cloud/pubsub/subscriber/timed_unary_buffer.rb, line 122 def started? @task.running? end
stop()
click to toggle source
# File lib/google/cloud/pubsub/subscriber/timed_unary_buffer.rb, line 115 def stop @task.shutdown flush! self end
stopped?()
click to toggle source
# File lib/google/cloud/pubsub/subscriber/timed_unary_buffer.rb, line 126 def stopped? !started? end
Private Instance Methods
add_future(pool) { || ... }
click to toggle source
# File lib/google/cloud/pubsub/subscriber/timed_unary_buffer.rb, line 217 def add_future pool Concurrent::Promises.future_on pool do yield rescue StandardError => e error! e end end
create_acknowledge_requests(ack_ids)
click to toggle source
# File lib/google/cloud/pubsub/subscriber/timed_unary_buffer.rb, line 154 def create_acknowledge_requests ack_ids req = Google::Cloud::PubSub::V1::AcknowledgeRequest.new( subscription: subscription_name, ack_ids: ack_ids ) addl_to_create = req.to_proto.bytesize / max_bytes return [req] if addl_to_create.zero? ack_ids.each_slice(addl_to_create + 1).map do |sliced_ack_ids| Google::Cloud::PubSub::V1::AcknowledgeRequest.new( subscription: subscription_name, ack_ids: sliced_ack_ids ) end end
create_modify_ack_deadline_requests(deadline, ack_ids)
click to toggle source
# File lib/google/cloud/pubsub/subscriber/timed_unary_buffer.rb, line 170 def create_modify_ack_deadline_requests deadline, ack_ids req = Google::Cloud::PubSub::V1::ModifyAckDeadlineRequest.new( subscription: subscription_name, ack_ids: ack_ids, ack_deadline_seconds: deadline ) addl_to_create = req.to_proto.bytesize / max_bytes return [req] if addl_to_create.zero? ack_ids.each_slice(addl_to_create + 1).map do |sliced_ack_ids| Google::Cloud::PubSub::V1::ModifyAckDeadlineRequest.new( subscription: subscription_name, ack_ids: sliced_ack_ids, ack_deadline_seconds: deadline ) end end
error!(error)
click to toggle source
# File lib/google/cloud/pubsub/subscriber/timed_unary_buffer.rb, line 196 def error! error @subscriber.error! error end
flush_requests!()
click to toggle source
# File lib/google/cloud/pubsub/subscriber/timed_unary_buffer.rb, line 132 def flush_requests! prev_reg = synchronize do return {} if @register.empty? reg = @register @register = {} reg end groups = prev_reg.each_pair.group_by { |_ack_id, delay| delay } req_hash = groups.transform_values { |v| v.map(&:first) } requests = { acknowledge: [] } ack_ids = Array(req_hash.delete(:ack)) # ack has no deadline set requests[:acknowledge] = create_acknowledge_requests ack_ids if ack_ids.any? requests[:modify_ack_deadline] = req_hash.map do |mod_deadline, mod_ack_ids| create_modify_ack_deadline_requests mod_deadline, mod_ack_ids end.flatten requests end
push_threads()
click to toggle source
# File lib/google/cloud/pubsub/subscriber/timed_unary_buffer.rb, line 192 def push_threads @subscriber.push_threads end
subscription_name()
click to toggle source
# File lib/google/cloud/pubsub/subscriber/timed_unary_buffer.rb, line 188 def subscription_name @subscriber.subscription_name end
with_threadpool() { |pool| ... }
click to toggle source
# File lib/google/cloud/pubsub/subscriber/timed_unary_buffer.rb, line 200 def with_threadpool pool = Concurrent::ThreadPoolExecutor.new max_threads: @subscriber.push_threads yield pool pool.shutdown pool.wait_for_termination 60 return if pool.shutdown? pool.kill begin raise "Timeout making subscriber API calls" rescue StandardError => e error! e end end