As of January 1, 2020 this library no longer supports Python 2 on the latest released version. Library versions released prior to that date will continue to be available. For more information please visit Python 2 support on Google Cloud.

Source code for google.cloud.spanner_v1.snapshot

# Copyright 2016 Google LLC All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Model a set of read-only queries to a database as a snapshot."""

import functools

from google.protobuf.struct_pb2 import Struct
from google.cloud.spanner_v1 import ExecuteSqlRequest
from google.cloud.spanner_v1 import ReadRequest
from google.cloud.spanner_v1 import TransactionOptions
from google.cloud.spanner_v1 import TransactionSelector
from google.cloud.spanner_v1 import PartitionOptions
from google.cloud.spanner_v1 import PartitionQueryRequest
from google.cloud.spanner_v1 import PartitionReadRequest

from google.api_core.exceptions import InternalServerError
from google.api_core.exceptions import ServiceUnavailable
from google.api_core import gapic_v1
from google.cloud.spanner_v1._helpers import _make_value_pb
from google.cloud.spanner_v1._helpers import _merge_query_options
from google.cloud.spanner_v1._helpers import _metadata_with_prefix
from google.cloud.spanner_v1._helpers import _SessionWrapper
from google.cloud.spanner_v1._opentelemetry_tracing import trace_call
from google.cloud.spanner_v1.streamed import StreamedResultSet
from google.cloud.spanner_v1 import RequestOptions

_STREAM_RESUMPTION_INTERNAL_ERROR_MESSAGES = (
    "RST_STREAM",
    "Received unexpected EOS on DATA frame from server",
)


def _restart_on_unavailable(
    method, request, trace_name=None, session=None, attributes=None
):
    """Restart iteration after :exc:`.ServiceUnavailable`.

    :type method: callable
    :param method: function returning iterator

    :type request: proto
    :param request: request proto to call the method with
    """
    resume_token = b""
    item_buffer = []
    with trace_call(trace_name, session, attributes):
        iterator = method(request=request)
    while True:
        try:
            for item in iterator:
                item_buffer.append(item)
                if item.resume_token:
                    resume_token = item.resume_token
                    break
        except ServiceUnavailable:
            del item_buffer[:]
            with trace_call(trace_name, session, attributes):
                request.resume_token = resume_token
                iterator = method(request=request)
            continue
        except InternalServerError as exc:
            resumable_error = any(
                resumable_message in exc.message
                for resumable_message in _STREAM_RESUMPTION_INTERNAL_ERROR_MESSAGES
            )
            if not resumable_error:
                raise
            del item_buffer[:]
            with trace_call(trace_name, session, attributes):
                request.resume_token = resume_token
                iterator = method(request=request)
            continue

        if len(item_buffer) == 0:
            break

        for item in item_buffer:
            yield item

        del item_buffer[:]


