class Lusnoc::Mutex

Attributes

key[R]
owner[R]
value[R]

Public Class Methods

new(key, value: Socket.gethostname, ttl: 20) click to toggle source
# File lib/lusnoc/mutex.rb, line 10
def initialize(key, value: Socket.gethostname, ttl: 20)
  @key = key
  @value = value
  @ttl = ttl
end

Public Instance Methods

locked?() click to toggle source
# File lib/lusnoc/mutex.rb, line 16
def locked?
  !!owner
end
on_mutex_lost(&block) click to toggle source
# File lib/lusnoc/mutex.rb, line 32
def on_mutex_lost(&block)
  @on_mutex_lost = block
end
owned?() click to toggle source
# File lib/lusnoc/mutex.rb, line 20
def owned?
  owner == Thread.current
end
session_id() click to toggle source
# File lib/lusnoc/mutex.rb, line 24
def session_id
  @session&.id
end
synchronize(timeout: 0, &block) click to toggle source
# File lib/lusnoc/mutex.rb, line 36
def synchronize(timeout: 0, &block)
  t = Timeouter::Timer.new(timeout, eclass: TimeoutError, message: 'mutex acquisition expired')

  Session.new("mutex_session/#{key}", ttl: @ttl) do |session|
    @session = session
    session.on_session_die do
      @owner = nil
      @on_mutex_lost&.call(self)
    end

    return acquisition_loop! key, session, value, t, &block
  ensure
    release(key, session.id, timeout: 2) rescue nil
    logger.info("Mutex[#{key}] released for Session[#{session.name}:#{session.id}]")
    @owner = nil
    @session = nil
  end
end

Private Instance Methods

acquire(key, session, value) click to toggle source
# File lib/lusnoc/mutex.rb, line 57
def acquire(key, session, value)
  resp = Lusnoc.http_put(build_url("/v1/kv/#{key}?acquire=#{session.id}"), value, timeout: 1)
  return false if resp.body.chomp != 'true'

  @owner = Thread.current
  logger.info("Mutex[#{key}] acquired for Session[#{session.name}:#{session.id}]")
  renew
  true
end
acquisition_loop!(key, session, value, t) { |self| ... } click to toggle source
# File lib/lusnoc/mutex.rb, line 71
def acquisition_loop!(key, session, value, t)
  if acquire(key, session, value)
    prepare_guard(session, key).run do
      return yield(self)
    end
  end

  logger.debug("Mutex[#{key}] run acquisition loop for Session[#{session.name}:#{session.id}]")
  t.loop! do
    session.alive!(TimeoutError)
    wait_for_key_released(key, t.left)

    if acquire(key, session, value)
      prepare_guard(session, key).run do
        return yield(self)
      end
    end

    logger.debug("Mutex[#{key}] acquisition failed for Session[#{session.name}:#{session.id}]")
    sleep 0.4
  end
end
prepare_guard(session, key) click to toggle source
# File lib/lusnoc/mutex.rb, line 94
def prepare_guard(session, key)
  Lusnoc::Guard.new(build_url("/v1/kv/#{key}")) do |guard|
    guard.condition do |body|
      JSON.parse(body).first['Session'] == session.id rescue false
    end

    guard.then do
      @owner = nil
      logger.info("Mutex[#{key}] LOST for Session[#{session.name}:#{session.id}]")
      @on_mutex_lost&.call(self)
    end
  end
end
release(key, session) click to toggle source
# File lib/lusnoc/mutex.rb, line 67
def release(key, session)
  Lusnoc.http_put(build_url("/v1/kv/#{key}?release=#{session.id}"), timeout: 1)
end
wait_for_key_released(key, timeout = nil) click to toggle source
# File lib/lusnoc/mutex.rb, line 108
def wait_for_key_released(key, timeout = nil)
  logger.debug("Mutex[#{key}] start waiting of key releasing...")
  Lusnoc::Watcher.new(build_url("/v1/kv/#{key}"),
                      timeout:  timeout,
                      eclass:   TimeoutError,
                      emessage: 'mutex acquisition expired').run do |body|
    result = JSON.parse(body.empty? ? '[{}]' : body)
    return true if result.first['Session'].nil?
  end
end