class WendelinClient
class representing a Wendelin client
Public Class Methods
new(streamtool_uri, credentials, log)
click to toggle source
`streamtool_uri` - URI pointing to portal_input_data_stream “mountpoint” `credentials` # {'user' => _, 'password' => _} TODO change to certificate `log` - logger to use
# File lib/embulk/output/wendelin_client.rb, line 29 def initialize(streamtool_uri, credentials, log) @streamtool_uri = streamtool_uri @credentials = credentials @log = log end
Public Instance Methods
ingest(reference, data_chunk)
click to toggle source
ingest `data_chunk` to a stream referenced as `reference`
# File lib/embulk/output/wendelin_client.rb, line 37 def ingest(reference, data_chunk) uri = URI("#{@streamtool_uri}/ingest?reference=#{reference}") req = Net::HTTP::Post.new(uri) if @credentials.has_key?('user') req.basic_auth @credentials['user'], @credentials['password'] end # TODO ensure content-type is 'raw', e.g. this way # (but then querystring ?reference=... is lost) # req.body = data_chunk # req.content_type = 'application/octet-stream' req.set_form_data('data_chunk' => data_chunk) #@log.on_trace do # @log.trace '>>> REQUEST' # @log.trace "method\t=> #{req.method}" # @log.trace "path\t=> #{req.path}" # @log.trace "uri\t=> #{req.uri}" # @log.trace "body\t=> #{req.body}" # @log.trace "body_stream\t=> #{req.body_stream}" # req.each {|h| @log.trace "#{h}:\t#{req[h]}"} # @log.trace #end begin # TODO keep connection open (so that every new ingest does not do # full connect again) res = Net::HTTP.start(uri.hostname, uri.port, :use_ssl => (uri.scheme == 'https'), # NOTE = "do not check server cert" # TODO move this out to conf parameters :verify_mode => OpenSSL::SSL::VERIFY_NONE, # Net::HTTP default open timeout is infinity, which results # in thread hang forever if other side does not fully # establish connection. Default read_timeout is 60 seconds. # We go safe way and make sure all timeouts are defined. :ssl_timeout => 60, :open_timeout => 60, :read_timeout => 60, ) do |http| http.request(req) end rescue # some http/ssl/other connection error #@log.warn "HTTP ERROR:" raise else #@log.on_trace do # @log.trace '>>> RESPONSE' # res.each {|h| @log.trace "#{h}:\t#{res[h]}"} # @log.trace "code\t=> #{res.code}" # @log.trace "msg\t=> #{res.message}" # @log.trace "class\t=> #{res.class}" # @log.trace "body:", res.body #end if res.kind_of?(Net::HTTPSuccess) # res.code is 2XX #@log.info "ingested ok" else #@log.warn "FAIL:" res.value end end end