class Rubbis::Server

Attributes

aof_file[R]
clock[R]
command_log[R]
server_file[R]
shutdown_pipe[R]
state[R]

Public Class Methods

new(opts = {}) click to toggle source
# File lib/rubbis/server.rb, line 22
def initialize(opts = {})
    @port          = opts[:port]
    @shutdown_pipe = IO.pipe
    @clock         = Clock.new
    @state         = State.new(@clock)
    @server_file   = opts[:server_file]
    # @aof_file    = opts[:aof_file]
    @aof_file      = false
end

Public Instance Methods

apply_log(contents) click to toggle source
# File lib/rubbis/server.rb, line 93
def apply_log(contents)
    Handler.new(AofClient.new(contents), self).process!(state)
end
bgrewriteaof() click to toggle source
# File lib/rubbis/server.rb, line 48
def bgrewriteaof
    return unless aof_file
    begin
        tmpf = Tempfile.new(File.basename(aof_file))
        state.minimal_log.each do |cmd|
            tmpf.write Rubbis::Protocol.marshal(cmd)
        end 

        tmpf.close

        command_log.close
        FileUtils.mv(tmpf, aof_file)
        @command_log = File.open(aof_file, 'a')
        :ok
    ensure
        if tmpf
            tmpf.close
            tmpf.unlink
        end
    end
end
bgsave() click to toggle source
# File lib/rubbis/server.rb, line 32
def bgsave
    return unless server_file
    begin
        tmpf = Tempfile.new(File.basename(server_file))
        tmpf.write state.serialize
        tmpf.close
        FileUtils.mv(tmpf, server_file)
        :ok
    ensure
        if tmpf
            tmpf.close
            tmpf.unlink
        end
    end
end
check_background_processes!() click to toggle source
# File lib/rubbis/server.rb, line 80
def check_background_processes!

end
commit!() click to toggle source
# File lib/rubbis/server.rb, line 70
def commit!
    return unless command_log

    state.log.each do |cmd|
        command_log.write Rubbis::Protocol.marshal(cmd)
    end
    command_log.fsync
    state.log.clear
end
listen() click to toggle source
# File lib/rubbis/server.rb, line 97
def listen
    readable = []
    clients = {}
    running = true

    server = TCPServer.new("127.0.0.1", port.to_i)
    
    expire_pipe = IO.pipe
    readable << server
    readable << shutdown_pipe[0]
    readable << expire_pipe[0]

    @command_log = File.open(aof_file, 'a') if aof_file

    if aof_file && File.exists?(aof_file)
        apply_log File.read(aof_file)
    elsif server_file && File.exists?(server_file)
        @state.deserialize File.read(server_file)
    end

    timer_thread = Thread.new do
        begin
            while running
                sleep 0.1
                expire_pipe[1].write '.'
            end
        rescue Errno::EPIPE, IOError
        end
    end

    while running
        ready_to_read, _ = IO.select(readable + clients.keys)
        ready_to_read.each do |socket|
            case socket
            when server
                child_socket = socket.accept_nonblock
                clients[child_socket] = Handler.new(child_socket, self)
            when shutdown_pipe[0]
                running = false
            when expire_pipe[0]
                state.expire_keys!
                check_background_processes!
            else
                begin
                    clients[socket].process!(state)
                rescue EOFError
                    handler = clients.delete(socket)
                    handler.disconnect!(state)
                    socket.close
                end
            end
        end
    end
ensure
    running = false
    (readable + clients.keys).each do |file|
        file.close
    end
    expire_pipe[1].close if expire_pipe
    timer_thread.join if timer_thread
end
shutdown() click to toggle source
# File lib/rubbis/server.rb, line 84
def shutdown
    shutdown_pipe[1].close
end

Private Instance Methods

port() click to toggle source
# File lib/rubbis/server.rb, line 163
def port
    @port
end