class MogileFS::Backend

This class communicates with the MogileFS trackers. You should not have to use this directly unless you are developing support for new commands or plugins for MogileFS

Attributes

lasterr[R]

The last error

lasterrstr[R]

The string attached to the last error

Public Class Methods

add_command(*names) click to toggle source

Adds MogileFS commands names.

# File lib/mogilefs/backend.rb, line 9
def self.add_command(*names)
  names.each do |name|
    define_method name do |*args|
      do_request(name, args[0] || {}, false)
    end
  end
end
add_error(err_snake) click to toggle source

this converts an error code from a mogilefsd tracker to an exception:

Examples of some exceptions that get created:

class AfterMismatchError < MogileFS::Error; end
class DomainNotFoundError < MogileFS::Error; end
class InvalidCharsError < MogileFS::Error; end
# File lib/mogilefs/backend.rb, line 35
def self.add_error(err_snake)
  err_camel = err_snake.gsub(/(?:^|_)([a-z])/) { $1.upcase }
  err_camel << 'Error' unless /Error\z/ =~ err_camel
  unless const_defined?(err_camel)
    const_set(err_camel, Class.new(MogileFS::Error))
  end
  BACKEND_ERRORS[err_snake] = const_get(err_camel)
end
add_idempotent_command(*names) click to toggle source

adds idempotent MogileFS commands names, these commands may be retried transparently on a different tracker if there is a network/server error.

# File lib/mogilefs/backend.rb, line 19
def self.add_idempotent_command(*names)
  names.each do |name|
    define_method name do |*args|
      do_request(name, args[0] || {}, true)
    end
  end
end
new(args) click to toggle source

Creates a new MogileFS::Backend.

:hosts is a required argument and must be an Array containing one or more 'hostname:port' pairs as Strings.

:timeout adjusts the request timeout before an error is returned.

# File lib/mogilefs/backend.rb, line 70
def initialize(args)
  @hosts = args[:hosts]
  @fail_timeout = args[:fail_timeout] || 5
  raise ArgumentError, "must specify at least one host" unless @hosts
  raise ArgumentError, "must specify at least one host" if @hosts.empty?
  unless @hosts == @hosts.select { |h| h =~ /:\d+$/ } then
    raise ArgumentError, ":hosts must be in 'host:port' form"
  end

  @mutex = Mutex.new
  @timeout = args[:timeout] || 3
  @connect_timeout = args[:connect_timeout] || @timeout
  @socket = nil
  @lasterr = nil
  @lasterrstr = nil
  @pending = []

  @dead = {}
end

Public Instance Methods

clear_cache(types = %w(all)) click to toggle source

this command is special since the cache is per-tracker, so we connect to all backends and not just one

# File lib/mogilefs/backend.rb, line 311
def clear_cache(types = %w(all))
  opts = {}
  types.each { |type| opts[type] = 1 }

  sockets = @hosts.map do |host|
    MogileFS::Socket.start(*(host.split(':'.freeze))) rescue nil
  end
  sockets.compact!

  wpending = sockets
  rpending = []
  request = make_request("clear_cache", opts)
  while wpending[0] || rpending[0]
    r = IO.select(rpending, wpending, nil, @timeout) or return
    rpending -= r[0]
    wpending -= r[1]
    r[0].each { |io| io.timed_gets(0) rescue nil }
    r[1].each do |io|
      begin
        io.timed_write(request, 0)
        rpending << io
      rescue
      end
    end
  end
  nil
ensure
  sockets.each { |io| io.close }
end
do_request(cmd, args, idempotent = false) click to toggle source

Performs the cmd request with args.

# File lib/mogilefs/backend.rb, line 236
def do_request(cmd, args, idempotent = false)
  no_raise = args.delete(:ruby_no_raise)
  request = make_request(cmd, args)
  line = nil
  failed = false
  @mutex.synchronize do
    begin
      io = dispatch_unlocked(request)
      line = io.timed_gets(@timeout)
      break if /\n\z/ =~ line

      line and raise MogileFS::InvalidResponseError,
                     "Invalid response from server: #{line.inspect}"

      idempotent or
        raise EOFError, "end of file reached after: #{request.inspect}"
      # fall through to retry in loop
    rescue SystemCallError,
           MogileFS::InvalidResponseError # truncated response
      # we got a successful timed_write, but not a timed_gets
      if idempotent
        failed = true
        shutdown_unlocked(false)
        retry
      end
      shutdown_unlocked(true)
    rescue MogileFS::UnreadableSocketError, MogileFS::Timeout
      shutdown_unlocked(true)
    rescue
      # we DO NOT want the response we timed out waiting for, to crop up later
      # on, on the same socket, intersperesed with a subsequent request!  we
      # close the socket if there's any error.
      shutdown_unlocked(true)
    end while idempotent
    shutdown_unlocked if failed
  end # @mutex.synchronize
  parse_response(line, no_raise ? request : nil)
end
error(err_snake) click to toggle source

this converts an error code from a mogilefsd tracker to an exception Most of these exceptions should already be defined, but since the MogileFS server code is liable to change and we may not always be able to keep up with the changes

# File lib/mogilefs/backend.rb, line 284
def error(err_snake)
  BACKEND_ERRORS[err_snake] || self.class.add_error(err_snake)
end
make_request(cmd, args) click to toggle source

Makes a new request string for cmd and args.

# File lib/mogilefs/backend.rb, line 276
def make_request(cmd, args)
  "#{cmd} #{url_encode args}\r\n"
end
parse_response(line, request = nil) click to toggle source

Turns the line response from the server into a Hash of options, an error, or raises, as appropriate.

# File lib/mogilefs/backend.rb, line 290
def parse_response(line, request = nil)
  case line
  when /\AOK\s+\d*\s*(\S*)\r?\n\z/
    url_decode($1)
  when /\AERR\s+(\w+)\s*([^\r\n]*)/
    @lasterr = $1
    @lasterrstr = $2 ? url_unescape($2) : nil
    if request
      request = " request=#{request.strip}"
      @lasterrstr = @lasterrstr ? (@lasterrstr << request) : request
      return error(@lasterr).new(@lasterrstr)
    end
    raise error(@lasterr).new(@lasterrstr)
  else
    raise MogileFS::InvalidResponseError,
          "Invalid response from server: #{line.inspect}"
  end
end
shutdown() click to toggle source

Closes this backend's socket.

# File lib/mogilefs/backend.rb, line 93
def shutdown
  @mutex.synchronize { shutdown_unlocked }
end
socket() click to toggle source

Returns a socket connected to a MogileFS tracker.

# File lib/mogilefs/backend.rb, line 342
def socket
  return @socket if @socket and not @socket.closed?

  @hosts.shuffle.each do |host|
    next if dead = @dead[host] and dead[0] > (MogileFS.now - @fail_timeout)

    begin
      addr, port = host.split(':'.freeze)
      @socket = MogileFS::Socket.tcp(addr, port, @connect_timeout)
      @active_host = host
    rescue SystemCallError, MogileFS::Timeout => err
      @dead[host] = [ MogileFS.now, err ]
      next
    end

    return @socket
  end

  errors = @dead.map { |host,(_,e)| "#{host} - #{e.message} (#{e.class})" }
  raise MogileFS::UnreachableBackendError,
        "couldn't connect to any tracker: #{errors.join(', ')}"
end