class Rubbis::Server
Attributes
aof_file[R]
clock[R]
command_log[R]
server_file[R]
shutdown_pipe[R]
state[R]
Public Class Methods
new(opts = {})
click to toggle source
# File lib/rubbis/server.rb, line 22 def initialize(opts = {}) @port = opts[:port] @shutdown_pipe = IO.pipe @clock = Clock.new @state = State.new(@clock) @server_file = opts[:server_file] # @aof_file = opts[:aof_file] @aof_file = false end
Public Instance Methods
apply_log(contents)
click to toggle source
# File lib/rubbis/server.rb, line 93 def apply_log(contents) Handler.new(AofClient.new(contents), self).process!(state) end
bgrewriteaof()
click to toggle source
# File lib/rubbis/server.rb, line 48 def bgrewriteaof return unless aof_file begin tmpf = Tempfile.new(File.basename(aof_file)) state.minimal_log.each do |cmd| tmpf.write Rubbis::Protocol.marshal(cmd) end tmpf.close command_log.close FileUtils.mv(tmpf, aof_file) @command_log = File.open(aof_file, 'a') :ok ensure if tmpf tmpf.close tmpf.unlink end end end
bgsave()
click to toggle source
# File lib/rubbis/server.rb, line 32 def bgsave return unless server_file begin tmpf = Tempfile.new(File.basename(server_file)) tmpf.write state.serialize tmpf.close FileUtils.mv(tmpf, server_file) :ok ensure if tmpf tmpf.close tmpf.unlink end end end
check_background_processes!()
click to toggle source
# File lib/rubbis/server.rb, line 80 def check_background_processes! end
commit!()
click to toggle source
# File lib/rubbis/server.rb, line 70 def commit! return unless command_log state.log.each do |cmd| command_log.write Rubbis::Protocol.marshal(cmd) end command_log.fsync state.log.clear end
listen()
click to toggle source
# File lib/rubbis/server.rb, line 97 def listen readable = [] clients = {} running = true server = TCPServer.new("127.0.0.1", port.to_i) expire_pipe = IO.pipe readable << server readable << shutdown_pipe[0] readable << expire_pipe[0] @command_log = File.open(aof_file, 'a') if aof_file if aof_file && File.exists?(aof_file) apply_log File.read(aof_file) elsif server_file && File.exists?(server_file) @state.deserialize File.read(server_file) end timer_thread = Thread.new do begin while running sleep 0.1 expire_pipe[1].write '.' end rescue Errno::EPIPE, IOError end end while running ready_to_read, _ = IO.select(readable + clients.keys) ready_to_read.each do |socket| case socket when server child_socket = socket.accept_nonblock clients[child_socket] = Handler.new(child_socket, self) when shutdown_pipe[0] running = false when expire_pipe[0] state.expire_keys! check_background_processes! else begin clients[socket].process!(state) rescue EOFError handler = clients.delete(socket) handler.disconnect!(state) socket.close end end end end ensure running = false (readable + clients.keys).each do |file| file.close end expire_pipe[1].close if expire_pipe timer_thread.join if timer_thread end
shutdown()
click to toggle source
# File lib/rubbis/server.rb, line 84 def shutdown shutdown_pipe[1].close end
Private Instance Methods
port()
click to toggle source
# File lib/rubbis/server.rb, line 163 def port @port end