class Fluent::Qi

Constants

DEFAULT_FORMAT_TYPE

Attributes

delayed[RW]

Public Class Methods

new() click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_qi.rb, line 55
def initialize
  super
  @delayed = false
end

Public Instance Methods

configure(conf) click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_qi.rb, line 60
def configure(conf)
  if conf['output_type'] && !conf['format']
    conf['format'] = conf['output_type']
  end

  super
  
  (conf.key?('tenantId')) ? @tenantId = conf['tenantId'] : @tenantId = nil
  (conf.key?('namespaceId')) ? @namespaceId = conf['namespaceId'] : @attpath = nil
  
  (conf.key?('appId')) ? @appId = conf['appId'] : @appId = nil
  (conf.key?('appKey')) ? @appKey = conf['appKey'] : @appKey = nil

  (conf.key?('typeID')) ? @typeID = conf['typeID'] : @typeID = nil 
  (conf.key?('streamID')) ? @streamID = conf['streamID'] : @streamID = 'nil'
  (conf.key?('valuetag')) ? @valuetag = conf['valuetag'] : @valuetag = nil   
   
  if(@tenantId == nil  || @namespaceId == nil)
    #could change requirement and create a namespace as it is needed, but currently it must be already created
    $log.write("Please specify a tenantID and namespaceID.  Unable to send to QI.\n")
    return
  end
  
  if(@appId == nil  || @appKey == nil)
    $log.write("Please specify a appId and appKey.  Unable to send to QI.\n")
    return
  end        
end
createTypeAndStream() click to toggle source
# File lib/fluent/plugin/out_qi.rb, line 159
def createTypeAndStream
    typeURL = ("https://qi-data.osisoft.com/Qi/#{@tenantId}/#{@namespaceId}/Types")

    # could open this up to let configuration dictate how the object is created.
    # need to figure out if this should be guided or open (either ask for information and it creates JSON, or just import the JSON and use directly).
    jsonObj =  "{\"Id\":\"#{@typeID}\",\"Name\":\"FluentD\",\"Description\":\"This is a type for fluentd events\",\"QiTypeCode\":0,\"Properties\":[{\"Id\":\"Timestamp\",\"Name\":null,\"Description\":null,\"QiType\":{\"Id\":\"string\",\"Name\":null,\"Description\":null,\"QiTypeCode\":18,\"Properties\":null},\"IsKey\":true},{\"Id\":\"Value\",\"Name\":null,\"Description\":null,\"QiType\":{\"Id\":\"string\",\"Name\":null,\"Description\":null,\"QiTypeCode\":18,\"Properties\":null},\"IsKey\":false}]}"
    
    client = RestClient::Resource.new(typeURL, :verify_ssl => OpenSSL::SSL::VERIFY_NONE)

    client.post(jsonObj, :authorization => @authHeader, :content_type => "application/json", :accept => "text/plain") { |response, request, result, &block|
        case response.code
        when 200,201,302                  
            #this returns a 201 when the create works and a 302 when the thing already existed

            getStreamURL = "https://qi-data.osisoft.com/Qi/#{@tenantId}/#{@namespaceId}/Streams"
            jsonObjGetStream = "{\"Id\":\"#{@streamID}\",\"Name\":\"WaveData_SampleStream\",\"Description\":null,\"TypeId\":\"#{@typeID}\",\"BehaviorId\":null}"
            
            clientGetStream = RestClient::Resource.new(getStreamURL, :verify_ssl => OpenSSL::SSL::VERIFY_NONE)
            clientGetStream.post(jsonObjGetStream, :authorization => @authHeader, :content_type => "application/json", :accept => "text/plain") { |response, request, result, &block|
                case response.code
                when 201,302
                #this returns a 201 when the create works and a 302 when the thing already existed

                else          
                    $log.write("Failed to get the stream created or found ")           
                    raise "Stream creation failed"  
                end                    
                }
        else
        $log.write("Failed to get the type created or found ")      
        raise "Type creation failed"  
        end
    }
end
format(tag, time, record) click to toggle source
# File lib/fluent/plugin/out_qi.rb, line 194
def format(tag, time, record)
  #assumes time is a bignum in FluentD pattern not a PI Time
    value = record[@valuetag].to_s
    return "{\"Timestamp\":\"#{Time.at(time).localtime}\",\"Value\":\"#{value}\"},"
end
gettoken(getstream) click to toggle source
# File lib/fluent/plugin/out_qi.rb, line 133
def gettoken(getstream)
    auth_ctx = ADAL::AuthenticationContext.new("login.windows.net","365cc376-d621-41b0-b32f-7a982b22a63c")
    client_cred = ADAL::ClientCredential.new(@appId, @appKey)

    token_response = auth_ctx.acquire_token_for_client("https://qihomeprod.onmicrosoft.com/historian",client_cred)
    case token_response
    when ADAL::SuccessResponse
        # ADAL successfully exchanged the authorization code for an access_token.
        # The token_response includes other information but we only care about the
        # access token and the refresh token.
        @full_token = token_response
        @access_token = token_response.access_token
        @refresh_token = token_response.refresh_token    
        @authHeader = ("bearer " + @access_token)
        
        if(getstream)
          createTypeAndStream()
        end

    when ADAL::ErrorResponse
        # ADAL failed to exchange the authorization code for an access_token.
        $log.write("Failed to get an access_token")
        raise "ADAL token failed"  
    end
end
prefer_buffered_processing() click to toggle source
# File lib/fluent/plugin/out_qi.rb, line 45
def prefer_buffered_processing
  false
end
prefer_delayed_commit() click to toggle source
# File lib/fluent/plugin/out_qi.rb, line 49
def prefer_delayed_commit
  @delayed
end
start() click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_qi.rb, line 89
def start
  super
  gettoken(true) 
  @secondWriteAttempt = false
  @three  = false
end
write(chunk) click to toggle source
# File lib/fluent/plugin/out_qi.rb, line 96
def write(chunk)
  if(@three)
    return
  end
  data = chunk.read

  jsonObj = "[" + data.chop + "]" # removes trailing ',' and makes a valid json object

  if(Time.at(@full_token.expires_on).utc  < (Time.now.utc + 30))
    gettoken(false) #the refresh token isn't working so when we are withing 30 seconds get a new token
  end

  sendDataURL = "https://qi-data.osisoft.com/Qi/#{@tenantId}/#{@namespaceId}/Streams/#{@streamID}/Data/UpdateValues"

  clientGetStream = RestClient::Resource.new(sendDataURL, :verify_ssl => OpenSSL::SSL::VERIFY_NONE)
  clientGetStream.put(jsonObj, :authorization => @authHeader, :content_type => "application/json", :accept => "text/plain") { |response, request, result, &block|
      case response.code
      when 200,201,302   
        #201 means it is created, 302 means it was created and it was just retrieved.
        secondWriteAttempt=false
      else
        $log.write(response.to_s)    
        $log.write("\nFailed to write the value.  Regetting stream and type to see if this helps.\n")    
        if(!@secondWriteAttempt) 
          @secondWriteAttempt=true
          createTypeAndStream()   
          write(chunk)
        else
          $log.write("Failed to write the value.  Regetting stream and type did not help.\n")    
          @three = true
          raise "Writing Values failed"  
        end
      end
      }           
    $log.write("QI successful writes\n")
end