class Dynflow::PersistenceAdapters::Sequel
Constants
- MAX_RETRIES
- META_DATA
- RETRY_DELAY
- SERIALIZABLE_COLUMNS
- TABLES
Attributes
db[R]
Public Class Methods
new(config)
click to toggle source
# File lib/dynflow/persistence_adapters/sequel.rb, line 51 def initialize(config) migrate = true config = config.dup @additional_responsibilities = { coordinator: true, connector: true } if config.is_a?(Hash) @additional_responsibilities.merge!(config.delete(:additional_responsibilities)) if config.key?(:additional_responsibilities) migrate = config.fetch(:migrate, true) end @db = initialize_db config migrate_db if migrate end
Private Class Methods
migrations_path()
click to toggle source
# File lib/dynflow/persistence_adapters/sequel.rb, line 358 def self.migrations_path File.expand_path('../sequel_migrations', __FILE__) end
Public Instance Methods
abort_if_pending_migrations!()
click to toggle source
# File lib/dynflow/persistence_adapters/sequel.rb, line 334 def abort_if_pending_migrations! ::Sequel::Migrator.check_current(db, self.class.migrations_path, table: 'dynflow_schema_info') end
chain_execution_plan(first, second)
click to toggle source
# File lib/dynflow/persistence_adapters/sequel.rb, line 198 def chain_execution_plan(first, second) save :execution_plan_dependency, {}, { execution_plan_uuid: second, blocked_by_uuid: first }, with_data: false end
connector_feature!()
click to toggle source
# File lib/dynflow/persistence_adapters/sequel.rb, line 249 def connector_feature! unless @additional_responsibilities[:connector] raise "The sequel persistence adapter connector feature used but not enabled in additional_features" end end
coordinator_feature!()
click to toggle source
# File lib/dynflow/persistence_adapters/sequel.rb, line 288 def coordinator_feature! unless @additional_responsibilities[:coordinator] raise "The sequel persistence adapter coordinator feature used but not enabled in additional_features" end end
delete_coordinator_record(class_name, record_id)
click to toggle source
# File lib/dynflow/persistence_adapters/sequel.rb, line 304 def delete_coordinator_record(class_name, record_id) coordinator_feature! with_retry { table(:coordinator_record).where(class: class_name, id: record_id).delete } end
delete_delayed_plans(filters, batch_size = 1000)
click to toggle source
# File lib/dynflow/persistence_adapters/sequel.rb, line 134 def delete_delayed_plans(filters, batch_size = 1000) count = 0 with_retry do filter(:delayed, table(:delayed), filters).each_slice(batch_size) do |plans| uuids = plans.map { |p| p.fetch(:execution_plan_uuid) } @db.transaction do count += table(:delayed).where(execution_plan_uuid: uuids).delete end end end count end
delete_execution_plans(filters, batch_size = 1000, backup_dir = nil)
click to toggle source
# File lib/dynflow/persistence_adapters/sequel.rb, line 99 def delete_execution_plans(filters, batch_size = 1000, backup_dir = nil) count = 0 with_retry do filter(:execution_plan, table(:execution_plan), filters).each_slice(batch_size) do |plans| uuids = plans.map { |p| p.fetch(:uuid) } @db.transaction do table(:delayed).where(execution_plan_uuid: uuids).delete steps = table(:step).where(execution_plan_uuid: uuids) backup_to_csv(:step, steps, backup_dir, 'steps.csv') if backup_dir steps.delete output_chunks = table(:output_chunk).where(execution_plan_uuid: uuids).delete actions = table(:action).where(execution_plan_uuid: uuids) backup_to_csv(:action, actions, backup_dir, 'actions.csv') if backup_dir actions.delete execution_plans = table(:execution_plan).where(uuid: uuids) backup_to_csv(:execution_plan, execution_plans, backup_dir, 'execution_plans.csv') if backup_dir count += execution_plans.delete end end return count end end
delete_output_chunks(execution_plan_id, action_id)
click to toggle source
# File lib/dynflow/persistence_adapters/sequel.rb, line 243 def delete_output_chunks(execution_plan_id, action_id) with_retry do filter(:output_chunk, table(:output_chunk), { execution_plan_uuid: execution_plan_id, action_id: action_id }).delete end end
filtering_by()
click to toggle source
# File lib/dynflow/persistence_adapters/sequel.rb, line 27 def filtering_by META_DATA.fetch :execution_plan end
find_blocked_execution_plans(execution_plan_id)
click to toggle source
# File lib/dynflow/persistence_adapters/sequel.rb, line 163 def find_blocked_execution_plans(execution_plan_id) table(:execution_plan_dependency) .where(blocked_by_uuid: execution_plan_id) .select_map(:execution_plan_uuid) end
find_coordinator_records(options)
click to toggle source
# File lib/dynflow/persistence_adapters/sequel.rb, line 309 def find_coordinator_records(options) coordinator_feature! options = options.dup filters = (options[:filters] || {}).dup exclude_owner_id = filters.delete(:exclude_owner_id) data_set = filter(:coordinator_record, table(:coordinator_record), filters) if exclude_owner_id data_set = data_set.exclude(:owner_id => exclude_owner_id) end with_retry do data_set.all.map { |record| load_data(record) } end end
find_execution_plan_counts(options = {})
click to toggle source
# File lib/dynflow/persistence_adapters/sequel.rb, line 79 def find_execution_plan_counts(options = {}) with_retry { filter(:execution_plan, table(:execution_plan), options[:filters]).count } end
find_execution_plan_counts_after(timestamp, options = {})
click to toggle source
# File lib/dynflow/persistence_adapters/sequel.rb, line 83 def find_execution_plan_counts_after(timestamp, options = {}) with_retry { filter(:execution_plan, table(:execution_plan), options[:filters]).filter(::Sequel.lit('ended_at >= ?', timestamp)).count } end
find_execution_plan_dependencies(execution_plan_id)
click to toggle source
# File lib/dynflow/persistence_adapters/sequel.rb, line 157 def find_execution_plan_dependencies(execution_plan_id) table(:execution_plan_dependency) .where(execution_plan_uuid: execution_plan_id) .select_map(:blocked_by_uuid) end
find_execution_plan_statuses(options)
click to toggle source
# File lib/dynflow/persistence_adapters/sequel.rb, line 87 def find_execution_plan_statuses(options) plans = with_retry do filter(:execution_plan, table(:execution_plan), options[:filters]) .select(:uuid, :state, :result) end plans.each_with_object({}) do |current, acc| uuid = current.delete(:uuid) acc[uuid] = current end end
find_execution_plans(options = {})
click to toggle source
# File lib/dynflow/persistence_adapters/sequel.rb, line 67 def find_execution_plans(options = {}) table_name = :execution_plan options[:order_by] ||= :started_at data_set = filter(table_name, order(table_name, paginate(table(table_name), options), options), options[:filters]) records = with_retry { data_set.all } records.map { |record| execution_plan_column_map(load_data(record, table_name)) } end
find_old_execution_plans(age)
click to toggle source
# File lib/dynflow/persistence_adapters/sequel.rb, line 147 def find_old_execution_plans(age) table_name = :execution_plan records = with_retry do table(table_name) .where(::Sequel.lit('ended_at <= ? AND state = ?', age, 'stopped')) .all end records.map { |plan| execution_plan_column_map(load_data plan, table_name) } end
find_ready_delayed_plans(time)
click to toggle source
# File lib/dynflow/persistence_adapters/sequel.rb, line 169 def find_ready_delayed_plans(time) table_name = :delayed # Subquery to find delayed plans that have at least one non-stopped dependency plans_with_unfinished_deps = table(:execution_plan_dependency) .join(TABLES[:execution_plan], uuid: :blocked_by_uuid) .where(::Sequel.~(state: 'stopped')) .select(:execution_plan_uuid) records = with_retry do table(table_name) .where(::Sequel.lit('start_at IS NULL OR (start_at <= ? OR (start_before IS NOT NULL AND start_before <= ?))', time, time)) .where(:frozen => false) .exclude(execution_plan_uuid: plans_with_unfinished_deps) .order_by(:start_at) .all end records.map { |plan| load_data(plan, table_name) } end
insert_coordinator_record(value)
click to toggle source
# File lib/dynflow/persistence_adapters/sequel.rb, line 294 def insert_coordinator_record(value) coordinator_feature! save :coordinator_record, {}, value end
load_action(execution_plan_id, action_id)
click to toggle source
# File lib/dynflow/persistence_adapters/sequel.rb, line 215 def load_action(execution_plan_id, action_id) load :action, execution_plan_uuid: execution_plan_id, id: action_id end
load_actions(execution_plan_id, action_ids)
click to toggle source
# File lib/dynflow/persistence_adapters/sequel.rb, line 219 def load_actions(execution_plan_id, action_ids) load_records :action, { execution_plan_uuid: execution_plan_id, id: action_ids } end
load_actions_attributes(execution_plan_id, attributes)
click to toggle source
# File lib/dynflow/persistence_adapters/sequel.rb, line 223 def load_actions_attributes(execution_plan_id, attributes) load_records :action, { execution_plan_uuid: execution_plan_id }, attributes end
load_delayed_plan(execution_plan_id)
click to toggle source
# File lib/dynflow/persistence_adapters/sequel.rb, line 188 def load_delayed_plan(execution_plan_id) load :delayed, execution_plan_uuid: execution_plan_id rescue KeyError return nil end
load_execution_plan(execution_plan_id)
click to toggle source
# File lib/dynflow/persistence_adapters/sequel.rb, line 126 def load_execution_plan(execution_plan_id) execution_plan_column_map(load :execution_plan, uuid: execution_plan_id) end
load_output_chunks(execution_plan_id, action_id)
click to toggle source
# File lib/dynflow/persistence_adapters/sequel.rb, line 239 def load_output_chunks(execution_plan_id, action_id) load_records :output_chunk, { execution_plan_uuid: execution_plan_id, action_id: action_id }, [:timestamp, :kind, :chunk] end
load_step(execution_plan_id, step_id)
click to toggle source
# File lib/dynflow/persistence_adapters/sequel.rb, line 202 def load_step(execution_plan_id, step_id) load :step, execution_plan_uuid: execution_plan_id, id: step_id end
load_steps(execution_plan_id)
click to toggle source
# File lib/dynflow/persistence_adapters/sequel.rb, line 206 def load_steps(execution_plan_id) load_records :step, execution_plan_uuid: execution_plan_id end
migrate_db()
click to toggle source
# File lib/dynflow/persistence_adapters/sequel.rb, line 330 def migrate_db ::Sequel::Migrator.run(db, self.class.migrations_path, table: 'dynflow_schema_info') end
ordering_by()
click to toggle source
# File lib/dynflow/persistence_adapters/sequel.rb, line 31 def ordering_by META_DATA.fetch :execution_plan end
pagination?()
click to toggle source
# File lib/dynflow/persistence_adapters/sequel.rb, line 23 def pagination? true end
prune_envelopes(receiver_ids)
click to toggle source
# File lib/dynflow/persistence_adapters/sequel.rb, line 278 def prune_envelopes(receiver_ids) connector_feature! with_retry { table(:envelope).where(receiver_id: receiver_ids).delete } end
prune_undeliverable_envelopes()
click to toggle source
# File lib/dynflow/persistence_adapters/sequel.rb, line 283 def prune_undeliverable_envelopes connector_feature! with_retry { table(:envelope).where(receiver_id: table(:coordinator_record).select(:id)).invert.delete } end
pull_envelopes(receiver_id)
click to toggle source
# File lib/dynflow/persistence_adapters/sequel.rb, line 260 def pull_envelopes(receiver_id) connector_feature! with_retry do db.transaction do data_set = table(:envelope).where(receiver_id: receiver_id).all envelopes = data_set.map { |record| load_data(record) } table(:envelope).where(id: data_set.map { |d| d[:id] }).delete return envelopes end end end
push_envelope(envelope)
click to toggle source
# File lib/dynflow/persistence_adapters/sequel.rb, line 273 def push_envelope(envelope) connector_feature! with_retry { table(:envelope).insert(prepare_record(:envelope, envelope)) } end
save_action(execution_plan_id, action_id, value)
click to toggle source
# File lib/dynflow/persistence_adapters/sequel.rb, line 227 def save_action(execution_plan_id, action_id, value) save :action, { execution_plan_uuid: execution_plan_id, id: action_id }, value, with_data: false end
save_delayed_plan(execution_plan_id, value)
click to toggle source
# File lib/dynflow/persistence_adapters/sequel.rb, line 194 def save_delayed_plan(execution_plan_id, value) save :delayed, { execution_plan_uuid: execution_plan_id }, value, with_data: false end
save_envelope(data)
click to toggle source
# File lib/dynflow/persistence_adapters/sequel.rb, line 255 def save_envelope(data) connector_feature! save :envelope, {}, data end
save_execution_plan(execution_plan_id, value)
click to toggle source
# File lib/dynflow/persistence_adapters/sequel.rb, line 130 def save_execution_plan(execution_plan_id, value) save :execution_plan, { uuid: execution_plan_id }, value, with_data: false end
save_output_chunks(execution_plan_id, action_id, chunks)
click to toggle source
# File lib/dynflow/persistence_adapters/sequel.rb, line 231 def save_output_chunks(execution_plan_id, action_id, chunks) chunks.each do |chunk| chunk[:execution_plan_uuid] = execution_plan_id chunk[:action_id] = action_id save :output_chunk, {}, chunk, with_data: false end end
save_step(execution_plan_id, step_id, value, update_conditions = {})
click to toggle source
# File lib/dynflow/persistence_adapters/sequel.rb, line 210 def save_step(execution_plan_id, step_id, value, update_conditions = {}) save :step, { execution_plan_uuid: execution_plan_id, id: step_id }, value, with_data: false, update_conditions: update_conditions end
to_hash()
click to toggle source
# File lib/dynflow/persistence_adapters/sequel.rb, line 323 def to_hash { execution_plans: table(:execution_plan).all.to_a, steps: table(:step).all.to_a, actions: table(:action).all.to_a, envelopes: table(:envelope).all.to_a } end
transaction(&block)
click to toggle source
# File lib/dynflow/persistence_adapters/sequel.rb, line 63 def transaction(&block) db.transaction(&block) end
update_coordinator_record(class_name, record_id, value)
click to toggle source
# File lib/dynflow/persistence_adapters/sequel.rb, line 299 def update_coordinator_record(class_name, record_id, value) coordinator_feature! save :coordinator_record, { class: class_name, :id => record_id }, value end
Private Instance Methods
backup_to_csv(table_name, dataset, backup_dir, file_name)
click to toggle source
# File lib/dynflow/persistence_adapters/sequel.rb, line 470 def backup_to_csv(table_name, dataset, backup_dir, file_name) ensure_backup_dir(backup_dir) csv_file = File.join(backup_dir, file_name) appending = File.exist?(csv_file) columns = dataset.columns File.open(csv_file, 'a') do |csv| csv << columns.to_csv unless appending dataset.each do |row| values = columns.map do |col| value = row[col] value = value.unpack('H*').first if value && SERIALIZABLE_COLUMNS.fetch(table_name, []).include?(col.to_s) value end csv << values.to_csv end end dataset end
delete(what, condition)
click to toggle source
# File lib/dynflow/persistence_adapters/sequel.rb, line 489 def delete(what, condition) with_retry { table(what).where(Utils.symbolize_keys(condition)).delete } end
dump_data(value)
click to toggle source
# File lib/dynflow/persistence_adapters/sequel.rb, line 499 def dump_data(value) return if value.nil? packed = MessagePack.pack(Type!(value, Hash, Array, Integer, String)) ::Sequel.blob(packed) end
ensure_backup_dir(backup_dir)
click to toggle source
# File lib/dynflow/persistence_adapters/sequel.rb, line 466 def ensure_backup_dir(backup_dir) FileUtils.mkdir_p(backup_dir) unless File.directory?(backup_dir) end
execution_plan_column_map(plan)
click to toggle source
# File lib/dynflow/persistence_adapters/sequel.rb, line 578 def execution_plan_column_map(plan) plan[:id] = plan[:uuid] unless plan[:uuid].nil? plan end
extract_metadata(what, value)
click to toggle source
# File lib/dynflow/persistence_adapters/sequel.rb, line 493 def extract_metadata(what, value) meta_keys = META_DATA.fetch(what) - SERIALIZABLE_COLUMNS.fetch(what, []) value = Utils.indifferent_hash(value) meta_keys.inject({}) { |h, k| h.update k.to_sym => value[k] } end
filter(what, data_set, filters)
click to toggle source
# File lib/dynflow/persistence_adapters/sequel.rb, line 527 def filter(what, data_set, filters) Type! filters, NilClass, Hash return data_set if filters.nil? filters = filters.each.with_object({}) { |(k, v), hash| hash[k.to_s] = v } unknown = filters.keys - META_DATA.fetch(what) if what == :execution_plan unknown -= %w[uuid caller_execution_plan_id caller_action_id delayed] if filters.key?('caller_action_id') && !filters.key?('caller_execution_plan_id') raise ArgumentError, "caller_action_id given but caller_execution_plan_id missing" end if filters.key?('caller_execution_plan_id') data_set = data_set.join_table(:inner, TABLES[:action], :execution_plan_uuid => :uuid) .select_all(TABLES[:execution_plan]).distinct end if filters.key?('delayed') filters.delete('delayed') data_set = data_set.join_table(:inner, TABLES[:delayed], :execution_plan_uuid => :uuid) .select_all(TABLES[:execution_plan]).distinct end end unless unknown.empty? raise ArgumentError, "unkown columns: #{unknown.inspect}" end data_set.where Utils.symbolize_keys(filters) end
initialize_db(db_path)
click to toggle source
# File lib/dynflow/persistence_adapters/sequel.rb, line 353 def initialize_db(db_path) logger = Logger.new($stderr) if ENV['DYNFLOW_SQL_LOG'] ::Sequel.connect db_path, logger: logger end
load_data(record, what = nil)
click to toggle source
# File lib/dynflow/persistence_adapters/sequel.rb, line 453 def load_data(record, what = nil) hash = if record[:data].nil? SERIALIZABLE_COLUMNS.fetch(what, []).each do |key| key = key.to_sym record[key] = MessagePack.unpack(record[key].to_s) unless record[key].nil? end record else MessagePack.unpack(record[:data].to_s) end Utils.indifferent_hash(hash) end
load_record(what, condition)
click to toggle source
# File lib/dynflow/persistence_adapters/sequel.rb, line 413 def load_record(what, condition) table = table(what) if (record = with_retry { table.first(Utils.symbolize_keys(condition)) }) load_data(record, what) else raise KeyError, "searching: #{what} by: #{condition.inspect}" end end
Also aliased as: load
load_records(what, condition, keys = nil)
click to toggle source
# File lib/dynflow/persistence_adapters/sequel.rb, line 432 def load_records(what, condition, keys = nil) table = table(what) records = with_retry do filtered = table.filter(Utils.symbolize_keys(condition)) # Filter out requested columns which the table doesn't have, load data just in case unless keys.nil? columns = table.columns & keys columns |= [:data] if table.columns.include?(:data) filtered = filtered.select(*columns) end filtered.all end records = records.map { |record| load_data(record, what) } return records if keys.nil? records.map do |record| keys.reduce({}) do |acc, key| acc.merge(key => record[key]) end end end
order(what, data_set, options)
click to toggle source
# File lib/dynflow/persistence_adapters/sequel.rb, line 517 def order(what, data_set, options) order_by = (options[:order_by]).to_s return data_set if order_by.empty? unless META_DATA.fetch(what).include? order_by raise ArgumentError, "unknown column #{order_by.inspect}" end order_by = order_by.to_sym data_set.order_by options[:desc] ? ::Sequel.desc(order_by) : order_by end
paginate(data_set, options)
click to toggle source
# File lib/dynflow/persistence_adapters/sequel.rb, line 505 def paginate(data_set, options) page = Integer(options[:page]) if options[:page] per_page = Integer(options[:per_page]) if options[:per_page] if page raise ArgumentError, "page specified without per_page attribute" unless per_page data_set.limit per_page, per_page * page else data_set end end
prepare_record(table_name, value, base = {}, with_data = true)
click to toggle source
# File lib/dynflow/persistence_adapters/sequel.rb, line 362 def prepare_record(table_name, value, base = {}, with_data = true) record = base.dup has_data_column = table(table_name).columns.include?(:data) if with_data && has_data_column record[:data] = dump_data(value) else if has_data_column record[:data] = nil else record.delete(:data) end record.merge! serialize_columns(table_name, value) end record.merge! extract_metadata(table_name, value) record.each { |k, v| record[k] = v.to_s if v.is_a? Symbol } record end
prune_unchanged(what, object, record)
click to toggle source
# File lib/dynflow/persistence_adapters/sequel.rb, line 422 def prune_unchanged(what, object, record) record = record.dup table(what).columns.each do |column| record.delete(column) if object[column] == record[column] end record end
save(what, condition, value, with_data: true, update_conditions: {})
click to toggle source
# File lib/dynflow/persistence_adapters/sequel.rb, line 392 def save(what, condition, value, with_data: true, update_conditions: {}) table = table(what) existing_record = with_retry { table.first condition } unless condition.empty? if value record = prepare_record(what, value, (existing_record || condition), with_data) if existing_record record = prune_unchanged(what, existing_record, record) return value if record.empty? condition = update_conditions.merge(condition) return with_retry { table.where(condition).update(record) } else with_retry { table.insert record } end else existing_record and with_retry { table.where(condition).delete } end value end
serialize_columns(table_name, record)
click to toggle source
# File lib/dynflow/persistence_adapters/sequel.rb, line 382 def serialize_columns(table_name, record) record.reduce({}) do |acc, (key, value)| if SERIALIZABLE_COLUMNS.fetch(table_name, []).include?(key.to_s) acc.merge(key.to_sym => dump_data(value)) else acc end end end
table(which)
click to toggle source
# File lib/dynflow/persistence_adapters/sequel.rb, line 349 def table(which) db[TABLES.fetch(which)] end
with_retry() { || ... }
click to toggle source
# File lib/dynflow/persistence_adapters/sequel.rb, line 558 def with_retry attempts = 0 begin yield rescue ::Sequel::DatabaseConnectionError, ::Sequel::DatabaseDisconnectError => e attempts += 1 log(:error, e) if attempts > MAX_RETRIES log(:error, "The number of MAX_RETRIES exceeded") raise Errors::FatalPersistenceError.delegate(e) else log(:error, "Persistence retry no. #{attempts}") sleep RETRY_DELAY retry end rescue Exception => e raise Errors::PersistenceError.delegate(e) end end