class Moneta::Server::Connection
@api private
Public Class Methods
new(io, store, max_size)
click to toggle source
# File lib/moneta/server.rb, line 14 def initialize(io, store, max_size) @io = io @store = store @max_size = max_size @fiber = Fiber.new { run } end
Public Instance Methods
resume(result = nil)
click to toggle source
# File lib/moneta/server.rb, line 21 def resume(result = nil) @fiber.resume result end
Private Instance Methods
dispatch(method, args)
click to toggle source
# File lib/moneta/server.rb, line 41 def dispatch(method, args) case method when :key?, :load, :delete, :increment, :create, :features @store.public_send(method, *args) when :store, :clear @store.public_send(method, *args) nil when :each_key yield_each(@store.each_key) nil end rescue => ex ex end
pack(obj)
click to toggle source
# File lib/moneta/server.rb, line 127 def pack(obj) s = Marshal.dump(obj) [s.bytesize].pack('N') << s end
read(len)
click to toggle source
# File lib/moneta/server.rb, line 68 def read(len) buffer = '' loop do begin case received = @io.recv_nonblock(len) when '', nil throw :closed, 'Closed during read' else buffer << received len -= received.bytesize end rescue IO::WaitReadable, IO::WaitWritable yield_to_reactor(:read) rescue Errno::ECONNRESET throw :closed, 'Closed during read' rescue IOError => ex if ex.message =~ /closed stream/ throw :closed, 'Closed during read' else raise end end break if len == 0 end buffer end
read_msg()
click to toggle source
# File lib/moneta/server.rb, line 62 def read_msg size = read(4).unpack1('N') throw :closed, 'Message too big' if size > @max_size Marshal.load(read(size)) end
run()
click to toggle source
The return value of this function will be sent to the reactor.
@return [:closed,Exception]
# File lib/moneta/server.rb, line 30 def run catch :closed do loop { write_dispatch(read_msg) } end :closed rescue => ex ex ensure @io.close unless @io.closed? end
sendmsg(msg)
click to toggle source
# File lib/moneta/server.rb, line 112 def sendmsg(msg) @io.sendmsg_nonblock(msg) end
write(obj)
click to toggle source
# File lib/moneta/server.rb, line 95 def write(obj) buffer = pack(obj) until buffer.empty? begin len = sendmsg(buffer) buffer = buffer.byteslice(len...buffer.length) rescue IO::WaitWritable, Errno::EINTR yield_to_reactor(:write) end end nil end
write_dispatch(msg)
click to toggle source
# File lib/moneta/server.rb, line 56 def write_dispatch(msg) method, *args = msg result = dispatch(method, args) write(result) end
yield_each(enumerator)
click to toggle source
# File lib/moneta/server.rb, line 132 def yield_each(enumerator) received_break = false loop do case msg = read_msg when %w{NEXT} # This will raise a StopIteration at the end of the enumeration, # which will exit the loop. write(enumerator.next) when %w{BREAK} # This is received when the client wants to stop the enumeration. received_break = true break else # Otherwise, the client is attempting to call another method within # an `each` block. write_dispatch(msg) end end ensure # This tells the client to stop enumerating write(StopIteration.new("Server initiated stop")) unless received_break end
yield_to_reactor(mode = :read)
click to toggle source
# File lib/moneta/server.rb, line 121 def yield_to_reactor(mode = :read) if Fiber.yield(mode) == :close throw :closed, 'Closed by reactor' end end