class Cassandra::Protocol::V4::Decoder

Constants

READY

Public Class Methods

new(handler, compressor = nil, custom_type_handlers = {}) click to toggle source
   # File lib/cassandra/protocol/v4.rb
63 def initialize(handler, compressor = nil, custom_type_handlers = {})
64   @handler    = handler
65   @compressor = compressor
66   @state      = :initial
67   @header     = nil
68   @version    = nil
69   @code       = nil
70   @length     = nil
71   @buffer     = CqlByteBuffer.new
72   @custom_type_handlers = custom_type_handlers
73 end

Public Instance Methods

<<(data) click to toggle source
   # File lib/cassandra/protocol/v4.rb
75 def <<(data)
76   @buffer << data
77 
78   __send__(:"decode_#{@state}", @buffer)
79 end

Private Instance Methods

actual_decode(buffer, fields, frame_length, code) click to toggle source
    # File lib/cassandra/protocol/v4.rb
166 def actual_decode(buffer, fields, frame_length, code)
167   protocol_version = (fields >> 24) & 0x7f
168   compression      = ((fields >> 16) & 0x01) == 0x01
169   tracing          = ((fields >> 16) & 0x02) == 0x02
170   payload          = ((fields >> 16) & 0x04) == 0x04
171   warning          = ((fields >> 16) & 0x08) == 0x08
172   stream_id        = fields & 0xffff
173   stream_id        = (stream_id & 0x7fff) - (stream_id & 0x8000)
174   opcode           = code & 0xff
175 
176   # If we're dealing with a compressed body, read the whole body, decompress,
177   # and treat the uncompressed body as if that's what we got in the first place.
178   # This means, reset frame_length to that uncompressed size.
179   if compression
180     if @compressor
181       buffer = CqlByteBuffer.new(
182         @compressor.decompress(buffer.read(frame_length))
183       )
184       frame_length = buffer.size
185     else
186       raise Errors::DecodingError,
187             'Compressed frame received, but no compressor configured'
188     end
189   end
190 
191   # We want to read one full frame; but after we read/parse chunks of the body
192   # there may be more cruft left in the frame that we don't care about. So,
193   # we save off the current size of the buffer, do all our reads for the
194   # frame, get the final remaining size, and based on that discard possible
195   # remaining bytes in the frame. In particular, we account for the possibility
196   # that the buffer contains some/all of a subsequent frame as well, and we
197   # don't want to mess with that.
198 
199   buffer_starting_length = buffer.length
200 
201   trace_id = (buffer.read_uuid if tracing)
202 
203   warnings = (buffer.read_string_list if warning)
204 
205   custom_payload = (buffer.read_bytes_map.freeze if payload)
206 
207   remaining_frame_length = frame_length -
208                            (buffer_starting_length - buffer.length)
209   response = decode_response(opcode, protocol_version, buffer,
210                              remaining_frame_length, trace_id, custom_payload,
211                              warnings)
212 
213   # Calculate and discard remaining cruft in the frame.
214   extra_length = frame_length - (buffer_starting_length - buffer.length)
215   buffer.discard(extra_length) if extra_length > 0
216 
217   if stream_id == -1
218     @handler.notify_event_listeners(response)
219   else
220     @handler.complete_request(stream_id, response)
221   end
222 end
decode_body(buffer) click to toggle source
    # File lib/cassandra/protocol/v4.rb
134 def decode_body(buffer)
135   frame_header  = @header
136   frame_code    = @code
137   frame_length  = @length
138   buffer_length = buffer.length
139 
140   until buffer_length < frame_length
141     actual_decode(buffer, frame_header, frame_length, frame_code)
142     buffer_length = buffer.length
143 
144     if buffer_length < 9
145       @header = nil
146       @code   = nil
147       @length = nil
148       @state  = :header
149 
150       return
151     end
152 
153     frame_header   = buffer.read_int
154     frame_code     = buffer.read_byte
155     frame_length   = buffer.read_int
156     buffer_length -= 9
157   end
158 
159   @header = frame_header
160   @code   = frame_code
161   @length = frame_length
162 
163   nil
164 end
decode_header(buffer) click to toggle source
    # File lib/cassandra/protocol/v4.rb
