module MessagePack::Rpc::Server

Module that implemented server protocol of MessagePack-RPC.

@abstract

Include from the class that implements the rpc server. You can expose
If you receive a protocol level error, override the on_error method.
the methods defined in that class as RPC procedures.

Public Class Methods

included(klass) click to toggle source

@!visibility protected

# File lib/msgpack/rpc/server.rb, line 91
def included(klass)
  m = Module.new {
    @@error = Class.new(StandardError) {
      def initialize(label, data)
        super("error ocurred on precedure \"#{label}\"")
        @data = data
      end

      attr_reader :data
    }

    @@deferred = Class.new {
      def initialize(id, klass)
        @id    = id
        @klass = klass
      end

      def resolve(result)
        packet = [1, @id, nil, result].to_msgpack
        @klass.instance_eval {send_data(packet)}

        class << self
          undef_method :resolve, :reject
        end
      end

      def reject(error)
        packet = [1, @id, error, nil].to_msgpack
        @klass.instance_eval {
          send_data(packet)

          if not error.kind_of?(Exception)
            label = caller_locations(3..3)[0].base_label
            error = @@error.new(label, error)
          end

          error.set_backtrace(caller(3..-1))
          error_occured(error)
        }

        class << self
          undef_method :resolve, :reject
        end
      end
    }

    klass.instance_variable_set(:@remote_public, [])
    klass.instance_variable_set(:@remote_async, [])
    klass.instance_variable_set(:@msgpack_options, {})

    def remote_public(meth = nil)
      @remote_public << meth if meth
      return @remote_public
    end

    def remote_async(meth = nil)
      @remote_async << meth if meth
      return @remote_async
    end

    def msgpack_options(opts = :none)
      if opts.nil? || opts.kind_of?(Hash)
        @msgpack_options = opts
      end

      return @msgpack_options
    end

    def new_unpacker
      return MessagePack::Unpacker.new(@msgpack_options || {})
    end
  }

  klass.extend(m)
end
initialize(label, data) click to toggle source
Calls superclass method
# File lib/msgpack/rpc/server.rb, line 94
def initialize(label, data)
  super("error ocurred on precedure \"#{label}\"")
  @data = data
end
msgpack_options(opts = :none) click to toggle source
# File lib/msgpack/rpc/server.rb, line 151
def msgpack_options(opts = :none)
  if opts.nil? || opts.kind_of?(Hash)
    @msgpack_options = opts
  end

  return @msgpack_options
end
new_unpacker() click to toggle source
# File lib/msgpack/rpc/server.rb, line 159
def new_unpacker
  return MessagePack::Unpacker.new(@msgpack_options || {})
end
reject(error) click to toggle source
# File lib/msgpack/rpc/server.rb, line 117
def reject(error)
  packet = [1, @id, error, nil].to_msgpack
  @klass.instance_eval {
    send_data(packet)

    if not error.kind_of?(Exception)
      label = caller_locations(3..3)[0].base_label
      error = @@error.new(label, error)
    end

    error.set_backtrace(caller(3..-1))
    error_occured(error)
  }

  class << self
    undef_method :resolve, :reject
  end
end
remote_async(meth = nil) click to toggle source
# File lib/msgpack/rpc/server.rb, line 146
def remote_async(meth = nil)
  @remote_async << meth if meth
  return @remote_async
end
remote_public(meth = nil) click to toggle source
# File lib/msgpack/rpc/server.rb, line 141
def remote_public(meth = nil)
  @remote_public << meth if meth
  return @remote_public
end
resolve(result) click to toggle source
# File lib/msgpack/rpc/server.rb, line 108
def resolve(result)
  packet = [1, @id, nil, result].to_msgpack
  @klass.instance_eval {send_data(packet)}

  class << self
    undef_method :resolve, :reject
  end
end

Public Instance Methods

notify(meth, *args) click to toggle source

send the notification to peer rpc client

