# -*- coding: utf-8 -*-
# Copyright (C) 2010-2020 by Mike Gabriel <mike.gabriel@das-netzwerkteam.de>
#
# Python X2Go is free software; you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation; either version 3 of the License, or
# (at your option) any later version.
#
# Python X2Go is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program; if not, write to the
# Free Software Foundation, Inc.,
# 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA.
"""\
:class:`x2go.backends.profiles.httpbroker.X2GoSessionProfiles` class - managing X2Go Client session profiles obtain from an HTTP based X2Go Session Broker.
:class:`x2go.backends.profiles.httpbroker.X2GoSessionProfiles` is a public API class. Use this class in your Python X2Go based
applications.
"""
from __future__ import print_function
__NAME__ = 'x2gosessionprofiles-pylib'
__package__ = 'x2go.backends.profiles'
__name__ = 'x2go.backends.profiles.httpbroker'
import re
import requests
import urllib3.exceptions
import copy
import types
import time
try: import simplejson as json
except ImportError: import json
# Python X2Go modules
from x2go.defaults import X2GO_SESSIONPROFILE_DEFAULTS as _X2GO_SESSIONPROFILE_DEFAULTS
from x2go.defaults import CURRENT_LOCAL_USER as _CURRENT_LOCAL_USER
import x2go.backends.profiles.base as base
import x2go.log as log
from x2go.utils import genkeypair
import x2go.x2go_exceptions
[docs]class X2GoSessionProfiles(base.X2GoSessionProfiles):
defaultSessionProfile = copy.deepcopy(_X2GO_SESSIONPROFILE_DEFAULTS)
def __init__(self, session_profile_defaults=None,
broker_url="http://localhost:8080/json/",
broker_username=None,
broker_password=None,
logger=None, loglevel=log.loglevel_DEFAULT,
**kwargs):
"""\
Retrieve X2Go session profiles from a HTTP(S) session broker.
:param session_profile_defaults: a default session profile
:type session_profile_defaults: ``dict``
:param broker_url: URL for accessing the X2Go Session Broker
:type broker_url: ``str``
:param broker_password: use this password for authentication against the X2Go Session Broker (avoid
password string in the ``broker_URL`` parameter is highly recommended)
:type broker_password: ``str``
:param logger: you can pass an :class:`x2go.log.X2GoLogger` object to the
:class:`x2go.backends.profiles.httpbroker.X2GoSessionProfiles` constructor
:type logger: :class:`x2go.log.X2GoLogger` instance
:param loglevel: if no :class:`x2go.log.X2GoLogger` object has been supplied a new one will be
constructed with the given loglevel
:type loglevel: ``int``
"""
if broker_url.upper() != "HTTP":
match = re.match('^(?P<protocol>(http(|s)))://(|(?P<user>[a-zA-Z0-9_\.-]+)(|:(?P<password>.*))@)(?P<hostname>[a-zA-Z0-9\.-]+)(|:(?P<port>[0-9]+))($|/(?P<path>.*)$)', broker_url)
p = match.groupdict()
if p['user']:
self.broker_username = p['user']
else:
self.broker_username = broker_username
if p['password']:
self.broker_password = p['password']
elif broker_password:
self.broker_password = broker_password
else:
self.broker_password = None
# fine-tune the URL
p['path'] = "/{path}".format(**p)
if p['port'] is not None:
p['port'] = ":{port}".format(**p)
else:
p['port'] = ''
self.broker_url = "{protocol}://{hostname}{port}{path}".format(**p)
else:
self.broker_username = broker_username
self.broker_password = broker_password
self.broker_url = broker_url
self.broker_noauth = False
self.broker_authid = None
self._broker_profile_cache = {}
self._mutable_profile_ids = None
self._broker_auth_successful = None
self._broker_type = "http"
base.X2GoSessionProfiles.__init__(self, session_profile_defaults=session_profile_defaults, logger=logger, loglevel=loglevel)
if self.broker_url != "HTTP":
self.logger("Using session broker at URL: %s" % self.broker_url, log.loglevel_NOTICE)
# for broker based autologin, we have to be able to provide public/private key pair
self.broker_my_pubkey, self.broker_my_privkey = genkeypair(local_username=_CURRENT_LOCAL_USER, client_address='127.0.0.1')
[docs] def get_broker_noauth(self):
"""\
Accessor for the class's ``broker_noauth`` property.
:returns: ``True`` if the broker probably does not expect authentication.
:rtype: ``bool``
"""
return self.broker_noauth
[docs] def get_broker_username(self):
"""\
Accessor for the class's ``broker_username`` property.
:returns: the username used for authentication against the session broker URL
:rtype: ``str``
"""
return self.broker_username
[docs] def get_broker_url(self):
"""\
Accessor for the class's ``broker_url`` property.
:returns: the session broker URL that was used at broker session instantiation
:rtype: ``str``
"""
return self.broker_url
[docs] def set_broker_url(self, broker_url):
"""\
Mutator for the class's ``broker_url`` property.
:param broker_url: A new broker URL to use with this instance. Format is
``<protocol>://<hostname>:<port>/<path>`` (where protocol has to be ``http``
or ``https``.
:type broker_url: ``str``
:returns: the session broker URL that was used at broker session instantiation
:rtype: ``str``
"""
self.broker_url = broker_url
[docs] def get_broker_type(self):
"""\
Accessor of the class's {_broker_type} property.
:returns: either ``http`` or ``https``.
:rtype: ``str``
"""
return self._broker_type
[docs] def broker_simpleauth(self, broker_username, broker_password):
"""\
Attempt a username / password authentication against the instance's
broker URL.
:param broker_username: username to use for authentication
:type broker_username: ``str``
:param broker_password: password to use for authentication
:type broker_password: ``str``
:returns: ``True`` if authentication has been successful
:rtype: ``bool``
:raises X2GoBrokerConnectionException: Raised on any kind of connection /
authentication failure.
"""
if self.broker_url is not None:
request_data = {
'user': broker_username or '',
}
if self.broker_authid is not None:
request_data['authid'] = self.broker_authid
self.logger("Sending request to broker: user: {user}, authid: {authid}".format(**request_data), log.loglevel_DEBUG)
else:
if broker_password:
request_data['password'] = "<hidden>"
else:
request_data['password'] = "<EMPTY>"
self.logger("Sending request to broker: user: {user}, password: {password}".format(**request_data), log.loglevel_DEBUG)
request_data['password'] = broker_password or ''
try:
r = requests.post(self.broker_url, data=request_data)
except (requests.exceptions.ConnectionError, requests.exceptions.MissingSchema, urllib3.exceptions.LocationParseError):
raise x2go.x2go_exceptions.X2GoBrokerConnectionException('Failed to connect to URL %s' % self.broker_url)
if r.status_code == 200:
payload = json.loads(r.text)
if not self.broker_authid and not self.broker_password:
self.broker_noauth = True
elif 'next-authid' in payload:
self.broker_authid = payload['next-authid']
self.broker_username = broker_username or ''
self.broker_password = broker_password or ''
self._broker_auth_successful = True
self.populate_session_profiles()
return True
self._broker_auth_successful = False
self.broker_authid = None
return False
[docs] def broker_disconnect(self):
"""\
Disconnect from an (already) authenticated broker session.
All authentication parameters will be dropped (forgotten) and
this instance has to re-authenticate against / re-connect to the
session broker before any new interaction with the broker is possible.
"""
_profile_ids = copy.deepcopy(self.profile_ids)
# forget nearly everything...
for profile_id in _profile_ids:
self.init_profile_cache(profile_id)
try: del self._profile_metatypes[profile_id]
except KeyError: pass
try: self._profiles_need_profile_id_renewal.remove(profile_id)
except ValueError: pass
try: del self._cached_profile_ids[profile_id]
except KeyError: pass
del self.session_profiles[profile_id]
self._mutable_profile_ids = None
self._broker_auth_successful = False
self.broker_authid = None
self.broker_password = None
self.broker_noauth = False
[docs] def is_broker_authenticated(self):
"""\
Detect if an authenticated broker session has already been
initiated. Todo so, a simple re-authentication (username, password)
will be attempted. If that fails, user credentials are not provided /
valid.
:returns: ``True`` if the broker session has already been authenticated
and user credentials are known / valid
:rtype: ``bool``
"""
if self._broker_auth_successful is None:
# do a test auth against the given broker URL
try:
self.broker_simpleauth(self.broker_username, self.broker_password)
except x2go.x2go_exceptions.X2GoBrokerConnectionException:
self._broker_auth_successful = False
return self._broker_auth_successful
[docs] def broker_listprofiles(self):
"""\
Obtain a session profile list from the X2Go Session Broker.
:returns: session profiles as a Python dictionary.
:rtype: ``dict``
"""
if self.broker_url is not None:
request_data = {
'task': 'listprofiles',
'user': self.broker_username,
}
if self.broker_authid is not None:
request_data['authid'] = self.broker_authid
self.logger("Sending request to broker: user: {user}, authid: {authid}, task: {task}".format(**request_data), log.loglevel_DEBUG)
else:
if self.broker_password:
request_data['password'] = "<hidden>"
else:
request_data['password'] = "<EMPTY>"
self.logger("Sending request to broker: user: {user}, password: {password}, task: {task}".format(**request_data), log.loglevel_DEBUG)
request_data['password'] = self.broker_password or ''
try:
r = requests.post(self.broker_url, data=request_data)
except requests.exceptions.ConnectionError:
raise x2go.x2go_exceptions.X2GoBrokerConnectionException('Failed to connect to URL %s' % self.broker_url)
if r.status_code == 200 and r.headers['content-type'].startswith("text/json"):
payload = json.loads(r.text)
if 'next-authid' in payload:
self.broker_authid = payload['next-authid']
if 'mutable_profile_ids' in payload:
self._mutable_profile_ids = payload['mutable_profile_ids']
self._broker_auth_successful = True
return payload['profiles'] if payload['task'] == 'listprofiles' else {}
self._broker_auth_successful = False
self.broker_authid = None
return {}
[docs] def broker_selectsession(self, profile_id):
"""\
Select a session from the list of available session profiles (presented by
:func:`broker_listprofiles()`). This method requests a session information dictionary
(server, port, SSH keys, already running / suspended sessions, etc.) from the
session broker for the provided ``profile_id``.
:param profile_id: profile ID of the selected session profile
:type profile_id: ``str``
:returns: session information (server, port, SSH keys, etc.) for a selected
session profile (i.e. ``profile_id``)
:rtype: ``dict``
"""
if self.broker_url is not None:
if profile_id not in self._broker_profile_cache or not self._broker_profile_cache[profile_id]:
request_data = {
'task': 'selectsession',
'profile-id': profile_id,
'user': self.broker_username,
'pubkey': self.broker_my_pubkey,
}
if self.broker_authid is not None:
request_data['authid'] = self.broker_authid
self.logger("Sending request to broker: user: {user}, authid: {authid}, task: {task}".format(**request_data), log.loglevel_DEBUG)
else:
if self.broker_password:
request_data['password'] = "<hidden>"
else:
request_data['password'] = "<EMPTY>"
self.logger("Sending request to broker: user: {user}, password: {password}, task: {task}".format(**request_data), log.loglevel_DEBUG)
request_data['password'] = self.broker_password or ''
try:
r = requests.post(self.broker_url, data=request_data)
except requests.exceptions.ConnectionError:
raise x2go.x2go_exceptions.X2GoBrokerConnectionException('Failed to connect to URL %s' % self.broker_url)
if r.status_code == 200 and r.headers['content-type'].startswith("text/json"):
payload = json.loads(r.text)
if 'next-authid' in payload:
self.broker_authid = payload['next-authid']
self._broker_profile_cache[profile_id] = payload['selected_session'] if payload['task'] == 'selectsession' else {}
self._broker_auth_successful = True
else:
self.broker_authid = None
self._broker_auth_successful = False
self._broker_profile_cache[profile_id]
return self._broker_profile_cache[profile_id]
return {}
def _init_profile_cache(self, profile_id):
if str(profile_id) in self._broker_profile_cache:
del self._broker_profile_cache[str(profile_id)]
def _populate_session_profiles(self):
"""\
Populate the set of session profiles by loading the session
profile configuration from a file in INI format.
:returns: a set of session profiles
:rtype: ``dict``
"""
if self.is_broker_authenticated() and \
self.broker_noauth or \
self.broker_username and self.broker_password:
session_profiles = self.broker_listprofiles()
_session_profiles = copy.deepcopy(session_profiles)
for session_profile in _session_profiles:
session_profile = str(session_profile)
for key, default_value in list(self.defaultSessionProfile.items()):
key = str(key)
if type(default_value) is bytes:
default_value = str(default_value)
if key not in session_profiles[session_profile]:
session_profiles[session_profile][key] = default_value
else:
session_profiles = {}
return session_profiles
def _is_mutable(self, profile_id):
if type(self._mutable_profile_ids) is list and profile_id in self._mutable_profile_ids:
return True
return False
def _supports_mutable_profiles(self):
if type(self._mutable_profile_ids) is list:
return True
return False
def _write(self):
print("not suported, yet")
def _delete_profile(self, profile_id):
del self.session_profiles[str(profile_id)]
def _update_value(self, profile_id, option, value):
if type(value) is bytes:
value = str(value)
self.session_profiles[str(profile_id)][str(option)] = value
def _get_profile_parameter(self, profile_id, option, key_type):
return key_type(self.session_profiles[str(profile_id)][str(option)])
def _get_profile_options(self, profile_id):
return list(self.session_profiles[str(profile_id)].keys())
def _get_profile_ids(self):
list(self.session_profiles.keys())
return list(self.session_profiles.keys())
def _get_server_hostname(self, profile_id):
selected_session = self.broker_selectsession(profile_id)
return selected_session['server']
def _get_server_port(self, profile_id):
selected_session = self.broker_selectsession(profile_id)
return int(selected_session['port'])
def _get_pkey_object(self, profile_id):
selected_session = self.broker_selectsession(profile_id)
if 'authentication_pubkey' in selected_session and selected_session['authentication_pubkey'] == 'ACCEPTED':
time.sleep(2)
return self.broker_my_privkey
return None