class DatalackeyIO
Attributes
syntax[R]
version[R]
Public Class Methods
internal_generic_map()
click to toggle source
# File lib/datalackeylib.rb, line 308 def self.internal_generic_map Marshal.load(Marshal.dump(@@internal_generic_map)) end
internal_notification_map()
click to toggle source
# File lib/datalackeylib.rb, line 304 def self.internal_notification_map Marshal.load(Marshal.dump(@@internal_notification_map)) end
new(to_datalackey, from_datalackey, notification_callable = nil, to_datalackey_echo_callable = nil, from_datalackey_echo_callable = nil)
click to toggle source
# File lib/datalackeylib.rb, line 314 def initialize(to_datalackey, from_datalackey, notification_callable = nil, to_datalackey_echo_callable = nil, from_datalackey_echo_callable = nil) @to_datalackey_mutex = Mutex.new @to_datalackey = to_datalackey @to_datalackey_echo = to_datalackey_echo_callable @from_datalackey = from_datalackey @identifier = 0 @tracked_mutex = Mutex.new # Handling of notifications. @notify_tracker = PatternAction.new([ @@internal_notification_map ]) @notify_tracker.set_identifier(nil) @internal = PatternAction.new([ @@internal_generic_map ]) @tracked = Hash.new(nil) @waiting = nil @return_mutex = Mutex.new @return_condition = ConditionVariable.new @dataprocess_mutex = Mutex.new @data = Hash.new(0) @process = { } @children = { } @version = { } @read_datalackey = Thread.new do accum = [] loop do begin raw = @from_datalackey.readpartial(32768) rescue IOError break rescue EOFError break end loc = raw.index("\n") until loc.nil? accum.push(raw[0, loc]) if loc.positive? # Newline at start ends line. raw = raw[loc + 1, raw.size - loc - 1] loc = raw.index("\n") joined = accum.join accum.clear next if joined.empty? from_datalackey_echo_callable.call(joined) unless from_datalackey_echo_callable.nil? msg = JSON.parse joined # See if we are interested in it. if msg.first.nil? act, vars = @notify_tracker.best_match(msg) next if act.nil? # We know there is only one action that matches. act = act.first actionable = nil name = vars.first id = vars.last # Messages from different threads may arrive out of order so # new data/process may be in book-keeping when previous should # be removed. With data these imply over-writing immediately, # with processes re-use of identifier and running back to back. case act.first when :stored @dataprocess_mutex.synchronize do if @data[name] < id @data[name] = id actionable = act end end when :deleted @dataprocess_mutex.synchronize do if @data.key?(name) && @data[name] <= id @data.delete name actionable = act end end when :data_error @dataprocess_mutex.synchronize do @data.delete(name) if @data[name] == id end actionable = act when :started @dataprocess_mutex.synchronize { @process[name] = id } actionable = act when :ended @dataprocess_mutex.synchronize do if @process[name] == id @process.delete(name) @children.delete(name) end end actionable = act when :error case act[1] when :format @to_datalackey_mutex.synchronize { @to_datalackey.putc 0 } when :user_id unless @waiting.nil? # Does the waited command have invalid id? begin int = Integer(@waiting) fract = @waiting - int raise ArgumentError, '' unless fract.zero? rescue ArgumentError, TypeError unless @waiting.is_a? String @tracked_mutex.synchronize do trackers = @tracked[@waiting] trackers.first.message = msg trackers.first.exit = [ act ] @tracked.delete(@waiting) @waiting = nil end @return_mutex.synchronize { @return_condition.signal } end end end end actionable = act end next if notification_callable.nil? || actionable.nil? notification_callable.call(actionable, msg, vars) next end # Not a notification. trackers = @tracked_mutex.synchronize { @tracked[msg[0]] } next if trackers.nil? finish = false last = nil # Deal with user-provided PatternAction (or NoPatternNoAction). tracker = trackers.first act, vars = tracker.best_match(msg) unless act.nil? act.each do |item| tracker.generators.each do |p| break if p.call(item, msg, vars) end next unless msg.first == @waiting case item.first when :return, 'return' finish = true last = act if last.nil? when :error, 'error' finish = true last = act end end end # Check internal PatternAction. internal = trackers.last act, vars = internal.best_match(msg) unless act.nil? act = act.first # We know patterns are all unique in mapping. if act.first == :child @dataprocess_mutex.synchronize { @children[msg[0]] = vars.first } elsif msg.first == @waiting finish = true if act.first == :done @tracked_mutex.synchronize { @tracked.delete(msg[0]) } elsif act.first == :error last = [ act ] end end end if finish tracker.message = msg tracker.exit = last @tracked_mutex.synchronize { @waiting = nil } @return_mutex.synchronize { @return_condition.signal } end end accum.push(raw) unless raw.empty? end @from_datalackey.close @return_mutex.synchronize { @return_condition.signal } end # Outside thread block. send(PatternAction.new([{ version: [ 'version', '', '?' ] }], [ proc do |action, message, vars| if action.first == :version @syntax = vars.first['commands'] @version = { } vars.first.each_pair do |key, value| @version[key] = value if value.is_a? Integer end true else false end end ]), ['version']) end
Public Instance Methods
close()
click to toggle source
# File lib/datalackeylib.rb, line 514 def close @to_datalackey_mutex.synchronize { @to_datalackey.close } end
closed?()
click to toggle source
# File lib/datalackeylib.rb, line 510 def closed? @from_datalackey.closed? end
data()
click to toggle source
# File lib/datalackeylib.rb, line 498 def data @dataprocess_mutex.synchronize { return @data.clone } end
dump(json_as_string)
click to toggle source
# File lib/datalackeylib.rb, line 553 def dump(json_as_string) @to_datalackey_mutex.synchronize do @to_datalackey.write json_as_string @to_datalackey.flush @to_datalackey_echo.call(json_as_string) unless @to_datalackey_echo.nil? rescue Errno::EPIPE # Should do something in this case. Child process died? end end
finish()
click to toggle source
# File lib/datalackeylib.rb, line 518 def finish @read_datalackey.join end
launched()
click to toggle source
# File lib/datalackeylib.rb, line 506 def launched @dataprocess_mutex.synchronize { return @children.clone } end
process()
click to toggle source
# File lib/datalackeylib.rb, line 502 def process @dataprocess_mutex.synchronize { return @process.clone } end
send(pattern_action, command, user_id = false)
click to toggle source
Pass nil pattern_action if you are not interested in doing anything.
# File lib/datalackeylib.rb, line 523 def send(pattern_action, command, user_id = false) return nil if @to_datalackey_mutex.synchronize { @to_datalackey.closed? } if user_id id = command[0] else id = @identifier @identifier += 1 command.prepend id end tracker = pattern_action.nil? ? NoPatternNoAction.new : pattern_action.clone tracker.set_identifier(id) tracker.command = JSON.generate(command) internal = @internal.clone internal.set_identifier(id) @tracked_mutex.synchronize do @tracked[id] = [ tracker, internal ] unless id.nil? @waiting = id end dump(tracker.command) return tracker if id.nil? # There will be no responses. @return_mutex.synchronize { @return_condition.wait(@return_mutex) } tracker.status = true unless tracker.exit.nil? tracker.exit.each do |item| tracker.status = false if item.first == :error || item.first == 'error' end end tracker end
verify(command)
click to toggle source
# File lib/datalackeylib.rb, line 563 def verify(command) @syntax.nil? ? nil : true end