class Cassandra::Protocol::V3::Decoder

Constants

READY

Public Class Methods

new(handler, compressor = nil) click to toggle source
   # File lib/cassandra/protocol/v3.rb
53 def initialize(handler, compressor = nil)
54   @handler    = handler
55   @compressor = compressor
56   @state      = :initial
57   @header     = nil
58   @version    = nil
59   @code       = nil
60   @length     = nil
61   @buffer     = CqlByteBuffer.new
62 end

Public Instance Methods

<<(data) click to toggle source
   # File lib/cassandra/protocol/v3.rb
64 def <<(data)
65   @buffer << data
66 
67   __send__(:"decode_#{@state}", @buffer)
68 end

Private Instance Methods

actual_decode(buffer, fields, frame_length, code) click to toggle source
    # File lib/cassandra/protocol/v3.rb
155 def actual_decode(buffer, fields, frame_length, code)
156   protocol_version = (fields >> 24) & 0x7f
157   compression      = (fields >> 16) & 0x01
158   tracing          = (fields >> 16) & 0x02
159   stream_id        = fields & 0xffff
160   stream_id        = (stream_id & 0x7fff) - (stream_id & 0x8000)
161   opcode           = code & 0xff
162 
163   # If we're dealing with a compressed body, read the whole body, decompress,
164   # and treat the uncompressed body as if that's what we got in the first place.
165   # This means we must reset frame_length to that uncompressed size.
166   if compression == 1
167     if @compressor
168       buffer = CqlByteBuffer.new(
169         @compressor.decompress(buffer.read(frame_length))
170       )
171       frame_length = buffer.size
172     else
173       raise Errors::DecodingError,
174             'Compressed frame received, but no compressor configured'
175     end
176   end
177 
178   # We want to read one full frame; but after we read/parse chunks of the body
179   # there may be more cruft left in the frame that we don't care about. So,
180   # we save off the current size of the buffer, do all our reads for the
181   # frame, get the final remaining size, and based on that discard possible
182   # remaining bytes in the frame. In particular, we account for the possibility
183   # that the buffer contains some/all of a subsequent frame as well, and we
184   # don't want to mess with that.
185 
186   buffer_starting_length = buffer.length
187 
188   trace_id = (buffer.read_uuid if tracing == 2)
189 
190   remaining_frame_length = frame_length -
191                            (buffer_starting_length - buffer.length)
192   response = decode_response(opcode, protocol_version, buffer,
193                              remaining_frame_length, trace_id)
194 
195   # Calculate and discard remaining cruft in the frame.
196   extra_length = frame_length - (buffer_starting_length - buffer.length)
197   buffer.discard(extra_length) if extra_length > 0
198 
199   if stream_id == -1
200     @handler.notify_event_listeners(response)
201   else
202     @handler.complete_request(stream_id, response)
203   end
204 end
decode_body(buffer) click to toggle source
    # File lib/cassandra/protocol/v3.rb
123 def decode_body(buffer)
124   frame_header  = @header
125   frame_code    = @code
126   frame_length  = @length
127   buffer_length = buffer.length
128 
129   until buffer_length < frame_length
130     actual_decode(buffer, frame_header, frame_length, frame_code)
131     buffer_length = buffer.length
132 
133     if buffer_length < 9
134       @header = nil
135       @code   = nil
136       @length = nil
137       @state  = :header
138 
139       return
140     end
141 
142     frame_header   = buffer.read_int
143     frame_code     = buffer.read_byte
144     frame_length   = buffer.read_int
145     buffer_length -= 9
146   end
147 
148   @header = frame_header
149   @code   = frame_code
150   @length = frame_length
151 
152   nil
153 end
decode_header(buffer) click to toggle source
    # File lib/cassandra/protocol/v3.rb
 99 def decode_header(buffer)
100   buffer_length = buffer.length
101 
102   while buffer_length >= 9
103     frame_header = buffer.read_int
104     frame_code   = buffer.read_byte
105     frame_length = buffer.read_int
106 
107     if (buffer_length - 9) < frame_length
108       @header = frame_header
109       @code   = frame_code
110       @length = frame_length
111       @state  = :body
112 
113       return
114     end
115 
116     actual_decode(buffer, frame_header, frame_length, frame_code)
117     buffer_length = buffer.length
118   end
119 
120   nil
121 end
decode_initial(buffer) click to toggle source
   # File lib/cassandra/protocol/v3.rb
74 def decode_initial(buffer)
75   return if buffer.length < 9
76 
77   frame_header     = buffer.read_int
78   protocol_version = (frame_header >> 24) & 0x7f
79 
80   if protocol_version < 3
81     stream_id  = (frame_header >> 8) & 0xff
82     stream_id  = (stream_id & 0x7f) - (stream_id & 0x80)
83 
84     error_response = ErrorResponse.new(nil, nil, 0x000A,
85                                        'Invalid or unsupported protocol version')
86     @handler.complete_request(stream_id, error_response)
87 
88     return
89   end
90 
91   @header = frame_header
92   @code   = buffer.read_byte
93   @length = buffer.read_int
94   @state  = :body
95 
96   decode_body(buffer)
97 end
decode_response(opcode, protocol_version, buffer, size, trace_id) click to toggle source
    # File lib/cassandra/protocol/v3.rb