class _SnapshotBase(_SessionWrapper):
    """Base class for Snapshot.

    Allows reuse of API request methods with different transaction selector.

    :type session: :class:`~google.cloud.spanner_v1.session.Session`
    :param session: the session used to perform the commit
    """

    _multi_use = False
    _read_only = True
    _transaction_id = None
    _read_request_count = 0
    _execute_sql_count = 0

    def _make_txn_selector(self):
        """Helper for :meth:`read` / :meth:`execute_sql`.

        Subclasses must override, returning an instance of
        :class:`transaction_pb2.TransactionSelector`
        appropriate for making ``read`` / ``execute_sql`` requests

        :raises: NotImplementedError, always
        """
        raise NotImplementedError

    def read(
        self,
        table,
        columns,
        keyset,
        index="",
        limit=0,
        partition=None,
        request_options=None,
        *,
        retry=gapic_v1.method.DEFAULT,
        timeout=gapic_v1.method.DEFAULT,
    ):
        """Perform a ``StreamingRead`` API request for rows in a table.

        :type table: str
        :param table: name of the table from which to fetch data

        :type columns: list of str
        :param columns: names of columns to be retrieved

        :type keyset: :class:`~google.cloud.spanner_v1.keyset.KeySet`
        :param keyset: keys / ranges identifying rows to be retrieved

        :type index: str
        :param index: (Optional) name of index to use, rather than the
                      table's primary key

        :type limit: int
        :param limit: (Optional) maximum number of rows to return.
                      Incompatible with ``partition``.

        :type partition: bytes
        :param partition: (Optional) one of the partition tokens returned
                          from :meth:`partition_read`.  Incompatible with
                          ``limit``.

        :type request_options:
            :class:`google.cloud.spanner_v1.types.RequestOptions`
        :param request_options:
                (Optional) Common options for this request.
                If a dict is provided, it must be of the same form as the protobuf
                message :class:`~google.cloud.spanner_v1.types.RequestOptions`.
                Please note, the `transactionTag` setting will be ignored for
                snapshot as it's not supported for read-only transactions.

        :type retry: :class:`~google.api_core.retry.Retry`
        :param retry: (Optional) The retry settings for this request.

        :type timeout: float
        :param timeout: (Optional) The timeout for this request.

        :rtype: :class:`~google.cloud.spanner_v1.streamed.StreamedResultSet`
        :returns: a result set instance which can be used to consume rows.

        :raises ValueError:
            for reuse of single-use snapshots, or if a transaction ID is
            already pending for multiple-use snapshots.
        """
        if self._read_request_count > 0:
            if not self._multi_use:
                raise ValueError("Cannot re-use single-use snapshot.")
            if self._transaction_id is None:
                raise ValueError("Transaction ID pending.")

        database = self._session._database
        api = database.spanner_api
        metadata = _metadata_with_prefix(database.name)
        transaction = self._make_txn_selector()

        if request_options is None:
            request_options = RequestOptions()
        elif type(request_options) == dict:
            request_options = RequestOptions(request_options)

        if self._read_only:
            # Transaction tags are not supported for read only transactions.
            request_options.transaction_tag = None
        else:
            request_options.transaction_tag = self.transaction_tag

        request = ReadRequest(
            session=self._session.name,
            table=table,
            columns=columns,
            key_set=keyset._to_pb(),
            transaction=transaction,
            index=index,
            limit=limit,
            partition_token=partition,
            request_options=request_options,
        )
        restart = functools.partial(
            api.streaming_read,
            request=request,
            metadata=metadata,
            retry=retry,
            timeout=timeout,
        )

        trace_attributes = {"table_id": table, "columns": columns}
        iterator = _restart_on_unavailable(
            restart,
            request,
            "CloudSpanner.ReadOnlyTransaction",
            self._session,
            trace_attributes,
        )

        self._read_request_count += 1

        if self._multi_use:
            return StreamedResultSet(iterator, source=self)
        else:
            return StreamedResultSet(iterator)

    def execute_sql(
        self,
        sql,
        params=None,
        param_types=None,
        query_mode=None,
        query_options=None,
        request_options=None,
        partition=None,
        retry=gapic_v1.method.DEFAULT,
        timeout=gapic_v1.method.DEFAULT,
    ):
        """Perform an ``ExecuteStreamingSql`` API request.

        :type sql: str
        :param sql: SQL query statement

        :type params: dict, {str -> column value}
        :param params: values for parameter replacement.  Keys must match
                       the names used in ``sql``.

        :type param_types: dict[str -> Union[dict, .types.Type]]
        :param param_types:
            (Optional) maps explicit types for one or more param values;
            required if parameters are passed.

        :type query_mode:
            :class:`~google.cloud.spanner_v1.types.ExecuteSqlRequest.QueryMode`
        :param query_mode: Mode governing return of results / query plan.
            See:
            `QueryMode <https://cloud.google.com/spanner/reference/rpc/google.spanner.v1#google.spanner.v1.ExecuteSqlRequest.QueryMode>`_.

        :type query_options:
            :class:`~google.cloud.spanner_v1.types.ExecuteSqlRequest.QueryOptions`
                or :class:`dict`
        :param query_options:
                (Optional) Query optimizer configuration to use for the given query.
                If a dict is provided, it must be of the same form as the protobuf
                message :class:`~google.cloud.spanner_v1.types.QueryOptions`

        :type request_options:
            :class:`google.cloud.spanner_v1.types.RequestOptions`
        :param request_options:
                (Optional) Common options for this request.
                If a dict is provided, it must be of the same form as the protobuf
                message :class:`~google.cloud.spanner_v1.types.RequestOptions`.

        :type partition: bytes
        :param partition: (Optional) one of the partition tokens returned
                          from :meth:`partition_query`.

        :rtype: :class:`~google.cloud.spanner_v1.streamed.StreamedResultSet`
        :returns: a result set instance which can be used to consume rows.

        :type retry: :class:`~google.api_core.retry.Retry`
        :param retry: (Optional) The retry settings for this request.

        :type timeout: float
        :param timeout: (Optional) The timeout for this request.

        :raises ValueError:
            for reuse of single-use snapshots, or if a transaction ID is
            already pending for multiple-use snapshots.
        """
        if self._read_request_count > 0:
            if not self._multi_use:
                raise ValueError("Cannot re-use single-use snapshot.")
            if self._transaction_id is None:
                raise ValueError("Transaction ID pending.")

        if params is not None:
            if param_types is None:
                raise ValueError("Specify 'param_types' when passing 'params'.")
            params_pb = Struct(
                fields={key: _make_value_pb(value) for key, value in params.items()}
            )
        else:
            params_pb = {}

        database = self._session._database
        metadata = _metadata_with_prefix(database.name)
        transaction = self._make_txn_selector()
        api = database.spanner_api

        # Query-level options have higher precedence than client-level and
        # environment-level options
        default_query_options = database._instance._client._query_options
        query_options = _merge_query_options(default_query_options, query_options)

        if request_options is None:
            request_options = RequestOptions()
        elif type(request_options) == dict:
            request_options = RequestOptions(request_options)
        if self._read_only:
            # Transaction tags are not supported for read only transactions.
            request_options.transaction_tag = None
        else:
            request_options.transaction_tag = self.transaction_tag

        request = ExecuteSqlRequest(
            session=self._session.name,
            sql=sql,
            transaction=transaction,
            params=params_pb,
            param_types=param_types,
            query_mode=query_mode,
            partition_token=partition,
            seqno=self._execute_sql_count,
            query_options=query_options,
            request_options=request_options,
        )
        restart = functools.partial(
            api.execute_streaming_sql,
            request=request,
            metadata=metadata,
            retry=retry,
            timeout=timeout,
        )

        trace_attributes = {"db.statement": sql}
        iterator = _restart_on_unavailable(
            restart,
            request,
            "CloudSpanner.ReadWriteTransaction",
            self._session,
            trace_attributes,
        )

        self._read_request_count += 1
        self._execute_sql_count += 1

        if self._multi_use:
            return StreamedResultSet(iterator, source=self)
        else:
            return StreamedResultSet(iterator)

    def partition_read(
        self,
        table,
        columns,
        keyset,
        index="",
        partition_size_bytes=None,
        max_partitions=None,
        *,
        retry=gapic_v1.method.DEFAULT,
        timeout=gapic_v1.method.DEFAULT,
    ):
        """Perform a ``PartitionRead`` API request for rows in a table.

        :type table: str
        :param table: name of the table from which to fetch data

        :type columns: list of str
        :param columns: names of columns to be retrieved

        :type keyset: :class:`~google.cloud.spanner_v1.keyset.KeySet`
        :param keyset: keys / ranges identifying rows to be retrieved

        :type index: str
        :param index: (Optional) name of index to use, rather than the
                      table's primary key

        :type partition_size_bytes: int
        :param partition_size_bytes:
            (Optional) desired size for each partition generated.  The service
            uses this as a hint, the actual partition size may differ.

        :type max_partitions: int
        :param max_partitions:
            (Optional) desired maximum number of partitions generated. The
            service uses this as a hint, the actual number of partitions may
            differ.

        :type retry: :class:`~google.api_core.retry.Retry`
        :param retry: (Optional) The retry settings for this request.

        :type timeout: float
        :param timeout: (Optional) The timeout for this request.

        :rtype: iterable of bytes
        :returns: a sequence of partition tokens

        :raises ValueError:
            for single-use snapshots, or if a transaction ID is
            already associated with the snapshot.
        """
        if not self._multi_use:
            raise ValueError("Cannot use single-use snapshot.")

        if self._transaction_id is None:
            raise ValueError("Transaction not started.")

        database = self._session._database
        api = database.spanner_api
        metadata = _metadata_with_prefix(database.name)
        transaction = self._make_txn_selector()
        partition_options = PartitionOptions(
            partition_size_bytes=partition_size_bytes, max_partitions=max_partitions
        )
        request = PartitionReadRequest(
            session=self._session.name,
            table=table,
            columns=columns,
            key_set=keyset._to_pb(),
            transaction=transaction,
            index=index,
            partition_options=partition_options,
        )

        trace_attributes = {"table_id": table, "columns": columns}
        with trace_call(
            "CloudSpanner.PartitionReadOnlyTransaction", self._session, trace_attributes
        ):
            response = api.partition_read(
                request=request, metadata=metadata, retry=retry, timeout=timeout,
            )

        return [partition.partition_token for partition in response.partitions]

    def partition_query(
        self,
        sql,
        params=None,
        param_types=None,
        partition_size_bytes=None,
        max_partitions=None,
        *,
        retry=gapic_v1.method.DEFAULT,
        timeout=gapic_v1.method.DEFAULT,
    ):
        """Perform a ``PartitionQuery`` API request.

        :type sql: str
        :param sql: SQL query statement

        :type params: dict, {str -> column value}
        :param params: values for parameter replacement.  Keys must match
                       the names used in ``sql``.

        :type param_types: dict[str -> Union[dict, .types.Type]]
        :param param_types:
            (Optional) maps explicit types for one or more param values;
            required if parameters are passed.

        :type partition_size_bytes: int
        :param partition_size_bytes:
            (Optional) desired size for each partition generated.  The service
            uses this as a hint, the actual partition size may differ.

        :type max_partitions: int
        :param max_partitions:
            (Optional) desired maximum number of partitions generated. The
            service uses this as a hint, the actual number of partitions may
            differ.

        :type retry: :class:`~google.api_core.retry.Retry`
        :param retry: (Optional) The retry settings for this request.

        :type timeout: float
        :param timeout: (Optional) The timeout for this request.

        :rtype: iterable of bytes
        :returns: a sequence of partition tokens

        :raises ValueError:
            for single-use snapshots, or if a transaction ID is
            already associated with the snapshot.
        """
        if not self._multi_use:
            raise ValueError("Cannot use single-use snapshot.")

        if self._transaction_id is None:
            raise ValueError("Transaction not started.")

        if params is not None:
            if param_types is None:
                raise ValueError("Specify 'param_types' when passing 'params'.")
            params_pb = Struct(
                fields={key: _make_value_pb(value) for (key, value) in params.items()}
            )
        else:
            params_pb = Struct()

        database = self._session._database
        api = database.spanner_api
        metadata = _metadata_with_prefix(database.name)
        transaction = self._make_txn_selector()
        partition_options = PartitionOptions(
            partition_size_bytes=partition_size_bytes, max_partitions=max_partitions
        )
        request = PartitionQueryRequest(
            session=self._session.name,
            sql=sql,
            transaction=transaction,
            params=params_pb,
            param_types=param_types,
            partition_options=partition_options,
        )

        trace_attributes = {"db.statement": sql}
        with trace_call(
            "CloudSpanner.PartitionReadWriteTransaction",
            self._session,
            trace_attributes,
        ):
            response = api.partition_query(
                request=request, metadata=metadata, retry=retry, timeout=timeout,
            )

        return [partition.partition_token for partition in response.partitions]


