class DaemonRunner::Semaphore

Manage semaphore locks with Consul

Attributes

limit[R]

The number of nodes that can obtain a semaphore lock

lock_content[R]

The lock content

lock_modify_index[R]

The current lock modify index

members[R]

The current semaphore members

prefix[R]

The Consul key prefix

session[R]

The Consul session

state[R]

The current state of the semaphore

Public Class Methods

lock(name, limit = 3, **options) { || ... } click to toggle source

Acquire a lock with the current session

@param limit [Integer] The number of nodes that can request the lock @see initialize for extra `options` @return [DaemonRunner::Semaphore] instance of the semaphore class

# File lib/daemon_runner/semaphore.rb, line 17
def lock(name, limit = 3, **options)
  options.merge!(name: name)
  options.merge!(limit: limit)
  semaphore = Semaphore.new(options)
  semaphore.lock
  if block_given?
    begin
      until semaphore.locked?
        semaphore.try_lock
        sleep 0.1
      end
      lock_thr = semaphore.renew
      yield
    ensure
      lock_thr.kill unless lock_thr.nil?
      semaphore.release
    end
  end
  semaphore
rescue Exception => e
  logger.error e
  logger.debug e.backtrace.join("\n")
  raise
end
new(name:, prefix: nil, lock: nil, limit: 3) click to toggle source

@param name [String] The name of the session, it is also used in the `prefix` @param prefix [String|NilClass] The Consul Kv prefix @param lock [String|NilClass] The path to the lock file

# File lib/daemon_runner/semaphore.rb, line 67
def initialize(name:, prefix: nil, lock: nil, limit: 3)
  create_session(name)
  @prefix = prefix.nil? ? "service/#{name}/lock/" : prefix
  @prefix += '/' unless @prefix.end_with?('/')
  @lock = lock.nil? ? "#{@prefix}.lock" : lock
  @lock_modify_index = nil
  @lock_content = nil
  @limit = set_limit(limit)
  @reset = false
end

Public Instance Methods

contender_key(value = 'none') click to toggle source

Create a contender key

# File lib/daemon_runner/semaphore.rb, line 139
def contender_key(value = 'none')
  if value.nil? || value.empty?
    raise ArgumentError, 'Value cannot be empty or nil'
  end
  key = "#{prefix}/#{session.id}"
  ::DaemonRunner::RetryErrors.retry do
    @contender_key = Diplomat::Lock.acquire(key, session.id, value)
  end
  @contender_key
end
create_session(name) click to toggle source
# File lib/daemon_runner/semaphore.rb, line 121
def create_session(name)
  ::DaemonRunner::RetryErrors.retry(exceptions: [DaemonRunner::Session::CreateSessionError]) do
    @session = Session.start(name, behavior: 'delete')
  end
end
lock() click to toggle source

Obtain a lock with the current session

@return [Boolean] `true` if the lock was obtained

# File lib/daemon_runner/semaphore.rb, line 82
def lock
  contender_key
  semaphore_state
  try_lock
end
locked?() click to toggle source

Check if the semaphore holds the lock

@return [Boolean] `true` if the lock is held, `false` otherwise

# File lib/daemon_runner/semaphore.rb, line 91
def locked?
  semaphore_state
  lock_exists? && (lock_content['Holders'] || []).include?(session.id)
end
release() click to toggle source

Release a lock with the current session

@return [Boolean] `true` if the lock was released

# File lib/daemon_runner/semaphore.rb, line 115
def release
  semaphore_state
  try_release
end
renew() click to toggle source

Renew lock watching for changes @return [Thread] Thread running a blocking call maintaining the lock state

# File lib/daemon_runner/semaphore.rb, line 99
def renew
  thr = Thread.new do
    loop do
      if renew?
        semaphore_state
        try_lock
      end
    end
  end
  thr
end
renew?() click to toggle source

Start a blocking query on the prefix, if there are changes we need to try to obtain the lock again.

@return [Boolean] `true` if there are changes,

`false` if the request has timed out
# File lib/daemon_runner/semaphore.rb, line 195
def renew?
  logger.debug("Watching Consul #{prefix} for changes")
  options = { recurse: true }
  changes = Diplomat::Kv.get(prefix, options, :wait, :wait)
  logger.info("Changes on #{prefix} detected") if changes
  changes
rescue StandardError => e
  logger.error(e)
end
semaphore_state() click to toggle source

Get the current semaphore state by fetching all conterder keys and the lock key

# File lib/daemon_runner/semaphore.rb, line 152
def semaphore_state
  options = { decode_values: true, recurse: true }
  @state = Diplomat::Kv.get(prefix, options, :return)
  decode_semaphore_state unless state.empty?
  state
