Source code for x2go.backends.info.plain

# -*- coding: utf-8 -*-

# Copyright (C) 2010-2020 by Mike Gabriel <mike.gabriel@das-netzwerkteam.de>
#
# Python X2Go is free software; you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation; either version 3 of the License, or
# (at your option) any later version.
#
# Python X2Go is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program; if not, write to the
# Free Software Foundation, Inc.,
# 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA.

"""\
X2GoServerSessionList and X2GoServerSessionInfo classes - data handling for
X2Go server sessions.

This backend handles X2Go server implementations that respond with session infos
via server-side PLAIN text output.

"""

from __future__ import print_function
__NAME__ = 'x2goserversessioninfo-pylib'

__package__ = 'x2go.backends.info'
__name__    = 'x2go.backends.info.plain'

# modules
import types
import re

[docs]class X2GoServerSessionInfo(object): """\ :class:`x2go.backends.info.plain.X2GoServerSessionInfo` is used to store all information that is retrieved from the connected X2Go server on ``X2GoTerminalSession.start()`` resp. ``X2GoTerminalSession.resume()``. """ def __init__(self): self.initialized = False self.protected = False self.session_type_cached = None def __str__(self): return self.name def __repr__(self): result = 'X2GoServerSessionInfo(' for p in dir(self): if '__' in p or not p in self.__dict__: continue result += p + '=' + str(self.__dict__[p]) +',' return result.strip(',') + ')' def _parse_x2golistsessions_line(self, x2go_output): """\ Parse a single line of X2Go's listsessions output. :param x2go_output: output from ,,x2golistsessions'' command (as list of strings/lines) :type x2go_output: ``list`` """ try: l = x2go_output.split("|") self.agent_pid = int(l[0]) self.name = l[1] self.display = int(l[2]) self.hostname = l[3] self.status = l[4] # TODO: turn into datetime object self.date_created = l[5] self.cookie = l[6] self.graphics_port = int(l[8]) self.snd_port = int(l[9]) # TODO: turn into datetime object self.date_suspended = l[10] self.username = l[11] self.sshfs_port = int(l[13]) self.local_container = '' self.initialized = True except IndexError as e: # DEBUGGING CODE raise e except ValueError as e: # DEBUGGING CODE raise e # retrieve Telekinesis ports from list of sessions... try: self.tekictrl_port = int(l[14]) except (IndexError, ValueError) as e: self.tekictrl_port = -1 try: self.tekidata_port = int(l[15]) except (IndexError, ValueError) as e: self.tekidata_port = -1
[docs] def is_published_applications_provider(self): """\ Detect from session info if this session is a published applications provider. :returns: returns ``True`` if this session is a published applications provider :rtype: ``bool`` or ``None`` (if not initialized) """ if self.initialized: return bool(re.match('.*_stRPUBLISHED_.*', self.name)) else: return None
[docs] def is_running(self): """\ Is this session running? :returns: ``True`` if the session is running, ``False`` otherwise :rtype: ``bool`` or ``None`` (if not initialized) """ if self.initialized: return self.status == 'R' else: return None
[docs] def get_session_type(self): """\ Get the session type (i.e. 'D', 'K', 'R', 'S' or 'P'). :returns: session type :rtype: ``str`` or ``None`` (if not initialized) """ if self.session_type_cached is not None: return self.session_type_cached if self.initialized: cmd = self.name.split('_')[1] session_type = cmd[2] if session_type == 'R' and self.is_published_applications_provider(): session_type = 'P' self.session_type_cached = session_type return session_type else: return None
[docs] def get_share_mode(self): """\ Get the share mode of a shadow session. :returns: share mode (0: view-only, 1: full access), ``None`` when used for non-desktop-sharing sessions :rtype: ``str`` or ``None`` (if not initialized) """ if self.initialized: share_mode = None cmd = self.name.split('_')[1] session_type = cmd[2] if session_type == 'S': share_mode = cmd[3] return share_mode else: return None
[docs] def is_suspended(self): """\ Is this session suspended? :returns: ``True`` if the session is suspended, ``False`` otherwise :rtype: ``bool`` or ``None`` (if not initialized) """ if self.initialized: return self.status == 'S' else: return None
[docs] def is_desktop_session(self): """\ Is this session a desktop session? :returns: ``True`` if this session is a desktop session, ``False`` otherwise :rtype: ``bool`` or ``None`` (if not initialized) """ if self.initialized: return self.get_session_type() in ('D', 'K') else: return None
def is_kdrive_session(self): """\ Is this session a KDrive based desktop session? :returns: ``True`` if this session is a KDrive based desktop session, ``False`` otherwise :rtype: ``bool`` """ if self.initialized: return self.get_session_type() == 'K' else: return None def is_nx3_session(self): """\ Is this session an NXv3 based desktop/rootless/shadow session? :returns: ``True`` if this session is a NXv3 based desktop, rootless or shadow session, ``False`` otherwise :rtype: ``bool`` """ if self.initialized: return self.get_session_type() in ('D', 'S', 'R', 'P') else: return None
[docs] def is_kdrive_session(self): """\ Is this session a KDrive based desktop session? :returns: ``True`` if this session is a KDrive based desktop session, ``False`` otherwise :rtype: ``bool`` """ return self.get_session_type() == 'K'
[docs] def is_nx3_session(self): """\ Is this session an NXv3 based desktop/rootless session? :returns: ``True`` if this session is a NXv3 based desktop/rootless session, ``False`` otherwise :rtype: ``bool`` """ return self.get_session_type() in ('D','R','P')
def _parse_x2gostartagent_output(self, x2go_output): """\ Parse x2gostartagent output. :param x2go_output: output from ,,x2gostartagent'' command (as list of strings/lines) :type x2go_output: ``list`` """ try: l = x2go_output.split("\n") self.name = l[3] self.cookie = l[1] self.agent_pid = int(l[2]) self.display = int(l[0]) self.graphics_port = int(l[4]) self.snd_port = int(l[5]) self.sshfs_port = int(l[6]) self.username = '' self.hostname = '' # TODO: we have to see how we fill these fields here... self.date_created = '' self.date_suspended = '' # TODO: presume session is running after x2gostartagent, this could be better self.status = 'R' self.local_container = '' self.remote_container = '' self.initialized = True except IndexError as e: # DEBUGGING CODE raise e except ValueError as e: # DEBUGGING CODE raise e # retrieve Telekinesis ports from x2gostartagent output try: self.tekictrl_port = int(l[7]) except (IndexError, ValueError) as e: self.tekictrl_port = -1 try: self.tekidata_port = int(l[8]) except (IndexError, ValueError) as e: self.tekidata_port = -1
[docs] def initialize(self, x2go_output, username='', hostname='', local_container='', remote_container=''): """\ Setup a a session info data block, includes parsing of X2Go server's ``x2gostartagent`` stdout values. :param x2go_output: X2Go server's ``x2gostartagent`` command output, each value separated by a newline character. :type x2go_output: str :param username: session user name (Default value = '') :type username: str :param hostname: hostname of X2Go server (Default value = '') :type hostname: str :param local_container: X2Go client session directory for config files, cache and session logs (Default value = '') :type local_container: str :param remote_container: X2Go server session directory for config files, cache and session logs (Default value = '') :type remote_container: str """ self.protect() self._parse_x2gostartagent_output(x2go_output) self.username = username self.hostname = hostname self.local_container = local_container self.remote_container = remote_container self.initialized = True
[docs] def protect(self): """\ Write-protect this session info data structure. """ self.protected = True
[docs] def unprotect(self): """\ Remove write-protection from this session info data structure. """ self.protected = False
[docs] def is_protected(self): """\ Check if this session info data structure is Write-protected. """ return self.protected
[docs] def is_initialized(self): """\ Check if this session info data structure has been initialized. """ return self.initialized
[docs] def get_status(self): """\ Retrieve the session's status from this session info data structure. :returns: session status :rtype: ``str`` or ``None`` (if not initalized) """ if self.initialized: return self.status else: return None
[docs] def clear(self): """\ Clear all properties of a :class:`x2go.backends.info.plain.X2GoServerSessionInfo` object. """ self.name = '' self.cookie = '' self.agent_pid = '' self.display = '' self.graphics_port = '' self.snd_port = '' self.sshfs_port = '' self.tekictrl_port = '' self.tekidata_port = '' self.username = '' self.hostname = '' self.date_created = '' self.date_suspended = '' self.status = '' self.local_container = '' self.remote_container = '' self.protected = False self.initialized = False self.session_type_cached = None
[docs] def update(self, session_info): """\ Update all properties of a :class:`x2go.backends.info.plain.X2GoServerSessionInfo` object. :param session_info: a provided session info data structure :type session_info: ``X2GoServerSessionInfo*`` """ if type(session_info) == type(self): for prop in ('graphics_port', 'snd_port', 'sshfs_port', 'tekictrl_port', 'tekidata_port', 'date_suspended', 'status', ): if hasattr(session_info, prop): _new = getattr(session_info, prop) _current = getattr(self, prop) if _new != _current: setattr(self, prop, _new)
def __init__(self): """\ Class constructor, identical to :func:`clear()` method. """ self.clear()
[docs]class X2GoServerSessionList(object): """\ :class:`x2go.backends.info.plain.X2GoServerSessionList` is used to store all information that is retrieved from a connected X2Go server on a ``X2GoControlSession.list_sessions()`` call. """ def __init__(self, x2go_output=None, info_backend=X2GoServerSessionInfo): """\ :param x2go_output: X2Go server's ``x2golistsessions`` command output, each session separated by a newline character. Session values are separated by Unix Pipe Symbols ('|') :type x2go_output: str :param info_backend: the session info backend to use :type info_backend: ``X2GoServerSessionInfo*`` """ self.sessions = {} if x2go_output is not None: lines = x2go_output.split("\n") for line in lines: if not line: continue s_info = info_backend() s_info._parse_x2golistsessions_line(line) self.sessions[s_info.name] = s_info def __call__(self): return self.sessions
[docs] def get_sessions(self): """\ Get the complete sessions property. :return: get this instance's dictionary of sessions :rtype: ``dict`` """ return self.sessions
[docs] def set_sessions(self, sessions): """\ Set the sessions property directly by parsing a complete data structure. :param sessions: set this instance's list of sessions directly :type sessions: ``dict`` """ self.sessions = sessions
[docs] def get_session_info(self, session_name): """\ Retrieve the session information for ``<session_name>``. :param session_name: the queried session name :type session_name: ``str`` :returns: the session info of ``<session_name>`` :rtype: ``X2GoServerSessionInfo*`` or ``None`` """ try: return self.sessions[session_name] except KeyError: return None
[docs] def get_session_with(self, property_name, value, hostname=None): """\ Find session with a given display number on a given host. :param property_name: match a session based on this property name :type property_name: ``str`` :param value: the resulting session has to match this value for ``<property_name>`` :type value: ``str`` :param hostname: the result has to match this hostname (Default value = None) :type hostname: ``str`` """ if property_name == 'display': value = value.lstrip(':') if '.' in value: value = value.split('.')[0] for session in list(self.sessions.values()): try: if str(getattr(session, property_name)) == str(value): if hostname is None or session.hostname == hostname: return session except AttributeError: pass