class Google::Cloud::Bigtable::ChunkProcessor

@private # ChunkProcessor

Read a chunk of data and merge it based on states and build rows and cells

Constants

CELL_IN_PROGRESS
NEW_ROW

Row states

ROW_IN_PROGRESS

Attributes

chunk[RW]

Current cached row data

cur_family[RW]

Current cell values

cur_labels[RW]

Current cell values

cur_qaul[RW]

Current cell values

cur_ts[RW]

Current cell values

cur_val[RW]

Current cell values

last_key[RW]

Current state

row[RW]

Current cached row data

state[RW]

Current state

Public Class Methods

new() click to toggle source

@private

Create chunk reader object and set row state to new

# File lib/google/cloud/bigtable/chunk_processor.rb, line 45
def initialize
  reset_to_new_row
end

Public Instance Methods

next_state!() click to toggle source

Set next state of row.

@return [Google::Cloud::Bigtable::Row]

# File lib/google/cloud/bigtable/chunk_processor.rb, line 173
def next_state!
  if cur_val
    self.cur_val += chunk.value
  else
    self.cur_val = chunk.value
  end

  if chunk.value_size.zero?
    persist_cell
    self.state = ROW_IN_PROGRESS
  else
    self.state = CELL_IN_PROGRESS
  end

  return unless chunk.commit_row

  self.last_key = row.key
  completed_row = row
  reset_to_new_row
  completed_row
end
persist_cell() click to toggle source

Build cell and append to row.

# File lib/google/cloud/bigtable/chunk_processor.rb, line 196
def persist_cell
  cell = Row::Cell.new cur_family, cur_qaul, cur_ts, cur_val, cur_labels
  row.cells[cur_family] << cell

  # Clear cached cell values
  self.cur_val = nil
  self.cur_ts = nil
  self.cur_labels = nil
end
process(chunk) click to toggle source

Process chunk and build full row with cells

@param chunk [Google::Cloud::Bigtable::V2::ReadRowsResponse::CellChunk]

# File lib/google/cloud/bigtable/chunk_processor.rb, line 54
def process chunk
  self.chunk = chunk


  raise_if chunk.value_size.positive?, "Commit rows cannot have a non-zero value_size." if chunk.commit_row

  case state
  when NEW_ROW
    process_new_row
  when CELL_IN_PROGRESS
    process_cell_in_progress
  when ROW_IN_PROGRESS
    process_row_in_progress
  end
end
process_cell_in_progress() click to toggle source

Process chunk if row cell state is in progress

@return [Google::Cloud::Bigtable::Row]

# File lib/google/cloud/bigtable/chunk_processor.rb, line 160
def process_cell_in_progress
  validate_reset_row

  return reset_to_new_row if chunk.reset_row

  next_state!
end
process_new_row() click to toggle source

Process new row by setting values from current chunk.

@return [Google::Cloud::Bigtable::Row]

# File lib/google/cloud/bigtable/chunk_processor.rb, line 124
def process_new_row
  validate_new_row

  return if chunk.family_name.nil? || chunk.qualifier.nil?

  row.key = chunk.row_key
  self.cur_family = chunk.family_name.value
  self.cur_qaul = chunk.qualifier.value
  self.cur_ts = chunk.timestamp_micros
  self.cur_labels = chunk.labels

  next_state!
end
process_row_in_progress() click to toggle source

Process chunk if row state is in progress

@return [Google::Cloud::Bigtable::Row]

# File lib/google/cloud/bigtable/chunk_processor.rb, line 143
def process_row_in_progress
  validate_row_in_progress

  return reset_to_new_row if chunk.reset_row

  self.cur_family = chunk.family_name.value if chunk.family_name
  self.cur_qaul = chunk.qualifier.value if chunk.qualifier
  self.cur_ts = chunk.timestamp_micros
  self.cur_labels = chunk.labels if chunk.labels
  next_state!
end
reset_to_new_row() click to toggle source

Reset read state and cached data

# File lib/google/cloud/bigtable/chunk_processor.rb, line 207
def reset_to_new_row
  self.row = Row.new
  self.state = NEW_ROW
  self.cur_family = nil
  self.cur_qaul = nil
  self.cur_ts = nil
  self.cur_val = nil
  self.cur_labels = nil
end
validate_last_row_complete() click to toggle source

Validate last row is completed

@raise [Google::Cloud::Bigtable::InvalidRowStateError]

If read rows response end without last row completed
# File lib/google/cloud/bigtable/chunk_processor.rb, line 223
def validate_last_row_complete
  return if row.key.nil?

  raise_if !chunk.commit_row, "Response ended with pending row without commit"
end
validate_new_row() click to toggle source

Validate chunk has new row state

@raise [Google::Cloud::Bigtable::InvalidRowStateError]

If row already has a set key, chunk has an empty row key, chunk
state is reset, new row key is the same as the last-read key,
or if family name or column qualifier are empty
# File lib/google/cloud/bigtable/chunk_processor.rb, line 96
def validate_new_row
  raise_if row.key, "A new row cannot have existing state"
  raise_if chunk.row_key.empty?, "A row key must be set"
  raise_if chunk.reset_row, "A new row cannot be reset"
  raise_if last_key == chunk.row_key, "A commit happened but the same key followed"
  raise_if chunk.family_name.nil?, "A family must be set"
  raise_if chunk.qualifier.nil?, "A column qualifier must be set"
end
validate_reset_row() click to toggle source

Validate row status commit or reset

@raise [Google::Cloud::Bigtable::InvalidRowStateError]

if chunk has data on reset row state
# File lib/google/cloud/bigtable/chunk_processor.rb, line 76
def validate_reset_row
  return unless chunk.reset_row

  value = (!chunk.row_key.empty? ||
      chunk.family_name ||
      chunk.qualifier ||
      !chunk.value.empty? ||
      chunk.timestamp_micros.positive?)

  raise_if value, "A reset should have no data"
end
validate_row_in_progress() click to toggle source

Validate chunk merge is in progress to build new row

@raise [Google::Cloud::Bigtable::InvalidRowStateError]

If row and chunk row key are not same or chunk row key is empty.
# File lib/google/cloud/bigtable/chunk_processor.rb, line 111
def validate_row_in_progress
  raise_if !chunk.row_key.empty? && chunk.row_key != row.key, "A commit is required between row keys"

  raise_if chunk.family_name && chunk.qualifier.nil?, "A qualifier must be specified"

  validate_reset_row
end

Private Instance Methods

raise_if(condition, message) click to toggle source

Raise error on condition failure

@raise [Google::Cloud::Bigtable::InvalidRowStateError]

# File lib/google/cloud/bigtable/chunk_processor.rb, line 236
def raise_if condition, message
  raise InvalidRowStateError.new(message, chunk.to_h) if condition
end