class Cassandra::Protocol::CqlProtocolHandler

This class wraps a single connection and translates between request/ response frames and raw bytes.

You send requests with send_request, and receive responses through the returned future.

Instances of this class are thread safe.

@example Sending an OPTIONS request

future = protocol_handler.send_request(Cassandra::Protocol::OptionsRequest.new)
response = future.get
puts "These options are supported: #{response.options}"

Constants

HEARTBEAT
TERMINATED

Attributes

error[R]

@return [Exception] outstanding error, from a failed connection.

keyspace[R]

@return [String] the current keyspace for the underlying connection

protocol_version[R]

@return [Integer] the version of the protocol to use in communicating with C*.

Public Class Methods

new(connection, scheduler, protocol_version, compressor = nil, heartbeat_interval = 30, idle_timeout = 60, requests_per_connection = 128, custom_type_handlers = {}) click to toggle source
   # File lib/cassandra/protocol/cql_protocol_handler.rb
43 def initialize(connection,
44                scheduler,
45                protocol_version,
46                compressor = nil,
47                heartbeat_interval = 30,
48                idle_timeout = 60,
49                requests_per_connection = 128,
50                custom_type_handlers = {})
51   @protocol_version = protocol_version
52   @connection = connection
53   @scheduler = scheduler
54   @compressor = compressor
55   @connection.on_data(&method(:receive_data))
56   @connection.on_closed(&method(:socket_closed))
57 
58   @streams = Array.new(requests_per_connection) {|i| i}
59 
60   @promises = {}
61 
62   if protocol_version > 3
63     @frame_encoder = V4::Encoder.new(@compressor, protocol_version)
64     @frame_decoder = V4::Decoder.new(self, @compressor, custom_type_handlers)
65   elsif protocol_version > 2
66     @frame_encoder = V3::Encoder.new(@compressor, protocol_version)
67     @frame_decoder = V3::Decoder.new(self, @compressor)
68   else
69     @frame_encoder = V1::Encoder.new(@compressor, protocol_version)
70     @frame_decoder = V1::Decoder.new(self, @compressor)
71   end
72 
73   @request_queue_in = []
74   @request_queue_out = []
75   @event_listeners = []
76   @data = {}
77   @lock = Mutex.new
78   @closed_promise = Ione::Promise.new
79   @keyspace = nil
80   @heartbeat = nil
81   @terminate = nil
82   @heartbeat_interval = heartbeat_interval
83   @idle_timeout = idle_timeout
84   @error = nil
85 end

Public Instance Methods

[](key) click to toggle source

