class DatalackeyIO

Attributes

syntax[R]
version[R]

Public Class Methods

internal_generic_map() click to toggle source
# File lib/datalackeylib.rb, line 308
def self.internal_generic_map
  Marshal.load(Marshal.dump(@@internal_generic_map))
end
internal_notification_map() click to toggle source
# File lib/datalackeylib.rb, line 304
def self.internal_notification_map
  Marshal.load(Marshal.dump(@@internal_notification_map))
end
new(to_datalackey, from_datalackey, notification_callable = nil, to_datalackey_echo_callable = nil, from_datalackey_echo_callable = nil) click to toggle source
# File lib/datalackeylib.rb, line 314
def initialize(to_datalackey, from_datalackey, notification_callable = nil,
    to_datalackey_echo_callable = nil, from_datalackey_echo_callable = nil)
  @to_datalackey_mutex = Mutex.new
  @to_datalackey = to_datalackey
  @to_datalackey_echo = to_datalackey_echo_callable
  @from_datalackey = from_datalackey
  @identifier = 0
  @tracked_mutex = Mutex.new
  # Handling of notifications.
  @notify_tracker = PatternAction.new([ @@internal_notification_map ])
  @notify_tracker.set_identifier(nil)
  @internal = PatternAction.new([ @@internal_generic_map ])
  @tracked = Hash.new(nil)
  @waiting = nil
  @return_mutex = Mutex.new
  @return_condition = ConditionVariable.new
  @dataprocess_mutex = Mutex.new
  @data = Hash.new(0)
  @process = { }
  @children = { }
  @version = { }
  @read_datalackey = Thread.new do
    accum = []
    loop do
      begin
        raw = @from_datalackey.readpartial(32768)
      rescue IOError
        break
      rescue EOFError
        break
      end
      loc = raw.index("\n")
      until loc.nil?
        accum.push(raw[0, loc]) if loc.positive? # Newline at start ends line.
        raw = raw[loc + 1, raw.size - loc - 1]
        loc = raw.index("\n")
        joined = accum.join
        accum.clear
        next if joined.empty?
        from_datalackey_echo_callable.call(joined) unless from_datalackey_echo_callable.nil?
        msg = JSON.parse joined
        # See if we are interested in it.
        if msg.first.nil?
          act, vars = @notify_tracker.best_match(msg)
          next if act.nil?
          # We know there is only one action that matches.
          act = act.first
          actionable = nil
          name = vars.first
          id = vars.last
          # Messages from different threads may arrive out of order so
          # new data/process may be in book-keeping when previous should
          # be removed. With data these imply over-writing immediately,
          # with processes re-use of identifier and running back to back.
          case act.first
          when :stored
            @dataprocess_mutex.synchronize do
              if @data[name] < id
                @data[name] = id
                actionable = act
              end
            end
          when :deleted
            @dataprocess_mutex.synchronize do
              if @data.key?(name) && @data[name] <= id
                @data.delete name
                actionable = act
              end
            end
          when :data_error
            @dataprocess_mutex.synchronize do
              @data.delete(name) if @data[name] == id
            end
            actionable = act
          when :started
            @dataprocess_mutex.synchronize { @process[name] = id }
            actionable = act
          when :ended
            @dataprocess_mutex.synchronize do
              if @process[name] == id
                @process.delete(name)
                @children.delete(name)
              end
            end
            actionable = act
          when :error
            case act[1]
            when :format
              @to_datalackey_mutex.synchronize { @to_datalackey.putc 0 }
            when :user_id
              unless @waiting.nil?
                # Does the waited command have invalid id?
                begin
                  int = Integer(@waiting)
                  fract = @waiting - int
                  raise ArgumentError, '' unless fract.zero?
                rescue ArgumentError, TypeError
                  unless @waiting.is_a? String
                    @tracked_mutex.synchronize do
                      trackers = @tracked[@waiting]
                      trackers.first.message = msg
                      trackers.first.exit = [ act ]
                      @tracked.delete(@waiting)
                      @waiting = nil
                    end
                    @return_mutex.synchronize { @return_condition.signal }
                  end
                end
              end
            end
            actionable = act
          end
          next if notification_callable.nil? || actionable.nil?
          notification_callable.call(actionable, msg, vars)
          next
        end
        # Not a notification.
        trackers = @tracked_mutex.synchronize { @tracked[msg[0]] }
        next if trackers.nil?
        finish = false
        last = nil
        # Deal with user-provided PatternAction (or NoPatternNoAction).
        tracker = trackers.first
        act, vars = tracker.best_match(msg)
        unless act.nil?
          act.each do |item|
            tracker.generators.each do |p|
              break if p.call(item, msg, vars)
            end
            next unless msg.first == @waiting
            case item.first
            when :return, 'return'
              finish = true
              last = act if last.nil?
            when :error, 'error'
              finish = true
              last = act
            end
          end
        end
        # Check internal PatternAction.
        internal = trackers.last
        act, vars = internal.best_match(msg)
        unless act.nil?
          act = act.first # We know patterns are all unique in mapping.
          if act.first == :child
            @dataprocess_mutex.synchronize { @children[msg[0]] = vars.first }
          elsif msg.first == @waiting
            finish = true
            if act.first == :done
              @tracked_mutex.synchronize { @tracked.delete(msg[0]) }
            elsif act.first == :error
              last = [ act ]
            end
          end
        end
        if finish
          tracker.message = msg
          tracker.exit = last
          @tracked_mutex.synchronize { @waiting = nil }
          @return_mutex.synchronize { @return_condition.signal }
        end
      end
      accum.push(raw) unless raw.empty?
    end
    @from_datalackey.close
    @return_mutex.synchronize { @return_condition.signal }
  end
  # Outside thread block.
  send(PatternAction.new([{ version: [ 'version', '', '?' ] }], [
    proc do |action, message, vars|
      if action.first == :version
        @syntax = vars.first['commands']
        @version = { }
        vars.first.each_pair do |key, value|
          @version[key] = value if value.is_a? Integer
        end
        true
      else false
      end
    end
  ]), ['version'])
