class Google::Cloud::Bigtable::ChunkProcessor
@private # ChunkProcessor
Read a chunk of data and merge it based on states and build rows and cells
Constants
- CELL_IN_PROGRESS
- NEW_ROW
Row
states- ROW_IN_PROGRESS
Attributes
Current cached row data
Current cell values
Current cell values
Current cell values
Current cell values
Current cell values
Current state
Current cached row data
Current state
Public Class Methods
@private
Create chunk reader object and set row state to new
# File lib/google/cloud/bigtable/chunk_processor.rb, line 45 def initialize reset_to_new_row end
Public Instance Methods
Set next state of row.
@return [Google::Cloud::Bigtable::Row]
# File lib/google/cloud/bigtable/chunk_processor.rb, line 173 def next_state! if cur_val self.cur_val += chunk.value else self.cur_val = chunk.value end if chunk.value_size.zero? persist_cell self.state = ROW_IN_PROGRESS else self.state = CELL_IN_PROGRESS end return unless chunk.commit_row self.last_key = row.key completed_row = row reset_to_new_row completed_row end
Build cell and append to row.
# File lib/google/cloud/bigtable/chunk_processor.rb, line 196 def persist_cell cell = Row::Cell.new cur_family, cur_qaul, cur_ts, cur_val, cur_labels row.cells[cur_family] << cell # Clear cached cell values self.cur_val = nil self.cur_ts = nil self.cur_labels = nil end
Process chunk and build full row with cells
@param chunk [Google::Cloud::Bigtable::V2::ReadRowsResponse::CellChunk]
# File lib/google/cloud/bigtable/chunk_processor.rb, line 54 def process chunk self.chunk = chunk raise_if chunk.value_size.positive?, "Commit rows cannot have a non-zero value_size." if chunk.commit_row case state when NEW_ROW process_new_row when CELL_IN_PROGRESS process_cell_in_progress when ROW_IN_PROGRESS process_row_in_progress end end
Process chunk if row cell state is in progress
@return [Google::Cloud::Bigtable::Row]
# File lib/google/cloud/bigtable/chunk_processor.rb, line 160 def process_cell_in_progress validate_reset_row return reset_to_new_row if chunk.reset_row next_state! end
Process new row by setting values from current chunk.
@return [Google::Cloud::Bigtable::Row]
# File lib/google/cloud/bigtable/chunk_processor.rb, line 124 def process_new_row validate_new_row return if chunk.family_name.nil? || chunk.qualifier.nil? row.key = chunk.row_key self.cur_family = chunk.family_name.value self.cur_qaul = chunk.qualifier.value self.cur_ts = chunk.timestamp_micros self.cur_labels = chunk.labels next_state! end
Process chunk if row state is in progress
@return [Google::Cloud::Bigtable::Row]
# File lib/google/cloud/bigtable/chunk_processor.rb, line 143 def process_row_in_progress validate_row_in_progress return reset_to_new_row if chunk.reset_row self.cur_family = chunk.family_name.value if chunk.family_name self.cur_qaul = chunk.qualifier.value if chunk.qualifier self.cur_ts = chunk.timestamp_micros self.cur_labels = chunk.labels if chunk.labels next_state! end
Reset read state and cached data
# File lib/google/cloud/bigtable/chunk_processor.rb, line 207 def reset_to_new_row self.row = Row.new self.state = NEW_ROW self.cur_family = nil self.cur_qaul = nil self.cur_ts = nil self.cur_val = nil self.cur_labels = nil end
Validate last row is completed
@raise [Google::Cloud::Bigtable::InvalidRowStateError]
If read rows response end without last row completed
# File lib/google/cloud/bigtable/chunk_processor.rb, line 223 def validate_last_row_complete return if row.key.nil? raise_if !chunk.commit_row, "Response ended with pending row without commit" end
Validate chunk has new row state
@raise [Google::Cloud::Bigtable::InvalidRowStateError]
If row already has a set key, chunk has an empty row key, chunk state is reset, new row key is the same as the last-read key, or if family name or column qualifier are empty
# File lib/google/cloud/bigtable/chunk_processor.rb, line 96 def validate_new_row raise_if row.key, "A new row cannot have existing state" raise_if chunk.row_key.empty?, "A row key must be set" raise_if chunk.reset_row, "A new row cannot be reset" raise_if last_key == chunk.row_key, "A commit happened but the same key followed" raise_if chunk.family_name.nil?, "A family must be set" raise_if chunk.qualifier.nil?, "A column qualifier must be set" end
Validate row status commit or reset
@raise [Google::Cloud::Bigtable::InvalidRowStateError]
if chunk has data on reset row state
# File lib/google/cloud/bigtable/chunk_processor.rb, line 76 def validate_reset_row return unless chunk.reset_row value = (!chunk.row_key.empty? || chunk.family_name || chunk.qualifier || !chunk.value.empty? || chunk.timestamp_micros.positive?) raise_if value, "A reset should have no data" end
Validate chunk merge is in progress to build new row
@raise [Google::Cloud::Bigtable::InvalidRowStateError]
If row and chunk row key are not same or chunk row key is empty.
# File lib/google/cloud/bigtable/chunk_processor.rb, line 111 def validate_row_in_progress raise_if !chunk.row_key.empty? && chunk.row_key != row.key, "A commit is required between row keys" raise_if chunk.family_name && chunk.qualifier.nil?, "A qualifier must be specified" validate_reset_row end
Private Instance Methods
Raise error on condition failure
@raise [Google::Cloud::Bigtable::InvalidRowStateError]
# File lib/google/cloud/bigtable/chunk_processor.rb, line 236 def raise_if condition, message raise InvalidRowStateError.new(message, chunk.to_h) if condition end