class Lusnoc::Mutex
Attributes
key[R]
owner[R]
value[R]
Public Class Methods
new(key, value: Socket.gethostname, ttl: 20)
click to toggle source
# File lib/lusnoc/mutex.rb, line 10 def initialize(key, value: Socket.gethostname, ttl: 20) @key = key @value = value @ttl = ttl end
Public Instance Methods
locked?()
click to toggle source
# File lib/lusnoc/mutex.rb, line 16 def locked? !!owner end
on_mutex_lost(&block)
click to toggle source
# File lib/lusnoc/mutex.rb, line 32 def on_mutex_lost(&block) @on_mutex_lost = block end
owned?()
click to toggle source
# File lib/lusnoc/mutex.rb, line 20 def owned? owner == Thread.current end
session_id()
click to toggle source
# File lib/lusnoc/mutex.rb, line 24 def session_id @session&.id end
synchronize(timeout: 0, &block)
click to toggle source
# File lib/lusnoc/mutex.rb, line 36 def synchronize(timeout: 0, &block) t = Timeouter::Timer.new(timeout, eclass: TimeoutError, message: 'mutex acquisition expired') Session.new("mutex_session/#{key}", ttl: @ttl) do |session| @session = session session.on_session_die do @owner = nil @on_mutex_lost&.call(self) end return acquisition_loop! key, session, value, t, &block ensure release(key, session.id, timeout: 2) rescue nil logger.info("Mutex[#{key}] released for Session[#{session.name}:#{session.id}]") @owner = nil @session = nil end end
Private Instance Methods
acquire(key, session, value)
click to toggle source
# File lib/lusnoc/mutex.rb, line 57 def acquire(key, session, value) resp = Lusnoc.http_put(build_url("/v1/kv/#{key}?acquire=#{session.id}"), value, timeout: 1) return false if resp.body.chomp != 'true' @owner = Thread.current logger.info("Mutex[#{key}] acquired for Session[#{session.name}:#{session.id}]") renew true end
acquisition_loop!(key, session, value, t) { |self| ... }
click to toggle source
# File lib/lusnoc/mutex.rb, line 71 def acquisition_loop!(key, session, value, t) if acquire(key, session, value) prepare_guard(session, key).run do return yield(self) end end logger.debug("Mutex[#{key}] run acquisition loop for Session[#{session.name}:#{session.id}]") t.loop! do session.alive!(TimeoutError) wait_for_key_released(key, t.left) if acquire(key, session, value) prepare_guard(session, key).run do return yield(self) end end logger.debug("Mutex[#{key}] acquisition failed for Session[#{session.name}:#{session.id}]") sleep 0.4 end end
prepare_guard(session, key)
click to toggle source
# File lib/lusnoc/mutex.rb, line 94 def prepare_guard(session, key) Lusnoc::Guard.new(build_url("/v1/kv/#{key}")) do |guard| guard.condition do |body| JSON.parse(body).first['Session'] == session.id rescue false end guard.then do @owner = nil logger.info("Mutex[#{key}] LOST for Session[#{session.name}:#{session.id}]") @on_mutex_lost&.call(self) end end end
release(key, session)
click to toggle source
# File lib/lusnoc/mutex.rb, line 67 def release(key, session) Lusnoc.http_put(build_url("/v1/kv/#{key}?release=#{session.id}"), timeout: 1) end
wait_for_key_released(key, timeout = nil)
click to toggle source
# File lib/lusnoc/mutex.rb, line 108 def wait_for_key_released(key, timeout = nil) logger.debug("Mutex[#{key}] start waiting of key releasing...") Lusnoc::Watcher.new(build_url("/v1/kv/#{key}"), timeout: timeout, eclass: TimeoutError, emessage: 'mutex acquisition expired').run do |body| result = JSON.parse(body.empty? ? '[{}]' : body) return true if result.first['Session'].nil? end end