class App42::Message::QueueService

Manages Asynchronous queues. Allows to create, delete, purge messages, view pending messages and get all messages

@see Queue

Public Class Methods

new(api_key, secret_key, base_url) click to toggle source

this is a constructor that takes

@param apiKey @param secretKey @param baseURL

# File lib/message/QueueService.rb, line 29
def initialize(api_key, secret_key, base_url)
  puts "Message Service->initialize"
  @api_key = api_key
  @secret_key = secret_key
  @base_url = base_url
  @resource = "queue"
  @messageResource = "message"
  @version = "1.0"
end

Public Instance Methods

create_pull_queue(queueName, queueDescription) click to toggle source

Creates a type Pull Queue

@param queueName

- The name of the queue which has to be created

@param queueDescription

- The description of the queue

@return Queue object containing queue name which has been created

@raise App42Exception

# File lib/message/QueueService.rb, line 51
def create_pull_queue(queueName, queueDescription)
  puts "Create Pull Queue Called "
  puts "Base url #{@base_url}"
  response = nil;
  messageObj = nil;
  messageObj = Queue.new
  util = Util.new
  util.throwExceptionIfNullOrBlank(queueName, "Queue Name");
  util.throwExceptionIfNullOrBlank(queueDescription, "Queue Description");
  begin
    connection = App42::Connection::RESTConnection.new(@base_url)
    body = {'app42' => {"queue"=> {
      "name" => queueName,
      "description" => queueDescription
      }}}.to_json
    puts "Body #{body}"
    query_params = Hash.new
    params = {
      'apiKey'=> @api_key,
      'version' => @version,
      'timeStamp' => util.get_timestamp_utc,
      "type"=>'pull'
    }
    query_params = params.clone
    params.store("body", body)
    puts query_params
    signature = util.sign(@secret_key, params)
    resource_url = "#{@version}/#{@resource}/pull"
    response = connection.post(signature, resource_url, query_params, body)
    message = QueueResponseBuilder.new()
    messageObj = message.buildResponse(response)
  rescue  App42Exception =>e
    raise e
  rescue  Exception => e
    raise App42Exception.new(e)
  end
  return messageObj
end
delete_pull_queue(queueName) click to toggle source

Deletes the Pull type Queue

@param queueName

- The name of the queue which has to be deleted

@return App42Response if deleted successfully

@raise App42Exception

# File lib/message/QueueService.rb, line 101
def delete_pull_queue(queueName)
  puts "Delete Pull Queue Called "
  puts "Base url #{@base_url}"
  response = nil;
  responseObj = App42Response.new();
  util = Util.new
  util.throwExceptionIfNullOrBlank(queueName, "Queue Name");
  begin
    connection = App42::Connection::RESTConnection.new(@base_url)
    query_params = Hash.new
    params = {
      'apiKey'=> @api_key,
      'version' => @version,
      'timeStamp' => util.get_timestamp_utc,
      "type"=>'pull'
    }
    query_params = params.clone
    params.store("queueName", queueName)
    puts query_params
    signature = util.sign(@secret_key, params)
    resource_url = "#{@version}/#{@resource}/pull/#{queueName}"
    response = connection.delete(signature, resource_url, query_params)
    responseObj.strResponse=(response)
    responseObj.isResponseSuccess=(true)
  rescue  App42Exception =>e
    raise e
  rescue  Exception => e
    raise App42Exception.new(e)
  end
  return responseObj
end
get_messages(queueName, receiveTimeOut) click to toggle source

Messages are retrieved and dequeued from the Queue.

@param queueName

- The name of the queue which have to be retrieved

@param receiveTimeOut

- Receive time out

@return Queue object containing messages in the Queue

@raise App42Exception

# File lib/message/QueueService.rb, line 234
def get_messages(queueName, receiveTimeOut)
  puts "Get Messages Called "
  puts "Base url #{@base_url}"
  response = nil;
  messageObj = nil;
  messageObj = Queue.new
  util = Util.new
  util.throwExceptionIfNullOrBlank(queueName, "Queue Name");
  util.throwExceptionIfNullOrBlank(receiveTimeOut, "ReceiveTimeOut");
  begin
    connection = App42::Connection::RESTConnection.new(@base_url)
    query_params = Hash.new
    params = {
      'apiKey'=> @api_key,
      'version' => @version,
      'timeStamp' => util.get_timestamp_utc,
    }
    query_params = params.clone
    params.store("queueName", queueName)
    params.store("receiveTimeOut", (receiveTimeOut.to_i).to_s)
    puts query_params
    signature = util.sign(@secret_key, params)
    resource_url = "#{@version}/#{@resource}/messages/#{queueName}/#{(receiveTimeOut.to_i).to_s}"
    response = connection.get(signature, resource_url, query_params)
    message = QueueResponseBuilder.new()
    messageObj = message.buildResponse(response)
  rescue  App42Exception =>e
    raise e
  rescue  Exception => e
    raise App42Exception.new(e)
  end
  return messageObj
