class EventHub::Processor
Attributes
exception_writer[R]
name[R]
pidfile[R]
statistics[R]
Public Class Methods
new(name=nil)
click to toggle source
# File lib/eventhub/processor.rb, line 11 def initialize(name=nil) @name = name || class_to_array(self.class)[1..-1].join('.') @pidfile = EventHub::Components::Pidfile.new(File.join(Dir.pwd, 'pids', "#{@name}.pid")) @exception_writer = EventHub::Components::ExceptionWriter.new @statistics = EventHub::Statistics.new @heartbeat = EventHub::Heartbeat.new(self) @message_processor = EventHub::MessageProcessor.new(self) @channel_receiver = nil @channel_sender = nil @restart = true end
Public Instance Methods
call_service(method, url)
click to toggle source
# File lib/eventhub/processor.rb, line 111 def call_service(method, url) if server_ssl? # ssl url = "https://" + url response = RestClient::Request.execute(method: method, url: url, ssl_ca_file: '/apps/sys_eventhub1/certs/cacert.pem', verify_ssl: OpenSSL::SSL::VERIFY_NONE, headers: { content_type: 'application/json', accept: 'application/json' } ) else # no ssl url = "http://" + url response = RestClient::Request.execute(method: method, url: url, headers: { content_type: 'application/json', accept: 'application/json' } ) end return response end
configuration()
click to toggle source
# File lib/eventhub/processor.rb, line 24 def configuration EventHub::Configuration.instance.data end
connection_settings()
click to toggle source
# File lib/eventhub/processor.rb, line 52 def connection_settings { user: server_user, password: server_password, host: server_host, vhost: server_vhost } end
handle_message(metadata, payload)
click to toggle source
# File lib/eventhub/processor.rb, line 107 def handle_message(metadata, payload) raise 'Please implement method in derived class' end
heartbeat_cycle_in_s()
click to toggle source
# File lib/eventhub/processor.rb, line 72 def heartbeat_cycle_in_s configuration.get('processor.heartbeat_cycle_in_s') || 300 end
listener_queues()
click to toggle source
# File lib/eventhub/processor.rb, line 56 def listener_queues Array( configuration.get('processor.listener_queue') || configuration.get('processor.listener_queues') || 'undefined_listener_queues' ) end
restart_in_s()
click to toggle source
# File lib/eventhub/processor.rb, line 68 def restart_in_s configuration.get('processor.restart_in_s') || 15 end
send_message(message, exchange_name = EventHub::EH_X_INBOUND)
click to toggle source
send message
# File lib/eventhub/processor.rb, line 165 def send_message(message, exchange_name = EventHub::EH_X_INBOUND) if @channel_sender.nil? || !@channel_sender.open? @channel_sender = AMQP::Channel.new(@connection, prefetch: 1) # use publisher confirm @channel_sender.confirm_select # @channel.on_error { |ch, channel_close| EventHub.logger.error "Oops! a channel-level exception: #{channel_close.reply_text}" } # @channel.on_ack { |basic_ack| EventHub.logger.info "Received basic_ack: multiple = #{basic_ack.multiple}, delivery_tag = #{basic_ack.delivery_tag}" } end exchange = @channel_sender.direct(exchange_name, :durable => true, :auto_delete => false) exchange.publish(message.to_json, :persistent => true) end
server_host()
click to toggle source
# File lib/eventhub/processor.rb, line 28 def server_host configuration.get('server.host') || 'localhost' end
server_management_port()
click to toggle source
# File lib/eventhub/processor.rb, line 40 def server_management_port configuration.get('server.management_port') || 15672 end
server_password()
click to toggle source
# File lib/eventhub/processor.rb, line 36 def server_password configuration.get('server.password') || 'admin' end
server_ssl?()
click to toggle source
# File lib/eventhub/processor.rb, line 48 def server_ssl? configuration.get('server.ssl') || false end
server_user()
click to toggle source
# File lib/eventhub/processor.rb, line 32 def server_user configuration.get('server.user') || 'admin' end
server_vhost()
click to toggle source
# File lib/eventhub/processor.rb, line 44 def server_vhost configuration.get('server.vhost') || 'event_hub' end
sleep_break(seconds)
click to toggle source
# File lib/eventhub/processor.rb, line 181 def sleep_break(seconds) # breaks after n seconds or after interrupt while (seconds > 0) sleep(1) seconds -= 1 break unless @restart end end
start(detached = false)
click to toggle source
# File lib/eventhub/processor.rb, line 76 def start(detached = false) daemonize if detached EventHub.logger.info("Processor [#{@name}] base folder [#{Dir.pwd}]") # use timer here to have last heartbeat message working Signal.trap('TERM') { EventMachine.add_timer(0) { about_to_stop } } Signal.trap('INT') { EventMachine.add_timer(0) { about_to_stop } } while @restart begin handle_start_internal # custom post start method to be overwritten post_start rescue => e id = @exception_writer.write(e) EventHub.logger.error("Unexpected exception: #{e}, see => #{id}. Trying to restart in #{self.restart_in_s} seconds...") sleep_break self.restart_in_s end end # while # custon post stop method to be overwritten post_stop EventHub.logger.info("Processor [#{@name}] has been stopped") ensure pidfile.delete end
version()
click to toggle source
# File lib/eventhub/processor.rb, line 7 def version '1.0.0' end
watchdog()
click to toggle source
# File lib/eventhub/processor.rb, line 136 def watchdog self.listener_queues.each do |queue_name| begin url = "#{self.server_user}:#{CGI::escape(self.server_password)}@#{self.server_host}:#{self.server_management_port}/api/queues/#{self.server_vhost}/#{queue_name}/bindings" response = call_service(:get, url) data = JSON.parse(response.body) if response.code != 200 EventHub.logger.warn("Watchdog: Server did not answered properly. Trying to restart in #{self.restart_in_s} seconds...") EventMachine.add_timer(self.restart_in_s) { stop_processor(true) } elsif data.size == 0 EventHub.logger.warn("Watchdog: Something is wrong with the vhost, queue [#{queue_name}], and/or bindings. Trying to restart in #{self.restart_in_s} seconds...") EventMachine.add_timer(self.restart_in_s) { stop_processor(true) } # does it make sense ? Needs maybe more checks in future else # Watchdog is happy :-) # add timer for next check EventMachine.add_timer(self.watchdog_cycle_in_s) { watchdog } end rescue => e EventHub.logger.error("Watchdog: Unexpected exception: #{e}. Trying to restart in #{self.restart_in_s} seconds...") stop_processor end end end
watchdog_cycle_in_s()
click to toggle source
# File lib/eventhub/processor.rb, line 64 def watchdog_cycle_in_s configuration.get('processor.watchdog_cycle_is_s') || 15 end
Private Instance Methods
about_to_stop()
click to toggle source
# File lib/eventhub/processor.rb, line 269 def about_to_stop heartbeat('stopped') stop_processor end
daemonize()
click to toggle source
# File lib/eventhub/processor.rb, line 291 def daemonize EventHub.logger.info("Processor [#{@name}] is going to start as daemon") # daemonize Process.daemon @pidfile.write(Process.pid.to_s) end
handle_connection_loss()
click to toggle source
# File lib/eventhub/processor.rb, line 251 def handle_connection_loss @connection.on_tcp_connection_loss do |conn, settings| EventHub.logger.warn("Processor lost tcp connection. Trying to restart in #{self.restart_in_s} seconds...") conn.reconnect(false, self.restart_in_s) end end
handle_start_internal()
click to toggle source
# File lib/eventhub/processor.rb, line 191 def handle_start_internal AMQP.start(self.connection_settings) do |connection, open_ok| @connection = connection handle_connection_loss # create channel @channel_receiver = AMQP::Channel.new(@connection) @channel_receiver.prefetch(100) @channel_receiver.auto_recovery = true if @channel_receiver.auto_recovering? EventHub.logger.warn("Channel #{@channel_receiver.id} IS auto-recovering") end self.listener_queues.each do |queue_name| # connect to queue queue = @channel_receiver.queue(queue_name, durable: true, auto_delete: false) # subscribe to queue queue.subscribe(:ack => true) do |metadata, payload| begin statistics.measure(payload.size) do messages_to_send = @message_processor.process({ metadata: metadata, queue_name: queue_name}, payload) # ack message before publish metadata.ack # forward invalid or returned messages to dispatcher messages_to_send.each do |message| send_message(message) end if messages_to_send end rescue EventHub::NoDeadletterException => e @channel_receiver.reject(metadata.delivery_tag, true) EventHub.logger.error("Unexpected exception in handle_message method: #{e}. Message will be requeued.") @exception_writer.write(e) sleep_break self.restart_in_s rescue => e @channel_receiver.reject(metadata.delivery_tag, false) EventHub.logger.error("Unexpected exception in handle_message method: #{e}. Message dead lettered.") @exception_writer.write(e) end end end EventHub.logger.info("Processor [#{@name}] is listening to vhost [#{self.server_vhost}], queues [#{self.listener_queues.join(', ')}]") register_timers # send first heartbeat heartbeat end end
heartbeat(action = 'running')
click to toggle source
# File lib/eventhub/processor.rb, line 263 def heartbeat(action = 'running') message = @heartbeat.build_message(action) message.append_to_execution_history(@name) send_message(message) end
post_start()
click to toggle source
# File lib/eventhub/processor.rb, line 300 def post_start # method which can be overwritten to call a code sequence after reactor start end
post_stop()
click to toggle source
# File lib/eventhub/processor.rb, line 304 def post_stop # method which can be overwritten to call a code sequence after reactor stop end
register_timers()
click to toggle source
# File lib/eventhub/processor.rb, line 258 def register_timers EventMachine.add_timer(watchdog_cycle_in_s) { watchdog } EventMachine.add_periodic_timer(heartbeat_cycle_in_s) { heartbeat } end
stop_processor(restart=false)
click to toggle source
# File lib/eventhub/processor.rb, line 274 def stop_processor(restart=false) @restart = restart # close channels [@channel_receiver, @channel_sender].each do |channel| if channel channel.close if channel.open? end end # stop connection and event loop if @connection @connection.disconnect if @connection.connected? EventMachine.stop if EventMachine.reactor_running? end end