class FSEvent

framework.rb — fail safe event driven framework

Copyright © 2014 National Institute of Advanced Industrial Science and Technology (AIST)

This program is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version.

This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.

You should have received a copy of the GNU General Public License along with this program. If not, see <www.gnu.org/licenses/>.

Attributes

clock_proc[RW]
current_time[R]

Public Class Methods

new(initial_time=Time.now) click to toggle source
# File lib/fsevent/framework.rb, line 21
def initialize(initial_time=Time.now)
  @current_time = initial_time
  @current_count = 0

  @devices = {} # device_name -> device
  @device_last_run_count = {} # device_name -> count

  # special status:
  #   _fsevent : _device_registered_DEVICE_NAME       => time
  #   _fsevent : _device_unregistered_DEVICE_NAME     => time
  #   DEVICE_NAME : _status_defined_STATUS_NAME       => time
  #   DEVICE_NAME : _status_undefined_STATUS_NAME     => time
  #
  @status_value = { "_fsevent" => {} } # device_name -> status_name -> value
  @status_time = { "_fsevent" => {} } # device_name -> status_name -> time
  @status_count = { "_fsevent" => {} } # device_name -> status_name -> count

  @watchset = FSEvent::WatchSet.new

  @clock_proc = nil

  @q = Depq.new
  @schedule_locator = {} # device_name -> locator
end

Public Instance Methods

add_watch(watchee_device_name_pat, status_name_pat, reaction = :immediate) click to toggle source

Called from a device. (mainly from registered().)

# File lib/fsevent/framework.rb, line 94
def add_watch(watchee_device_name_pat, status_name_pat, reaction = :immediate)
  if !valid_device_name_pat_for_read?(watchee_device_name_pat)
    raise ArgumentError, "invalid device name pattern: #{watchee_device_name_pat.inspect}"
  end
  if !valid_status_name_pat_for_read?(status_name_pat)
    raise ArgumentError, "invalid status name pattern: #{status_name_pat.inspect}"
  end
  Thread.current[:fsevent_buffer] << [:add_watch, watchee_device_name_pat, status_name_pat, reaction]
end
define_status(status_name, value) click to toggle source

Called from a device to define the status.

# File lib/fsevent/framework.rb, line 116
def define_status(status_name, value)
  if !valid_status_name_for_write?(status_name)
    raise ArgumentError, "invalid status name: #{status_name.inspect}"
  end
  Thread.current[:fsevent_buffer] << [:define_status, status_name, value]
end
del_watch(watchee_device_name_pat, status_name_pat) click to toggle source

Called from a device. (mainly from registered().)

# File lib/fsevent/framework.rb, line 105
def del_watch(watchee_device_name_pat, status_name_pat)
  if !valid_device_name_pat_for_read?(watchee_device_name_pat)
    raise ArgumentError, "invalid device name pattern: #{watchee_device_name_pat.inspect}"
  end
  if !valid_status_name_pat_for_read?(status_name_pat)
    raise ArgumentError, "invalid status name pattern: #{status_name_pat.inspect}"
  end
  Thread.current[:fsevent_buffer] << [:del_watch, watchee_device_name_pat, status_name_pat]
end
internal_undefine_status(device_name, run_end_time, status_name) click to toggle source
# File lib/fsevent/framework.rb, line 330
def internal_undefine_status(device_name, run_end_time, status_name)
  unless @status_value.has_key? device_name
    raise ArgumentError, "device not defined: #{device_name}"
  end
  unless @status_value[device_name].has_key? status_name
    raise ArgumentError, "device status not defined: #{device_name} #{status_name}"
  end
  @status_value[device_name].delete status_name
  @status_time[device_name][status_name] = @current_time
  @status_count[device_name][status_name] = @current_count
  lookup_watchers(device_name, status_name).each {|watcher_device_name, reaction|
    if watcher_device_name != device_name
      if reaction_immediate_at_subsequent? reaction
        set_wakeup_if_possible(watcher_device_name, run_end_time)
      end
    end
  }
  internal_update_status(device_name, run_end_time, "_status_undefined_#{status_name}", run_end_time)
