class Lucid::Shopify::BulkRequest::Operation
Public Instance Methods
Wait for the operation to complete, then download the JSONL result data which is yielded as an {Enumerator} to the block. The data is streamed and parsed line by line to limit memory usage.
@param delay [Integer] delay between polling requests in seconds @param http [HTTP::Client]
@yield [Enumerator<Hash>] yields each parsed line of JSONL
@raise CanceledOperationError @raise ExpiredOperationError @raise FailedOperationError @raise ObsoleteOperationError
# File lib/lucid/shopify/bulk_request.rb, line 42 def call(delay: 1, http: Container[:http], &block) url = loop do status, url = poll case status when 'CANCELED' raise CanceledOperationError when 'EXPIRED' raise ExpiredOperationError when 'FAILED' raise FailedOperationError when 'COMPLETED' break url else sleep(delay) end end return if url.nil? # TODO: Verify signature? begin file = Tempfile.new(mode: 0600) body = http.get(url).body until (chunk = body.readpartial).nil? file.write(chunk) end file.rewind block.(Enumerator.new do |y| file.each_line { |line| y << JSON.parse(line) } end) ensure file.close file.unlink end end
Cancel the bulk operation.
@raise ObsoleteOperationError @raise TimeoutError
# File lib/lucid/shopify/bulk_request.rb, line 84 def cancel begin client.post_graphql(credentials, <<~QUERY) mutation { bulkOperationCancel(id: "#{id}") { userErrors { field message } } } QUERY rescue Response::GraphQLClientError => e return if e.response.error_message?([ /cannot be canceled when it is completed/, ]) raise e end poll_until(['CANCELED', 'COMPLETED']) end
Poll until operation status is met.
@param statuses [Array<String>] to terminate polling on @param timeout [Integer] in seconds
@raise ObsoleteOperationError @raise TimeoutError
# File lib/lucid/shopify/bulk_request.rb, line 114 def poll_until(statuses, timeout: 60) Timeout.timeout(timeout) do loop do status, _ = poll break if statuses.any? { |expected_status| status == expected_status } end end rescue Timeout::Error raise TimeoutError, 'exceeded %s seconds polling for status %s' % [ timeout, statuses.join(', '), ] end
Private Instance Methods
@return [Array(String, String | nil)] the operation status and the
download URL, or nil if the result data is empty
# File lib/lucid/shopify/bulk_request.rb, line 131 def poll op = client.post_graphql(credentials, <<~QUERY)['data']['currentBulkOperation'] { currentBulkOperation { id status url } } QUERY raise ObsoleteOperationError if op['id'] != id [ op['status'], op['url'], ] end