class PowerTrack::Stream

A PowerTrack stream to be used for both updating the rules and collecting new messages.

Constants

DEFAULT_CONNECTION_TIMEOUT

The default timeout on a connection to PowerTrack. Can be overriden per call.

DEFAULT_INACTIVITY_TIMEOUT

The default timeout for inactivity on a connection to PowerTrack. Can be overriden per call.

DEFAULT_LIST_RULES_OPTIONS
DEFAULT_OK_RESPONSE_STATUS
DEFAULT_RULES_REQUEST_OPTIONS
DEFAULT_STREAM_OPTIONS

The default options for using the stream.

DEFAULT_TRACK_OPTIONS
FEATURE_URL_FORMAT

The format of the URL to connect to the stream service

hostname, domain, feature, stream type, account, source, label, sub-feature
HEARTBEAT_MESSAGE_PATTERN

The patterns used to identify the various types of message received from GNIP everything else is an activity

REPLAY_TIMESTAMP_FORMAT

The format used to send UTC timestamps in Replay mode

SYSTEM_MESSAGE_PATTERN

Attributes

account_name[R]
data_source[R]
label[R]
username[R]

Public Class Methods

new(username, password, account_name, data_source, label, options=nil) click to toggle source
# File lib/powertrack/streaming/stream.rb, line 54
def initialize(username, password, account_name, data_source, label, options=nil)
  @username = username
  @password = password
  @account_name = account_name
  @data_source = data_source
  @label = label
  @options = DEFAULT_STREAM_OPTIONS.merge(options || {})
  @replay = !!@options[:replay]
end

Public Instance Methods

add_rules(*rules) click to toggle source

Adds many rules to your PowerTrack stream’s ruleset.

POST /rules

See support.gnip.com/apis/powertrack/api_reference.html#AddRules

# File lib/powertrack/streaming/stream.rb, line 69
def add_rules(*rules)
  # flatten the rules in case it was provided as an array
  make_rules_request(:post,
    body: MultiJson.encode('rules' => rules.flatten),
    ok: 201)
end
delete_rules(*rules) click to toggle source

Removes the specified rules from the stream.

DELETE /rules

See support.gnip.com/apis/powertrack/api_reference.html#DeleteRules

# File lib/powertrack/streaming/stream.rb, line 81
def delete_rules(*rules)
  # v2 does not use DELETE anymore
  delete_verb = :post
  # flatten the rules in case it was provided as an array
  delete_options = { body: MultiJson.encode('rules' => rules.flatten) }
  # v2 uses a query parameter
  delete_options[:query] = { '_method' => 'delete' }

  make_rules_request(delete_verb, delete_options)
end
list_rules(options=nil) click to toggle source

Retrieves all existing rules for a stream.

Returns an array of PowerTrack::Rule objects when the response permits so.

GET /rules

See support.gnip.com/apis/powertrack/api_reference.html#ListRules

# File lib/powertrack/streaming/stream.rb, line 104
def list_rules(options=nil)
  options = DEFAULT_LIST_RULES_OPTIONS.merge(options || {})
  res = make_rules_request(:get, headers: gzip_compressed_header(options[:compressed]))

  # return Rule objects when required and feasible/appropriate
  if options[:objectify] &&
     res.is_a?(Hash) &&
     (rules = res['rules']).is_a?(Array) &&
     rules.all? { |rule| rule.is_a?(Hash) && rule.key?('value') }
    rules.map do |rule|
      PowerTrack::Rule.new(rule['value'], tag: rule['tag'], id: rule['id'])
    end
  else
    res
  end
end
track(options=nil) click to toggle source

Establishes a persistent connection to the PowerTrack data stream, through which the social data will be delivered.

Manages reconnections when being disconnected.

GET /track/:stream

See support.gnip.com/apis/powertrack/api_reference.html#Stream

# File lib/powertrack/streaming/stream.rb, line 159
def track(options=nil)
  options = DEFAULT_TRACK_OPTIONS.merge(options || {})
  retrier = PowerTrack::Retrier.new(options[:max_retries])
  handle_api_response(*retrier.retry { track_once(options, retrier) })
end

Private Instance Methods

auth_header() click to toggle source

Returns the authorization header to join to the HTTP request.

# File lib/powertrack/streaming/stream.rb, line 193
def auth_header
  { 'authorization' => [ @username, @password ] }
end
common_req_headers() click to toggle source

Returns the HTTP headers common to each valid PowerTrack request. Each call returns a new hash which can be safely modified by the caller.

# File lib/powertrack/streaming/stream.rb, line 213
def common_req_headers
  { 'accept' => 'application/json',
    'content-type' => 'application/json; charset=utf-8',
    :redirects => 3 }.merge(auth_header)
end
connect(hostname, feature=nil, sub_feature=nil) click to toggle source

Opens a new connection to GNIP PowerTrack.

