class Cassandra::Protocol::V4::Decoder

Constants

READY

Public Class Methods

new(handler, compressor = nil, custom_type_handlers = {}) click to toggle source
   # File lib/cassandra/protocol/v4.rb
63 def initialize(handler, compressor = nil, custom_type_handlers = {})
64   # In v4 the duration type is represented as a custom type so we always want to have
65   # a handler included here.  This handler can be overridden via the connection options.
66   custom_type_handlers[Cassandra::Types::Duration.type] ||= Cassandra::Types::Duration
67   @handler    = handler
68   @compressor = compressor
69   @state      = :initial
70   @header     = nil
71   @version    = nil
72   @code       = nil
73   @length     = nil
74   @buffer     = CqlByteBuffer.new
75   @custom_type_handlers = custom_type_handlers
76 end

Public Instance Methods

<<(data) click to toggle source
   # File lib/cassandra/protocol/v4.rb
78 def <<(data)
79   @buffer << data
80 
81   __send__(:"decode_#{@state}", @buffer)
82 end

Private Instance Methods

actual_decode(buffer, fields, frame_length, code) click to toggle source
    # File lib/cassandra/protocol/v4.rb
169 def actual_decode(buffer, fields, frame_length, code)
170   protocol_version = (fields >> 24) & 0x7f
171   compression      = ((fields >> 16) & 0x01) == 0x01
172   tracing          = ((fields >> 16) & 0x02) == 0x02
173   payload          = ((fields >> 16) & 0x04) == 0x04
174   warning          = ((fields >> 16) & 0x08) == 0x08
175   stream_id        = fields & 0xffff
176   stream_id        = (stream_id & 0x7fff) - (stream_id & 0x8000)
177   opcode           = code & 0xff
178 
179   # If we're dealing with a compressed body, read the whole body, decompress,
180   # and treat the uncompressed body as if that's what we got in the first place.
181   # This means, reset frame_length to that uncompressed size.
182   if compression
183     if @compressor
184       buffer = CqlByteBuffer.new(
185         @compressor.decompress(buffer.read(frame_length))
186       )
187       frame_length = buffer.size
188     else
189       raise Errors::DecodingError,
190             'Compressed frame received, but no compressor configured'
191     end
192   end
193 
194   # We want to read one full frame; but after we read/parse chunks of the body
195   # there may be more cruft left in the frame that we don't care about. So,
196   # we save off the current size of the buffer, do all our reads for the
197   # frame, get the final remaining size, and based on that discard possible
198   # remaining bytes in the frame. In particular, we account for the possibility
199   # that the buffer contains some/all of a subsequent frame as well, and we
200   # don't want to mess with that.
201 
202   buffer_starting_length = buffer.length
203 
204   trace_id = (buffer.read_uuid if tracing)
205 
206   warnings = (buffer.read_string_list if warning)
207 
208   custom_payload = (buffer.read_bytes_map.freeze if payload)
209 
210   remaining_frame_length = frame_length -
211                            (buffer_starting_length - buffer.length)
212   response = decode_response(opcode, protocol_version, buffer,
213                              remaining_frame_length, trace_id, custom_payload,
214                              warnings)
215 
216   # Calculate and discard remaining cruft in the frame.
217   extra_length = frame_length - (buffer_starting_length - buffer.length)
218   buffer.discard(extra_length) if extra_length > 0
219 
220   if stream_id == -1
221     @handler.notify_event_listeners(response)
222   else
223     @handler.complete_request(stream_id, response)
224   end
225 end
decode_body(buffer) click to toggle source
    # File lib/cassandra/protocol/v4.rb
137 def decode_body(buffer)
138   frame_header  = @header
139   frame_code    = @code
140   frame_length  = @length
141   buffer_length = buffer.length
142 
143   until buffer_length < frame_length
144     actual_decode(buffer, frame_header, frame_length, frame_code)
145     buffer_length = buffer.length
146 
147     if buffer_length < 9
148       @header = nil
149       @code   = nil
150       @length = nil
151       @state  = :header
152 
153       return
154     end
155 
156     frame_header   = buffer.read_int
157     frame_code     = buffer.read_byte
158     frame_length   = buffer.read_int
159     buffer_length -= 9
160   end
161 
162   @header = frame_header
163   @code   = frame_code
164   @length = frame_length
165 
166   nil
167 end
decode_header(buffer) click to toggle source
    # File lib/cassandra/protocol/v4.rb