end
modify_status(status_name, value) click to toggle source

Called from a device to notify the status.

# File lib/fsevent/framework.rb, line 124
def modify_status(status_name, value)
  if !valid_status_name_for_write?(status_name)
    raise ArgumentError, "invalid status name: #{status_name.inspect}"
  end
  Thread.current[:fsevent_buffer] << [:modify_status, status_name, value]
end
register_device(device) click to toggle source
# File lib/fsevent/framework.rb, line 48
def register_device(device)
  device_name = device.name
  if !valid_device_name_for_write?(device_name)
    raise ArgumentError, "invalid device name: #{device_name.inspect}"
  end
  if !Thread.current[:fsevent_buffer]
    internal_register_device(device_name, device)
  else
    value = [:register_device, device_name, device]
    Thread.current[:fsevent_buffer] << value
  end
end
set_elapsed_time(t) click to toggle source

Called from a device to set the elapsed time.

# File lib/fsevent/framework.rb, line 148
def set_elapsed_time(t)
  raise ArgumentError, "negative elapsed time given: #{t}" if t < 0
  Thread.current[:fsevent_device_elapsed_time] = t
end
start() click to toggle source
# File lib/fsevent/framework.rb, line 61
def start
  until @q.empty?
    loc = @q.delete_min_locator
    event_type, *args = loc.value
    @clock_proc.call(@current_time, loc.priority) if @clock_proc && @current_time != loc.priority
    @current_time = loc.priority
    @current_count += 1
    case event_type
    when :register_start; at_register_start(loc, *args)
    when :register_end; at_register_end(loc, *args)
    when :run_start; at_run_start(loc, *args)
    when :run_end; at_run_end(loc, *args)
    else
      raise FSEvent::FSEventError, "unexpected event type: #{event_type}"
    end
  end
end
undefine_status(status_name) click to toggle source

Called from a device to define the status.

# File lib/fsevent/framework.rb, line 132
def undefine_status(status_name)
  if !valid_status_name_for_write?(status_name)
    raise ArgumentError, "invalid status name: #{status_name.inspect}"
  end
  Thread.current[:fsevent_buffer] << [:undefine_status, status_name]
end
unregister_device(device_name) click to toggle source

Called from a device.

# File lib/fsevent/framework.rb, line 140
def unregister_device(device_name)
  if !valid_device_name_for_write?(device_name)
    raise ArgumentError, "invalid device name: #{device_name.inspect}"
  end
  Thread.current[:fsevent_buffer] << [:unregister_device, device_name]
end

Private Instance Methods

at_register_end(loc, device_name, device, register_start_count, buffer) click to toggle source
# File lib/fsevent/framework.rb, line 169
def at_register_end(loc, device_name, device, register_start_count, buffer)
  if @devices.has_key? device_name
    raise ArgumentError, "Device already registered: #{device_name}"
  end

  @devices[device_name] = device
  @device_last_run_count[device_name] = register_start_count
  @status_value[device_name] = {}
  @status_time[device_name] = {}
  @status_count[device_name] = {}

  internal_update_status("_fsevent", @current_time, "_device_registered_#{device_name}", @current_time)

  at_run_end(loc, device_name, register_start_count, buffer)
end
at_register_start(loc, device_name, device) click to toggle source
# File lib/fsevent/framework.rb, line 153
def at_register_start(loc, device_name, device)
  if @devices.has_key? device_name
    raise ArgumentError, "Device already registered: #{device_name}"
  end

  buffer, elapsed = wrap_device_action {
      device.framework = self
      device.registered
  }

  value = [:register_end, device_name, device, @current_count, buffer]
  loc.update value, @current_time + elapsed
  @q.insert_locator loc
