class MogileFS::NewFile::Stream
Attributes
md5[R]
to_io[R]
Public Class Methods
new(dests, opts)
click to toggle source
# File lib/mogilefs/new_file/stream.rb, line 12 def initialize(dests, opts) @opts = opts @md5 = nil @bytes_uploaded = 0 dests.each do |devid, path| begin uri = URI.parse(path) sock = MogileFS::Socket.tcp(uri.host, uri.port) start_sock(sock, uri) # raise on errors @to_io = sock @uri = uri @devid = devid if ! @md5 && @opts[:content_length] @writer = @to_io else @writer = MogileFS::Chunker.new(@to_io, @md5, @opts[:content_md5]) end return rescue SystemCallError => e sock.close if sock && ! sock.closed? errors ||= [] errors << "#{path} - #{e.message} (#{e.class})" end end raise NoStorageNodesError, "all paths failed with PUT: #{errors.join(', ')}", [] end
Public Instance Methods
commit()
click to toggle source
# File lib/mogilefs/new_file/stream.rb, line 49 def commit @writer.flush clen = @opts[:content_length] if clen && @bytes_uploaded != clen raise MogileFS::SizeMismatchError, "did not upload expected content_length: #{clen} uploaded: " \ "#@bytes_uploaded" end read_response(@to_io) # raises on errors create_close(@devid, @uri, @bytes_uploaded) ensure @to_io.close if @to_io && ! @to_io.closed? end
start_sock(sock, uri)
click to toggle source
# File lib/mogilefs/new_file/stream.rb, line 64 def start_sock(sock, uri) host_with_port = "#{uri.host}:#{uri.port}" headers = "PUT #{uri.request_uri} HTTP/1.1\r\n" \ "Host: #{host_with_port}\r\n" content_md5 = @opts[:content_md5] if String === content_md5 headers << "Content-MD5: #{content_md5}\r\n" elsif content_md5.respond_to?(:call) || :trailer == content_md5 || MD5_TRAILER_NODES[host_with_port] @md5 = Digest::MD5.new headers << "Trailer: Content-MD5\r\n".freeze end if ! @md5 && clen = @opts[:content_length] headers << "Content-Length: #{clen}\r\n" else headers << "Transfer-Encoding: chunked\r\n".freeze end sock.write(headers << "\r\n".freeze) end
write(buf)
click to toggle source
# File lib/mogilefs/new_file/stream.rb, line 41 def write(buf) buf = String buf return 0 if buf.empty? rv = @writer.write(buf) @bytes_uploaded += rv rv end
Also aliased as: syswrite