end

Public Instance Methods

close() click to toggle source
# File lib/datalackeylib.rb, line 514
def close
  @to_datalackey_mutex.synchronize { @to_datalackey.close }
end
closed?() click to toggle source
# File lib/datalackeylib.rb, line 510
def closed?
  @from_datalackey.closed?
end
data() click to toggle source
# File lib/datalackeylib.rb, line 498
def data
  @dataprocess_mutex.synchronize { return @data.clone }
end
dump(json_as_string) click to toggle source
# File lib/datalackeylib.rb, line 553
def dump(json_as_string)
  @to_datalackey_mutex.synchronize do
    @to_datalackey.write json_as_string
    @to_datalackey.flush
    @to_datalackey_echo.call(json_as_string) unless @to_datalackey_echo.nil?
  rescue Errno::EPIPE
    # Should do something in this case. Child process died?
  end
end
finish() click to toggle source
# File lib/datalackeylib.rb, line 518
def finish
  @read_datalackey.join
end
launched() click to toggle source
# File lib/datalackeylib.rb, line 506
def launched
  @dataprocess_mutex.synchronize { return @children.clone }
end
process() click to toggle source
# File lib/datalackeylib.rb, line 502
def process
  @dataprocess_mutex.synchronize { return @process.clone }
end
send(pattern_action, command, user_id = false) click to toggle source

Pass nil pattern_action if you are not interested in doing anything.

# File lib/datalackeylib.rb, line 523
def send(pattern_action, command, user_id = false)
  return nil if @to_datalackey_mutex.synchronize { @to_datalackey.closed? }
  if user_id
    id = command[0]
  else
    id = @identifier
    @identifier += 1
    command.prepend id
  end
  tracker = pattern_action.nil? ? NoPatternNoAction.new : pattern_action.clone
  tracker.set_identifier(id)
  tracker.command = JSON.generate(command)
  internal = @internal.clone
  internal.set_identifier(id)
  @tracked_mutex.synchronize do
    @tracked[id] = [ tracker, internal ] unless id.nil?
    @waiting = id
  end
  dump(tracker.command)
  return tracker if id.nil? # There will be no responses.
  @return_mutex.synchronize { @return_condition.wait(@return_mutex) }
  tracker.status = true
  unless tracker.exit.nil?
    tracker.exit.each do |item|
      tracker.status = false if item.first == :error || item.first == 'error'
    end
  end
  tracker
end
verify(command) click to toggle source
# File lib/datalackeylib.rb, line 563
def verify(command)
  @syntax.nil? ? nil : true
end