class RedisClient::RubyConnection::BufferedIO
Constants
- ENCODING
- EOL
- EOL_SIZE
Attributes
read_timeout[RW]
write_timeout[RW]
Public Class Methods
new(io, read_timeout:, write_timeout:, chunk_size: 4096)
click to toggle source
# File lib/redis_client/ruby_connection/buffered_io.rb, line 16 def initialize(io, read_timeout:, write_timeout:, chunk_size: 4096) @io = io @buffer = +"" @offset = 0 @chunk_size = chunk_size @read_timeout = read_timeout @write_timeout = write_timeout @blocking_reads = false end
Public Instance Methods
close()
click to toggle source
# File lib/redis_client/ruby_connection/buffered_io.rb, line 89 def close @io.to_io.close end
closed?()
click to toggle source
# File lib/redis_client/ruby_connection/buffered_io.rb, line 93 def closed? @io.to_io.closed? end
ensure_line()
click to toggle source
# File lib/redis_client/ruby_connection/buffered_io.rb, line 44 def ensure_line fill_buffer(false) if @offset >= @buffer.bytesize until @buffer.byteindex(EOL, @offset) fill_buffer(false) end end
eof?()
click to toggle source
# File lib/redis_client/ruby_connection/buffered_io.rb, line 97 def eof? @offset >= @buffer.bytesize && @io.eof? end
getbyte()
click to toggle source
# File lib/redis_client/ruby_connection/buffered_io.rb, line 150 def getbyte unless byte = @buffer.getbyte(@offset) ensure_remaining(1) byte = @buffer.getbyte(@offset) end @offset += 1 byte end
gets_chomp()
click to toggle source
# File lib/redis_client/ruby_connection/buffered_io.rb, line 26 def gets_chomp fill_buffer(false) if @offset >= @buffer.bytesize until eol_index = @buffer.byteindex(EOL, @offset) fill_buffer(false) end line = @buffer.byteslice(@offset, eol_index - @offset) @offset = eol_index + EOL_SIZE line end
gets_integer()
click to toggle source
# File lib/redis_client/ruby_connection/buffered_io.rb, line 159 def gets_integer int = 0 offset = @offset while true chr = @buffer.getbyte(offset) if chr if chr == 13 # "\r".ord @offset = offset + 2 break else int = (int * 10) + chr - 48 end offset += 1 else ensure_line return gets_integer end end int end
read_chomp(bytes)
click to toggle source
# File lib/redis_client/ruby_connection/buffered_io.rb, line 37 def read_chomp(bytes) ensure_remaining(bytes + EOL_SIZE) str = @buffer.byteslice(@offset, bytes) @offset += bytes + EOL_SIZE str end
skip(offset)
click to toggle source
# File lib/redis_client/ruby_connection/buffered_io.rb, line 121 def skip(offset) ensure_remaining(offset) @offset += offset nil end
with_timeout(new_timeout) { || ... }
click to toggle source
# File lib/redis_client/ruby_connection/buffered_io.rb, line 101 def with_timeout(new_timeout) new_timeout = false if new_timeout == 0 previous_read_timeout = @read_timeout previous_blocking_reads = @blocking_reads if new_timeout @read_timeout = new_timeout else @blocking_reads = true end begin yield ensure @read_timeout = previous_read_timeout @blocking_reads = previous_blocking_reads end end
write(string)
click to toggle source
# File lib/redis_client/ruby_connection/buffered_io.rb, line 127 def write(string) total = remaining = string.bytesize loop do case bytes_written = @io.write_nonblock(string, exception: false) when Integer remaining -= bytes_written if remaining > 0 string = string.byteslice(bytes_written..-1) else return total end when :wait_readable @io.to_io.wait_readable(@read_timeout) or raise(ReadTimeoutError, "Waited #{@read_timeout} seconds") when :wait_writable @io.to_io.wait_writable(@write_timeout) or raise(WriteTimeoutError, "Waited #{@write_timeout} seconds") when nil raise Errno::ECONNRESET else raise "Unexpected `write_nonblock` return: #{bytes.inspect}" end end end
Private Instance Methods
ensure_remaining(bytes)
click to toggle source
# File lib/redis_client/ruby_connection/buffered_io.rb, line 184 def ensure_remaining(bytes) needed = bytes - (@buffer.bytesize - @offset) if needed > 0 fill_buffer(true, needed) end end
fill_buffer(strict, size = @chunk_size)
click to toggle source
# File lib/redis_client/ruby_connection/buffered_io.rb, line 191 def fill_buffer(strict, size = @chunk_size) remaining = size buffer_size = @buffer.bytesize start = @offset - buffer_size empty_buffer = start >= 0 loop do bytes = if empty_buffer @io.read_nonblock([remaining, @chunk_size].max, @buffer, exception: false) else @io.read_nonblock([remaining, @chunk_size].max, exception: false) end case bytes when :wait_readable # Ref: https://github.com/redis-rb/redis-client/issues/190 # SSLSocket always clear the provided buffer, even when it didn't # read anything. So we need to reset the offset accordingly. if empty_buffer && @buffer.empty? @offset -= buffer_size end unless @io.to_io.wait_readable(@read_timeout) raise ReadTimeoutError, "Waited #{@read_timeout} seconds" unless @blocking_reads end when :wait_writable # Ref: https://github.com/redis-rb/redis-client/issues/190 # SSLSocket always clear the provided buffer, even when it didn't # read anything. So we need to reset the offset accordingly. if empty_buffer && @buffer.empty? @offset -= buffer_size end @io.to_io.wait_writable(@write_timeout) or raise(WriteTimeoutError, "Waited #{@write_timeout} seconds") when nil raise EOFError else if empty_buffer @offset = start empty_buffer = false @buffer.force_encoding(ENCODING) unless @buffer.encoding == ENCODING else @buffer << bytes.force_encoding(ENCODING) end remaining -= bytes.bytesize return if !strict || remaining <= 0 end end end