end
set_limit(new_limit) click to toggle source
# File lib/daemon_runner/semaphore.rb, line 127
def set_limit(new_limit)
  if lock_exists?
    if new_limit.to_i != @limit.to_i
      logger.warn 'Limit in lockfile and @limit do not match using limit from lockfile'
    end
    @limit = lock_content['Limit']
  else
    @limit = new_limit
  end
end
try_lock() click to toggle source
# File lib/daemon_runner/semaphore.rb, line 159
def try_lock
  prune_members
  do_update = add_self_to_holders
  @reset = false
  if do_update
    format_holders
    @locked = write_lock
  end
  log_lock_state
end
try_release() click to toggle source
# File lib/daemon_runner/semaphore.rb, line 170
def try_release
  do_update = remove_self_from_holders
  if do_update
    format_holders
    @locked = !write_lock
  end
  DaemonRunner::Session.release(prefix)
  session.destroy!
  log_release_state
end
write_lock() click to toggle source

Write a new lock file if the number of contenders is less than `limit` @return [Boolean] `true` if the lock was written succesfully

# File lib/daemon_runner/semaphore.rb, line 183
def write_lock
  index = lock_modify_index.nil? ? 0 : lock_modify_index
  value = generate_lockfile
  return true if value == true
  Diplomat::Kv.put(@lock, value, cas: index)
end

Private Instance Methods

add_self_to_holders() click to toggle source

Add our session.id to the holders list if holders is less than limit

# File lib/daemon_runner/semaphore.rb, line 243
def add_self_to_holders
  @holders.uniq!
  @reset = true if @holders.length == 0
  return true if @holders.include? session.id
  if @holders.length < limit
    @holders << session.id
  end
end
decode_semaphore_state() click to toggle source

Decode raw response from Consul Set `@lock_modify_index`, `@lock_content`, and `@members` @returns [Array] List of members

# File lib/daemon_runner/semaphore.rb, line 210
def decode_semaphore_state
  lock_key = state.find { |k| k['Key'] == @lock }
  member_keys = state.delete_if { |k| k['Key'] == @lock }
  member_keys.map! { |k| k['Key'] }

  unless lock_key.nil?
    @lock_modify_index = lock_key['ModifyIndex']
    @lock_content = JSON.parse(lock_key['Value'])
  end
  @members = member_keys.map { |k| k.split('/')[-1] }
end
format_holders() click to toggle source

Format the list of holders for the lock file

# File lib/daemon_runner/semaphore.rb, line 262
def format_holders
  @holders.uniq!
  @holders.sort!
  holders = {}
  logger.debug "Holders are: #{@holders.join(',')}"
  @holders.map { |m| holders[m] = true }
  @holders = holders
end
generate_lockfile() click to toggle source

Generate JSON formatted lockfile content, only if the content has changed

# File lib/daemon_runner/semaphore.rb, line 272
def generate_lockfile
  if lock_exists? && lock_content['Holders'] == @holders
    logger.info 'Holders are unchanged, not updating'
    return true
  end
  lockfile_format = {
    'Limit' => limit,
    'Holders' => @holders
  }
  JSON.generate(lockfile_format)
end
lock_exists?() click to toggle source

Returns current state of lockfile

# File lib/daemon_runner/semaphore.rb, line 223
def lock_exists?
  (!lock_modify_index.nil? && !lock_content.nil?) && !@reset
end
log_lock(locked: true, end_string: 'obtained') click to toggle source
# File lib/daemon_runner/semaphore.rb, line 292
def log_lock(locked: true, end_string: 'obtained')
  msg = 'Lock %{text} %{end_string}'
  text = locked == true ? 'succesfully' : 'could not be'
  msg = msg % { text: text, end_string: end_string }
  logger.info msg
end
log_lock_state() click to toggle source
# File lib/daemon_runner/semaphore.rb, line 284
def log_lock_state
  log_lock(locked: @locked)
end
log_release_state() click to toggle source
# File lib/daemon_runner/semaphore.rb, line 288
def log_release_state
  log_lock(locked: !@locked, end_string: 'released')
end
prune_members() click to toggle source

Get the active members from the lock file, removing any dead members. This is accomplished by using the contenders keys(`@members`) to get the list of all alive members. So we can easily remove any nodes that don't appear in that list.

# File lib/daemon_runner/semaphore.rb, line 231
def prune_members
  @holders = if lock_exists?
    holders = lock_content['Holders']
    return @holders = [] if holders.nil?
    holders = holders.keys
    holders & members
  else
    []
  end
end
remove_self_from_holders() click to toggle source

Remove our session.id from the holders list

# File lib/daemon_runner/semaphore.rb, line 253
def remove_self_from_holders
  return unless lock_exists?
  @holders = lock_content['Holders']
  @holders = @holders.keys
  @holders.delete(session.id)
  @holders
end