class LogStash::Outputs::ADLS
Usage¶ ↑
This is an example of Logstash config:
- source,ruby
input {
...
} filter {
...
} output {
adls { adls_fqdn => "XXXXXXXXXXX.azuredatalakestore.net" # (required) adls_token_endpoint => "https://login.microsoftonline.com/XXXXXXXXXX/oauth2/token" # (required) adls_client_id => "00000000-0000-0000-0000-000000000000" # (required) adls_client_key => "XXXXXXXXXXXXXXXXXXXXXX" # (required) path => "/logstash/%{+YYYY}/%{+MM}/%{+dd}/logstash-%{+HH}-%{[@metadata][cid]}.log" # (required) test_path => "testfile" # (optional, default "testfile") line_separator => "\n" # (optional, default: "\n") created_files_permission => 755 # (optional, default: 755) adls_token_expire_security_margin => 300 # (optional, default: 300) single_file_per_thread = > true # (optional, default: true) retry_interval => 0.5 # (optional, default: 0.5) max_retry_interval => 10 # (optional, default: 10) retry_times => 3 # (optional, default: 3) exit_if_retries_exceeded => false # (optional, default: false) codec => "json" # (optional, default: default codec defined by Logstash) }
}
Attributes
azureOauthTokenRefreshDate[RW]
client[RW]
timer[RW]
timerTaskClass[RW]
Public Instance Methods
close()
click to toggle source
# File lib/logstash/outputs/adls.rb, line 146 def close @logger.info("Logstash ADLS output plugin is shutting down...") end
multi_receive(events)
click to toggle source
# File lib/logstash/outputs/adls.rb, line 168 def multi_receive(events) return if not events timeElapsed = Time.now output_files = Hash.new { |hash, key| hash[key] = "" } events.collect do |event| if @single_file_per_thread event.set("[@metadata][cid]", "#{@randomValuePerInstance.to_s}#{Thread.current.object_id.to_s}") end path = event.sprintf(@path) event_as_string = @codec.encode(event) event_as_string += @line_separator unless event_as_string.end_with? @line_separator output_files[path] << event_as_string end output_files.each do |path, output| # Retry max_retry times. This can solve problems like leases being hold by another process. write_tries = 0 begin write_data(path, output) rescue Exception => e if (write_tries < @retry_times) or (@retry_times == -1) sleepTime = [@retry_interval * write_tries, @max_retry_interval].min @logger.warn("ADLS write caused an exception: #{e.message}. Maybe you should increase retry_interval or reduce number of workers. Attempt: #{write_tries.to_s}. Retrying in #{sleepTime.to_s} seconds...") sleep(sleepTime) write_tries += 1 retry else if e.instance_of? com.microsoft.azure.datalake.store.ADLException @logger.error("Max write retries reached. Events discarded! ADLS_RemoteMessage: #{e.remoteExceptionMessage}; Exception: #{e.message}; ADLS_Path: #{path}; StackTrace:#{e.backtrace.join("\n\t")}") else @logger.error("Max write retries reached. Events discarded! Exception: #{e.message}; StackTrace:#{e.backtrace.join("\n\t")}") end if @exit_if_retries_exceeded exit 1 end end end end @logger.debug("#{events.length.to_s} events written on ADLS in #{Time.now-timeElapsed} seconds.") end
prepare_client(accountFQDN, clientId, authTokenEndpoint, clientKey, testPath)
click to toggle source
# File lib/logstash/outputs/adls.rb, line 150 def prepare_client(accountFQDN, clientId, authTokenEndpoint, clientKey, testPath) azureToken = com.microsoft.azure.datalake.store.oauth2.AzureADAuthenticator.getTokenUsingClientCreds(authTokenEndpoint, clientId, clientKey) calendar = java.util.Calendar.getInstance() calendar.setTime(azureToken.expiry) calendar.set(java.util.Calendar::SECOND,(calendar.get(java.util.Calendar::SECOND)-@adls_token_expire_security_margin)) @azureOauthTokenRefreshDate = calendar.getTime() @logger.info("Got ADLS OAuth Token with expire date #{azureToken.expiry.to_s}. Token will be refreshed at #{@azureOauthTokenRefreshDate.to_s}") client = com.microsoft.azure.datalake.store.ADLStoreClient.createClient(accountFQDN, azureToken) options = com.microsoft.azure.datalake.store.ADLStoreOptions.new() options.setUserAgentSuffix("Logstash-ADLS-Output-Plugin") client.setOptions(options) client.checkExists(testPath) # Test the Client to make sure it works. The return value is irrelevant. client end
register()
click to toggle source
# File lib/logstash/outputs/adls.rb, line 100 def register() begin @client = prepare_client(@adls_fqdn, @adls_client_id, @adls_token_endpoint, @adls_client_key, @test_path) rescue => e logger.error("Cannot Login in ADLS. Aborting.... Exception: #{e.message}; Trace:#{e.backtrace.join("\n\t")}") exit 1 end # Make sure @path contains %{[@metadata][thread_id]} format value if @single_file_per_thread and !@path.include? "%{[@metadata][cid]}" @logger.error("Please set %{[@metadata][cid]} format value in @path to avoid file locks in ADLS.") raise LogStash::ConfigurationError end @codec.on_event do |event, encoded_event| encoded_event end @timerTaskClass = Class.new java.util.TimerTask do def setContext(parent) @parent = parent end def run begin @parent.client = @parent.prepare_client(@parent.adls_fqdn, @parent.adls_client_id, @parent.adls_token_endpoint, @parent.adls_client_key, @parent.test_path) rescue => e sleepTime = [@parent.retry_interval, @parent.max_retry_interval].min @parent.logger.error("ADLS Refresh OAuth Token Failed! Retrying in #{sleepTime.to_s} seconds... Exception: #{e.message}; Trace:#{e.backtrace.join("\n\t")}") sleep(sleepTime) end timerTask = @parent.timerTaskClass.new timerTask.setContext(@parent) @parent.timer.schedule(timerTask, @parent.azureOauthTokenRefreshDate) # Rearm timer end end timerTask = @timerTaskClass.new timerTask.setContext(self) @timer = java.util.Timer.new @timer.schedule(timerTask, @azureOauthTokenRefreshDate) @randomValuePerInstance = rand(10..10000) # To make sure different instances in different machines don't generate the same threadId. end
run()
click to toggle source
# File lib/logstash/outputs/adls.rb, line 123 def run begin @parent.client = @parent.prepare_client(@parent.adls_fqdn, @parent.adls_client_id, @parent.adls_token_endpoint, @parent.adls_client_key, @parent.test_path) rescue => e sleepTime = [@parent.retry_interval, @parent.max_retry_interval].min @parent.logger.error("ADLS Refresh OAuth Token Failed! Retrying in #{sleepTime.to_s} seconds... Exception: #{e.message}; Trace:#{e.backtrace.join("\n\t")}") sleep(sleepTime) end timerTask = @parent.timerTaskClass.new timerTask.setContext(@parent) @parent.timer.schedule(timerTask, @parent.azureOauthTokenRefreshDate) # Rearm timer end
setContext(parent)
click to toggle source
# File lib/logstash/outputs/adls.rb, line 120 def setContext(parent) @parent = parent end
write_data(path, data)
click to toggle source
# File lib/logstash/outputs/adls.rb, line 213 def write_data(path, data) begin @logger.debug("Trying to write at #{path}") adlsClient = @client # Try to append to already existing file, which will work most of the times. stream = adlsClient.getAppendStream(path) outStream = java.io.PrintStream.new(stream) outStream.print(data) outStream.close() stream.close() # File does not exist, so create it. rescue com.microsoft.azure.datalake.store.ADLException => e if e.httpResponseCode == 404 createStream = adlsClient.createFile(path, com.microsoft.azure.datalake.store.IfExists::OVERWRITE, @created_files_permission.to_s, true) outStream = java.io.PrintStream.new(createStream) outStream.print(data) outStream.close() createStream.close() @logger.debug("File #{path} created.") else raise e end end end