end
at_run_end(loc, device_name, run_start_count, buffer) click to toggle source
# File lib/fsevent/framework.rb, line 224
def at_run_end(loc, device_name, run_start_count, buffer)
  @device_last_run_count[device_name] = run_start_count
  run_end_time = @current_time

  wakeup_immediate = false
  unregister_self = false

  buffer.each {|tag, *rest|
    case tag
    when :define_status
      internal_define_status(device_name, run_end_time, *rest)
    when :modify_status
      internal_modify_status(device_name, run_end_time, *rest)
    when :undefine_status
      internal_undefine_status(device_name, run_end_time, *rest)
    when :add_watch
      wakeup_immediate |= internal_add_watch(device_name, *rest)
    when :del_watch
      internal_del_watch(device_name, *rest)
    when :register_device
      internal_register_device(*rest)
    when :unregister_device
      unregister_self |= internal_unregister_device(device_name, *rest)
    else
      raise "unexpected tag: #{tag}"
    end
  }

  unless unregister_self
    wakeup_immediate ||= immediate_wakeup_self?(device_name, run_start_count)
    setup_next_schedule(device_name, loc, run_end_time, wakeup_immediate)
  end
end
at_run_start(loc, device_name) click to toggle source
# File lib/fsevent/framework.rb, line 186
def at_run_start(loc, device_name)
  time = @current_time
  device = @devices[device_name]

  watched_status, changed_status = notifications(device_name, @device_last_run_count[device_name])

  buffer, elapsed = wrap_device_action { device.run(watched_status, changed_status) }

  value = [:run_end, device_name, @current_count, buffer]
  loc.update value, time + elapsed
  @q.insert_locator loc
end
has_status?(device_name, status_name) click to toggle source
# File lib/fsevent/framework.rb, line 280
def has_status?(device_name, status_name)
  @status_value.has_key?(device_name) && @status_value[device_name].has_key?(status_name)
end
immediate_wakeup_self?(watcher_device_name, run_start_count) click to toggle source
# File lib/fsevent/framework.rb, line 459
def immediate_wakeup_self?(watcher_device_name, run_start_count)
  @watchset.watcher_each(watcher_device_name) {|watchee_device_name_pat, status_name_pat, reaction|
    if reaction_immediate_at_subsequent?(reaction)
      matched_status_each(watchee_device_name_pat, status_name_pat) {|watchee_device_name, status_name|
        if @status_count.has_key?(watchee_device_name) &&
           @status_count[watchee_device_name].has_key?(status_name) &&
           run_start_count <= @status_count[watchee_device_name][status_name]
          return true
        end
      }
    end
  }
  false
end
internal_add_watch(watcher_device_name, watchee_device_name_pat, status_name_pat, reaction) click to toggle source
# File lib/fsevent/framework.rb, line 356
def internal_add_watch(watcher_device_name, watchee_device_name_pat, status_name_pat, reaction)
  @watchset.add(watchee_device_name_pat, status_name_pat, watcher_device_name, reaction)
  matched_status_each(watchee_device_name_pat, status_name_pat) {|watchee_device_name, status_name|
    if reaction_immediate_at_beginning? reaction
      return true
    end
  }
  false
end
internal_define_status(device_name, run_end_time, status_name, value) click to toggle source
# File lib/fsevent/framework.rb, line 265
def internal_define_status(device_name, run_end_time, status_name, value)
  internal_define_status2(device_name, run_end_time, status_name, value)
  internal_update_status(device_name, run_end_time, "_status_defined_#{status_name}", run_end_time)
end
internal_define_status2(device_name, run_end_time, status_name, value) click to toggle source
# File lib/fsevent/framework.rb, line 285
def internal_define_status2(device_name, run_end_time, status_name, value)
  unless @status_value.has_key? device_name
    raise ArgumentError, "device not defined: #{device_name}"
  end
  if @status_value[device_name].has_key? status_name
    raise ArgumentError, "device status already defined: #{device_name} #{status_name}"
  end
  @status_value[device_name][status_name] = value
  @status_time[device_name][status_name] = @current_time
  @status_count[device_name][status_name] = @current_count
  lookup_watchers(device_name, status_name).each {|watcher_device_name, reaction|
    if reaction_immediate_at_beginning? reaction
      if watcher_device_name != device_name
        set_wakeup_if_possible(watcher_device_name, run_end_time)
      end
    end
  }
