class Rpush::Daemon::Adm::Delivery

developer.amazon.com/sdk/adm/sending-message.html

Constants

ACCESS_TOKEN_REQUEST_DATA

Data used to request authorization tokens.

AMAZON_ADM_URL

ADM services endpoint. This endpoint is used to perform ADM requests.

AMAZON_TOKEN_URI

Oauth2.0 token endpoint. This endpoint is used to request authorization tokens.

Public Class Methods

new(app, http, notification, batch) click to toggle source
# File lib/rpush/daemon/adm/delivery.rb, line 17
def initialize(app, http, notification, batch)
  @app = app
  @http = http
  @notification = notification
  @batch = batch
  @sent_registration_ids = []
  @failed_registration_ids = {}
end

Public Instance Methods

perform() click to toggle source
# File lib/rpush/daemon/adm/delivery.rb, line 26
def perform
  @notification.registration_ids.each do |registration_id|
    handle_response(do_post(registration_id), registration_id)
  end
  if @sent_registration_ids.empty?
    fail Rpush::DeliveryError.new(nil, @notification.id, describe_errors)
  else
    unless @failed_registration_ids.empty?
      @notification.error_description = describe_errors
      Rpush::Daemon.store.update_notification(@notification)
    end
    mark_delivered
  end
rescue Rpush::RateLimitError => error
  handle_rate_limited(error)
rescue Rpush::RetryableError => error
  handle_retryable(error)
rescue SocketError => error
  mark_retryable(@notification, Time.now + 10.seconds, error)
  raise
rescue StandardError => error
  mark_failed(error)
  raise
ensure
  @batch.notification_processed
end

Protected Instance Methods

access_token() click to toggle source
# File lib/rpush/daemon/adm/delivery.rb, line 198
def access_token
  if @notification.app.access_token.nil? || @notification.app.access_token_expired?
    post = Net::HTTP::Post.new(AMAZON_TOKEN_URI.path, 'Content-Type' => 'application/x-www-form-urlencoded')
    post.set_form_data(ACCESS_TOKEN_REQUEST_DATA.merge('client_id' => @notification.app.client_id, 'client_secret' => @notification.app.client_secret))

    handle_access_token(@http.request(AMAZON_TOKEN_URI, post))
  end

  @notification.app.access_token
end
bad_request(response, current_registration_id) click to toggle source
# File lib/rpush/daemon/adm/delivery.rb, line 121
def bad_request(response, current_registration_id)
  response_body = multi_json_load(response.body)

  return unless response_body.key?('reason')

  reason = response_body['reason']
  log_warn("bad_request: #{current_registration_id} (#{reason})")
  @failed_registration_ids[current_registration_id] = reason

  reflect(:adm_failed_to_recipient, @notification, current_registration_id, reason)
end
create_new_notification(response, registration_ids) click to toggle source
# File lib/rpush/daemon/adm/delivery.rb, line 153
def create_new_notification(response, registration_ids)
  attrs = { 'app_id' => @notification.app_id, 'collapse_key' => @notification.collapse_key, 'delay_while_idle' => @notification.delay_while_idle }
  Rpush::Daemon.store.create_adm_notification(attrs, @notification.data, registration_ids, deliver_after_header(response), @notification.app)
end
deliver_after_header(response) click to toggle source
# File lib/rpush/daemon/adm/delivery.rb, line 158
def deliver_after_header(response)
  Rpush::Daemon::RetryHeaderParser.parse(response.header['retry-after'])
end
describe_errors() click to toggle source
# File lib/rpush/daemon/adm/delivery.rb, line 171
def describe_errors
  if @failed_registration_ids.size == @notification.registration_ids.size
    "Failed to deliver to all recipients."
  else
    error_msgs = []
    @failed_registration_ids.each_pair { |regId, reason| error_msgs.push("#{regId}: #{reason}") }
    "Failed to deliver to recipients: \n#{error_msgs.join("\n")}"
  end
end
do_post(registration_id) click to toggle source
# File lib/rpush/daemon/adm/delivery.rb, line 185
def do_post(registration_id)
  adm_uri = URI.parse(format(AMAZON_ADM_URL, registration_id))
  post = Net::HTTP::Post.new(adm_uri.path,
                             'Content-Type' => 'application/json',
                             'Accept' => 'application/json',
                             'x-amzn-type-version' => 'com.amazon.device.messaging.ADMMessage@1.0',
                             'x-amzn-accept-type' => 'com.amazon.device.messaging.ADMSendResult@1.0',
                             'Authorization' => "Bearer #{access_token}")
  post.body = @notification.as_json.to_json

  @http.request(adm_uri, post)
end
handle_access_token(response) click to toggle source
# File lib/rpush/daemon/adm/delivery.rb, line 209
def handle_access_token(response)
  if response.code.to_i == 200
    update_access_token(JSON.parse(response.body))
    Rpush::Daemon.store.update_app(@notification.app)
    log_info("ADM access token updated: token = #{@notification.app.access_token}, expires = #{@notification.app.access_token_expiration}")
  else
    log_warn("Could not retrieve access token from ADM: #{response.body}")
  end
