class Farcall::Endpoint

The protocol endpoint. Takes some transport and implements Farcall protocol over it. You can use it direcly or with Farcall::RemoteInterface and Farcall::LocalProvider helper classes.

Note that the returned data is converted to SmartHash primarily for the sake of :key vs. 'key' ambigity that otherwise might appear depending on the transport encoding protocol. Anyway it is better than ruby hash ;)

Endpoint class is thread-safe.

Attributes

provider[RW]

Set or get provider instance. When provider is set, its public methods are called by the remote and any possible exception are passed back to caller party. You can use any ruby class instance everything will work, operators, indexes[] and like.

Public Class Methods

new(transport, init_proc=nil) click to toggle source

Create endpoint connected to some transport @param [Farcall::Transport] transport

# File lib/farcall/endpoint.rb, line 23
def initialize(transport, init_proc=nil)
  @transport                  = transport
  @in_serial                  = @out_serial = 0
  @send_lock    = Mutex.new
  @receive_lock = Mutex.new
  @handlers     = {}
  @waiting      = {}

  init_proc.call(self) if init_proc

  # @!visibility private
  def push_input data
    @in_buffer << data
    drain
  end

  @transport.on_data_received = -> (data) {
    begin
      _received(data)
    rescue
      abort :format_error, $!
    end
  }
end
open(transport, &block) click to toggle source
# File lib/farcall/endpoint.rb, line 48
def self.open(transport, &block)
  Endpoint.new(transport, block)
end

Public Instance Methods

abort(reason, exception = nil) click to toggle source

@!visibility private

# File lib/farcall/endpoint.rb, line 64
def abort reason, exception = nil
  puts "*** Abort: reason #{reason || exception.to_s}"
  @abort_hadnler and @abort_hadnler.call reason, exception
  if exception
    raise exception
  end
  close
end
call(name, *args, **kwargs, &block) click to toggle source

Call the remote party, non blocking, returns {Farcall::Promise} instance to handle the remote return valy asynchronously (recommended way).

Optionally the block could be provided that takes |error, result| parameters. Error must be nil or

SmartHash.new({'class' =>, 'text' => text [, data: {some_data}] })

if error is presented, the result is always the nil.

