class RFlow::Components::HTTP::Server

Implements a HTTP server based on eventmachine_httpserver. Accepts incoming HTTP connections, marshals the HTTP request into an RFlow message, annotates the message with a bit of provenance (see below) and then sends the message out its {request_port}. When a HTTP response message is received on {response_port}, checks the provenance to see if it matches an underlying TCP/HTTP connection and, if so, creates an actual HTTP response from the incoming message and sends it to the client.

The HTTP request message sent from the HTTP server component utilizes the RFlow::Message provenance feature to annotate a request message with a bit of metadata that allows subsequent response messages to be matched to their underlying TCP/HTTP connections. This means that any component that processes HTTP request messages to generate response messages must copy the provenance from the request message to the response message.

Attributes

closed_connections[RW]

@!visibility private

connections[RW]

@!visibility private

listen[RW]

@!visibility private

port[RW]

@!visibility private

proxy_real_client_ip_header[RW]

@!visibility private

proxy_real_client_port_header[RW]

@!visibility private

proxy_real_server_ip_header[RW]

@!visibility private

proxy_real_server_port_header[RW]

@!visibility private

server_signature[RW]

@!visibility private

Public Instance Methods

configure!(config) click to toggle source

RFlow-called method at startup. @return [void]

# File lib/rflow/components/http/server.rb, line 44
def configure!(config)
  @listen = config['listen'] ? config['listen'] : '127.0.0.1'
  @port = config['port'] ? config['port'].to_i : 8000
  @proxy_real_client_ip_header = config.has_key?('proxy-real-client-ip-header') ? config['proxy-real-client-ip-header'] : 'X-Real-IP'
  @proxy_real_client_port_header = config.has_key?('proxy-real-client-port-header') ? config['proxy-real-client-port-header'] : 'X-Real-Port'
  @proxy_real_server_ip_header = config.has_key?('proxy-real-server-ip-header') ? config['proxy-real-server-ip-header'] : 'X-Server-IP'
  @proxy_real_server_port_header = config.has_key?('proxy-real-server-port-header') ? config['proxy-real-server-port-header'] : 'X-Server-Port'
  @connections = {}
  @closed_connections = ActiveSupport::Cache::MemoryStore.new(expires_in: 5.minutes)
end
process_message(input_port, input_port_key, connection, message) click to toggle source

RFlow-called method upon message arrival.

Filters for messages that pertain to this component and have active connections by inspecting the provenance, specifically the context attribute that we stored originally.

@return [void]

# File lib/rflow/components/http/server.rb, line 72
def process_message(input_port, input_port_key, connection, message)
  return unless message.data_type_name == 'RFlow::Message::Data::HTTP::Response'
  my_events = message.provenance.find_all {|processing_event| processing_event.component_instance_uuid == uuid}

  my_events.each do |processing_event|
    connection_signature_string = processing_event.context.to_s
    if connections[connection_signature_string]
      connections[connection_signature_string].send_http_response message
    else
      conn = closed_connections.read(connection_signature_string)
      if conn
        RFlow.logger.info "#{name}: Could not send HTTP response to #{conn.client_details}: connection is already closed"
      else
        RFlow.logger.info "#{name}: Could not send HTTP response to <client details expired>: connection is already closed"
      end
    end
  end
end
run!() click to toggle source

RFlow-called method at startup. @return [void]

# File lib/rflow/components/http/server.rb, line 57
def run!
  @server_signature = EM.start_server(@listen, @port, Connection) do |conn|
    conn.server = self
    self.connections[conn.signature.to_s] = conn
    RFlow.logger.debug { "#{name}: Connection from #{conn.client_details} to #{conn.server_details}" }
  end
end