# File lib/powertrack/streaming/stream.rb, line 205
def connect(hostname, feature=nil, sub_feature=nil)
  url = feature_url(hostname, feature, sub_feature)
  logger.debug("Connecting to '#{url}' with headers #{connection_headers}...")
  EventMachine::HttpRequest.new(url, connection_headers)
end
connection_headers() click to toggle source

Returns the HTTP headers common to each valid PowerTrack connection. Each call returns a new hash which can be safely modified by the caller.

# File lib/powertrack/streaming/stream.rb, line 199
def connection_headers
  { connect_timeout: @options[:connect_timeout],
    inactivity_timeout: @options[:inactivity_timeout] }
end
feature_url(hostname, feature=nil, sub_feature=nil) click to toggle source

Returns the URL of the stream for a given feature.

# File lib/powertrack/streaming/stream.rb, line 168
def feature_url(hostname, feature=nil, sub_feature=nil)
  feature ||= @replay ? 'replay' : hostname
  sub_feature = sub_feature ? "/#{sub_feature}" : ''
  stream_type = (feature == 'rules' && @replay ? 'powertrack-replay' : 'powertrack')
  # replay streaming is on gnip.com while replay rules are on twitter.com...
  domain = (feature == 'replay' && @replay  ? 'gnip' : 'twitter')

  FEATURE_URL_FORMAT %
    [ hostname,
      domain,
      feature,
      stream_type,
      @account_name,
      @data_source,
      @label,
      sub_feature ]
end
gzip_compressed_header(compressed) click to toggle source

Returns the HTTP header that turns on GZip-based compression if required. Each call returns a new hash which can be safely modified by the caller.

# File lib/powertrack/streaming/stream.rb, line 188
def gzip_compressed_header(compressed)
  compressed ? { 'accept-encoding' => 'gzip, compressed' } : {}
end
handle_api_response(status, error, body, ok=DEFAULT_OK_RESPONSE_STATUS) click to toggle source

Returns an appropriate return value or exception according to the response obtained on an API request.

# File lib/powertrack/streaming/stream.rb, line 249
def handle_api_response(status, error, body, ok=DEFAULT_OK_RESPONSE_STATUS)
  case status
  when nil
    # connection issue
    raise PowerTrack::ConnectionError.new(error)
  when ok
    # successful call: return the body unless there isn't any
    return nil if body.nil?

    parse_json_body(body) do |exception|
      # invalid JSON response
      raise PowerTrack::InvalidResponseError.new(ok, exception.message, body)
    end
  else
    # specified response status
    raise PowerTrack::WithStatusPowerTrackError.build(status, error, parse_json_body(body))
  end
end
make_rules_request(verb, options=nil) click to toggle source

Makes a rules-related request with a specific HTTP verb and a few options. Returns the response if successful or an exception if the request failed.

# File lib/powertrack/streaming/stream.rb, line 277
def make_rules_request(verb, options=nil)
  options = DEFAULT_RULES_REQUEST_OPTIONS.merge(options || {})
  resp_status = nil
  resp_error = nil
  resp_body = nil

  EM.run do
    con = connect('api', 'rules')
    http = con.setup_request(verb,
             head: rules_req_headers.merge(options[:headers]),
             query: options[:query],
             body: options[:body])

    http.errback do
      resp_error = http.error
      EM.stop
    end

    http.callback do
      resp_status = http.response_header.status
      resp_error = http.error
      resp_body = http.response
      EM.stop
    end
  end

  handle_api_response(resp_status, resp_error, resp_body, options[:ok])
end
message_type(message) click to toggle source

Returns the type of message received on the stream, together with a level indicator in case of a system message, nil otherwise.

# File lib/powertrack/streaming/stream.rb, line 308
def message_type(message)
  case message
  when HEARTBEAT_MESSAGE_PATTERN then [ :heartbeat, nil ]
  when SYSTEM_MESSAGE_PATTERN then [ :system, $1.downcase.to_sym ]
  else
    [ :activity, nil ]
  end
end
parse_json_body(body) { |$!| ... } click to toggle source

Parses a JSON-formatted body received as the response of a PowerTrack API request.

Returns nil when the body is empty, the Ruby object decoded from the JSON-formatted body otherwise.

If the parsing fails, returns the value returned by the given block which is called with the textual body as a single argument. If no block id, return the textual body initially received.

# File lib/powertrack/streaming/stream.rb, line 234
def parse_json_body(body, &block)
  body = (body || '').strip
  begin
    body == '' ? nil : MultiJson.load(body)
  rescue
    if block_given?
      yield($!)
    else
      body
    end
  end
end
rules_req_headers() click to toggle source

Returns the HTTP headers common to each valid /rules request. Each call returns a new hash which can be safely modified by the caller.

# File lib/powertrack/streaming/stream.rb, line 221
def rules_req_headers
  common_req_headers
end
track_once(options, retrier) click to toggle source

Connects to the /track endpoint.

