class Lucid::Shopify::BulkRequest::Operation

Public Instance Methods

call(delay: 1, http: Container[:http], &block) click to toggle source

Wait for the operation to complete, then download the JSONL result data which is yielded as an {Enumerator} to the block. The data is streamed and parsed line by line to limit memory usage.

@param delay [Integer] delay between polling requests in seconds @param http [HTTP::Client]

@yield [Enumerator<Hash>] yields each parsed line of JSONL

@raise CanceledOperationError @raise ExpiredOperationError @raise FailedOperationError @raise ObsoleteOperationError

# File lib/lucid/shopify/bulk_request.rb, line 42
def call(delay: 1, http: Container[:http], &block)
  url = loop do
    status, url = poll

    case status
    when 'CANCELED'
      raise CanceledOperationError
    when 'EXPIRED'
      raise ExpiredOperationError
    when 'FAILED'
      raise FailedOperationError
    when 'COMPLETED'
      break url
    else
      sleep(delay)
    end
  end

  return if url.nil?

  # TODO: Verify signature?

  begin
    file = Tempfile.new(mode: 0600)
    body = http.get(url).body
    until (chunk = body.readpartial).nil?
      file.write(chunk)
    end
    file.rewind
    block.(Enumerator.new do |y|
      file.each_line { |line| y << JSON.parse(line) }
    end)
  ensure
    file.close
    file.unlink
  end
end
cancel() click to toggle source

Cancel the bulk operation.

@raise ObsoleteOperationError @raise TimeoutError

# File lib/lucid/shopify/bulk_request.rb, line 84
        def cancel
          begin
            client.post_graphql(credentials, <<~QUERY)
              mutation {
                bulkOperationCancel(id: "#{id}") {
                  userErrors {
                    field
                    message
                  }
                }
              }
            QUERY
          rescue Response::GraphQLClientError => e
            return if e.response.error_message?([
              /cannot be canceled when it is completed/,
            ])

            raise e
          end

          poll_until(['CANCELED', 'COMPLETED'])
        end
poll_until(statuses, timeout: 60) click to toggle source

Poll until operation status is met.

@param statuses [Array<String>] to terminate polling on @param timeout [Integer] in seconds

@raise ObsoleteOperationError @raise TimeoutError

# File lib/lucid/shopify/bulk_request.rb, line 114
def poll_until(statuses, timeout: 60)
  Timeout.timeout(timeout) do
    loop do
      status, _ = poll

      break if statuses.any? { |expected_status| status == expected_status }
    end
  end
rescue Timeout::Error
  raise TimeoutError, 'exceeded %s seconds polling for status %s' % [
    timeout,
    statuses.join(', '),
  ]
end

Private Instance Methods

poll() click to toggle source

@return [Array(String, String | nil)] the operation status and the

download URL, or nil if the result data is empty
# File lib/lucid/shopify/bulk_request.rb, line 131
                def poll
          op = client.post_graphql(credentials, <<~QUERY)['data']['currentBulkOperation']
            {
              currentBulkOperation {
                id
                status
                url
              }
            }
          QUERY

          raise ObsoleteOperationError if op['id'] != id

          [
            op['status'],
            op['url'],
          ]
        end