class Dynflow::Coordinator
Attributes
adapter[R]
Public Class Methods
new(coordinator_adapter)
click to toggle source
# File lib/dynflow/coordinator.rb, line 324 def initialize(coordinator_adapter) @adapter = coordinator_adapter end
Public Instance Methods
acquire(lock, &block)
click to toggle source
# File lib/dynflow/coordinator.rb, line 328 def acquire(lock, &block) Type! lock, Lock lock.validate! adapter.create_record(lock) if block begin block.call # We are looking for ::Sidekiq::Shutdown, but that may not be defined. We rely on it being a subclass of Interrupt # We don't really want to rescue it, but we need to bind it somehow so that we can check it in ensure rescue Interrupt => e raise e ensure release(lock) if !(defined?(::Sidekiq) && e.is_a?(::Sidekiq::Shutdown)) || lock.unlock_on_shutdown? end end rescue DuplicateRecordError => e raise LockError.new(e.record) end
clean_orphaned_locks()
click to toggle source
# File lib/dynflow/coordinator.rb, line 410 def clean_orphaned_locks cleanup_classes = [LockByWorld, SingletonActionLock] ret = [] cleanup_classes.each do |cleanup_class| valid_owner_ids = cleanup_class.valid_owner_ids(self) valid_classes = cleanup_class.valid_classes.map(&:name) orphaned_locks = find_locks(class: valid_classes, exclude_owner_id: valid_owner_ids) # reloading the valid owner ids to avoid race conditions valid_owner_ids = cleanup_class.valid_owner_ids(self) orphaned_locks.each do |lock| unless valid_owner_ids.include?(lock.owner_id) release(lock) ret << lock end end end return ret end
create_record(record)
click to toggle source
# File lib/dynflow/coordinator.rb, line 362 def create_record(record) Type! record, Record adapter.create_record(record) end
deactivate_world(world)
click to toggle source
# File lib/dynflow/coordinator.rb, line 404 def deactivate_world(world) Type! world, Coordinator::ExecutorWorld world.active = false update_record(world) end
delete_record(record)
click to toggle source
# File lib/dynflow/coordinator.rb, line 372 def delete_record(record) Type! record, Record adapter.delete_record(record) end
delete_world(world, on_termination = false)
click to toggle source
# File lib/dynflow/coordinator.rb, line 398 def delete_world(world, on_termination = false) Type! world, Coordinator::ClientWorld, Coordinator::ExecutorWorld release_by_owner("world:#{world.id}", on_termination) delete_record(world) end
find_locks(filter_options)
click to toggle source
# File lib/dynflow/coordinator.rb, line 356 def find_locks(filter_options) adapter.find_records(filter_options).map do |lock_data| Lock.from_hash(lock_data) end end
find_records(filter)
click to toggle source
# File lib/dynflow/coordinator.rb, line 377 def find_records(filter) adapter.find_records(filter).map do |record_data| Record.from_hash(record_data) end end
find_worlds(active_executor_only = false, filters = {})
click to toggle source
# File lib/dynflow/coordinator.rb, line 383 def find_worlds(active_executor_only = false, filters = {}) ret = find_records(filters.merge(class: Coordinator::ExecutorWorld.name)) if active_executor_only ret = ret.select(&:active?) else ret.concat(find_records(filters.merge(class: Coordinator::ClientWorld.name))) end ret end
register_world(world)
click to toggle source
# File lib/dynflow/coordinator.rb, line 393 def register_world(world) Type! world, Coordinator::ClientWorld, Coordinator::ExecutorWorld create_record(world) end
release(lock)
click to toggle source
# File lib/dynflow/coordinator.rb, line 347 def release(lock) Type! lock, Lock adapter.delete_record(lock) end
release_by_owner(owner_id, on_termination = false)
click to toggle source
# File lib/dynflow/coordinator.rb, line 352 def release_by_owner(owner_id, on_termination = false) find_locks(owner_id: owner_id).map { |lock| release(lock) if !on_termination || lock.unlock_on_shutdown? } end
update_record(record)
click to toggle source
# File lib/dynflow/coordinator.rb, line 367 def update_record(record) Type! record, Record adapter.update_record(record) end