class Cassandra::Protocol::V4::Decoder
Constants
- READY
Public Class Methods
new(handler, compressor = nil, custom_type_handlers = {})
click to toggle source
# File lib/cassandra/protocol/v4.rb 63 def initialize(handler, compressor = nil, custom_type_handlers = {}) 64 # In v4 the duration type is represented as a custom type so we always want to have 65 # a handler included here. This handler can be overridden via the connection options. 66 custom_type_handlers[Cassandra::Types::Duration.type] ||= Cassandra::Types::Duration 67 @handler = handler 68 @compressor = compressor 69 @state = :initial 70 @header = nil 71 @version = nil 72 @code = nil 73 @length = nil 74 @buffer = CqlByteBuffer.new 75 @custom_type_handlers = custom_type_handlers 76 end
Public Instance Methods
<<(data)
click to toggle source
# File lib/cassandra/protocol/v4.rb 78 def <<(data) 79 @buffer << data 80 81 __send__(:"decode_#{@state}", @buffer) 82 end
Private Instance Methods
actual_decode(buffer, fields, frame_length, code)
click to toggle source
# File lib/cassandra/protocol/v4.rb 169 def actual_decode(buffer, fields, frame_length, code) 170 protocol_version = (fields >> 24) & 0x7f 171 compression = ((fields >> 16) & 0x01) == 0x01 172 tracing = ((fields >> 16) & 0x02) == 0x02 173 payload = ((fields >> 16) & 0x04) == 0x04 174 warning = ((fields >> 16) & 0x08) == 0x08 175 stream_id = fields & 0xffff 176 stream_id = (stream_id & 0x7fff) - (stream_id & 0x8000) 177 opcode = code & 0xff 178 179 # If we're dealing with a compressed body, read the whole body, decompress, 180 # and treat the uncompressed body as if that's what we got in the first place. 181 # This means, reset frame_length to that uncompressed size. 182 if compression 183 if @compressor 184 buffer = CqlByteBuffer.new( 185 @compressor.decompress(buffer.read(frame_length)) 186 ) 187 frame_length = buffer.size 188 else 189 raise Errors::DecodingError, 190 'Compressed frame received, but no compressor configured' 191 end 192 end 193 194 # We want to read one full frame; but after we read/parse chunks of the body 195 # there may be more cruft left in the frame that we don't care about. So, 196 # we save off the current size of the buffer, do all our reads for the 197 # frame, get the final remaining size, and based on that discard possible 198 # remaining bytes in the frame. In particular, we account for the possibility 199 # that the buffer contains some/all of a subsequent frame as well, and we 200 # don't want to mess with that. 201 202 buffer_starting_length = buffer.length 203 204 trace_id = (buffer.read_uuid if tracing) 205 206 warnings = (buffer.read_string_list if warning) 207 208 custom_payload = (buffer.read_bytes_map.freeze if payload) 209 210 remaining_frame_length = frame_length - 211 (buffer_starting_length - buffer.length) 212 response = decode_response(opcode, protocol_version, buffer, 213 remaining_frame_length, trace_id, custom_payload, 214 warnings) 215 216 # Calculate and discard remaining cruft in the frame. 217 extra_length = frame_length - (buffer_starting_length - buffer.length) 218 buffer.discard(extra_length) if extra_length > 0 219 220 if stream_id == -1 221 @handler.notify_event_listeners(response) 222 else 223 @handler.complete_request(stream_id, response) 224 end 225 end
decode_body(buffer)
click to toggle source
# File lib/cassandra/protocol/v4.rb 137 def decode_body(buffer) 138 frame_header = @header 139 frame_code = @code 140 frame_length = @length 141 buffer_length = buffer.length 142 143 until buffer_length < frame_length 144 actual_decode(buffer, frame_header, frame_length, frame_code) 145 buffer_length = buffer.length 146 147 if buffer_length < 9 148 @header = nil 149 @code = nil 150 @length = nil 151 @state = :header 152 153 return 154 end 155 156 frame_header = buffer.read_int 157 frame_code = buffer.read_byte 158 frame_length = buffer.read_int 159 buffer_length -= 9 160 end 161 162 @header = frame_header 163 @code = frame_code 164 @length = frame_length 165 166 nil 167 end
decode_header(buffer)
click to toggle source
# File lib/cassandra/protocol/v4.rb 113 def decode_header(buffer) 114 buffer_length = buffer.length 115 116 while buffer_length >= 9 117 frame_header = buffer.read_int 118 frame_code = buffer.read_byte 119 frame_length = buffer.read_int 120 121 if (buffer_length - 9) < frame_length 122 @header = frame_header 123 @code = frame_code 124 @length = frame_length 125 @state = :body 126 127 return 128 end 129 130 actual_decode(buffer, frame_header, frame_length, frame_code) 131 buffer_length = buffer.length 132 end 133 134 nil 135 end
decode_initial(buffer)
click to toggle source
# File lib/cassandra/protocol/v4.rb 88 def decode_initial(buffer) 89 return if buffer.length < 9 90 91 frame_header = buffer.read_int 92 protocol_version = (frame_header >> 24) & 0x7f 93 94 if protocol_version < 3 95 stream_id = (frame_header >> 8) & 0xff 96 stream_id = (stream_id & 0x7f) - (stream_id & 0x80) 97 98 error_response = ErrorResponse.new(nil, nil, 0x000A, 99 'Invalid or unsupported protocol version') 100 @handler.complete_request(stream_id, error_response) 101 102 return 103 end 104 105 @header = frame_header 106 @code = buffer.read_byte 107 @length = buffer.read_int 108 @state = :body 109 110 decode_body(buffer) 111 end
decode_response(opcode, protocol_version, buffer, size, trace_id, custom_payload, warnings)
click to toggle source
# File lib/cassandra/protocol/v4.rb 227 def decode_response(opcode, 228 protocol_version, 229 buffer, 230 size, 231 trace_id, 232 custom_payload, 233 warnings) 234 case opcode 235 when 0x00 # ERROR 236 code = buffer.read_int 237 message = buffer.read_string 238 239 case code 240 when 0x1000 241 UnavailableErrorResponse.new(custom_payload, 242 warnings, 243 code, 244 message, 245 buffer.read_consistency, 246 buffer.read_int, 247 buffer.read_int) 248 when 0x1100 249 WriteTimeoutErrorResponse.new(custom_payload, 250 warnings, 251 code, 252 message, 253 buffer.read_consistency, 254 buffer.read_int, 255 buffer.read_int, 256 buffer.read_string) 257 when 0x1200 258 ReadTimeoutErrorResponse.new(custom_payload, 259 warnings, 260 code, 261 message, 262 buffer.read_consistency, 263 buffer.read_int, 264 buffer.read_int, 265 (buffer.read_byte != 0)) 266 when 0x1300 267 cl = buffer.read_consistency 268 received = buffer.read_int 269 block_for = buffer.read_int 270 if protocol_version < 5 271 ReadFailureErrorResponse.new(custom_payload, 272 warnings, 273 code, 274 message, 275 cl, 276 received, 277 block_for, 278 buffer.read_int, 279 (buffer.read_byte != 0), 280 nil) 281 else 282 failures_by_node = buffer.read_reason_map 283 ReadFailureErrorResponse.new(custom_payload, 284 warnings, 285 code, 286 message, 287 cl, 288 received, 289 block_for, 290 nil, 291 (buffer.read_byte != 0), 292 failures_by_node) 293 end 294 when 0x1400 295 FunctionFailureErrorResponse.new(custom_payload, 296 warnings, 297 code, 298 message, 299 buffer.read_string, 300 buffer.read_string, 301 buffer.read_string_list) 302 when 0x1500 303 cl = buffer.read_consistency 304 received = buffer.read_int 305 block_for = buffer.read_int 306 if protocol_version < 5 307 WriteFailureErrorResponse.new(custom_payload, 308 warnings, 309 code, 310 message, 311 cl, 312 received, 313 block_for, 314 buffer.read_int, 315 buffer.read_string, 316 nil) 317 else 318 failures_by_node = buffer.read_reason_map 319 WriteFailureErrorResponse.new(custom_payload, 320 warnings, 321 code, 322 message, 323 cl, 324 received, 325 block_for, 326 nil, 327 buffer.read_string, 328 failures_by_node) 329 end 330 when 0x2400 331 AlreadyExistsErrorResponse.new(custom_payload, 332 warnings, 333 code, 334 message, 335 buffer.read_string, 336 buffer.read_string) 337 when 0x2500 338 UnpreparedErrorResponse.new(custom_payload, 339 warnings, 340 code, 341 message, 342 buffer.read_short_bytes) 343 else 344 ErrorResponse.new(custom_payload, warnings, code, message) 345 end 346 when 0x02 # READY 347 READY 348 when 0x03 # AUTHENTICATE 349 AuthenticateResponse.new(buffer.read_string) 350 when 0x06 # SUPPORTED 351 SupportedResponse.new(buffer.read_string_multimap) 352 when 0x08 # RESULT 353 result_type = buffer.read_int 354 case result_type 355 when 0x0001 # Void 356 VoidResultResponse.new(custom_payload, warnings, trace_id) 357 when 0x0002 # Rows 358 original_buffer_length = buffer.length 359 column_specs, paging_state = Coder.read_metadata_v4(buffer) 360 361 if column_specs.nil? 362 consumed_bytes = original_buffer_length - buffer.length 363 remaining_bytes = 364 CqlByteBuffer.new(buffer.read(size - consumed_bytes - 4)) 365 RawRowsResultResponse.new(custom_payload, 366 warnings, 367 protocol_version, 368 remaining_bytes, 369 paging_state, 370 trace_id, 371 @custom_type_handlers) 372 else 373 RowsResultResponse.new(custom_payload, 374 warnings, 375 Coder.read_values_v4(buffer, column_specs, @custom_type_handlers), 376 column_specs, 377 paging_state, 378 trace_id) 379 end 380 when 0x0003 # SetKeyspace 381 SetKeyspaceResultResponse.new(custom_payload, 382 warnings, 383 buffer.read_string, 384 trace_id) 385 when 0x0004 # Prepared 386 id = buffer.read_short_bytes 387 pk_idx, params_metadata = Coder.read_prepared_metadata_v4(buffer) 388 result_metadata = Coder.read_metadata_v4(buffer).first 389 390 PreparedResultResponse.new(custom_payload, 391 warnings, 392 id, 393 params_metadata, 394 result_metadata, 395 pk_idx, 396 trace_id) 397 when 0x0005 # SchemaChange 398 change = buffer.read_string 399 target = buffer.read_string 400 name = nil 401 arguments = EMPTY_LIST 402 403 case target 404 when Protocol::Constants::SCHEMA_CHANGE_TARGET_KEYSPACE 405 keyspace = buffer.read_string 406 when Protocol::Constants::SCHEMA_CHANGE_TARGET_TABLE, 407 Protocol::Constants::SCHEMA_CHANGE_TARGET_UDT 408 keyspace = buffer.read_string 409 name = buffer.read_string 410 when Protocol::Constants::SCHEMA_CHANGE_TARGET_FUNCTION, 411 Protocol::Constants::SCHEMA_CHANGE_TARGET_AGGREGATE 412 keyspace = buffer.read_string 413 name = buffer.read_string 414 arguments = buffer.read_string_list 415 else 416 raise Errors::DecodingError, 417 "Unsupported event target: #{target.inspect}" 418 end 419 420 SchemaChangeResultResponse.new(custom_payload, 421 warnings, 422 change, 423 keyspace, 424 name, 425 target, 426 arguments, 427 trace_id) 428 else 429 raise Errors::DecodingError, 430 "Unsupported result type: #{result_type.inspect}" 431 end 432 when 0x0C # EVENT 433 event_type = buffer.read_string 434 case event_type 435 when 'SCHEMA_CHANGE' 436 change = buffer.read_string 437 target = buffer.read_string 438 arguments = EMPTY_LIST 439 440 case target 441 when Protocol::Constants::SCHEMA_CHANGE_TARGET_KEYSPACE 442 keyspace = buffer.read_string 443 name = nil 444 when Protocol::Constants::SCHEMA_CHANGE_TARGET_TABLE, 445 Protocol::Constants::SCHEMA_CHANGE_TARGET_UDT, 446 Protocol::Constants::SCHEMA_CHANGE_TARGET_FUNCTION, 447 Protocol::Constants::SCHEMA_CHANGE_TARGET_AGGREGATE 448 keyspace = buffer.read_string 449 name = buffer.read_string 450 else 451 raise Errors::DecodingError, 452 "Unsupported event target: #{target.inspect}" 453 end 454 455 if target == Protocol::Constants::SCHEMA_CHANGE_TARGET_FUNCTION \ 456 || target == Protocol::Constants::SCHEMA_CHANGE_TARGET_AGGREGATE 457 arguments = buffer.read_string_list 458 end 459 460 SchemaChangeEventResponse.new(change, keyspace, name, target, arguments) 461 when 'STATUS_CHANGE' 462 StatusChangeEventResponse.new(buffer.read_string, *buffer.read_inet) 463 when 'TOPOLOGY_CHANGE' 464 TopologyChangeEventResponse.new(buffer.read_string, *buffer.read_inet) 465 else 466 raise Errors::DecodingError, 467 "Unsupported event type: #{event_type.inspect}" 468 end 469 when 0x0E # AUTH_CHALLENGE 470 AuthChallengeResponse.new(buffer.read_bytes) 471 when 0x10 # AUTH_SUCCESS 472 AuthSuccessResponse.new(buffer.read_bytes) 473 else 474 raise Errors::DecodingError, 475 "Unsupported response opcode: #{opcode.inspect}" 476 end 477 end