class Cassandra::Protocol::V1::Decoder

Constants

CODE_AUTHENTICATE
CODE_AUTH_CHALLENGE
CODE_AUTH_SUCCESS
CODE_ERROR
CODE_EVENT
CODE_READY
CODE_RESULT
CODE_SUPPORTED
READY

Public Class Methods

new(handler, compressor = nil) click to toggle source
   # File lib/cassandra/protocol/v1.rb
52 def initialize(handler, compressor = nil)
53   @handler    = handler
54   @compressor = compressor
55   @state      = :header
56   @header     = nil
57   @length     = nil
58   @buffer     = CqlByteBuffer.new
59 end

Public Instance Methods

<<(data) click to toggle source
   # File lib/cassandra/protocol/v1.rb
61 def <<(data)
62   @buffer << data
63 
64   __send__(:"decode_#{@state}", @buffer)
65 end

Private Instance Methods

actual_decode(buffer, fields, size) click to toggle source
    # File lib/cassandra/protocol/v1.rb
121 def actual_decode(buffer, fields, size)
122   protocol_version = (fields >> 24) & 0x7f
123   compression      = (fields >> 16) & 0x01
124   tracing          = (fields >> 16) & 0x02
125   stream_id        = (fields >> 8) & 0xff
126   stream_id        = (stream_id & 0x7f) - (stream_id & 0x80)
127   opcode           = fields & 0xff
128 
129   if compression == 1
130     if @compressor
131       buffer = CqlByteBuffer.new(@compressor.decompress(buffer.read(size)))
132       size   = buffer.size
133     else
134       raise Errors::DecodingError,
135             'Compressed frame received, but no compressor configured'
136     end
137   end
138 
139   if tracing == 2
140     trace_id = buffer.read_uuid
141     size    -= 16
142   else
143     trace_id = nil
144   end
145 
146   extra_length = buffer.length - size
147   response = decode_response(opcode, protocol_version, buffer, size, trace_id)
148 
149   buffer.discard(buffer.length - extra_length) if buffer.length > extra_length
150 
151   if stream_id == -1
152     @handler.notify_event_listeners(response)
153   else
154     @handler.complete_request(stream_id, response)
155   end
156 end
decode_body(buffer) click to toggle source
    # File lib/cassandra/protocol/v1.rb
 93 def decode_body(buffer)
 94   frame_header  = @header
 95   frame_length  = @length
 96   buffer_length = buffer.length
 97 
 98   until buffer_length < frame_length
 99     actual_decode(buffer, frame_header, frame_length)
100     buffer_length = buffer.length
101 
102     if buffer_length < 8
103       @header = nil
104       @length = nil
105       @state  = :header
106 
107       return
108     end
109 
110     frame_header   = buffer.read_int
111     frame_length   = buffer.read_int
112     buffer_length -= 8
113   end
114 
115   @header = frame_header
116   @length = frame_length
117 
118   nil
119 end
decode_header(buffer) click to toggle source
   # File lib/cassandra/protocol/v1.rb
71 def decode_header(buffer)
72   buffer_length = buffer.length
73 
74   while buffer_length >= 8
75     frame_header = buffer.read_int
76     frame_length = buffer.read_int
77 
78     if (buffer_length - 8) < frame_length
79       @header = frame_header
80       @length = frame_length
81       @state  = :body
82 
83       return
84     end
85 
86     actual_decode(buffer, frame_header, frame_length)
87     buffer_length = buffer.length
88   end
89 
90   nil
91 end
decode_response(opcode, protocol_version, buffer, size, trace_id) click to toggle source
    # File lib/cassandra/protocol/v1.rb