end
internal_del_watch(watcher_device_name, watchee_device_name_pat, status_name_pat) click to toggle source
# File lib/fsevent/framework.rb, line 408
def internal_del_watch(watcher_device_name, watchee_device_name_pat, status_name_pat)
  @watchset.del(watchee_device_name_pat, status_name_pat, watcher_device_name)
end
internal_modify_status(device_name, run_end_time, status_name, value) click to toggle source
# File lib/fsevent/framework.rb, line 305
def internal_modify_status(device_name, run_end_time, status_name, value)
  internal_modify_status2(device_name, run_end_time, status_name, value)
end
internal_modify_status2(device_name, run_end_time, status_name, value) click to toggle source
# File lib/fsevent/framework.rb, line 310
def internal_modify_status2(device_name, run_end_time, status_name, value)
  unless @status_value.has_key? device_name
    raise ArgumentError, "device not defined: #{device_name}"
  end
  unless @status_value[device_name].has_key? status_name
    raise ArgumentError, "device status not defined: #{device_name} #{status_name}"
  end
  @status_value[device_name][status_name] = value
  @status_time[device_name][status_name] = @current_time
  @status_count[device_name][status_name] = @current_count
  lookup_watchers(device_name, status_name).each {|watcher_device_name, reaction|
    if watcher_device_name != device_name
      if reaction_immediate_at_subsequent? reaction
        set_wakeup_if_possible(watcher_device_name, run_end_time)
      end
    end
  }
end
internal_register_device(target_device_name, device) click to toggle source
# File lib/fsevent/framework.rb, line 259
def internal_register_device(target_device_name, device)
  value = [:register_start, target_device_name, device]
  @schedule_locator[target_device_name] = @q.insert value, @current_time
end
internal_unregister_device(self_device_name, target_device_name) click to toggle source
# File lib/fsevent/framework.rb, line 475
def internal_unregister_device(self_device_name, target_device_name)
  if @status_value.has_key? target_device_name
    @status_value[target_device_name].keys.each {|status_name|
      next if /\A_/ =~ status_name
      internal_undefine_status(target_device_name, @current_time, status_name)
    }
  end
  device = @devices.delete target_device_name
  @status_value.delete target_device_name
  @watchset.delete_watcher(target_device_name)
  loc = @schedule_locator.delete target_device_name
  if loc.in_queue?
    @q.delete_locator loc
  end
  device.unregistered
  internal_update_status("_fsevent", @current_time, "_device_unregistered_#{target_device_name}", @current_time)
  self_device_name == target_device_name
end
internal_update_status(device_name, run_end_time, status_name, value) click to toggle source
# File lib/fsevent/framework.rb, line 271
def internal_update_status(device_name, run_end_time, status_name, value)
  if has_status?(device_name, status_name)
    internal_modify_status2(device_name, run_end_time, status_name, value)
  else
    internal_define_status2(device_name, run_end_time, status_name, value)
  end
end
lookup_watchers(watchee_device_name, status_name) click to toggle source
# File lib/fsevent/framework.rb, line 351
def lookup_watchers(watchee_device_name, status_name)
  @watchset.lookup_watchers(watchee_device_name, status_name)
