class Sequel::Impala::Database

Constants

DisconnectExceptions
ImpalaExceptions

Exception classes used by Impala.

RECORD_QUERY_PROFILE

Public Instance Methods

connect(server) click to toggle source

Connect to the Impala server. Currently, only the :host and :port options are respected, and they default to 'localhost' and 21000, respectively.

   # File lib/sequel/adapters/impala.rb
31 def connect(server)
32   opts = server_opts(server)
33   force_database(::Impala.connect(opts[:host]||'localhost', (opts[:port]||21000).to_i, opts), opts[:database])
34 end
database_error_classes() click to toggle source
   # File lib/sequel/adapters/impala.rb
36 def database_error_classes
37   ImpalaExceptions
38 end
disconnect_connection(c) click to toggle source
   # File lib/sequel/adapters/impala.rb
40 def disconnect_connection(c)
41   log_info("Closing connection: #{c}")
42   c.close
43 rescue *DisconnectExceptions
44 end
execute(sql, opts=OPTS) { |cursor| ... } click to toggle source
   # File lib/sequel/adapters/impala.rb
46 def execute(sql, opts=OPTS)
47   synchronize(opts[:server]) do |c|
48     # here's my super-hack to get DDL calls to record their profiles and query_ids
49     opts = self.opts.select { |k, v| [:query_id_name, :profile_name].include?(k) }.merge(opts)
50     begin
51       cursor = record_query_id(opts) do
52         log_connection_yield(sql, c) do
53           c.execute(sql){}
54         end
55       end
56       yield cursor if block_given?
57       nil
58     rescue *ImpalaExceptions => e
59       raise_error(e)
60     ensure
61       record_profile(cursor, opts)
62       log_info("Closing cursor: #{cursor.inspect}")
63       log_query_url(cursor.handle) if cursor && cursor.handle
64       cursor.close if cursor && cursor.open?
65     end
66   end
67 end
log_query_url(handle) click to toggle source
   # File lib/sequel/adapters/impala.rb
69 def log_query_url(handle)
70   log_info(sprintf(ENV['SEQUEL_IMPALA_QUERY_URL'], query_id: handle.id)) if ENV['SEQUEL_IMPALA_QUERY_URL']
71 end
profile_for(profile_name=:default) click to toggle source
   # File lib/sequel/adapters/impala.rb
93 def profile_for(profile_name=:default)
94   Sequel.synchronize{@runtime_profiles[profile_name]}
95 end
query_id_and_profile(query_id_name=:default, profile_name=:default) { || ... } click to toggle source
   # File lib/sequel/adapters/impala.rb
73 def query_id_and_profile(query_id_name=:default, profile_name=:default)
74   key = RECORD_QUERY_PROFILE
75   prev_profile_name = prev_query_id_name = nil
76   begin
77     Sequel.synchronize do
78       prev_query_id_name = @query_ids[key]
79       prev_profile_name = @runtime_profiles[key]
80       @query_ids[key] = query_id_name
81       @runtime_profiles[key] = profile_name
82     end
83 
84     yield
85   ensure
86     Sequel.synchronize do
87       @query_ids[key] = prev_query_id_name
88       @runtime_profiles[key] = prev_profile_name
89     end
90   end
91 end
query_id_for(query_id_name=:default) click to toggle source
   # File lib/sequel/adapters/impala.rb
97 def query_id_for(query_id_name=:default)
98   Sequel.synchronize{@query_ids[query_id_name]}
99 end

Private Instance Methods

adapter_initialize() click to toggle source
    # File lib/sequel/adapters/impala.rb
128 def adapter_initialize
129   @runtime_profiles = {}
130   @query_ids = {}
131 end
connection_execute_method() click to toggle source
    # File lib/sequel/adapters/impala.rb
133 def connection_execute_method
134   :query
135 end
dataset_class_default() click to toggle source
    # File lib/sequel/adapters/impala.rb
103 def dataset_class_default
104   Dataset
105 end
disconnect_error?(exception, opts) click to toggle source

Impala raises IOError if it detects a problem on the connection, and in most cases that results in an unusable connection, so treat it as a disconnect error so Sequel will reconnect.

Calls superclass method
    # File lib/sequel/adapters/impala.rb
140 def disconnect_error?(exception, opts)
141   case exception
142   when *DisconnectExceptions
143     true
144   else
145     super
146   end
147 end
record_profile(cursor, opts) click to toggle source
    # File lib/sequel/adapters/impala.rb
107 def record_profile(cursor, opts)
108   if cursor && (profile_name = opts[:profile_name] || Sequel.synchronize{@runtime_profiles[RECORD_QUERY_PROFILE]})
109     profile = cursor.runtime_profile
110     Sequel.synchronize{@runtime_profiles[profile_name] = profile}
111   end
112 end
record_query_id(opts = OPTS) { || ... } click to toggle source
    # File lib/sequel/adapters/impala.rb
114 def record_query_id(opts = OPTS)
115   query_id_name = opts[:query_id_name] || Sequel.synchronize{@query_ids[RECORD_QUERY_PROFILE]}
116   start = Time.now if query_id_name
117 
118   cursor = yield
119 
120   if cursor && query_id_name
121     h = { query_id: cursor.handle.id, start_time: start }
122     Sequel.synchronize{ @query_ids[query_id_name] = h }
123   end
124 
125   cursor
126 end
schema_parse_table(table_name, opts) click to toggle source

Use DESCRIBE to get the column names and types for the table.

    # File lib/sequel/adapters/impala.rb
150 def schema_parse_table(table_name, opts)
151   m = output_identifier_meth(opts[:dataset])
152 
153   table = if opts[:schema]
154     Sequel.qualify(opts[:schema], table_name)
155   else
156     Sequel.identifier(table_name)
157   end
158 
159   describe(table, opts).map do |row|
160     row[:db_type] = row[:type]
161     row[:type] = schema_column_type(row[:db_type])
162     row[:default] = nil
163     row[:primary_key] = false
164     [m.call(row.delete(:name)), row]
165   end
166 end