@see {#[]=} @return the value associated with the key

    # File lib/cassandra/protocol/cql_protocol_handler.rb
114 def [](key)
115   @lock.lock
116   @data[key]
117 ensure
118   @lock.unlock
119 end
[]=(key, value) click to toggle source

Associate arbitrary data with this protocol handler object. This is useful in situations where additional metadata can be loaded after the connection has been set up, or to keep statistics specific to the connection this protocol handler wraps.

    # File lib/cassandra/protocol/cql_protocol_handler.rb
105 def []=(key, value)
106   @lock.lock
107   @data[key] = value
108 ensure
109   @lock.unlock
110 end
close(cause = nil) click to toggle source

Closes the underlying connection.

@return [Ione::Future] a future that completes when the connection has closed

    # File lib/cassandra/protocol/cql_protocol_handler.rb
199 def close(cause = nil)
200   if @heartbeat
201     @scheduler.cancel_timer(@heartbeat)
202     @heartbeat = nil
203   end
204 
205   if @terminate
206     @scheduler.cancel_timer(@terminate)
207     @terminate = nil
208   end
209 
210   @scheduler.schedule_timer(0).on_value do
211     @connection.close(cause)
212   end
213 
214   @closed_promise.future
215 end
closed?() click to toggle source

@return [true, false] true if the underlying connection is closed

    # File lib/cassandra/protocol/cql_protocol_handler.rb
127 def closed?
128   @connection.closed?
129 end
complete_request(id, response) click to toggle source
    # File lib/cassandra/protocol/cql_protocol_handler.rb
231 def complete_request(id, response)
232   promise = nil
233   @lock.lock
234   begin
235     promise = @promises.delete(id)
236     @streams.unshift(id)
237   ensure
238     @lock.unlock
239   end
240   @keyspace = response.keyspace if response.is_a?(Protocol::SetKeyspaceResultResponse)
241   if response.is_a?(Protocol::SchemaChangeResultResponse) &&
242      response.change == 'DROPPED' &&
243      response.keyspace == @keyspace &&
244      response.target == Protocol::Constants::SCHEMA_CHANGE_TARGET_KEYSPACE
245     @keyspace = nil
246   end
247   flush_request_queue
248   promise.fulfill(response) unless promise.timed_out?
249 end
connected?() click to toggle source

@return [true, false] true if the underlying connection is connected

    # File lib/cassandra/protocol/cql_protocol_handler.rb
122 def connected?
123   @connection.connected?
124 end
host() click to toggle source

Returns the hostname of the underlying connection

@return [String] the hostname

   # File lib/cassandra/protocol/cql_protocol_handler.rb
90 def host
91   @connection.host
92 end
notify_event_listeners(event_response) click to toggle source
    # File lib/cassandra/protocol/cql_protocol_handler.rb
217 def notify_event_listeners(event_response)
218   event_listeners = nil
219   @lock.lock
220   begin
221     event_listeners = @event_listeners
222     return if event_listeners.empty?
223   ensure
224     @lock.unlock
225   end
226   event_listeners.each do |listener|
227     listener.call(event_response)
228   end
229 end
on_closed(&listener) click to toggle source

Register to receive notification when the underlying connection has closed. If the connection closed abruptly the error will be passed to the listener, otherwise it will not receive any parameters.

@yieldparam error [nil, Error] the error that caused the connection to

close, if any
    # File lib/cassandra/protocol/cql_protocol_handler.rb
137 def on_closed(&listener)
138   @closed_promise.future.on_value(&listener)
139   @closed_promise.future.on_failure(&listener)
140 end
on_event(&listener) click to toggle source

Register to receive server sent events, like schema changes, nodes going up or down, etc. To actually receive events you also need to send a REGISTER request for the events you wish to receive.

@yieldparam event [Cassandra::Protocol::EventResponse] an event sent by the server

    # File lib/cassandra/protocol/cql_protocol_handler.rb
147 def on_event(&listener)
148   @lock.lock
149   @event_listeners += [listener]
150 ensure
151   @lock.unlock
152 end
port() click to toggle source

Returns the port of the underlying connection

@return [Integer] the port

   # File lib/cassandra/protocol/cql_protocol_handler.rb
97 def port
98   @connection.port
99 end
send_request(request, timeout = nil, with_heartbeat = true) click to toggle source

Serializes and send a request over the underlying connection.

Returns a future that will resolve to the response. When the connection closes the futures of all active requests will be failed with the error that caused the connection to close, or nil.

When `timeout` is specified the future will fail with {Cassandra::Errors::TimeoutError} after that many seconds have passed. If a response arrives after that time it will be lost. If a response never arrives for the request the channel occupied by the request will not be reused.

@param [Cassandra::Protocol::Request] request @param [Float] timeout an optional number of seconds to wait until

failing the request

@return [Ione::Future<Cassandra::Protocol::Response>] a future that resolves to

the response
    # File lib/cassandra/protocol/cql_protocol_handler.rb
170 def send_request(request, timeout = nil, with_heartbeat = true)
171   return Ione::Future.failed(Errors::IOError.new('Connection closed')) if closed?
172   schedule_heartbeat if with_heartbeat
173   promise = RequestPromise.new(request, timeout, @scheduler)
174   id = nil
175   @lock.lock
176   begin
177     if (id = next_stream_id)
178       @promises[id] = promise
179     end
180   ensure
181     @lock.unlock
182   end
183   if id
184     write_request(id, promise)
185   else
186     @lock.lock
187     begin
188       @request_queue_in << promise
189     ensure
190       @lock.unlock
191     end
192   end
193   promise.future
194 end

Private Instance Methods

flush_request_queue() click to toggle source
    # File lib/cassandra/protocol/cql_protocol_handler.rb
334 def flush_request_queue
335   @lock.lock
336   begin
337     if @request_queue_out.empty? && !@request_queue_in.empty?
338       @request_queue_out = @request_queue_in
339       @request_queue_in = []
340     end
341   ensure
342     @lock.unlock
343   end
344   loop do
345     id = nil
346     promise = nil
347     @lock.lock
348     begin
349       if @request_queue_out.any? && (id = next_stream_id)
350         promise = @request_queue_out.shift
351         next if promise.timed_out?
352         @promises[id] = promise
353       end
354     ensure
355       @lock.unlock
356     end
357 
358     break unless id
359     write_request(id, promise)
360   end
361 end
next_stream_id() click to toggle source
    # File lib/cassandra/protocol/cql_protocol_handler.rb
440 def next_stream_id
441   @streams.shift
442 end
receive_data(data) click to toggle source
    # File lib/cassandra/protocol/cql_protocol_handler.rb
329 def receive_data(data)
330   reschedule_termination
331   @frame_decoder << data
332 end
reschedule_termination() click to toggle source
    # File lib/cassandra/protocol/cql_protocol_handler.rb
423 def reschedule_termination
424   return unless @idle_timeout
425 
426   timer = nil
427 
428   @lock.synchronize do
429     @scheduler.cancel_timer(@terminate) if @terminate
430 
431     @terminate = timer = @scheduler.schedule_timer(@idle_timeout)
432   end
433 
434   timer.on_value do
435     @terminate = nil
436     @connection.close(TERMINATED)
437   end
438 end
schedule_heartbeat() click to toggle source
    # File lib/cassandra/protocol/cql_protocol_handler.rb
405 def schedule_heartbeat
406   return unless @heartbeat_interval
407 
408   timer = nil
409 
410   @lock.synchronize do
411     @scheduler.cancel_timer(@heartbeat) if @heartbeat && !@heartbeat.resolved?
412 
413     @heartbeat = timer = @scheduler.schedule_timer(@heartbeat_interval)
414   end
415 
416   timer.on_value do
417     send_request(HEARTBEAT, nil, false).on_value do
418       schedule_heartbeat
419     end
420   end
421 end
socket_closed(cause) click to toggle source
    # File lib/cassandra/protocol/cql_protocol_handler.rb
370 def socket_closed(cause)
371   if cause
372     e = Errors::IOError.new(cause.message)
373     e.set_backtrace(cause.backtrace)
374 
375     cause = e
376   end
377   @error = cause
378 
379   request_failure_cause = cause || Errors::IOError.new('Connection closed')
380   promises_to_fail = nil
381   @lock.synchronize do
382     @scheduler.cancel_timer(@heartbeat) if @heartbeat
383     @scheduler.cancel_timer(@terminate) if @terminate
384 
385     @heartbeat = nil
386     @terminate = nil
387 
388     promises_to_fail = @promises.values
389     promises_to_fail.concat(@request_queue_in)
390     promises_to_fail.concat(@request_queue_out)
391     @promises.clear
392     @request_queue_in.clear
393     @request_queue_out.clear
394   end
395   promises_to_fail.each do |promise|
396     promise.fail(request_failure_cause) unless promise.timed_out?
397   end
398   if cause
399     @closed_promise.fail(cause)
400   else
401     @closed_promise.fulfill
402   end
403 end
write_request(id, request_promise) click to toggle source
    # File lib/cassandra/protocol/cql_protocol_handler.rb
363 def write_request(id, request_promise)
364   @connection.write do |buffer|
365     @frame_encoder.encode(buffer, request_promise.request, id)
366   end
367   request_promise.maybe_start_timer
368 end