Usually, using {#remote} which returns {Farcall::Interface} is more effective rather than this low-level method.

The returned {Farcall::Promise} instance let add any number of callbacks on commend execution, success or failure.

@param [String] name of the remote command @return [Farcall::Promise] instance

# File lib/farcall/endpoint.rb, line 98
def call(name, *args, **kwargs, &block)
  promise = Farcall::Promise.new
  @send_lock.synchronize {
      @waiting[@out_serial] = -> (error, result) {
        block.call(error, result) if block
        if error
          promise.set_fail error
        else
          promise.set_success result
        end
      }
      _send(cmd: name.to_s, args: args, kwargs: kwargs)
  }
  promise
end
close() click to toggle source

Close endpoint and connected transport

# File lib/farcall/endpoint.rb, line 74
def close
  @transport.close
  @transport = nil
  @close_handler and @close_handler.call
end
on(name, &block) click to toggle source

Set handler to perform the named command. Block will be called when the remote party calls with parameters passed from the remote. The block returned value will be passed back to the caller.

The provider if set is calling instead.

If the block raises the exception it will be reported to the caller as an error (depending on it's platofrm, will raise exception on its end or report error)

# File lib/farcall/endpoint.rb, line 173
def on(name, &block)
  @handlers[name.to_s] = block
end
on_abort(&proc) click to toggle source

The provided block will be called if endpoint functioning will be aborted. The block should take |reason, exception| parameters - latter could be nil

# File lib/farcall/endpoint.rb, line 54
def on_abort &proc
  @abort_hadnler = proc
end
on_close(&block) click to toggle source

Add the close handler. Specified block will be called when the endpoint is been closed

# File lib/farcall/endpoint.rb, line 59
def on_close &block
  @close_handler = block
end
on_command(&block)
Alias for: on_remote_call
on_remote_call(&block) click to toggle source

Process remote commands. Provided block will be executed on every remote command taking parameters |name, args, kwargs|. Whatever block returns will be passed to a calling party. The same any exception that the block might raise would be send back to caller.

this block will be called onlly of there wes no `provider` specified and no on handler set for the command being executed.

# File lib/farcall/endpoint.rb, line 159
def on_remote_call &block
  @on_remote_call = block
end
Also aliased as: on_command
push_input(data) click to toggle source

@!visibility private

# File lib/farcall/endpoint.rb, line 34
def push_input data
  @in_buffer << data
  drain
end
remote() click to toggle source

Get the {Farcall::Interface} connnected to this endpoint. Any subsequent calls with return the same instance. @return [Farcall::Interface] the remote interface instance

# File lib/farcall/endpoint.rb, line 181
def remote
  @remote ||= Farcall::Interface.new endpoint: self
end
sync_call(name, *args, **kwargs) click to toggle source

Call the remote party and wait for the return.

It is desirable to use Farcall::Endpoint#interface or Farcall::RemoteInterface rather than this low-level method.

@param [String] name of the remote command @return [Object] any data that remote party retruns. If it is a hash, it is a SmartHash

instance.

@raise [Farcall::RemoteError]

# File lib/farcall/endpoint.rb, line 124
def sync_call(name, *args, **kwargs)
  mutex          = Mutex.new
  resource       = ConditionVariable.new
  error          = nil
  result         = nil
  calling_thread = Thread.current

  mutex.synchronize {
    same_thread = false
    call(name, *args, **kwargs) { |e, r|
      error, result = e, r
      # Absolutly stupid wait for self situation
      # When single thread is used to send and receive
      # - often happens in test environments
      if calling_thread == Thread.current
        same_thread = true
      else
        resource.signal
      end
    }
    same_thread or resource.wait(mutex)
  }
  if error
    raise Farcall::RemoteError.new(error['class'], error['text'], error['data'])
  end
  result
end

Private Instance Methods

_received(data) click to toggle source
# File lib/farcall/endpoint.rb, line 197
def _received(data)
  # p [:r, data]
  data = SmartHash.new data

  cmd, serial, args, kwargs, ref, result, error =
      %w{cmd serial args kwargs ref result error}.map { |k| data[k] || data[k.to_sym] }
  !serial || serial < 0 and abort 'missing or bad serial'

  @receive_lock.synchronize {
    serial == @in_serial or abort "framing error (wrong serial)"
    @in_serial += 1
  }

  case
    when cmd

      begin
        result = if @provider
                   args ||= []
                   if kwargs && !kwargs.empty?
                     # ruby thing: keyqord args must be symbols, not strings:
                     fixed = {}
                     kwargs.each { |k, v| fixed[k.to_sym] = v }
                     args << fixed
                   end
                   @provider.send :remote_call, cmd.to_sym, args
                 elsif (h = @handlers[cmd.to_s])
                   h.call args, kwargs
                 elsif @on_remote_call
                   @on_remote_call.call cmd, args, kwargs
                 end
        _send ref: serial, result: result
      rescue Exception => e
        # puts e
        # puts e.backtrace.join("\n")
       error_data = { 'class' => e.class.name, 'text' => e.to_s }
       e.respond_to?(:data) and error_data[:data] = e.data
        _send ref: serial, error: error_data
      end

    when ref

      ref or abort 'no reference in return'
      (proc = @waiting.delete ref) != nil and proc.call(error, result)

    else
      abort 'unknown command'
  end
end
_send(**kwargs) click to toggle source
# File lib/farcall/endpoint.rb, line 187
def _send(**kwargs)
  if @send_lock.locked?
    kwargs[:serial] = @out_serial
    @transport.send_data kwargs
    @out_serial += 1
  else
    @send_lock.synchronize { _send(**kwargs) }
  end
end