@param [Symbol] meth

notify name

@param [Array] args

argument for notification
# File lib/msgpack/rpc/server.rb, line 283
def notify(meth, *args)
  send_data([2, meth, args].to_msgpack)
end
receive_dgram(data) click to toggle source

emqueu the received datagram to communication buffer

@param [Blob] data

recevied data from peer rpc client.

@note

Use this method for datagram communication. \
Use it when it is guaranteed that data is exchanged \
in packets (it works a bit faster).
# File lib/msgpack/rpc/server.rb, line 298
def receive_dgram(data)
  msg = MessagePack.unpack(data, self.class.msgpack_options)

  if not msg.kind_of?(Array)
    error_occured(RantimeError.new("not array message is received"))
  end

  eval_message(msg)
end
receive_stream(data) click to toggle source

emqueu the received data to communication buffer

@param [Blob] data

recevied data from peer rpc client.
# File lib/msgpack/rpc/server.rb, line 314
def receive_stream(data)
  begin
    unpacker.feed_each(data) {|msg| eval_message(msg)}

  rescue MessagePack::UnpackError => e
    unpacker.reset
    error_occured(e)

  rescue => e
    error_occured(e)
  end
end
reset_unpacker() click to toggle source
# File lib/msgpack/rpc/server.rb, line 172
def reset_unpacker
  @unpacker = nil
end
unpacker() click to toggle source
# File lib/msgpack/rpc/server.rb, line 168
def unpacker
  return (@unpacker ||= self.class.new_unpacker)
end

Private Instance Methods

do_async_call(id, meth, para) click to toggle source
# File lib/msgpack/rpc/server.rb, line 176
def do_async_call(id, meth, para)
  deferred = @@deferred.new(id, self)

  if not para
    self.__send__(meth, deferred)

  elsif para.kind_of?(Array)
    self.__send__(meth, deferred, *para)

  else
    self.__send__(meth, deferred, para)
  end
end
do_call(id, meth, para) click to toggle source
# File lib/msgpack/rpc/server.rb, line 191
def do_call(id, meth, para)
  if not para
    ret = self.__send__(meth)

  elsif para.kind_of?(Array)
    ret = self.__send__(meth, *para)

  else
    ret = self.__send__(meth, para)
  end

  return ret
end
do_notify(meth, para) click to toggle source
# File lib/msgpack/rpc/server.rb, line 206
def do_notify(meth, para)
  if para.kind_of?(Array)
    self.__send__(meth.to_sym, *para)

  else
    self.__send__(meth.to_sym, para)
  end
end
error_occured(e) click to toggle source
# File lib/msgpack/rpc/server.rb, line 216
def error_occured(e)
  if self.respond_to?(:on_error, true)
    __send__(:on_error, e)
  else
    STDERR.print("#{e.message}")
  end
end
eval_message(msg) click to toggle source
# File lib/msgpack/rpc/server.rb, line 225
def eval_message(msg)
  case msg[0]
  when 0
    #
    # when call
    #
    id   = msg[1]
    meth = msg[2].to_sym
    args = msg[3]

    if self.class.remote_async.include?(meth)
      do_async_call(id, meth, args)

    elsif self.class.remote_public.include?(meth)
      result = do_call(id, meth, args)
      send_data([1, id, nil, result].to_msgpack)

    else
      raise("procedure `#{meth}` is not callable from remote")
    end

  when 2
    #
    # when notify
    #
    meth = msg[1].to_sym
    args = msg[2]

    if self.class.remote_public.include?(meth)
      do_notify(meth, args);

    else
      raise("notify `#{meth}` is unhandled")
    end

  else
    raise ProtocolError.new("unknown message type #{msg[0]} recived.")
  end

rescue => e
  if msg[0] == 0
    error = e.data rescue String.new(e.message, encoding:"UTF-8")
    send_data([1, id, error, nil].to_msgpack)
  end

  error_occured(e)
end