class Emptyd::Connection
Constants
- EXPIRE_INTERVAL
- HAPPY_RATIO
- MAX_CONNECTIONS
Attributes
error[R]
failed_at[R]
key[R]
updated_at[R]
Public Class Methods
[](key, logger)
click to toggle source
# File lib/emptyd.rb, line 21 def self.[](key, logger) @@connections[key] or Connection.new(key, logger) end
new(key, logger)
click to toggle source
# File lib/emptyd.rb, line 25 def initialize(key, logger) raise IOError, "already registered" if @@connections[key] @key = key @logger = logger @sessions = [] @user, @host = key.split('@', 2) @user, @host = "root", @user if @host.nil? @@connections[key] = self @run_queue = [] self.start @timer = EM::PeriodicTimer.new(rand(5..15)) do start destroy if old? and free? end end
Public Instance Methods
bind(session)
click to toggle source
# File lib/emptyd.rb, line 195 def bind(session) raise IOError, "already bound" if @sessions.include? session @sessions << session end
connecting?()
click to toggle source
# File lib/emptyd.rb, line 274 def connecting? @connecting end
dead?()
click to toggle source
# File lib/emptyd.rb, line 278 def dead? @failed_at end
destroy()
click to toggle source
# File lib/emptyd.rb, line 41 def destroy raise IOError, "sessions are still alive" unless @sessions.empty? @timer.cancel @start_timer.cancel if @start_timer @@connections.delete @key if @conn @@count.delete self.key conn = @conn @conn = nil Fiber.new do conn.close end.resume end @logger.debug "Destroying connection #{@key}" end
errback(err)
click to toggle source
# File lib/emptyd.rb, line 176 def errback err @@count.delete self.key had_valid_conn = !!@conn @conn = nil @logger.warn "Connection to #{@key} is broken: #{err}" @error = err @failed_at = Time.now @connecting = false if had_valid_conn @sessions.each do |session| session.queue.push [self, :error, @error] end else @run_queue.each do |cmd,session,callback| callback.call self, :error, @error end end end
free?()
click to toggle source
# File lib/emptyd.rb, line 270 def free? @sessions.empty? end
old?()
click to toggle source
# File lib/emptyd.rb, line 266 def old? @updated_at and Time.now - @updated_at > EXPIRE_INTERVAL end
run(cmd, session, &callback)
click to toggle source
# File lib/emptyd.rb, line 206 def run cmd, session, &callback unless @conn @run_queue << [cmd, session, callback] return end setup = proc do |ch| callback.call self, :init, ch ch.on_data do |c, data| EM.next_tick do @updated_at = Time.now session.queue.push [@key,nil,data] end end ch.on_extended_data do |c, type, data| EM.next_tick do @updated_at = Time.now session.queue.push [@key,type,data] @logger.debug [type,data] end end ch.on_request "exit-status" do |ch, data| EM.next_tick do @updated_at = Time.now session.queue.push [@key,:exit,data.read_long] end end ch.on_close do @updated_at = Time.now callback.call self, :close end ch.on_open_failed do |ch, code, desc| EM.next_tick do callback.call self, :error, desc end end end @conn.open_channel do |ch| if session.interactive? ch.request_pty do |ch, success| ch.exec cmd do |ch, success| @logger.warn "exec failed: #{cmd}" unless success setup[ch] end end else ch.exec cmd do |ch, success| @logger.warn "exec failed: #{cmd}" unless success setup[ch] end end end end
start()
click to toggle source
# File lib/emptyd.rb, line 57 def start return if @conn or @connecting @connecting = true unless @@dns @@dns = EM::Udns::Resolver.new EM::Udns.run @@dns end pressure = proc do if @@count.size >= MAX_CONNECTIONS # pressure c = @@count.select{|k,c| c.free?}.values.sample if c c.destroy else @logger.debug "pressure: no free connections: #{@@count.keys}" end end end starter = proc do begin pressure[] if @@count.size >= MAX_CONNECTIONS @logger.debug "Quota exceeded by #{@key}: #{@@count.size}" @start_timer = EM::PeriodicTimer.new(rand(1..10)) do pressure[] if @@count.size < MAX_CONNECTIONS @start_timer.cancel EM.next_tick starter else @logger.debug "No more connection quota, deferring #{@key}..." end end else @@count[self.key] = self @logger.debug "Created new conn: #{key}, quota = #{@@count.size}" options = { :user_known_hosts_file => [], :number_of_password_prompts => 0 } options[:password] = $PASSWORD if $PASSWORD options[:forward_agent] = true if $FORWARD_AGENT EM::Ssh.start(@ip, @user, options) do |conn| conn.errback { |err| errback err } conn.on(:closed) { errback "closed" } conn.callback do |ssh| @conn = ssh @error = nil @failed_at = nil @updated_at = Time.now @connecting = false @run_queue.each do |cmd,session,callback| if session.dead? @logger.debug "Dropping pending run request from a dead session" @error = "session is dead" else EM.next_tick { run cmd, session, &callback } end end @run_queue.clear if @error ssh.close @@count.delete self.key end end end end rescue EventMachine::ConnectionError => e @@count.delete self.key @conn = nil @error = e @failed_at = Time.now @connecting = false @run_queue.each do |cmd,session,callback| callback.call self, :error end end end resolver = nil resfail = proc do |err| @error = err @logger.error "DNS Error: #{err}: #{@key}" @failed_at = Time.now EM::Timer.new(rand(1..10)) do EM.next_tick resolver end end if @host =~ /^[0-9a-f:.]+$/i p @host resolver = proc do @ip = @host starter[] end else resolver = proc do query = @@dns.submit_AAAA @host query.callback do |result| @ip = result.sample starter[] end query.errback do |err| if err == :dns_error_nodata query = @@dns.submit_A @host query.callback do |result| @ip = result.sample starter[] end query.errback { |err| resfail[err] } else resfail[err] end end end end EM.next_tick resolver end
unbind(session)
click to toggle source
# File lib/emptyd.rb, line 200 def unbind(session) raise IOError, "not bound" unless @sessions.include? session @run_queue.delete_if{|cmd,sess,cb| sess == session} @sessions.delete session end