end
pending_messages(queueName) click to toggle source

Messages which are pending to be dequeue. Note: Calling this method does not dequeue the messages in the Queue. The messages stay in the Queue till they are dequeued

@param queueName

- The name of the queue from which pending messages have to be fetched

@return Queue object containing pending messages in the Queue

@raise App42Exception

# File lib/message/QueueService.rb, line 189
def pending_messages(queueName)
  puts "Pending Messages Called "
  puts "Base url #{@base_url}"
  response = nil;
  messageObj = nil;
  messageObj = Queue.new
  util = Util.new
  util.throwExceptionIfNullOrBlank(queueName, "Queue Name");
  begin
    connection = App42::Connection::RESTConnection.new(@base_url)
    query_params = Hash.new
    params = {
      'apiKey'=> @api_key,
      'version' => @version,
      'timeStamp' => util.get_timestamp_utc,
    }
    query_params = params.clone
    params.store("queueName", queueName)
    puts query_params
    signature = util.sign(@secret_key, params)
    resource_url = "#{@version}/#{@resource}/pending/#{queueName}"
    response = connection.get(signature, resource_url, query_params)
    message = QueueResponseBuilder.new()
    messageObj = message.buildResponse(response)
  rescue  App42Exception =>e
    raise e
  rescue  Exception => e
    raise App42Exception.new(e)
  end
  return messageObj
end
purge_pull_queue(queueName) click to toggle source

Purges message on the Queue. Note: once the Queue is purged the messages are removed from the Queue and wont be available for dequeueing.

@param queueName

- The name of the queue which has to be purged

@return Queue object containing queue name which has been purged

@raise App42Exception

# File lib/message/QueueService.rb, line 145
def purge_pull_queue(queueName)
  puts "Purge Pull Queue Called "
  puts "Base url #{@base_url}"
  response = nil;
  responseObj = App42Response.new();
  util = Util.new
  util.throwExceptionIfNullOrBlank(queueName, "Queue Name");
  begin
    connection = App42::Connection::RESTConnection.new(@base_url)
    query_params = Hash.new
    params = {
      'apiKey'=> @api_key,
      'version' => @version,
      'timeStamp' => util.get_timestamp_utc,
      "type"=>'pull'
    }
    query_params = params.clone
    params.store("queueName", queueName)
    puts query_params
    signature = util.sign(@secret_key, params)
    resource_url = "#{@version}/#{@resource}/pull/purge/#{queueName}"
    response = connection.delete(signature, resource_url, query_params)
    responseObj.strResponse=(response)
    responseObj.isResponseSuccess=(true)
  rescue  App42Exception =>e
    raise e
  rescue  Exception => e
    raise App42Exception.new(e)
  end
  return responseObj
end
receive_message(queueName, receiveTimeOut) click to toggle source

Pulls all the message from the queue

@param queueName

- The name of the queue from which messages have to be pulled

@param receiveTimeOut

- Receive time out

@return Queue object containing messages which have been pulled

@raise App42Exception

# File lib/message/QueueService.rb, line 336
def receive_message(queueName, receiveTimeOut)
  puts "Receive Message Called "
  puts "Base url #{@base_url}"
  response = nil;
  messageObj = nil;
  messageObj = Queue.new
  util = Util.new
  util.throwExceptionIfNullOrBlank(queueName, "Queue Name");
  util.throwExceptionIfNullOrBlank(receiveTimeOut, "ReceiveTimeOut");
  begin
    connection = App42::Connection::RESTConnection.new(@base_url)
    query_params = Hash.new
    params = {
      'apiKey'=> @api_key,
      'version' => @version,
      'timeStamp' => util.get_timestamp_utc,
    }
    query_params = params.clone
    params.store("queueName", queueName)
    params.store("receiveTimeOut", "" + (receiveTimeOut.to_i).to_s)
    puts query_params
    signature = util.sign(@secret_key, params)
    resource_url = "#{@version}/#{@messageResource}/#{queueName}/#{(receiveTimeOut.to_i).to_s}"
    response = connection.get(signature, resource_url, query_params)
    message = QueueResponseBuilder.new()
    messageObj = message.buildResponse(response)
  rescue  App42Exception =>e
    raise e
  rescue  Exception => e
    raise App42Exception.new(e)
  end
  return messageObj
end
receive_message_by_correlation_id(queueName, receiveTimeOut, correlationId) click to toggle source

Pull message based on the correlation id

@param queueName

- The name of the queue from which the message has to be pulled

@param receiveTimeOut

- Receive time out

@param correlationId