end
handle_rate_limited(error) click to toggle source
# File lib/rpush/daemon/adm/delivery.rb, line 100
def handle_rate_limited(error)
  if @sent_registration_ids.empty?
    # none sent yet, just resend after the specified retry-after response.header
    retry_delivery(@notification, error.response)
  else
    # save unsent registration ids
    unsent_registration_ids = @notification.registration_ids.select { |reg_id| !@sent_registration_ids.include?(reg_id) }

    # update the current notification so it only contains the sent reg ids
    @notification.registration_ids.reject! { |reg_id| !@sent_registration_ids.include?(reg_id) }

    Rpush::Daemon.store.update_notification(@notification)

    # create a new notification with the remaining unsent reg ids
    create_new_notification(error.response, unsent_registration_ids)

    # mark the current notification as sent
    mark_delivered
  end
end
handle_response(response, current_registration_id) click to toggle source
# File lib/rpush/daemon/adm/delivery.rb, line 55
def handle_response(response, current_registration_id)
  case response.code.to_i
  when 200
    ok(response, current_registration_id)
  when 400
    bad_request(response, current_registration_id)
  when 401
    unauthorized(response)
  when 429
    rate_limited(response)
  when 500
    internal_server_error(current_registration_id)
  when 503
    service_unavailable(response)
  else
    fail Rpush::DeliveryError.new(response.code, @notification.id, Rpush::Daemon::HTTP_STATUS_CODES[response.code.to_i])
  end
end
handle_retryable(error) click to toggle source
# File lib/rpush/daemon/adm/delivery.rb, line 87
def handle_retryable(error)
  case error.code
  when 401
    # clear app access_token so a new one is fetched
    @notification.app.access_token = nil
    access_token
    mark_retryable(@notification, Time.now) if @notification.app.access_token
  when 503
    retry_delivery(@notification, error.response)
    log_warn("ADM responded with an Service Unavailable Error. " + retry_message)
  end
end
internal_server_error(current_registration_id) click to toggle source
# File lib/rpush/daemon/adm/delivery.rb, line 143
def internal_server_error(current_registration_id)
  @failed_registration_ids[current_registration_id] = "Internal Server Error"
  log_warn("internal_server_error: #{current_registration_id} (Internal Server Error)")
end
ok(response, current_registration_id) click to toggle source
# File lib/rpush/daemon/adm/delivery.rb, line 74
def ok(response, current_registration_id)
  response_body = multi_json_load(response.body)

  if response_body.key?('registrationID')
    @sent_registration_ids << response_body['registrationID']
    log_info("#{@notification.id} sent to #{response_body['registrationID']}")
  end

  return if current_registration_id == response_body['registrationID']

  reflect(:adm_canonical_id, current_registration_id, response_body['registrationID'])
end
rate_limited(response) click to toggle source
# File lib/rpush/daemon/adm/delivery.rb, line 138
def rate_limited(response)
  # raise error so the current notification stops sending messages to remaining reg ids
  fail Rpush::RateLimitError.new(response.code.to_i, @notification.id, 'Exceeded maximum allowable rate of messages.', response)
end
retry_delivery(notification, response) click to toggle source
# File lib/rpush/daemon/adm/delivery.rb, line 162
def retry_delivery(notification, response)
  time = deliver_after_header(response)
  if time
    mark_retryable(notification, time)
  else
    mark_retryable_exponential(notification)
  end
end
retry_message() click to toggle source
# File lib/rpush/daemon/adm/delivery.rb, line 181
def retry_message
  "Notification #{@notification.id} will be retired after #{@notification.deliver_after.strftime('%Y-%m-%d %H:%M:%S')} (retry #{@notification.retries})."
end
service_unavailable(response) click to toggle source
# File lib/rpush/daemon/adm/delivery.rb, line 148
def service_unavailable(response)
  # Indicate a notification is retryable. Because ADM requires separate request for each push token, this will safely mark the entire notification to retry delivery.
  fail Rpush::RetryableError.new(response.code.to_i, @notification.id, 'ADM responded with an Service Unavailable Error.', response)
end
unauthorized(response) click to toggle source
# File lib/rpush/daemon/adm/delivery.rb, line 133
def unauthorized(response)
  # Indicate a notification is retryable. Because ADM requires separate request for each push token, this will safely mark the entire notification to retry delivery.
  fail Rpush::RetryableError.new(response.code.to_i, @notification.id, 'ADM responded with an Unauthorized Error.', response)
end
update_access_token(data) click to toggle source
# File lib/rpush/daemon/adm/delivery.rb, line 219
def update_access_token(data)
  @notification.app.access_token = data['access_token']
  @notification.app.access_token_expiration = Time.now + data['expires_in'].to_i
end