167 def decode_response(opcode, protocol_version, buffer, size, trace_id)
168   case opcode
169   when CODE_READY then
170     READY
171   when CODE_AUTHENTICATE then
172     AuthenticateResponse.new(buffer.read_string)
173   when CODE_AUTH_CHALLENGE then
174     AuthChallengeResponse.new(buffer.read_bytes)
175   when CODE_AUTH_SUCCESS then
176     AuthSuccessResponse.new(buffer.read_bytes)
177   when CODE_SUPPORTED then
178     SupportedResponse.new(buffer.read_string_multimap)
179   when CODE_ERROR
180     code = buffer.read_int
181     message = buffer.read_string
182 
183     case code
184     when 0x1000 then
185       UnavailableErrorResponse.new(nil,
186                                    nil,
187                                    code,
188                                    message,
189                                    buffer.read_consistency,
190                                    buffer.read_int,
191                                    buffer.read_int)
192     when 0x1100 then
193       WriteTimeoutErrorResponse.new(nil,
194                                     nil,
195                                     code,
196                                     message,
197                                     buffer.read_consistency,
198                                     buffer.read_int,
199                                     buffer.read_int,
200                                     buffer.read_string)
201     when 0x1200 then
202       ReadTimeoutErrorResponse.new(nil,
203                                    nil,
204                                    code,
205                                    message,
206                                    buffer.read_consistency,
207                                    buffer.read_int,
208                                    buffer.read_int,
209                                    (buffer.read_byte != 0))
210     when 0x2400 then
211       AlreadyExistsErrorResponse.new(nil,
212                                      nil,
213                                      code,
214                                      message,
215                                      buffer.read_string,
216                                      buffer.read_string)
217     when 0x2500 then
218       UnpreparedErrorResponse.new(nil,
219                                   nil,
220                                   code,
221                                   message,
222                                   buffer.read_short_bytes)
223     else
224       ErrorResponse.new(nil, nil, code, message)
225     end
226   when CODE_RESULT
227     result_type = buffer.read_int
228     case result_type
229     when 0x0001 # Void
230       VoidResultResponse.new(nil, nil, trace_id)
231     when 0x0002 # Rows
232       original_buffer_length = buffer.length
233       column_specs, paging_state = Coder.read_metadata_v1(buffer)
234 
235       if column_specs.nil?
236         consumed_bytes = original_buffer_length - buffer.length
237         remaining_bytes =
238           CqlByteBuffer.new(buffer.read(size - consumed_bytes - 4))
239         RawRowsResultResponse.new(nil,
240                                   nil,
241                                   protocol_version,
242                                   remaining_bytes,
243                                   paging_state,
244                                   trace_id,
245                                   nil)
246       else
247         RowsResultResponse.new(nil,
248                                nil,
249                                Coder.read_values_v1(buffer, column_specs),
250                                column_specs,
251                                paging_state,
252                                trace_id)
253       end
254     when 0x0003 # SetKeyspace
255       SetKeyspaceResultResponse.new(nil, nil, buffer.read_string, trace_id)
256     when 0x0004 # Prepared
257       id = buffer.read_short_bytes
258       params_metadata = Coder.read_metadata_v1(buffer).first
259       result_metadata = nil
260       result_metadata = Coder.read_metadata_v1(buffer).first if protocol_version > 1
261 
262       PreparedResultResponse.new(nil,
263                                  nil,
264                                  id,
265                                  params_metadata,
266                                  result_metadata,
267                                  nil,
268                                  trace_id)
269     when 0x0005 # SchemaChange
270       change = buffer.read_string
271       keyspace = buffer.read_string
272       name = buffer.read_string
273       arguments = EMPTY_LIST
274       target = nil
275 
276       if name.empty?
277         name = nil
278         target = Constants::SCHEMA_CHANGE_TARGET_KEYSPACE
279       else
280         target = Constants::SCHEMA_CHANGE_TARGET_TABLE
281       end
282 
283       SchemaChangeResultResponse.new(nil,
284                                      nil,
285                                      change,
286                                      keyspace,
287                                      name,
288                                      target,
289                                      arguments,
290                                      nil)
291     else
292       raise Errors::DecodingError,
293             "Unsupported result type: #{result_type.inspect}"
294     end
295   when CODE_EVENT
296     event_type = buffer.read_string
297     case event_type
298     when 'SCHEMA_CHANGE'
299       change = buffer.read_string
300       keyspace = buffer.read_string
301       name = buffer.read_string
302       arguments = EMPTY_LIST
303 
304       if name.empty?
305         name = nil
306         target = Constants::SCHEMA_CHANGE_TARGET_KEYSPACE
307       else
308         target = Constants::SCHEMA_CHANGE_TARGET_TABLE
309       end
310 
311       SchemaChangeEventResponse.new(change, keyspace, name, target, arguments)
312     when 'STATUS_CHANGE'
313       StatusChangeEventResponse.new(buffer.read_string, *buffer.read_inet)
314     when 'TOPOLOGY_CHANGE'
315       TopologyChangeEventResponse.new(buffer.read_string, *buffer.read_inet)
316     else
317       raise Errors::DecodingError, "Unsupported event type: #{event_type.inspect}"
318     end
319   else
320     raise Errors::DecodingError, "Unsupported response opcode: #{opcode.inspect}"
321   end
322 end