class WendelinClient

class representing a Wendelin client

Public Class Methods

new(streamtool_uri, credentials, log) click to toggle source

`streamtool_uri` - URI pointing to portal_input_data_stream “mountpoint” `credentials` # {'user' => _, 'password' => _} TODO change to certificate `log` - logger to use

# File lib/embulk/output/wendelin_client.rb, line 29
def initialize(streamtool_uri, credentials, log)
  @streamtool_uri = streamtool_uri
  @credentials    = credentials
  @log            = log
end

Public Instance Methods

ingest(reference, data_chunk) click to toggle source

ingest `data_chunk` to a stream referenced as `reference`

# File lib/embulk/output/wendelin_client.rb, line 37
def ingest(reference, data_chunk)
    uri = URI("#{@streamtool_uri}/ingest?reference=#{reference}")
    req = Net::HTTP::Post.new(uri)
    if @credentials.has_key?('user')
      req.basic_auth @credentials['user'], @credentials['password']
    end

    # TODO ensure content-type is 'raw', e.g. this way
    # (but then querystring ?reference=... is lost)
    # req.body = data_chunk
    # req.content_type = 'application/octet-stream'
    req.set_form_data('data_chunk' => data_chunk)

    #@log.on_trace do
    #    @log.trace '>>> REQUEST'
    #    @log.trace "method\t=> #{req.method}"
    #    @log.trace "path\t=> #{req.path}"
    #    @log.trace "uri\t=> #{req.uri}"
    #    @log.trace "body\t=> #{req.body}"
    #    @log.trace "body_stream\t=> #{req.body_stream}"
    #    req.each {|h| @log.trace "#{h}:\t#{req[h]}"}
    #    @log.trace
    #end

    begin
        # TODO keep connection open (so that every new ingest does not do
        # full connect again)
        res = Net::HTTP.start(uri.hostname, uri.port,
                    :use_ssl      => (uri.scheme == 'https'),
                    # NOTE = "do not check server cert"
                    # TODO move this out to conf parameters
                    :verify_mode  => OpenSSL::SSL::VERIFY_NONE,

                    # Net::HTTP default open timeout is infinity, which results
                    # in thread hang forever if other side does not fully
                    # establish connection. Default read_timeout is 60 seconds.
                    # We go safe way and make sure all timeouts are defined.
                    :ssl_timeout  => 60,
                    :open_timeout => 60,
                    :read_timeout => 60,
              ) do |http|
            http.request(req)
        end

    rescue
        # some http/ssl/other connection error
        #@log.warn "HTTP ERROR:"
        raise

    else
        #@log.on_trace do
        #    @log.trace '>>> RESPONSE'
        #    res.each {|h| @log.trace "#{h}:\t#{res[h]}"}
        #    @log.trace "code\t=> #{res.code}"
        #    @log.trace "msg\t=> #{res.message}"
        #    @log.trace "class\t=> #{res.class}"
        #    @log.trace "body:", res.body
        #end

        if res.kind_of?(Net::HTTPSuccess)       # res.code is 2XX
            #@log.info "ingested ok"
        else
            #@log.warn "FAIL:"
            res.value
        end
    end
end