class AllQ::Connection

Represents a connection to a allq instance.

Constants

DEFAULT_PORT

Default port value for beanstalk connection

DEFAULT_RETRY_INTERVAL

Default retry interval

MAX_RETRIES

Default number of retries to send a command to a connection

Attributes

address[R]
connection[R]
host[R]
port[R]

Public Class Methods

new(address = '') click to toggle source

Initializes new connection.

@param [String] address allq instance address. @example

AllQ::Connection.new('127.0.0.1')
AllQ::Connection.new('127.0.0.1:11300')

ENV['ALLQ_CLIENT_URL'] = '127.0.0.1:11300'
@b = AllQ.new
@b.connection.host # => '127.0.0.1'
@b.connection.port # => '11300'
# File lib/allq/connection.rb, line 31
def initialize(address = '')
  @address = address || _host_from_env
rescue
  _raise_not_connected!
end

Public Instance Methods

call_socat(data, timeout = 3.0) click to toggle source
# File lib/allq/connection.rb, line 47
def call_socat(data, timeout = 3.0)
  cmd_string = "echo '#{data}' | socat -t #{timeout} - tcp4-connect:#{@address}"
  output = `#{cmd_string}`
  return output
end
close() click to toggle source

Close connection with allq server.

@example

@conn.close
# File lib/allq/connection.rb, line 75
def close
end
inspect()
Alias for: to_s
socat(command, options={}) { |call| ... } click to toggle source
# File lib/allq/connection.rb, line 37
def socat(command, options={}, &block)
  send_string = command.to_s
  if send_string.include?("'")
    puts "Single quotes not allow in JSON. This will probably error."
  end
  res = call_socat(send_string)
  _raise_not_connected if res.include?("Connection refused")
  yield block.call(res)
end
to_s() click to toggle source

Returns string representation of job.

@example

@conn.inspect
# File lib/allq/connection.rb, line 83
def to_s
  "#<AllQ::Connection host=#{host.inspect} port=#{port.inspect}>"
end
Also aliased as: inspect
transmit(command, options={}) { |call| ... } click to toggle source

Send commands to allq server via connection.

@param [String] command AllQ command @return [Array<Hash{String => String, Number}>] AllQ command response @example

@conn = AllQ::Connection.new
@conn.transmit('bury 123')
@conn.transmit('stats')
# File lib/allq/connection.rb, line 62
def transmit(command, options={}, &block)
  _with_retry(options[:retry_interval], options[:init]) do
    res = call_socat(command.to_s, 20.0)
    raise "Socat failed after 20 seconds" if res.to_s == ""
    yield block.call(res)
  end
end

Protected Instance Methods

establish_connection() click to toggle source

Establish a connection based on beanstalk address.

@return [Net::TCPSocket] connection for specified address. @raise [AllQ::NotConnected] Could not connect to specified allq instance. @example

establish_connection('localhost:3005')
# File lib/allq/connection.rb, line 97
def establish_connection
end

Private Instance Methods

_host_from_env() click to toggle source

The host provided by ALLQ_CLIENT_URL environment variable, if available.

@return [String] A allq host address @example

ENV['ALLQ_CLIENT_URL'] = "localhost:1212"
 # => 'localhost:1212'
# File lib/allq/connection.rb, line 141
def _host_from_env
  ENV['ALLQ_CLIENT_URL'].respond_to?(:length) && ENV['ALLQ_CLIENT_URL'].length > 0 && ENV['ALLQ_CLIENT_URL'].strip
end
_raise_not_connected!() click to toggle source

Raises an error to be triggered when the connection has failed @raise [AllQ::NotConnected] AllQ is no longer connected

# File lib/allq/connection.rb, line 147
def _raise_not_connected!
  raise "Connection to allq '#{@host}:#{@port}' is closed!"
end
_reconnect(original_exception, retry_interval, tries=MAX_RETRIES) click to toggle source

Tries to re-establish connection to the allq

@param [Exception] original_exception The exception caused the retry @param [Integer] retry_interval The time to wait before the next reconnect @param [Integer] tries The maximum number of attempts to reconnect

# File lib/allq/connection.rb, line 122
def _reconnect(original_exception, retry_interval, tries=MAX_RETRIES)
  close
  establish_connection
rescue Errno::ECONNREFUSED
  tries -= 1
  if tries.zero?
    _raise_not_connected!
  end
  sleep(retry_interval || DEFAULT_RETRY_INTERVAL)
  retry
end
_with_retry(retry_interval, init=true, tries=MAX_RETRIES) { || ... } click to toggle source

Wrapper method for capturing certain failures and retry the payload block

@param [Proc] block The command to execute. @param [Integer] retry_interval The time to wait before the next retry @param [Integer] tries The maximum number of tries in draining mode @return [Object] Result of the block passed

# File lib/allq/connection.rb, line 109
def _with_retry(retry_interval, init=true, tries=MAX_RETRIES, &block)
  yield
rescue => ex
  sleep(tries) if tries > 0
  _reconnect(ex, retry_interval)
  retry
end