Source code for snapm.manager.plugins._plugin
# Copyright (C) 2023 Red Hat, Inc., Bryn M. Reeves <bmr@redhat.com>
#
# snapm/manager/plugins/__init__.py - Snapshot Manager plugins
#
# This file is part of the snapm project.
#
# This copyrighted material is made available to anyone wishing to use,
# modify, copy, or redistribute it subject to the terms and conditions
# of the GNU General Public License v.2.
#
# You should have received a copy of the GNU Lesser General Public License
# along with this program; if not, write to the Free Software Foundation,
# Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
"""
Snapshot manager plugin helpers.
"""
import os
import logging
from os.path import sep as path_sep
from snapm import SNAPM_DEBUG_PLUGINS
_log = logging.getLogger(__name__)
_log.set_debug_mask(SNAPM_DEBUG_PLUGINS)
_log_debug = _log.debug
_log_debug_plugins = _log.debug_masked
_log_info = _log.info
_log_warn = _log.warning
_log_error = _log.error
MOUNT_SEPARATOR = "-"
ESCAPED_MOUNT_SEPARATOR = "--"
PROC_MOUNTS = "/proc/self/mounts"
# Fields: origin_name-snapset_snapset-name_timestamp_mount-point
SNAPSHOT_NAME_FORMAT = "%s-snapset_%s_%d_%s"
[docs]
def encode_mount_point(mount_point):
"""
Encode mount point paths.
Encode a mount point for use in a snapshot name by replacing the path
separator with '-'.
"""
if mount_point == "/":
return MOUNT_SEPARATOR
parts = [
part.replace(MOUNT_SEPARATOR, ESCAPED_MOUNT_SEPARATOR)
for part in mount_point.split(path_sep)
]
return MOUNT_SEPARATOR.join(parts)
def _split_mount_separators(mount_str):
"""
Split string by mount separator handling escaped separators.
Split the mount string ``mount_str`` at ``MOUNT_SEPARATOR`` boundaries,
into path components, ignoring ``ESCAPED_MOUNT_SEPARATOR``.
"""
result = []
current = ""
i = 0
while i < len(mount_str):
if mount_str[i] == MOUNT_SEPARATOR:
if i < len(mount_str) - 1 and mount_str[i + 1] == MOUNT_SEPARATOR:
current += ESCAPED_MOUNT_SEPARATOR
i += 1
else:
result.append(current)
current = ""
else:
current += mount_str[i]
i += 1
result.append(current) # Append the remaining segment
return result
[docs]
def decode_mount_point(mount_str):
"""
Parse the mount point component of a snapshot name.
"""
mount_parts = _split_mount_separators(mount_str)
unescaped_parts = [
part.replace(ESCAPED_MOUNT_SEPARATOR, MOUNT_SEPARATOR) for part in mount_parts
]
return path_sep.join(unescaped_parts)
[docs]
def parse_snapshot_name(full_name, origin):
"""
Attempt to parse a snapshot set snapshot name.
Returns a tuple of (snapset_name, timestamp, mount_point) if ``full_name``
is a valid snapset snapshot name, or ``None`` otherwise.
"""
if not full_name.startswith(origin):
return None
base = full_name.removeprefix(origin + "-")
if "_" not in base:
return None
fields = base.split("_", maxsplit=3)
if len(fields) != 4:
return None
if fields[0] != "snapset":
return None
snapset_name = fields[1]
timestamp = int(fields[2])
mount_point = decode_mount_point(fields[3])
# (snapset_name, timestamp, mount)
return (snapset_name, timestamp, mount_point)
def _parse_proc_mounts_line(mount_line):
"""
Parse ``/proc/mounts`` line.
Parse a line of ``/proc/mounts`` data and return a tuple
containing the device and mount point.
"""
(device, mount_point, _) = mount_line.split(maxsplit=2)
return (device, mount_point)
[docs]
def device_from_mount_point(mount_point):
"""
Convert a mount point path to a corresponding device path.
Return the device corresponding to the file system mount point
``mount_point`` according to /proc/self/mounts.
"""
# Ignore any trailing '/' in mount point path
if mount_point != path_sep:
mount_point = mount_point.rstrip(path_sep)
with open(PROC_MOUNTS, "r", encoding="utf8") as mounts:
for line in mounts.readlines():
(device, mount_path) = _parse_proc_mounts_line(line)
if mount_path == mount_point:
return device
raise ValueError(f"Mount point {mount_point} not found")
[docs]
def mount_point_space_used(mount_point):
"""
Retrieve space usage for a mount point path.
Return the space used on the file system mounted at ``mount_point``
as a value in bytes.
"""
stat = os.statvfs(mount_point)
return (stat.f_blocks - stat.f_bfree) * stat.f_frsize
__all__ = [
"encode_mount_point",
"decode_mount_point",
"parse_snapshot_name",
"device_from_mount_point",
"mount_point_space_used",
"format_snapshot_name",
]