class Cassandra::Protocol::V3::Decoder
Constants
- READY
Public Class Methods
new(handler, compressor = nil)
click to toggle source
# File lib/cassandra/protocol/v3.rb 53 def initialize(handler, compressor = nil) 54 @handler = handler 55 @compressor = compressor 56 @state = :initial 57 @header = nil 58 @version = nil 59 @code = nil 60 @length = nil 61 @buffer = CqlByteBuffer.new 62 end
Public Instance Methods
<<(data)
click to toggle source
# File lib/cassandra/protocol/v3.rb 64 def <<(data) 65 @buffer << data 66 67 __send__(:"decode_#{@state}", @buffer) 68 end
Private Instance Methods
actual_decode(buffer, fields, frame_length, code)
click to toggle source
# File lib/cassandra/protocol/v3.rb 155 def actual_decode(buffer, fields, frame_length, code) 156 protocol_version = (fields >> 24) & 0x7f 157 compression = (fields >> 16) & 0x01 158 tracing = (fields >> 16) & 0x02 159 stream_id = fields & 0xffff 160 stream_id = (stream_id & 0x7fff) - (stream_id & 0x8000) 161 opcode = code & 0xff 162 163 # If we're dealing with a compressed body, read the whole body, decompress, 164 # and treat the uncompressed body as if that's what we got in the first place. 165 # This means we must reset frame_length to that uncompressed size. 166 if compression == 1 167 if @compressor 168 buffer = CqlByteBuffer.new( 169 @compressor.decompress(buffer.read(frame_length)) 170 ) 171 frame_length = buffer.size 172 else 173 raise Errors::DecodingError, 174 'Compressed frame received, but no compressor configured' 175 end 176 end 177 178 # We want to read one full frame; but after we read/parse chunks of the body 179 # there may be more cruft left in the frame that we don't care about. So, 180 # we save off the current size of the buffer, do all our reads for the 181 # frame, get the final remaining size, and based on that discard possible 182 # remaining bytes in the frame. In particular, we account for the possibility 183 # that the buffer contains some/all of a subsequent frame as well, and we 184 # don't want to mess with that. 185 186 buffer_starting_length = buffer.length 187 188 trace_id = (buffer.read_uuid if tracing == 2) 189 190 remaining_frame_length = frame_length - 191 (buffer_starting_length - buffer.length) 192 response = decode_response(opcode, protocol_version, buffer, 193 remaining_frame_length, trace_id) 194 195 # Calculate and discard remaining cruft in the frame. 196 extra_length = frame_length - (buffer_starting_length - buffer.length) 197 buffer.discard(extra_length) if extra_length > 0 198 199 if stream_id == -1 200 @handler.notify_event_listeners(response) 201 else 202 @handler.complete_request(stream_id, response) 203 end 204 end
decode_body(buffer)
click to toggle source
# File lib/cassandra/protocol/v3.rb 123 def decode_body(buffer) 124 frame_header = @header 125 frame_code = @code 126 frame_length = @length 127 buffer_length = buffer.length 128 129 until buffer_length < frame_length 130 actual_decode(buffer, frame_header, frame_length, frame_code) 131 buffer_length = buffer.length 132 133 if buffer_length < 9 134 @header = nil 135 @code = nil 136 @length = nil 137 @state = :header 138 139 return 140 end 141 142 frame_header = buffer.read_int 143 frame_code = buffer.read_byte 144 frame_length = buffer.read_int 145 buffer_length -= 9 146 end 147 148 @header = frame_header 149 @code = frame_code 150 @length = frame_length 151 152 nil 153 end
decode_header(buffer)
click to toggle source
# File lib/cassandra/protocol/v3.rb 99 def decode_header(buffer) 100 buffer_length = buffer.length 101 102 while buffer_length >= 9 103 frame_header = buffer.read_int 104 frame_code = buffer.read_byte 105 frame_length = buffer.read_int 106 107 if (buffer_length - 9) < frame_length 108 @header = frame_header 109 @code = frame_code 110 @length = frame_length 111 @state = :body 112 113 return 114 end 115 116 actual_decode(buffer, frame_header, frame_length, frame_code) 117 buffer_length = buffer.length 118 end 119 120 nil 121 end
decode_initial(buffer)
click to toggle source
# File lib/cassandra/protocol/v3.rb 74 def decode_initial(buffer) 75 return if buffer.length < 9 76 77 frame_header = buffer.read_int 78 protocol_version = (frame_header >> 24) & 0x7f 79 80 if protocol_version < 3 81 stream_id = (frame_header >> 8) & 0xff 82 stream_id = (stream_id & 0x7f) - (stream_id & 0x80) 83 84 error_response = ErrorResponse.new(nil, nil, 0x000A, 85 'Invalid or unsupported protocol version') 86 @handler.complete_request(stream_id, error_response) 87 88 return 89 end 90 91 @header = frame_header 92 @code = buffer.read_byte 93 @length = buffer.read_int 94 @state = :body 95 96 decode_body(buffer) 97 end
decode_response(opcode, protocol_version, buffer, size, trace_id)
click to toggle source
# File lib/cassandra/protocol/v3.rb 206 def decode_response(opcode, protocol_version, buffer, size, trace_id) 207 case opcode 208 when 0x00 # ERROR 209 code = buffer.read_int 210 message = buffer.read_string 211 212 case code 213 when 0x1000 214 UnavailableErrorResponse.new(nil, 215 nil, 216 code, 217 message, 218 buffer.read_consistency, 219 buffer.read_int, 220 buffer.read_int) 221 when 0x1100 222 WriteTimeoutErrorResponse.new(nil, 223 nil, 224 code, 225 message, 226 buffer.read_consistency, 227 buffer.read_int, 228 buffer.read_int, 229 buffer.read_string) 230 when 0x1200 231 ReadTimeoutErrorResponse.new(nil, 232 nil, 233 code, 234 message, 235 buffer.read_consistency, 236 buffer.read_int, 237 buffer.read_int, 238 (buffer.read_byte != 0)) 239 when 0x2400 240 AlreadyExistsErrorResponse.new(nil, 241 nil, 242 code, 243 message, 244 buffer.read_string, 245 buffer.read_string) 246 when 0x2500 247 UnpreparedErrorResponse.new(nil, 248 nil, 249 code, 250 message, 251 buffer.read_short_bytes) 252 else 253 ErrorResponse.new(nil, nil, code, message) 254 end 255 when 0x02 # READY 256 READY 257 when 0x03 # AUTHENTICATE 258 AuthenticateResponse.new(buffer.read_string) 259 when 0x06 # SUPPORTED 260 SupportedResponse.new(buffer.read_string_multimap) 261 when 0x08 # RESULT 262 result_type = buffer.read_int 263 case result_type 264 when 0x0001 # Void 265 VoidResultResponse.new(nil, nil, trace_id) 266 when 0x0002 # Rows 267 original_buffer_length = buffer.length 268 column_specs, paging_state = Coder.read_metadata_v3(buffer) 269 270 if column_specs.nil? 271 consumed_bytes = original_buffer_length - buffer.length 272 remaining_bytes = 273 CqlByteBuffer.new(buffer.read(size - consumed_bytes - 4)) 274 RawRowsResultResponse.new(nil, 275 nil, 276 protocol_version, 277 remaining_bytes, 278 paging_state, 279 trace_id, 280 nil) 281 else 282 RowsResultResponse.new(nil, 283 nil, 284 Coder.read_values_v3(buffer, column_specs), 285 column_specs, 286 paging_state, 287 trace_id) 288 end 289 when 0x0003 # SetKeyspace 290 SetKeyspaceResultResponse.new(nil, nil, buffer.read_string, trace_id) 291 when 0x0004 # Prepared 292 id = buffer.read_short_bytes 293 params_metadata = Coder.read_metadata_v3(buffer).first 294 result_metadata = nil 295 result_metadata = Coder.read_metadata_v3(buffer).first if protocol_version > 1 296 297 PreparedResultResponse.new(nil, 298 nil, 299 id, 300 params_metadata, 301 result_metadata, 302 nil, 303 trace_id) 304 when 0x0005 # SchemaChange 305 change = buffer.read_string 306 target = buffer.read_string 307 keyspace = buffer.read_string 308 arguments = EMPTY_LIST 309 name = nil 310 311 name = buffer.read_string if target == Constants::SCHEMA_CHANGE_TARGET_TABLE 312 313 SchemaChangeResultResponse.new(nil, 314 nil, 315 change, 316 keyspace, 317 name, 318 target, 319 arguments, 320 nil) 321 else 322 raise Errors::DecodingError, 323 "Unsupported result type: #{result_type.inspect}" 324 end 325 when 0x0C # EVENT 326 event_type = buffer.read_string 327 case event_type 328 when 'SCHEMA_CHANGE' 329 change = buffer.read_string 330 target = buffer.read_string 331 keyspace = buffer.read_string 332 name = nil 333 arguments = EMPTY_LIST 334 335 name = buffer.read_string if target == Constants::SCHEMA_CHANGE_TARGET_TABLE 336 337 SchemaChangeEventResponse.new(change, keyspace, name, target, arguments) 338 when 'STATUS_CHANGE' 339 StatusChangeEventResponse.new(buffer.read_string, *buffer.read_inet) 340 when 'TOPOLOGY_CHANGE' 341 TopologyChangeEventResponse.new(buffer.read_string, *buffer.read_inet) 342 else 343 raise Errors::DecodingError, 344 "Unsupported event type: #{event_type.inspect}" 345 end 346 when 0x0E # AUTH_CHALLENGE 347 AuthChallengeResponse.new(buffer.read_bytes) 348 when 0x10 # AUTH_SUCCESS 349 AuthSuccessResponse.new(buffer.read_bytes) 350 else 351 raise Errors::DecodingError, 352 "Unsupported response opcode: #{opcode.inspect}" 353 end 354 end