# File lib/powertrack/streaming/stream.rb, line 325
def track_once(options, retrier)
  logger.info "Starting tracker for retry ##{retrier.retries}..."
  backfill_minutes = options[:backfill_minutes]
  stop_timeout = options[:stop_timeout]
  on_heartbeat = options[:on_heartbeat]
  on_message = options[:on_message]
  on_activity = options[:on_activity]
  on_system = options[:on_system]
  close_now = options[:close_now] || lambda { false }

  buffer = PowerTrack::DataBuffer.new
  closed = false
  disconnected = false
  resp_status = DEFAULT_OK_RESPONSE_STATUS
  resp_error = nil
  resp_body = nil

  EM.run do
    logger.info "Starting the reactor..."
    con = connect('stream')
    get_opts = {
      head: track_req_headers(options[:compressed]),
      query: {}
    }

    # add a timeframe in replay mode
    if @replay
      now = Time.now
      # start 1 hour ago by default
      from = options[:from] || (now - 60*60)
      # stop 30 minutes ago by default
      to = options[:to] || (now - 30*60)

      get_opts[:query].merge!({
        'fromDate' => from.utc.strftime(REPLAY_TIMESTAMP_FORMAT),
        'toDate' => to.utc.strftime(REPLAY_TIMESTAMP_FORMAT)
      })

      logger.info "Replay mode enabled from '#{from}' to '#{to}'"
    end

    if backfill_minutes
      get_opts[:query]['backfillMinutes'] = backfill_minutes
    end

    http = con.get(get_opts)

    # polls to see if the connection should be closed
    close_watcher = EM.add_periodic_timer(1) do
      # exit if required
      if close_now.call
        logger.info "Time to close the tracker"
        closed = true
        close_watcher.cancel
        con.close
      end
    end

    # simulate periodic disconnections
    if options[:fake_disconnections]
       EM.add_timer(rand(options[:fake_disconnections])) do
         con.close
       end
    end

    http.stream do |chunk|
      # ignore data if already disconnected, thus avoiding synchronizing the
      # buffer. Nevertheless, this should never happen...
      # TODO: log a warning if it happens

      if disconnected
        logger.warn "Message received while already disconnected"
        next
      end

      # process the chunk
      buffer.process(chunk) do |raw|
        logger.debug "New message received"

        # get the message type and its (optional) level
        m_type, m_level = message_type(raw)

        # reset retries when some (valid) data are received but not in replay
        # mode where we don't want to retry on the same timeframe again and
        # again when GNIP periodically fails
        if !@replay && retrier.retrying? && m_level != :error
          logger.info "Resetting retries..."
          retrier.reset!
        end

        EM.defer do
          # select the right message handler(s) according to the message type
          if m_type == :heartbeat
            on_heartbeat.call if on_heartbeat
          else
            # JSON decoding if required
            message = options[:raw] ? raw : MultiJson.decode(raw)

            on_message.call(message) if on_message

            case m_type
            when :system then on_system.call(message) if on_system
            when :activity then on_activity.call(message) if on_activity
            end
          end

          # TODO: manage exceptions at this level
        end
      end
    end

    # reconnection on error
    reconnect_cb = lambda do |http_client|
      logger.info "Disconnected after #{retrier.retries} retries"
      disconnected = true

      resp_status = http_client.response_header.status

      # stop the stream if required so or the replay is simply over
      if closed || (@replay && resp_status == DEFAULT_OK_RESPONSE_STATUS)
        # close immediately if required
        wait_til_defers_finish_and_stop(stop_timeout)
        # tell the retrier the tracking is over
        retrier.stop
      else
        # cancel the periodic close watcher
        close_watcher.cancel

        resp_status ||= DEFAULT_OK_RESPONSE_STATUS
        resp_error = http_client.error
        resp_body = http_client.response

        wait_til_defers_finish_and_stop(stop_timeout)
      end
    end

    http.callback(&reconnect_cb)
    http.errback(&reconnect_cb)
  end

  [ resp_status, resp_error, resp_body ]
end
track_req_headers(compressed) click to toggle source

Returns the HTTP headers for each valid /track request. Each call returns a new hash which can be safely modified by the caller.

# File lib/powertrack/streaming/stream.rb, line 319
def track_req_headers(compressed)
  common_req_headers.merge('connection' => 'keep-alive')
                    .merge(gzip_compressed_header(compressed))
end
wait_til_defers_finish_and_stop(timeout) click to toggle source

Waits for all the deferrable threads to complete, then stops the reactor.

# File lib/powertrack/streaming/stream.rb, line 469
def wait_til_defers_finish_and_stop(timeout)
  # wait for defers to terminate but no more than timeout...
  start = Time.now
  defers_waiter = EM.add_periodic_timer(0.2) do
    logger.info "Waiting for defers..."
    if EM.defers_finished? || (Time.now - start) > timeout
      defers_waiter.cancel
    end
  end
ensure
  logger.info "Stopping the reactor..."
  EM.stop
end