class Fluent::Qi
Constants
- DEFAULT_FORMAT_TYPE
Attributes
delayed[RW]
Public Class Methods
new()
click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_qi.rb, line 55 def initialize super @delayed = false end
Public Instance Methods
configure(conf)
click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_qi.rb, line 60 def configure(conf) if conf['output_type'] && !conf['format'] conf['format'] = conf['output_type'] end super (conf.key?('tenantId')) ? @tenantId = conf['tenantId'] : @tenantId = nil (conf.key?('namespaceId')) ? @namespaceId = conf['namespaceId'] : @attpath = nil (conf.key?('appId')) ? @appId = conf['appId'] : @appId = nil (conf.key?('appKey')) ? @appKey = conf['appKey'] : @appKey = nil (conf.key?('typeID')) ? @typeID = conf['typeID'] : @typeID = nil (conf.key?('streamID')) ? @streamID = conf['streamID'] : @streamID = 'nil' (conf.key?('valuetag')) ? @valuetag = conf['valuetag'] : @valuetag = nil if(@tenantId == nil || @namespaceId == nil) #could change requirement and create a namespace as it is needed, but currently it must be already created $log.write("Please specify a tenantID and namespaceID. Unable to send to QI.\n") return end if(@appId == nil || @appKey == nil) $log.write("Please specify a appId and appKey. Unable to send to QI.\n") return end end
createTypeAndStream()
click to toggle source
# File lib/fluent/plugin/out_qi.rb, line 159 def createTypeAndStream typeURL = ("https://qi-data.osisoft.com/Qi/#{@tenantId}/#{@namespaceId}/Types") # could open this up to let configuration dictate how the object is created. # need to figure out if this should be guided or open (either ask for information and it creates JSON, or just import the JSON and use directly). jsonObj = "{\"Id\":\"#{@typeID}\",\"Name\":\"FluentD\",\"Description\":\"This is a type for fluentd events\",\"QiTypeCode\":0,\"Properties\":[{\"Id\":\"Timestamp\",\"Name\":null,\"Description\":null,\"QiType\":{\"Id\":\"string\",\"Name\":null,\"Description\":null,\"QiTypeCode\":18,\"Properties\":null},\"IsKey\":true},{\"Id\":\"Value\",\"Name\":null,\"Description\":null,\"QiType\":{\"Id\":\"string\",\"Name\":null,\"Description\":null,\"QiTypeCode\":18,\"Properties\":null},\"IsKey\":false}]}" client = RestClient::Resource.new(typeURL, :verify_ssl => OpenSSL::SSL::VERIFY_NONE) client.post(jsonObj, :authorization => @authHeader, :content_type => "application/json", :accept => "text/plain") { |response, request, result, &block| case response.code when 200,201,302 #this returns a 201 when the create works and a 302 when the thing already existed getStreamURL = "https://qi-data.osisoft.com/Qi/#{@tenantId}/#{@namespaceId}/Streams" jsonObjGetStream = "{\"Id\":\"#{@streamID}\",\"Name\":\"WaveData_SampleStream\",\"Description\":null,\"TypeId\":\"#{@typeID}\",\"BehaviorId\":null}" clientGetStream = RestClient::Resource.new(getStreamURL, :verify_ssl => OpenSSL::SSL::VERIFY_NONE) clientGetStream.post(jsonObjGetStream, :authorization => @authHeader, :content_type => "application/json", :accept => "text/plain") { |response, request, result, &block| case response.code when 201,302 #this returns a 201 when the create works and a 302 when the thing already existed else $log.write("Failed to get the stream created or found ") raise "Stream creation failed" end } else $log.write("Failed to get the type created or found ") raise "Type creation failed" end } end
format(tag, time, record)
click to toggle source
# File lib/fluent/plugin/out_qi.rb, line 194 def format(tag, time, record) #assumes time is a bignum in FluentD pattern not a PI Time value = record[@valuetag].to_s return "{\"Timestamp\":\"#{Time.at(time).localtime}\",\"Value\":\"#{value}\"}," end
gettoken(getstream)
click to toggle source
# File lib/fluent/plugin/out_qi.rb, line 133 def gettoken(getstream) auth_ctx = ADAL::AuthenticationContext.new("login.windows.net","365cc376-d621-41b0-b32f-7a982b22a63c") client_cred = ADAL::ClientCredential.new(@appId, @appKey) token_response = auth_ctx.acquire_token_for_client("https://qihomeprod.onmicrosoft.com/historian",client_cred) case token_response when ADAL::SuccessResponse # ADAL successfully exchanged the authorization code for an access_token. # The token_response includes other information but we only care about the # access token and the refresh token. @full_token = token_response @access_token = token_response.access_token @refresh_token = token_response.refresh_token @authHeader = ("bearer " + @access_token) if(getstream) createTypeAndStream() end when ADAL::ErrorResponse # ADAL failed to exchange the authorization code for an access_token. $log.write("Failed to get an access_token") raise "ADAL token failed" end end
prefer_buffered_processing()
click to toggle source
# File lib/fluent/plugin/out_qi.rb, line 45 def prefer_buffered_processing false end
prefer_delayed_commit()
click to toggle source
# File lib/fluent/plugin/out_qi.rb, line 49 def prefer_delayed_commit @delayed end
start()
click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_qi.rb, line 89 def start super gettoken(true) @secondWriteAttempt = false @three = false end
write(chunk)
click to toggle source
# File lib/fluent/plugin/out_qi.rb, line 96 def write(chunk) if(@three) return end data = chunk.read jsonObj = "[" + data.chop + "]" # removes trailing ',' and makes a valid json object if(Time.at(@full_token.expires_on).utc < (Time.now.utc + 30)) gettoken(false) #the refresh token isn't working so when we are withing 30 seconds get a new token end sendDataURL = "https://qi-data.osisoft.com/Qi/#{@tenantId}/#{@namespaceId}/Streams/#{@streamID}/Data/UpdateValues" clientGetStream = RestClient::Resource.new(sendDataURL, :verify_ssl => OpenSSL::SSL::VERIFY_NONE) clientGetStream.put(jsonObj, :authorization => @authHeader, :content_type => "application/json", :accept => "text/plain") { |response, request, result, &block| case response.code when 200,201,302 #201 means it is created, 302 means it was created and it was just retrieved. secondWriteAttempt=false else $log.write(response.to_s) $log.write("\nFailed to write the value. Regetting stream and type to see if this helps.\n") if(!@secondWriteAttempt) @secondWriteAttempt=true createTypeAndStream() write(chunk) else $log.write("Failed to write the value. Regetting stream and type did not help.\n") @three = true raise "Writing Values failed" end end } $log.write("QI successful writes\n") end