[docs]class Snapshot(_SnapshotBase): """Allow a set of reads / SQL statements with shared staleness. See https://cloud.google.com/spanner/reference/rpc/google.spanner.v1#google.spanner.v1.TransactionOptions.ReadOnly If no options are passed, reads will use the ``strong`` model, reading at a timestamp where all previously committed transactions are visible. :type session: :class:`~google.cloud.spanner_v1.session.Session` :param session: The session used to perform the commit. :type read_timestamp: :class:`datetime.datetime` :param read_timestamp: Execute all reads at the given timestamp. :type min_read_timestamp: :class:`datetime.datetime` :param min_read_timestamp: Execute all reads at a timestamp >= ``min_read_timestamp``. :type max_staleness: :class:`datetime.timedelta` :param max_staleness: Read data at a timestamp >= NOW - ``max_staleness`` seconds. :type exact_staleness: :class:`datetime.timedelta` :param exact_staleness: Execute all reads at a timestamp that is ``exact_staleness`` old. :type multi_use: :class:`bool` :param multi_use: If true, multiple :meth:`read` / :meth:`execute_sql` calls can be performed with the snapshot in the context of a read-only transaction, used to ensure isolation / consistency. Incompatible with ``max_staleness`` and ``min_read_timestamp``. """ def __init__( self, session, read_timestamp=None, min_read_timestamp=None, max_staleness=None, exact_staleness=None, multi_use=False, ): super(Snapshot, self).__init__(session) opts = [read_timestamp, min_read_timestamp, max_staleness, exact_staleness] flagged = [opt for opt in opts if opt is not None] if len(flagged) > 1: raise ValueError("Supply zero or one options.") if multi_use: if min_read_timestamp is not None or max_staleness is not None: raise ValueError( "'multi_use' is incompatible with " "'min_read_timestamp' / 'max_staleness'" ) self._strong = len(flagged) == 0 self._read_timestamp = read_timestamp self._min_read_timestamp = min_read_timestamp self._max_staleness = max_staleness self._exact_staleness = exact_staleness self._multi_use = multi_use def _make_txn_selector(self): """Helper for :meth:`read`.""" if self._transaction_id is not None: return TransactionSelector(id=self._transaction_id) if self._read_timestamp: key = "read_timestamp" value = self._read_timestamp elif self._min_read_timestamp: key = "min_read_timestamp" value = self._min_read_timestamp elif self._max_staleness: key = "max_staleness" value = self._max_staleness elif self._exact_staleness: key = "exact_staleness" value = self._exact_staleness else: key = "strong" value = True options = TransactionOptions( read_only=TransactionOptions.ReadOnly(**{key: value}) ) if self._multi_use: return TransactionSelector(begin=options) else: return TransactionSelector(single_use=options)
[docs] def begin(self): """Begin a read-only transaction on the database. :rtype: bytes :returns: the ID for the newly-begun transaction. :raises ValueError: if the transaction is already begun, committed, or rolled back. """ if not self._multi_use: raise ValueError("Cannot call 'begin' on single-use snapshots") if self._transaction_id is not None: raise ValueError("Read-only transaction already begun") if self._read_request_count > 0: raise ValueError("Read-only transaction already pending") database = self._session._database api = database.spanner_api metadata = _metadata_with_prefix(database.name) txn_selector = self._make_txn_selector() with trace_call("CloudSpanner.BeginTransaction", self._session): response = api.begin_transaction( session=self._session.name, options=txn_selector.begin, metadata=metadata, ) self._transaction_id = response.id return self._transaction_id