end
matched_device_name_each(device_name_pat) { |device_name| ... } click to toggle source
# File lib/fsevent/framework.rb, line 376
def matched_device_name_each(device_name_pat)
  if /\*\z/ =~ device_name_pat
    prefix = $`
    @status_time.each {|device_name, _h|
      if device_name.start_with? prefix
        yield device_name
      end
    }
  else
    yield device_name_pat
  end
end
matched_status_each(watchee_device_name_pat, status_name_pat) { |watchee_device_name, status_name| ... } click to toggle source
# File lib/fsevent/framework.rb, line 367
def matched_status_each(watchee_device_name_pat, status_name_pat)
  matched_device_name_each(watchee_device_name_pat) {|watchee_device_name|
    matched_status_name_each(watchee_device_name, status_name_pat) {|status_name|
      yield watchee_device_name, status_name
    }
  }
end
matched_status_name_each(device_name, status_name_pat) { |status_name| ... } click to toggle source
# File lib/fsevent/framework.rb, line 390
def matched_status_name_each(device_name, status_name_pat)
  return unless @status_time.has_key? device_name
  status_hash = @status_time[device_name]
  if /\*\z/ =~ status_name_pat
    prefix = $`
    status_hash.each {|status_name, _value|
      if status_name.start_with? prefix
        yield status_name
      end
    }
  else
    if status_hash.has_key? status_name_pat
      yield status_name_pat
    end
  end
end
notifications(watcher_device_name, last_run_count) click to toggle source
# File lib/fsevent/framework.rb, line 200
def notifications(watcher_device_name, last_run_count)
  watched_status = {}
  changed_status = {}
  @watchset.watcher_each(watcher_device_name) {|watchee_device_name_pat, status_name_pat, reaction|
    matched_device_name_each(watchee_device_name_pat) {|watchee_device_name|
      watched_status[watchee_device_name] ||= {}
      changed_status[watchee_device_name] ||= {}
      matched_status_name_each(watchee_device_name, status_name_pat) {|status_name|
        if @status_value.has_key?(watchee_device_name) &&
           @status_value[watchee_device_name].has_key?(status_name)
          watched_status[watchee_device_name][status_name] = @status_value[watchee_device_name][status_name]
        end
        if @status_time.has_key?(watchee_device_name) &&
           @status_time[watchee_device_name].has_key?(status_name) &&
           last_run_count <= @status_count[watchee_device_name][status_name]
          changed_status[watchee_device_name][status_name] = @status_time[watchee_device_name][status_name]
        end
      }
    }
  }
  return watched_status, changed_status
end
set_wakeup_if_possible(device_name, time) click to toggle source
# File lib/fsevent/framework.rb, line 413
def set_wakeup_if_possible(device_name, time)
  loc = @schedule_locator[device_name]
  if !loc.in_queue?
    loc.update [:run_start, device_name], time
    @q.insert_locator loc
    return
  end
  case event_type = loc.value.first
  when :run_start # The device is sleeping now.
    if time != loc.priority
      if time < loc.priority
        @devices[device_name].schedule.merge_schedule([loc.priority])
        loc.update_priority time
      else
        @devices[device_name].schedule.merge_schedule([time])
      end
    end
  when :run_end # The device is working now.
    # Nothing to do. at_run_end itself checks arrived events at last.
  else
    raise FSEvent::FSEventError, "unexpected event type: #{event_type}"
  end
end
setup_next_schedule(device_name, loc, run_end_time, wakeup_immediate) click to toggle source
# File lib/fsevent/framework.rb, line 438
def setup_next_schedule(device_name, loc, run_end_time, wakeup_immediate)
  device = @devices[device_name]
  run_start_time = nil
  if wakeup_immediate
    run_start_time = run_end_time
  elsif run_start_time = device.schedule.shift
    if run_start_time <= run_end_time
      run_start_time = run_end_time
    end
    while device.schedule.first && device.schedule.first <= run_end_time
      device.schedule.shift
    end
  end
  if run_start_time
    value = [:run_start, device_name]
    loc.update value, run_start_time
    @q.insert_locator loc
  end
end
wrap_device_action() { || ... } click to toggle source
# File lib/fsevent/framework.rb, line 79
def wrap_device_action(&block)
  Thread.current[:fsevent_buffer] = buffer = []
  Thread.current[:fsevent_device_elapsed_time] = nil
  t1 = Time.now
  yield
  t2 = Time.now
  elapsed = Thread.current[:fsevent_device_elapsed_time] || t2 - t1
  return buffer, elapsed
ensure
  Thread.current[:fsevent_buffer] = nil
  Thread.current[:fsevent_device_elapsed_time] = nil
end