class Rubbis::Handler
Attributes
buffer[R]
client[R]
server[R]
tx[R]
Public Class Methods
new(socket, server)
click to toggle source
# File lib/rubbis/handler.rb, line 7 def initialize(socket, server) @client = socket @server = server @buffer = "" reset_tx! end
Public Instance Methods
active?()
click to toggle source
# File lib/rubbis/handler.rb, line 56 def active? client end
disconnect!(state)
click to toggle source
# File lib/rubbis/handler.rb, line 60 def disconnect!(state) state.unsubscribe_all(self) @client = nil end
dispatch(state, cmd)
click to toggle source
# File lib/rubbis/handler.rb, line 65 def dispatch(state, cmd) case cmd[0].downcase when 'bgsave' then server.bgsave; :ok when 'multi' then tx.start!; :ok when 'watch' then curr_tx = tx state.watch(cmd[1]) { tx.dirty! if curr_tx == tx } else state.apply_command(self, cmd) end end
process!(state)
click to toggle source
# File lib/rubbis/handler.rb, line 18 def process!(state) buffer << client.read_nonblock(1024) cmds, processed = Rubbis::Protocol.unmarshal(buffer) @buffer = buffer[processed..-1] cmds.each do |cmd| response = if tx.active? case cmd[0].downcase when 'exec' result = tx.buffer.map do |cmd| dispatch state, cmd end unless tx.dirty? reset_tx! result else tx.queue cmd :queued end else dispatch state, cmd end unless response == :block respond! response end state.process_list_watches! end end
reset_tx!()
click to toggle source
# File lib/rubbis/handler.rb, line 14 def reset_tx! @tx = Transaction.new end
respond!(response)
click to toggle source
# File lib/rubbis/handler.rb, line 49 def respond!(response) if active? server.commit! client.write Rubbis::Protocol.marshal(response) end end