110 def decode_header(buffer)
111   buffer_length = buffer.length
112 
113   while buffer_length >= 9
114     frame_header = buffer.read_int
115     frame_code   = buffer.read_byte
116     frame_length = buffer.read_int
117 
118     if (buffer_length - 9) < frame_length
119       @header = frame_header
120       @code   = frame_code
121       @length = frame_length
122       @state  = :body
123 
124       return
125     end
126 
127     actual_decode(buffer, frame_header, frame_length, frame_code)
128     buffer_length = buffer.length
129   end
130 
131   nil
132 end
decode_initial(buffer) click to toggle source
    # File lib/cassandra/protocol/v4.rb
 85 def decode_initial(buffer)
 86   return if buffer.length < 9
 87 
 88   frame_header     = buffer.read_int
 89   protocol_version = (frame_header >> 24) & 0x7f
 90 
 91   if protocol_version < 3
 92     stream_id  = (frame_header >> 8) & 0xff
 93     stream_id  = (stream_id & 0x7f) - (stream_id & 0x80)
 94 
 95     error_response = ErrorResponse.new(nil, nil, 0x000A,
 96                                        'Invalid or unsupported protocol version')
 97     @handler.complete_request(stream_id, error_response)
 98 
 99     return
100   end
101 
102   @header = frame_header
103   @code   = buffer.read_byte
104   @length = buffer.read_int
105   @state  = :body
106 
107   decode_body(buffer)
108 end
decode_response(opcode, protocol_version, buffer, size, trace_id, custom_payload, warnings) click to toggle source
    # File lib/cassandra/protocol/v4.rb
