class Sequel::Impala::Database
Constants
- DisconnectExceptions
- ImpalaExceptions
Exception classes used by
Impala
.- RECORD_QUERY_PROFILE
Public Instance Methods
connect(server)
click to toggle source
Connect to the Impala
server. Currently, only the :host and :port options are respected, and they default to 'localhost' and 21000, respectively.
# File lib/sequel/adapters/impala.rb 31 def connect(server) 32 opts = server_opts(server) 33 force_database(::Impala.connect(opts[:host]||'localhost', (opts[:port]||21000).to_i, opts), opts[:database]) 34 end
database_error_classes()
click to toggle source
# File lib/sequel/adapters/impala.rb 36 def database_error_classes 37 ImpalaExceptions 38 end
disconnect_connection(c)
click to toggle source
# File lib/sequel/adapters/impala.rb 40 def disconnect_connection(c) 41 log_info("Closing connection: #{c}") 42 c.close 43 rescue *DisconnectExceptions 44 end
execute(sql, opts=OPTS) { |cursor| ... }
click to toggle source
# File lib/sequel/adapters/impala.rb 46 def execute(sql, opts=OPTS) 47 synchronize(opts[:server]) do |c| 48 # here's my super-hack to get DDL calls to record their profiles and query_ids 49 opts = self.opts.select { |k, v| [:query_id_name, :profile_name].include?(k) }.merge(opts) 50 begin 51 cursor = record_query_id(opts) do 52 log_connection_yield(sql, c) do 53 c.execute(sql){} 54 end 55 end 56 yield cursor if block_given? 57 nil 58 rescue *ImpalaExceptions => e 59 raise_error(e) 60 ensure 61 record_profile(cursor, opts) 62 log_info("Closing cursor: #{cursor.inspect}") 63 log_query_url(cursor.handle) if cursor && cursor.handle 64 cursor.close if cursor && cursor.open? 65 end 66 end 67 end
log_query_url(handle)
click to toggle source
# File lib/sequel/adapters/impala.rb 69 def log_query_url(handle) 70 log_info(sprintf(ENV['SEQUEL_IMPALA_QUERY_URL'], query_id: handle.id)) if ENV['SEQUEL_IMPALA_QUERY_URL'] 71 end
profile_for(profile_name=:default)
click to toggle source
# File lib/sequel/adapters/impala.rb 93 def profile_for(profile_name=:default) 94 Sequel.synchronize{@runtime_profiles[profile_name]} 95 end
query_id_and_profile(query_id_name=:default, profile_name=:default) { || ... }
click to toggle source
# File lib/sequel/adapters/impala.rb 73 def query_id_and_profile(query_id_name=:default, profile_name=:default) 74 key = RECORD_QUERY_PROFILE 75 prev_profile_name = prev_query_id_name = nil 76 begin 77 Sequel.synchronize do 78 prev_query_id_name = @query_ids[key] 79 prev_profile_name = @runtime_profiles[key] 80 @query_ids[key] = query_id_name 81 @runtime_profiles[key] = profile_name 82 end 83 84 yield 85 ensure 86 Sequel.synchronize do 87 @query_ids[key] = prev_query_id_name 88 @runtime_profiles[key] = prev_profile_name 89 end 90 end 91 end
query_id_for(query_id_name=:default)
click to toggle source
# File lib/sequel/adapters/impala.rb 97 def query_id_for(query_id_name=:default) 98 Sequel.synchronize{@query_ids[query_id_name]} 99 end
Private Instance Methods
adapter_initialize()
click to toggle source
# File lib/sequel/adapters/impala.rb 128 def adapter_initialize 129 @runtime_profiles = {} 130 @query_ids = {} 131 end
connection_execute_method()
click to toggle source
# File lib/sequel/adapters/impala.rb 133 def connection_execute_method 134 :query 135 end
dataset_class_default()
click to toggle source
# File lib/sequel/adapters/impala.rb 103 def dataset_class_default 104 Dataset 105 end
disconnect_error?(exception, opts)
click to toggle source
Impala
raises IOError if it detects a problem on the connection, and in most cases that results in an unusable connection, so treat it as a disconnect error so Sequel
will reconnect.
Calls superclass method
# File lib/sequel/adapters/impala.rb 140 def disconnect_error?(exception, opts) 141 case exception 142 when *DisconnectExceptions 143 true 144 else 145 super 146 end 147 end
record_profile(cursor, opts)
click to toggle source
# File lib/sequel/adapters/impala.rb 107 def record_profile(cursor, opts) 108 if cursor && (profile_name = opts[:profile_name] || Sequel.synchronize{@runtime_profiles[RECORD_QUERY_PROFILE]}) 109 profile = cursor.runtime_profile 110 Sequel.synchronize{@runtime_profiles[profile_name] = profile} 111 end 112 end
record_query_id(opts = OPTS) { || ... }
click to toggle source
# File lib/sequel/adapters/impala.rb 114 def record_query_id(opts = OPTS) 115 query_id_name = opts[:query_id_name] || Sequel.synchronize{@query_ids[RECORD_QUERY_PROFILE]} 116 start = Time.now if query_id_name 117 118 cursor = yield 119 120 if cursor && query_id_name 121 h = { query_id: cursor.handle.id, start_time: start } 122 Sequel.synchronize{ @query_ids[query_id_name] = h } 123 end 124 125 cursor 126 end
schema_parse_table(table_name, opts)
click to toggle source
Use DESCRIBE to get the column names and types for the table.
# File lib/sequel/adapters/impala.rb 150 def schema_parse_table(table_name, opts) 151 m = output_identifier_meth(opts[:dataset]) 152 153 table = if opts[:schema] 154 Sequel.qualify(opts[:schema], table_name) 155 else 156 Sequel.identifier(table_name) 157 end 158 159 describe(table, opts).map do |row| 160 row[:db_type] = row[:type] 161 row[:type] = schema_column_type(row[:db_type]) 162 row[:default] = nil 163 row[:primary_key] = false 164 [m.call(row.delete(:name)), row] 165 end 166 end