113 def decode_header(buffer)
114   buffer_length = buffer.length
115 
116   while buffer_length >= 9
117     frame_header = buffer.read_int
118     frame_code   = buffer.read_byte
119     frame_length = buffer.read_int
120 
121     if (buffer_length - 9) < frame_length
122       @header = frame_header
123       @code   = frame_code
124       @length = frame_length
125       @state  = :body
126 
127       return
128     end
129 
130     actual_decode(buffer, frame_header, frame_length, frame_code)
131     buffer_length = buffer.length
132   end
133 
134   nil
135 end
decode_initial(buffer) click to toggle source
    # File lib/cassandra/protocol/v4.rb
 88 def decode_initial(buffer)
 89   return if buffer.length < 9
 90 
 91   frame_header     = buffer.read_int
 92   protocol_version = (frame_header >> 24) & 0x7f
 93 
 94   if protocol_version < 3
 95     stream_id  = (frame_header >> 8) & 0xff
 96     stream_id  = (stream_id & 0x7f) - (stream_id & 0x80)
 97 
 98     error_response = ErrorResponse.new(nil, nil, 0x000A,
 99                                        'Invalid or unsupported protocol version')
100     @handler.complete_request(stream_id, error_response)
101 
102     return
103   end
104 
105   @header = frame_header
106   @code   = buffer.read_byte
107   @length = buffer.read_int
108   @state  = :body
109 
110   decode_body(buffer)
111 end
decode_response(opcode, protocol_version, buffer, size, trace_id, custom_payload, warnings) click to toggle source
    # File lib/cassandra/protocol/v4.rb