224 def decode_response(opcode,
225                     protocol_version,
226                     buffer,
227                     size,
228                     trace_id,
229                     custom_payload,
230                     warnings)
231   case opcode
232   when 0x00 # ERROR
233     code = buffer.read_int
234     message = buffer.read_string
235 
236     case code
237     when 0x1000
238       UnavailableErrorResponse.new(custom_payload,
239                                    warnings,
240                                    code,
241                                    message,
242                                    buffer.read_consistency,
243                                    buffer.read_int,
244                                    buffer.read_int)
245     when 0x1100
246       WriteTimeoutErrorResponse.new(custom_payload,
247                                     warnings,
248                                     code,
249                                     message,
250                                     buffer.read_consistency,
251                                     buffer.read_int,
252                                     buffer.read_int,
253                                     buffer.read_string)
254     when 0x1200
255       ReadTimeoutErrorResponse.new(custom_payload,
256                                    warnings,
257                                    code,
258                                    message,
259                                    buffer.read_consistency,
260                                    buffer.read_int,
261                                    buffer.read_int,
262                                    (buffer.read_byte != 0))
263     when 0x1300
264       cl = buffer.read_consistency
265       received = buffer.read_int
266       block_for = buffer.read_int
267       if protocol_version < 5
268         ReadFailureErrorResponse.new(custom_payload,
269                                      warnings,
270                                      code,
271                                      message,
272                                      cl,
273                                      received,
274                                      block_for,
275                                      buffer.read_int,
276                                      (buffer.read_byte != 0),
277                                      nil)
278       else
279         failures_by_node = buffer.read_reason_map
280         ReadFailureErrorResponse.new(custom_payload,
281                                      warnings,
282                                      code,
283                                      message,
284                                      cl,
285                                      received,
286                                      block_for,
287                                      nil,
288                                      (buffer.read_byte != 0),
289                                      failures_by_node)
290       end
291     when 0x1400
292       FunctionFailureErrorResponse.new(custom_payload,
293                                        warnings,
294                                        code,
295                                        message,
296                                        buffer.read_string,
297                                        buffer.read_string,
298                                        buffer.read_string_list)
299     when 0x1500
300       cl = buffer.read_consistency
301       received = buffer.read_int
302       block_for = buffer.read_int
303       if protocol_version < 5
304         WriteFailureErrorResponse.new(custom_payload,
305                                       warnings,
306                                       code,
307                                       message,
308                                       cl,
309                                       received,
310                                       block_for,
311                                       buffer.read_int,
312                                       buffer.read_string,
313                                       nil)
314       else
315         failures_by_node = buffer.read_reason_map
316         WriteFailureErrorResponse.new(custom_payload,
317                                       warnings,
318                                       code,
319                                       message,
320                                       cl,
321                                       received,
322                                       block_for,
323                                       nil,
324                                       buffer.read_string,
325                                       failures_by_node)
326       end
327     when 0x2400
328       AlreadyExistsErrorResponse.new(custom_payload,
329                                      warnings,
330                                      code,
331                                      message,
332                                      buffer.read_string,
333                                      buffer.read_string)
334     when 0x2500
335       UnpreparedErrorResponse.new(custom_payload,
336                                   warnings,
337                                   code,
338                                   message,
339                                   buffer.read_short_bytes)
340     else
341       ErrorResponse.new(custom_payload, warnings, code, message)
342     end
343   when 0x02 # READY
344     READY
345   when 0x03 # AUTHENTICATE
346     AuthenticateResponse.new(buffer.read_string)
347   when 0x06 # SUPPORTED
348     SupportedResponse.new(buffer.read_string_multimap)
349   when 0x08 # RESULT
350     result_type = buffer.read_int
351     case result_type
352     when 0x0001 # Void
353       VoidResultResponse.new(custom_payload, warnings, trace_id)
354     when 0x0002 # Rows
355       original_buffer_length = buffer.length
356       column_specs, paging_state = Coder.read_metadata_v4(buffer)
357 
358       if column_specs.nil?
359         consumed_bytes = original_buffer_length - buffer.length
360         remaining_bytes =
361           CqlByteBuffer.new(buffer.read(size - consumed_bytes - 4))
362         RawRowsResultResponse.new(custom_payload,
363                                   warnings,
364                                   protocol_version,
365                                   remaining_bytes,
366                                   paging_state,
367                                   trace_id,
368                                   @custom_type_handlers)
369       else
370         RowsResultResponse.new(custom_payload,
371                                warnings,
372                                Coder.read_values_v4(buffer, column_specs, @custom_type_handlers),
373                                column_specs,
374                                paging_state,
375                                trace_id)
376       end
377     when 0x0003 # SetKeyspace
378       SetKeyspaceResultResponse.new(custom_payload,
379                                     warnings,
380                                     buffer.read_string,
381                                     trace_id)
382     when 0x0004 # Prepared
383       id = buffer.read_short_bytes
384       pk_idx, params_metadata = Coder.read_prepared_metadata_v4(buffer)
385       result_metadata = Coder.read_metadata_v4(buffer).first
386 
387       PreparedResultResponse.new(custom_payload,
388                                  warnings,
389                                  id,
390                                  params_metadata,
391                                  result_metadata,
392                                  pk_idx,
393                                  trace_id)
394     when 0x0005 # SchemaChange
395       change = buffer.read_string
396       target = buffer.read_string
397       name = nil
398       arguments = EMPTY_LIST
399 
400       case target
401       when Protocol::Constants::SCHEMA_CHANGE_TARGET_KEYSPACE
402         keyspace = buffer.read_string
403       when Protocol::Constants::SCHEMA_CHANGE_TARGET_TABLE,
404             Protocol::Constants::SCHEMA_CHANGE_TARGET_UDT
405         keyspace = buffer.read_string
406         name = buffer.read_string
407       when Protocol::Constants::SCHEMA_CHANGE_TARGET_FUNCTION,
408             Protocol::Constants::SCHEMA_CHANGE_TARGET_AGGREGATE
409         keyspace = buffer.read_string
410         name = buffer.read_string
411         arguments = buffer.read_string_list
412       else
413         raise Errors::DecodingError,
414               "Unsupported event target: #{target.inspect}"
415       end
416 
417       SchemaChangeResultResponse.new(custom_payload,
418                                      warnings,
419                                      change,
420                                      keyspace,
421                                      name,
422                                      target,
423                                      arguments,
424                                      trace_id)
425     else
426       raise Errors::DecodingError,
427             "Unsupported result type: #{result_type.inspect}"
428     end
429   when 0x0C # EVENT
430     event_type = buffer.read_string
431     case event_type
432     when 'SCHEMA_CHANGE'
433       change = buffer.read_string
434       target = buffer.read_string
435       arguments = EMPTY_LIST
436 
437       case target
438       when Protocol::Constants::SCHEMA_CHANGE_TARGET_KEYSPACE
439         keyspace = buffer.read_string
440         name = nil
441       when Protocol::Constants::SCHEMA_CHANGE_TARGET_TABLE,
442             Protocol::Constants::SCHEMA_CHANGE_TARGET_UDT,
443             Protocol::Constants::SCHEMA_CHANGE_TARGET_FUNCTION,
444             Protocol::Constants::SCHEMA_CHANGE_TARGET_AGGREGATE
445         keyspace = buffer.read_string
446         name = buffer.read_string
447       else
448         raise Errors::DecodingError,
449               "Unsupported event target: #{target.inspect}"
450       end
451 
452       if target == Protocol::Constants::SCHEMA_CHANGE_TARGET_FUNCTION \
453   || target == Protocol::Constants::SCHEMA_CHANGE_TARGET_AGGREGATE
454         arguments = buffer.read_string_list
455       end
456 
457       SchemaChangeEventResponse.new(change, keyspace, name, target, arguments)
458     when 'STATUS_CHANGE'
459       StatusChangeEventResponse.new(buffer.read_string, *buffer.read_inet)
460     when 'TOPOLOGY_CHANGE'
461       TopologyChangeEventResponse.new(buffer.read_string, *buffer.read_inet)
462     else
463       raise Errors::DecodingError,
464             "Unsupported event type: #{event_type.inspect}"
465     end
466   when 0x0E # AUTH_CHALLENGE
467     AuthChallengeResponse.new(buffer.read_bytes)
468   when 0x10 # AUTH_SUCCESS
469     AuthSuccessResponse.new(buffer.read_bytes)
470   else
471     raise Errors::DecodingError,
472           "Unsupported response opcode: #{opcode.inspect}"
473   end
474 end