class Cassandra::Protocol::V1::Decoder
Constants
- CODE_AUTHENTICATE
- CODE_AUTH_CHALLENGE
- CODE_AUTH_SUCCESS
- CODE_ERROR
- CODE_EVENT
- CODE_READY
- CODE_RESULT
- CODE_SUPPORTED
- READY
Public Class Methods
new(handler, compressor = nil)
click to toggle source
# File lib/cassandra/protocol/v1.rb 52 def initialize(handler, compressor = nil) 53 @handler = handler 54 @compressor = compressor 55 @state = :header 56 @header = nil 57 @length = nil 58 @buffer = CqlByteBuffer.new 59 end
Public Instance Methods
<<(data)
click to toggle source
# File lib/cassandra/protocol/v1.rb 61 def <<(data) 62 @buffer << data 63 64 __send__(:"decode_#{@state}", @buffer) 65 end
Private Instance Methods
actual_decode(buffer, fields, size)
click to toggle source
# File lib/cassandra/protocol/v1.rb 121 def actual_decode(buffer, fields, size) 122 protocol_version = (fields >> 24) & 0x7f 123 compression = (fields >> 16) & 0x01 124 tracing = (fields >> 16) & 0x02 125 stream_id = (fields >> 8) & 0xff 126 stream_id = (stream_id & 0x7f) - (stream_id & 0x80) 127 opcode = fields & 0xff 128 129 if compression == 1 130 if @compressor 131 buffer = CqlByteBuffer.new(@compressor.decompress(buffer.read(size))) 132 size = buffer.size 133 else 134 raise Errors::DecodingError, 135 'Compressed frame received, but no compressor configured' 136 end 137 end 138 139 if tracing == 2 140 trace_id = buffer.read_uuid 141 size -= 16 142 else 143 trace_id = nil 144 end 145 146 extra_length = buffer.length - size 147 response = decode_response(opcode, protocol_version, buffer, size, trace_id) 148 149 buffer.discard(buffer.length - extra_length) if buffer.length > extra_length 150 151 if stream_id == -1 152 @handler.notify_event_listeners(response) 153 else 154 @handler.complete_request(stream_id, response) 155 end 156 end
decode_body(buffer)
click to toggle source
# File lib/cassandra/protocol/v1.rb 93 def decode_body(buffer) 94 frame_header = @header 95 frame_length = @length 96 buffer_length = buffer.length 97 98 until buffer_length < frame_length 99 actual_decode(buffer, frame_header, frame_length) 100 buffer_length = buffer.length 101 102 if buffer_length < 8 103 @header = nil 104 @length = nil 105 @state = :header 106 107 return 108 end 109 110 frame_header = buffer.read_int 111 frame_length = buffer.read_int 112 buffer_length -= 8 113 end 114 115 @header = frame_header 116 @length = frame_length 117 118 nil 119 end
decode_header(buffer)
click to toggle source
# File lib/cassandra/protocol/v1.rb 71 def decode_header(buffer) 72 buffer_length = buffer.length 73 74 while buffer_length >= 8 75 frame_header = buffer.read_int 76 frame_length = buffer.read_int 77 78 if (buffer_length - 8) < frame_length 79 @header = frame_header 80 @length = frame_length 81 @state = :body 82 83 return 84 end 85 86 actual_decode(buffer, frame_header, frame_length) 87 buffer_length = buffer.length 88 end 89 90 nil 91 end
decode_response(opcode, protocol_version, buffer, size, trace_id)
click to toggle source
# File lib/cassandra/protocol/v1.rb 167 def decode_response(opcode, protocol_version, buffer, size, trace_id) 168 case opcode 169 when CODE_READY then 170 READY 171 when CODE_AUTHENTICATE then 172 AuthenticateResponse.new(buffer.read_string) 173 when CODE_AUTH_CHALLENGE then 174 AuthChallengeResponse.new(buffer.read_bytes) 175 when CODE_AUTH_SUCCESS then 176 AuthSuccessResponse.new(buffer.read_bytes) 177 when CODE_SUPPORTED then 178 SupportedResponse.new(buffer.read_string_multimap) 179 when CODE_ERROR 180 code = buffer.read_int 181 message = buffer.read_string 182 183 case code 184 when 0x1000 then 185 UnavailableErrorResponse.new(nil, 186 nil, 187 code, 188 message, 189 buffer.read_consistency, 190 buffer.read_int, 191 buffer.read_int) 192 when 0x1100 then 193 WriteTimeoutErrorResponse.new(nil, 194 nil, 195 code, 196 message, 197 buffer.read_consistency, 198 buffer.read_int, 199 buffer.read_int, 200 buffer.read_string) 201 when 0x1200 then 202 ReadTimeoutErrorResponse.new(nil, 203 nil, 204 code, 205 message, 206 buffer.read_consistency, 207 buffer.read_int, 208 buffer.read_int, 209 (buffer.read_byte != 0)) 210 when 0x2400 then 211 AlreadyExistsErrorResponse.new(nil, 212 nil, 213 code, 214 message, 215 buffer.read_string, 216 buffer.read_string) 217 when 0x2500 then 218 UnpreparedErrorResponse.new(nil, 219 nil, 220 code, 221 message, 222 buffer.read_short_bytes) 223 else 224 ErrorResponse.new(nil, nil, code, message) 225 end 226 when CODE_RESULT 227 result_type = buffer.read_int 228 case result_type 229 when 0x0001 # Void 230 VoidResultResponse.new(nil, nil, trace_id) 231 when 0x0002 # Rows 232 original_buffer_length = buffer.length 233 column_specs, paging_state = Coder.read_metadata_v1(buffer) 234 235 if column_specs.nil? 236 consumed_bytes = original_buffer_length - buffer.length 237 remaining_bytes = 238 CqlByteBuffer.new(buffer.read(size - consumed_bytes - 4)) 239 RawRowsResultResponse.new(nil, 240 nil, 241 protocol_version, 242 remaining_bytes, 243 paging_state, 244 trace_id, 245 nil) 246 else 247 RowsResultResponse.new(nil, 248 nil, 249 Coder.read_values_v1(buffer, column_specs), 250 column_specs, 251 paging_state, 252 trace_id) 253 end 254 when 0x0003 # SetKeyspace 255 SetKeyspaceResultResponse.new(nil, nil, buffer.read_string, trace_id) 256 when 0x0004 # Prepared 257 id = buffer.read_short_bytes 258 params_metadata = Coder.read_metadata_v1(buffer).first 259 result_metadata = nil 260 result_metadata = Coder.read_metadata_v1(buffer).first if protocol_version > 1 261 262 PreparedResultResponse.new(nil, 263 nil, 264 id, 265 params_metadata, 266 result_metadata, 267 nil, 268 trace_id) 269 when 0x0005 # SchemaChange 270 change = buffer.read_string 271 keyspace = buffer.read_string 272 name = buffer.read_string 273 arguments = EMPTY_LIST 274 target = nil 275 276 if name.empty? 277 name = nil 278 target = Constants::SCHEMA_CHANGE_TARGET_KEYSPACE 279 else 280 target = Constants::SCHEMA_CHANGE_TARGET_TABLE 281 end 282 283 SchemaChangeResultResponse.new(nil, 284 nil, 285 change, 286 keyspace, 287 name, 288 target, 289 arguments, 290 nil) 291 else 292 raise Errors::DecodingError, 293 "Unsupported result type: #{result_type.inspect}" 294 end 295 when CODE_EVENT 296 event_type = buffer.read_string 297 case event_type 298 when 'SCHEMA_CHANGE' 299 change = buffer.read_string 300 keyspace = buffer.read_string 301 name = buffer.read_string 302 arguments = EMPTY_LIST 303 304 if name.empty? 305 name = nil 306 target = Constants::SCHEMA_CHANGE_TARGET_KEYSPACE 307 else 308 target = Constants::SCHEMA_CHANGE_TARGET_TABLE 309 end 310 311 SchemaChangeEventResponse.new(change, keyspace, name, target, arguments) 312 when 'STATUS_CHANGE' 313 StatusChangeEventResponse.new(buffer.read_string, *buffer.read_inet) 314 when 'TOPOLOGY_CHANGE' 315 TopologyChangeEventResponse.new(buffer.read_string, *buffer.read_inet) 316 else 317 raise Errors::DecodingError, "Unsupported event type: #{event_type.inspect}" 318 end 319 else 320 raise Errors::DecodingError, "Unsupported response opcode: #{opcode.inspect}" 321 end 322 end