As of January 1, 2020 this library no longer supports Python 2 on the latest released version. Library versions released prior to that date will continue to be available. For more information please visit Python 2 support on Google Cloud.

Source code for google.cloud.spanner_v1.batch

# Copyright 2016 Google LLC All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Context manager for Cloud Spanner batched writes."""

from google.cloud.spanner_v1 import CommitRequest
from google.cloud.spanner_v1 import Mutation
from google.cloud.spanner_v1 import TransactionOptions

from google.cloud.spanner_v1._helpers import _SessionWrapper
from google.cloud.spanner_v1._helpers import _make_list_value_pbs
from google.cloud.spanner_v1._helpers import _metadata_with_prefix
from google.cloud.spanner_v1._opentelemetry_tracing import trace_call
from google.cloud.spanner_v1 import RequestOptions


class _BatchBase(_SessionWrapper):
    """Accumulate mutations for transmission during :meth:`commit`.

    :type session: :class:`~google.cloud.spanner_v1.session.Session`
    :param session: the session used to perform the commit
    """

    transaction_tag = None
    _read_only = False

    def __init__(self, session):
        super(_BatchBase, self).__init__(session)
        self._mutations = []

    def _check_state(self):
        """Helper for :meth:`commit` et al.

        Subclasses must override

        :raises: :exc:`ValueError` if the object's state is invalid for making
                 API requests.
        """
        raise NotImplementedError

    def insert(self, table, columns, values):
        """Insert one or more new table rows.

        :type table: str
        :param table: Name of the table to be modified.

        :type columns: list of str
        :param columns: Name of the table columns to be modified.

        :type values: list of lists
        :param values: Values to be modified.
        """
        self._mutations.append(Mutation(insert=_make_write_pb(table, columns, values)))

    def update(self, table, columns, values):
        """Update one or more existing table rows.

        :type table: str
        :param table: Name of the table to be modified.

        :type columns: list of str
        :param columns: Name of the table columns to be modified.

        :type values: list of lists
        :param values: Values to be modified.
        """
        self._mutations.append(Mutation(update=_make_write_pb(table, columns, values)))

    def insert_or_update(self, table, columns, values):
        """Insert/update one or more table rows.

        :type table: str
        :param table: Name of the table to be modified.

        :type columns: list of str
        :param columns: Name of the table columns to be modified.

        :type values: list of lists
        :param values: Values to be modified.
        """
        self._mutations.append(
            Mutation(insert_or_update=_make_write_pb(table, columns, values))
        )

    def replace(self, table, columns, values):
        """Replace one or more table rows.

        :type table: str
        :param table: Name of the table to be modified.

        :type columns: list of str
        :param columns: Name of the table columns to be modified.

        :type values: list of lists
        :param values: Values to be modified.
        """
        self._mutations.append(Mutation(replace=_make_write_pb(table, columns, values)))

    def delete(self, table, keyset):
        """Delete one or more table rows.

        :type table: str
        :param table: Name of the table to be modified.

        :type keyset: :class:`~google.cloud.spanner_v1.keyset.Keyset`
        :param keyset: Keys/ranges identifying rows to delete.
        """
        delete = Mutation.Delete(table=table, key_set=keyset._to_pb())
        self._mutations.append(Mutation(delete=delete))


[docs]class Batch(_BatchBase): """Accumulate mutations for transmission during :meth:`commit`.""" committed = None commit_stats = None """Timestamp at which the batch was successfully committed.""" def _check_state(self): """Helper for :meth:`commit` et al. Subclasses must override :raises: :exc:`ValueError` if the object's state is invalid for making API requests. """ if self.committed is not None: raise ValueError("Batch already committed")
[docs] def commit(self, return_commit_stats=False, request_options=None): """Commit mutations to the database. :type return_commit_stats: bool :param return_commit_stats: If true, the response will return commit stats which can be accessed though commit_stats. :type request_options: :class:`google.cloud.spanner_v1.types.RequestOptions` :param request_options: (Optional) Common options for this request. If a dict is provided, it must be of the same form as the protobuf message :class:`~google.cloud.spanner_v1.types.RequestOptions`. :rtype: datetime :returns: timestamp of the committed changes. """ self._check_state() database = self._session._database api = database.spanner_api metadata = _metadata_with_prefix(database.name) txn_options = TransactionOptions(read_write=TransactionOptions.ReadWrite()) trace_attributes = {"num_mutations": len(self._mutations)} if request_options is None: request_options = RequestOptions() elif type(request_options) == dict: request_options = RequestOptions(request_options) request_options.transaction_tag = self.transaction_tag # Request tags are not supported for commit requests. request_options.request_tag = None request = CommitRequest( session=self._session.name, mutations=self._mutations, single_use_transaction=txn_options, return_commit_stats=return_commit_stats, request_options=request_options, ) with trace_call("CloudSpanner.Commit", self._session, trace_attributes): response = api.commit(request=request, metadata=metadata,) self.committed = response.commit_timestamp self.commit_stats = response.commit_stats return self.committed
[docs] def __enter__(self): """Begin ``with`` block.""" self._check_state() return self
[docs] def __exit__(self, exc_type, exc_val, exc_tb): """End ``with`` block.""" if exc_type is None: self.commit()
def _make_write_pb(table, columns, values): """Helper for :meth:`Batch.insert` et al. :type table: str :param table: Name of the table to be modified. :type columns: list of str :param columns: Name of the table columns to be modified. :type values: list of lists :param values: Values to be modified. :rtype: :class:`google.cloud.spanner_v1.types.Mutation.Write` :returns: Write protobuf """ return Mutation.Write( table=table, columns=columns, values=_make_list_value_pbs(values) )