227 def decode_response(opcode,
228                     protocol_version,
229                     buffer,
230                     size,
231                     trace_id,
232                     custom_payload,
233                     warnings)
234   case opcode
235   when 0x00 # ERROR
236     code = buffer.read_int
237     message = buffer.read_string
238 
239     case code
240     when 0x1000
241       UnavailableErrorResponse.new(custom_payload,
242                                    warnings,
243                                    code,
244                                    message,
245                                    buffer.read_consistency,
246                                    buffer.read_int,
247                                    buffer.read_int)
248     when 0x1100
249       WriteTimeoutErrorResponse.new(custom_payload,
250                                     warnings,
251                                     code,
252                                     message,
253                                     buffer.read_consistency,
254                                     buffer.read_int,
255                                     buffer.read_int,
256                                     buffer.read_string)
257     when 0x1200
258       ReadTimeoutErrorResponse.new(custom_payload,
259                                    warnings,
260                                    code,
261                                    message,
262                                    buffer.read_consistency,
263                                    buffer.read_int,
264                                    buffer.read_int,
265                                    (buffer.read_byte != 0))
266     when 0x1300
267       cl = buffer.read_consistency
268       received = buffer.read_int
269       block_for = buffer.read_int
270       if protocol_version < 5
271         ReadFailureErrorResponse.new(custom_payload,
272                                      warnings,
273                                      code,
274                                      message,
275                                      cl,
276                                      received,
277                                      block_for,
278                                      buffer.read_int,
279                                      (buffer.read_byte != 0),
280                                      nil)
281       else
282         failures_by_node = buffer.read_reason_map
283         ReadFailureErrorResponse.new(custom_payload,
284                                      warnings,
285                                      code,
286                                      message,
287                                      cl,
288                                      received,
289                                      block_for,
290                                      nil,
291                                      (buffer.read_byte != 0),
292                                      failures_by_node)
293       end
294     when 0x1400
295       FunctionFailureErrorResponse.new(custom_payload,
296                                        warnings,
297                                        code,
298                                        message,
299                                        buffer.read_string,
300                                        buffer.read_string,
301                                        buffer.read_string_list)
302     when 0x1500
303       cl = buffer.read_consistency
304       received = buffer.read_int
305       block_for = buffer.read_int
306       if protocol_version < 5
307         WriteFailureErrorResponse.new(custom_payload,
308                                       warnings,
309                                       code,
310                                       message,
311                                       cl,
312                                       received,
313                                       block_for,
314                                       buffer.read_int,
315                                       buffer.read_string,
316                                       nil)
317       else
318         failures_by_node = buffer.read_reason_map
319         WriteFailureErrorResponse.new(custom_payload,
320                                       warnings,
321                                       code,
322                                       message,
323                                       cl,
324                                       received,
325                                       block_for,
326                                       nil,
327                                       buffer.read_string,
328                                       failures_by_node)
329       end
330     when 0x2400
331       AlreadyExistsErrorResponse.new(custom_payload,
332                                      warnings,
333                                      code,
334                                      message,
335                                      buffer.read_string,
336                                      buffer.read_string)
337     when 0x2500
338       UnpreparedErrorResponse.new(custom_payload,
339                                   warnings,
340                                   code,
341                                   message,
342                                   buffer.read_short_bytes)
343     else
344       ErrorResponse.new(custom_payload, warnings, code, message)
345     end
346   when 0x02 # READY
347     READY
348   when 0x03 # AUTHENTICATE
349     AuthenticateResponse.new(buffer.read_string)
350   when 0x06 # SUPPORTED
351     SupportedResponse.new(buffer.read_string_multimap)
352   when 0x08 # RESULT
353     result_type = buffer.read_int
354     case result_type
355     when 0x0001 # Void
356       VoidResultResponse.new(custom_payload, warnings, trace_id)
357     when 0x0002 # Rows
358       original_buffer_length = buffer.length
359       column_specs, paging_state = Coder.read_metadata_v4(buffer)
360 
361       if column_specs.nil?
362         consumed_bytes = original_buffer_length - buffer.length
363         remaining_bytes =
364           CqlByteBuffer.new(buffer.read(size - consumed_bytes - 4))
365         RawRowsResultResponse.new(custom_payload,
366                                   warnings,
367                                   protocol_version,
368                                   remaining_bytes,
369                                   paging_state,
370                                   trace_id,
371                                   @custom_type_handlers)
372       else
373         RowsResultResponse.new(custom_payload,
374                                warnings,
375                                Coder.read_values_v4(buffer, column_specs, @custom_type_handlers),
376                                column_specs,
377                                paging_state,
378                                trace_id)
379       end
380     when 0x0003 # SetKeyspace
381       SetKeyspaceResultResponse.new(custom_payload,
382                                     warnings,
383                                     buffer.read_string,
384                                     trace_id)
385     when 0x0004 # Prepared
386       id = buffer.read_short_bytes
387       pk_idx, params_metadata = Coder.read_prepared_metadata_v4(buffer)
388       result_metadata = Coder.read_metadata_v4(buffer).first
389 
390       PreparedResultResponse.new(custom_payload,
391                                  warnings,
392                                  id,
393                                  params_metadata,
394                                  result_metadata,
395                                  pk_idx,
396                                  trace_id)
397     when 0x0005 # SchemaChange
398       change = buffer.read_string
399       target = buffer.read_string
400       name = nil
401       arguments = EMPTY_LIST
402 
403       case target
404       when Protocol::Constants::SCHEMA_CHANGE_TARGET_KEYSPACE
405         keyspace = buffer.read_string
406       when Protocol::Constants::SCHEMA_CHANGE_TARGET_TABLE,
407             Protocol::Constants::SCHEMA_CHANGE_TARGET_UDT
408         keyspace = buffer.read_string
409         name = buffer.read_string
410       when Protocol::Constants::SCHEMA_CHANGE_TARGET_FUNCTION,
411             Protocol::Constants::SCHEMA_CHANGE_TARGET_AGGREGATE
412         keyspace = buffer.read_string
413         name = buffer.read_string
414         arguments = buffer.read_string_list
415       else
416         raise Errors::DecodingError,
417               "Unsupported event target: #{target.inspect}"
418       end
419 
420       SchemaChangeResultResponse.new(custom_payload,
421                                      warnings,
422                                      change,
423                                      keyspace,
424                                      name,
425                                      target,
426                                      arguments,
427                                      trace_id)
428     else
429       raise Errors::DecodingError,
430             "Unsupported result type: #{result_type.inspect}"
431     end
432   when 0x0C # EVENT
433     event_type = buffer.read_string
434     case event_type
435     when 'SCHEMA_CHANGE'
436       change = buffer.read_string
437       target = buffer.read_string
438       arguments = EMPTY_LIST
439 
440       case target
441       when Protocol::Constants::SCHEMA_CHANGE_TARGET_KEYSPACE
442         keyspace = buffer.read_string
443         name = nil
444       when Protocol::Constants::SCHEMA_CHANGE_TARGET_TABLE,
445             Protocol::Constants::SCHEMA_CHANGE_TARGET_UDT,
446             Protocol::Constants::SCHEMA_CHANGE_TARGET_FUNCTION,
447             Protocol::Constants::SCHEMA_CHANGE_TARGET_AGGREGATE
448         keyspace = buffer.read_string
449         name = buffer.read_string
450       else
451         raise Errors::DecodingError,
452               "Unsupported event target: #{target.inspect}"
453       end
454 
455       if target == Protocol::Constants::SCHEMA_CHANGE_TARGET_FUNCTION \
456   || target == Protocol::Constants::SCHEMA_CHANGE_TARGET_AGGREGATE
457         arguments = buffer.read_string_list
458       end
459 
460       SchemaChangeEventResponse.new(change, keyspace, name, target, arguments)
461     when 'STATUS_CHANGE'
462       StatusChangeEventResponse.new(buffer.read_string, *buffer.read_inet)
463     when 'TOPOLOGY_CHANGE'
464       TopologyChangeEventResponse.new(buffer.read_string, *buffer.read_inet)
465     else
466       raise Errors::DecodingError,
467             "Unsupported event type: #{event_type.inspect}"
468     end
469   when 0x0E # AUTH_CHALLENGE
470     AuthChallengeResponse.new(buffer.read_bytes)
471   when 0x10 # AUTH_SUCCESS
472     AuthSuccessResponse.new(buffer.read_bytes)
473   else
474     raise Errors::DecodingError,
475           "Unsupported response opcode: #{opcode.inspect}"
476   end
477 end