class Emptyd::Connection

Constants

EXPIRE_INTERVAL
HAPPY_RATIO
MAX_CONNECTIONS

Attributes

error[R]
failed_at[R]
key[R]
updated_at[R]

Public Class Methods

[](key, logger) click to toggle source
# File lib/emptyd.rb, line 21
def self.[](key, logger)
  @@connections[key] or Connection.new(key, logger)
end
new(key, logger) click to toggle source
# File lib/emptyd.rb, line 25
def initialize(key, logger)
  raise IOError, "already registered" if @@connections[key]
  @key = key
  @logger = logger
  @sessions = []
  @user, @host = key.split('@', 2)
  @user, @host = "root", @user if @host.nil?
  @@connections[key] = self
  @run_queue = []
  self.start
  @timer = EM::PeriodicTimer.new(rand(5..15)) do
    start
    destroy if old? and free?
  end
end

Public Instance Methods

bind(session) click to toggle source
# File lib/emptyd.rb, line 195
def bind(session)
  raise IOError, "already bound" if @sessions.include? session
  @sessions << session
end
connecting?() click to toggle source
# File lib/emptyd.rb, line 274
def connecting?
  @connecting
end
dead?() click to toggle source
# File lib/emptyd.rb, line 278
def dead?
  @failed_at
end
destroy() click to toggle source
# File lib/emptyd.rb, line 41
def destroy
  raise IOError, "sessions are still alive" unless @sessions.empty?
  @timer.cancel
  @start_timer.cancel if @start_timer
  @@connections.delete @key
  if @conn
    @@count.delete self.key
    conn = @conn
    @conn = nil
    Fiber.new do
      conn.close
    end.resume
  end
  @logger.debug "Destroying connection #{@key}"
end
errback(err) click to toggle source
# File lib/emptyd.rb, line 176
def errback err
  @@count.delete self.key
  had_valid_conn = !!@conn
  @conn = nil
  @logger.warn "Connection to #{@key} is broken: #{err}"
  @error = err
  @failed_at = Time.now
  @connecting = false
  if had_valid_conn
    @sessions.each do |session|
      session.queue.push [self, :error, @error]
    end
  else
    @run_queue.each do |cmd,session,callback|
      callback.call self, :error, @error
    end
  end
end
free?() click to toggle source
# File lib/emptyd.rb, line 270
def free?
  @sessions.empty?
end
old?() click to toggle source
# File lib/emptyd.rb, line 266
def old?
  @updated_at and Time.now - @updated_at > EXPIRE_INTERVAL
end
run(cmd, session, &callback) click to toggle source
# File lib/emptyd.rb, line 206
def run cmd, session, &callback
  unless @conn
    @run_queue << [cmd, session, callback]
    return
  end

  setup = proc do |ch|
    callback.call self, :init, ch

    ch.on_data do |c, data|
      EM.next_tick do
        @updated_at = Time.now
        session.queue.push [@key,nil,data]
      end
    end

    ch.on_extended_data do |c, type, data|
      EM.next_tick do
        @updated_at = Time.now
        session.queue.push [@key,type,data]
        @logger.debug [type,data]
      end
    end

    ch.on_request "exit-status" do |ch, data|
      EM.next_tick do
        @updated_at = Time.now
        session.queue.push [@key,:exit,data.read_long]
      end
    end

    ch.on_close do
      @updated_at = Time.now
      callback.call self, :close
    end

    ch.on_open_failed do |ch, code, desc|
      EM.next_tick do
        callback.call self, :error, desc
      end
    end
  end

  @conn.open_channel do |ch|
    if session.interactive?
      ch.request_pty do |ch, success|
        ch.exec cmd do |ch, success|
          @logger.warn "exec failed: #{cmd}" unless success
          setup[ch]
        end
      end
    else
      ch.exec cmd do |ch, success|
        @logger.warn "exec failed: #{cmd}" unless success
        setup[ch]
      end
    end
  end
end
start() click to toggle source
# File lib/emptyd.rb, line 57
def start
  return if @conn or @connecting
  @connecting = true

  unless @@dns
    @@dns = EM::Udns::Resolver.new
    EM::Udns.run @@dns
  end

  pressure = proc do
    if @@count.size >= MAX_CONNECTIONS # pressure
      c = @@count.select{|k,c| c.free?}.values.sample
      if c
        c.destroy
      else
        @logger.debug "pressure: no free connections: #{@@count.keys}"
      end
    end
  end

  starter = proc do
    begin
      pressure[]
      if @@count.size >= MAX_CONNECTIONS
        @logger.debug "Quota exceeded by #{@key}: #{@@count.size}"
        @start_timer = EM::PeriodicTimer.new(rand(1..10)) do
          pressure[]
          if @@count.size < MAX_CONNECTIONS
            @start_timer.cancel
            EM.next_tick starter
          else
            @logger.debug "No more connection quota, deferring #{@key}..."
          end
        end
      else
        @@count[self.key] = self
        @logger.debug "Created new conn: #{key}, quota = #{@@count.size}"
        options = { :user_known_hosts_file => [], :number_of_password_prompts => 0 }
        options[:password] = $PASSWORD if $PASSWORD
        options[:forward_agent] = true if $FORWARD_AGENT
        EM::Ssh.start(@ip, @user, options) do |conn|
          conn.errback { |err| errback err }
          conn.on(:closed) { errback "closed" }
          conn.callback do |ssh|
            @conn = ssh
            @error = nil
            @failed_at = nil
            @updated_at = Time.now
            @connecting = false
            @run_queue.each do |cmd,session,callback|
              if session.dead?
                @logger.debug "Dropping pending run request from a dead session"
                @error = "session is dead"
              else
                EM.next_tick { run cmd, session, &callback }
              end
            end
            @run_queue.clear
            if @error
              ssh.close
              @@count.delete self.key
            end
          end
        end
      end
    rescue EventMachine::ConnectionError => e
      @@count.delete self.key
      @conn = nil
      @error = e
      @failed_at = Time.now
      @connecting = false
      @run_queue.each do |cmd,session,callback|
        callback.call self, :error
      end
    end
  end

  resolver = nil

  resfail = proc do |err|
    @error = err
    @logger.error "DNS Error: #{err}: #{@key}"
    @failed_at = Time.now
    EM::Timer.new(rand(1..10)) do
      EM.next_tick resolver
    end
  end

  if @host =~ /^[0-9a-f:.]+$/i
    p @host
    resolver = proc do
      @ip = @host
      starter[]
    end
  else
    resolver = proc do
      query = @@dns.submit_AAAA @host
      query.callback do |result|
        @ip = result.sample
        starter[]
      end
      query.errback do |err|
        if err == :dns_error_nodata
          query = @@dns.submit_A @host
          query.callback do |result|
            @ip = result.sample
            starter[]
          end
          query.errback { |err| resfail[err] }
        else
          resfail[err]
        end
      end
    end
  end

  EM.next_tick resolver
end
unbind(session) click to toggle source
# File lib/emptyd.rb, line 200
def unbind(session)
  raise IOError, "not bound" unless @sessions.include? session
  @run_queue.delete_if{|cmd,sess,cb| sess == session}
  @sessions.delete session
end