class EventHub::ActorHeartbeat

Heartbeat class

Public Class Methods

new(processor_instance) click to toggle source
# File lib/eventhub/actor_heartbeat.rb, line 9
def initialize(processor_instance)
  @processor_instance = processor_instance
  async.start
end

Public Instance Methods

cleanup() click to toggle source
# File lib/eventhub/actor_heartbeat.rb, line 27
def cleanup
  EventHub.logger.info("Heartbeat is cleaning up...")
  publish(heartbeat(action: "stopped"))
  EventHub.logger.info("Heartbeat has sent a [stopped] beat")
end
start() click to toggle source
# File lib/eventhub/actor_heartbeat.rb, line 14
def start
  EventHub.logger.info("Heartbeat is starting...")

  every(60 * 60 * 24) { EventHub.logger.info("Actual actors: #{Celluloid::Actor.all.size}: #{Celluloid::Actor.all.map { |a| a.class }.join(", ")}") }

  publish(heartbeat(action: "started"))
  EventHub.logger.info("Heartbeat has sent [started] beat")
  loop do
    sleep Configuration.processor[:heartbeat_cycle_in_s]
    publish(heartbeat)
  end
end

Private Instance Methods

addresses() click to toggle source
# File lib/eventhub/actor_heartbeat.rb, line 91
def addresses
  interfaces = Socket.getifaddrs.select { |interface|
    !interface.addr.ipv4_loopback? && !interface.addr.ipv6_loopback?
  }

  interfaces.map { |interface|
    begin
      {
        interface: interface.name,
        host_name: Socket.gethostname,
        ip_address: interface.addr.ip_address
      }
    rescue
      nil # will be ignored
    end
  }.compact
end
heartbeat(args = {action: "running"}) click to toggle source
# File lib/eventhub/actor_heartbeat.rb, line 52
def heartbeat(args = {action: "running"})
  message = EventHub::Message.new
  message.origin_module_id = EventHub::Configuration.name
  message.origin_type = "processor"
  message.origin_site_id = "global"

  message.process_name = "event_hub.heartbeat"

  now = Time.now

  # message structure needs more changes
  message.body = {
    version: @processor_instance.send(:version),
    action: args[:action],
    pid: Process.pid,
    process_name: "event_hub.heartbeat",
    heartbeat: {
      started: now_stamp(started_at),
      stamp_last_beat: now_stamp(now),
      uptime_in_ms: (now - started_at) * 1000,
      heartbeat_cycle_in_ms: Configuration.processor[:heartbeat_cycle_in_s] * 1000,
      queues_consuming_from: EventHub::Configuration.processor[:listener_queues],
      queues_publishing_to: [EventHub::EH_X_INBOUND], # needs more dynamic in the future
      host: Socket.gethostname,
      addresses: addresses,
      messages: messages_statistics
    }
  }
  message.to_json
end
messages_statistics() click to toggle source
# File lib/eventhub/actor_heartbeat.rb, line 109
def messages_statistics
  {
    total: statistics.messages_total,
    successful: statistics.messages_successful,
    unsuccessful: statistics.messages_unsuccessful,
    average_size: statistics.messages_average_size,
    average_process_time_in_ms:
      statistics.messages_average_process_time * 1000,
    total_process_time_in_ms:
      statistics.messages_total_process_time * 1000
  }
end
publish(message) click to toggle source
# File lib/eventhub/actor_heartbeat.rb, line 35
def publish(message)
  connection = create_bunny_connection
  connection.start
  channel = connection.create_channel
  channel.confirm_select
  exchange = channel.direct(EventHub::EH_X_INBOUND, durable: true)
  exchange.publish(message, persistent: true)
  success = channel.wait_for_confirms

  unless success
    raise "Published heartbeat message has "\
      "not been confirmed by the server"
  end
ensure
  connection&.close
end
started_at() click to toggle source
# File lib/eventhub/actor_heartbeat.rb, line 83
def started_at
  @processor_instance.started_at
end
statistics() click to toggle source
# File lib/eventhub/actor_heartbeat.rb, line 87
def statistics
  @processor_instance.statistics
end