- Correlation Id for which message has to be pulled

@return Queue containing message which has pulled based on the correlation id

@raise App42Exception

# File lib/message/QueueService.rb, line 385
def receive_message_by_correlation_id(queueName, receiveTimeOut, correlationId)
  puts "Receive Message Called "
  puts "Base url #{@base_url}"
  response = nil;
  messageObj = nil;
  messageObj = Queue.new
  util = Util.new
  util.throwExceptionIfNullOrBlank(queueName, "Queue Name");
  util.throwExceptionIfNullOrBlank(receiveTimeOut, "ReceiveTimeOut");
  util.throwExceptionIfNullOrBlank(correlationId, "Correlation Id");
  begin
    connection = App42::Connection::RESTConnection.new(@base_url)
    query_params = Hash.new
    params = {
      'apiKey'=> @api_key,
      'version' => @version,
      'timeStamp' => util.get_timestamp_utc,
    }
    query_params = params.clone
    params.store("queueName", queueName)
    params.store("receiveTimeOut", "" + (receiveTimeOut.to_i).to_s)
    params.store("correlationId", "" + correlationId)
    puts query_params
    signature = util.sign(@secret_key, params)
    resource_url = "#{@version}/#{@messageResource}/#{queueName}/#{(receiveTimeOut.to_i).to_s}/#{correlationId}"
    response = connection.get(signature, resource_url, query_params)
    message = QueueResponseBuilder.new()
    messageObj = message.buildResponse(response)
  rescue  App42Exception =>e
    raise e
  rescue  Exception => e
    raise App42Exception.new(e)
  end
  return messageObj
end
remove_message(queueName, messageId) click to toggle source

Remove message from the queue based on the message id. Note: Once the message is removed it cannot be pulled

@param queueName

- The name of the queue from which the message has to be removed

@param messageId

- The message id of the message which has to be removed.

@return App42Response if removed successfully

@raise App42Exception

# File lib/message/QueueService.rb, line 434
def remove_message(queueName, messageId)
  puts "Remove Message Called "
  puts "Base url #{@base_url}"
  response = nil;
  responseObj = App42Response.new();
  util = Util.new
  util.throwExceptionIfNullOrBlank(queueName, "Queue Name");
  util.throwExceptionIfNullOrBlank(messageId, "messageId");
  begin
    connection = App42::Connection::RESTConnection.new(@base_url)
    query_params = Hash.new
    params = {
      'apiKey'=> @api_key,
      'version' => @version,
      'timeStamp' => util.get_timestamp_utc,
    }
    query_params = params.clone
    params.store("queueName", queueName)
    params.store("messageId", messageId)
    puts query_params
    signature = util.sign(@secret_key, params)
    resource_url = "#{@version}/#{@messageResource}/#{queueName}/#{messageId}"
    response = connection.delete(signature, resource_url, query_params)
    responseObj.strResponse=(response)
    responseObj.isResponseSuccess=(true)
  rescue  App42Exception =>e
    raise e
  rescue  Exception => e
    raise App42Exception.new(e)
  end
  return responseObj
end
send_message(queueName, msg, exp) click to toggle source

Send message on the queue with an expiry. The message will expire if it is not pulled/dequeued before the expiry

@param queueName

- The name of the queue to which the message has to be sent

@param msg

- Message that has to be sent

@param exp

- Message expiry time

@return Queue object containing message which has been sent with its message id and correlation id

@raise App42Exception

# File lib/message/QueueService.rb, line 283
def send_message(queueName, msg, exp)
  puts "Get Messages Called "
  puts "Base url #{@base_url}"
  response = nil;
  messageObj = nil;
  messageObj = Queue.new
  util = Util.new
  util.throwExceptionIfNullOrBlank(queueName, "Queue Name");
  util.throwExceptionIfNullOrBlank(msg, "Message");
  util.throwExceptionIfNullOrBlank(exp, "Exipiration");
  begin
    connection = App42::Connection::RESTConnection.new(@base_url)
    body = {'app42' => {"payLoad"=> {
      "message" => msg,
      "expiration" => exp
      }}}.to_json
    puts "Body #{body}"
    query_params = Hash.new
    params = {
      'apiKey'=> @api_key,
      'version' => @version,
      'timeStamp' => util.get_timestamp_utc,
      'queueName' => queueName
    }
    query_params = params.clone
    params.store("body", body)
    puts query_params
    signature = util.sign(@secret_key, params)
    resource_url = "#{@version}/#{@messageResource}/#{queueName}"
    response = connection.post(signature, resource_url, query_params, body)
    message = QueueResponseBuilder.new()
    messageObj = message.buildResponse(response)
  rescue  App42Exception =>e
    raise e
  rescue  Exception => e
    raise App42Exception.new(e)
  end
  return messageObj
end