206 def decode_response(opcode, protocol_version, buffer, size, trace_id)
207   case opcode
208   when 0x00 # ERROR
209     code = buffer.read_int
210     message = buffer.read_string
211 
212     case code
213     when 0x1000
214       UnavailableErrorResponse.new(nil,
215                                    nil,
216                                    code,
217                                    message,
218                                    buffer.read_consistency,
219                                    buffer.read_int,
220                                    buffer.read_int)
221     when 0x1100
222       WriteTimeoutErrorResponse.new(nil,
223                                     nil,
224                                     code,
225                                     message,
226                                     buffer.read_consistency,
227                                     buffer.read_int,
228                                     buffer.read_int,
229                                     buffer.read_string)
230     when 0x1200
231       ReadTimeoutErrorResponse.new(nil,
232                                    nil,
233                                    code,
234                                    message,
235                                    buffer.read_consistency,
236                                    buffer.read_int,
237                                    buffer.read_int,
238                                    (buffer.read_byte != 0))
239     when 0x2400
240       AlreadyExistsErrorResponse.new(nil,
241                                      nil,
242                                      code,
243                                      message,
244                                      buffer.read_string,
245                                      buffer.read_string)
246     when 0x2500
247       UnpreparedErrorResponse.new(nil,
248                                   nil,
249                                   code,
250                                   message,
251                                   buffer.read_short_bytes)
252     else
253       ErrorResponse.new(nil, nil, code, message)
254     end
255   when 0x02 # READY
256     READY
257   when 0x03 # AUTHENTICATE
258     AuthenticateResponse.new(buffer.read_string)
259   when 0x06 # SUPPORTED
260     SupportedResponse.new(buffer.read_string_multimap)
261   when 0x08 # RESULT
262     result_type = buffer.read_int
263     case result_type
264     when 0x0001 # Void
265       VoidResultResponse.new(nil, nil, trace_id)
266     when 0x0002 # Rows
267       original_buffer_length = buffer.length
268       column_specs, paging_state = Coder.read_metadata_v3(buffer)
269 
270       if column_specs.nil?
271         consumed_bytes = original_buffer_length - buffer.length
272         remaining_bytes =
273           CqlByteBuffer.new(buffer.read(size - consumed_bytes - 4))
274         RawRowsResultResponse.new(nil,
275                                   nil,
276                                   protocol_version,
277                                   remaining_bytes,
278                                   paging_state,
279                                   trace_id,
280                                   nil)
281       else
282         RowsResultResponse.new(nil,
283                                nil,
284                                Coder.read_values_v3(buffer, column_specs),
285                                column_specs,
286                                paging_state,
287                                trace_id)
288       end
289     when 0x0003 # SetKeyspace
290       SetKeyspaceResultResponse.new(nil, nil, buffer.read_string, trace_id)
291     when 0x0004 # Prepared
292       id = buffer.read_short_bytes
293       params_metadata = Coder.read_metadata_v3(buffer).first
294       result_metadata = nil
295       result_metadata = Coder.read_metadata_v3(buffer).first if protocol_version > 1
296 
297       PreparedResultResponse.new(nil,
298                                  nil,
299                                  id,
300                                  params_metadata,
301                                  result_metadata,
302                                  nil,
303                                  trace_id)
304     when 0x0005 # SchemaChange
305       change = buffer.read_string
306       target = buffer.read_string
307       keyspace = buffer.read_string
308       arguments = EMPTY_LIST
309       name = nil
310 
311       name = buffer.read_string if target == Constants::SCHEMA_CHANGE_TARGET_TABLE
312 
313       SchemaChangeResultResponse.new(nil,
314                                      nil,
315                                      change,
316                                      keyspace,
317                                      name,
318                                      target,
319                                      arguments,
320                                      nil)
321     else
322       raise Errors::DecodingError,
323             "Unsupported result type: #{result_type.inspect}"
324     end
325   when 0x0C # EVENT
326     event_type = buffer.read_string
327     case event_type
328     when 'SCHEMA_CHANGE'
329       change = buffer.read_string
330       target = buffer.read_string
331       keyspace = buffer.read_string
332       name = nil
333       arguments = EMPTY_LIST
334 
335       name = buffer.read_string if target == Constants::SCHEMA_CHANGE_TARGET_TABLE
336 
337       SchemaChangeEventResponse.new(change, keyspace, name, target, arguments)
338     when 'STATUS_CHANGE'
339       StatusChangeEventResponse.new(buffer.read_string, *buffer.read_inet)
340     when 'TOPOLOGY_CHANGE'
341       TopologyChangeEventResponse.new(buffer.read_string, *buffer.read_inet)
342     else
343       raise Errors::DecodingError,
344             "Unsupported event type: #{event_type.inspect}"
345     end
346   when 0x0E # AUTH_CHALLENGE
347     AuthChallengeResponse.new(buffer.read_bytes)
348   when 0x10 # AUTH_SUCCESS
349     AuthSuccessResponse.new(buffer.read_bytes)
350   else
351     raise Errors::DecodingError,
352           "Unsupported response opcode: #{opcode.inspect}"
353   end
354 end