Source code for x2go.checkhosts

# -*- coding: utf-8 -*-

# Copyright (C) 2010-2020 by Mike Gabriel <mike.gabriel@das-netzwerkteam.de>
#
# Python X2Go is free software; you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation; either version 3 of the License, or
# (at your option) any later version.
#
# Python X2Go is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program; if not, write to the
# Free Software Foundation, Inc.,
# 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA.

"""\
Providing mechanisms to ``X2GoControlSession*`` backends for checking host validity.

"""
__NAME__ = 'x2gocheckhosts-pylib'

__package__ = 'x2go'
__name__    = 'x2go.checkhosts'

# modules
import paramiko
import binascii
import sys

# Python X2Go modules
from . import log
from . import x2go_exceptions
import random
import string

[docs]class X2GoMissingHostKeyPolicy(paramiko.MissingHostKeyPolicy): """\ Skeleton class for Python X2Go's missing host key policies. """ def __init__(self, caller=None, session_instance=None, fake_hostname=None): """\ :param caller: calling instance :type caller: ``class`` :param session_instance: an X2Go session instance :type session_instance: :class:`x2go.session.X2GoSession` instance """ self.caller = caller self.session_instance = session_instance self.fake_hostname = fake_hostname
[docs] def get_client(self): """\ Retrieve the Paramiko SSH/Client. :returns: the associated X2Go control session instance. :rtype: ``X2GoControlSession*`` instance """ return self.client
[docs] def get_hostname(self): """\ Retrieve the server hostname:port expression of the server to be validated. :returns: hostname:port :rtype: ``str`` """ return self.fake_hostname or self.hostname
[docs] def get_hostname_name(self): """\ Retrieve the server hostname string of the server to be validated. :returns: hostname :rtype: ``str`` """ if ":" in self.get_hostname(): return self.get_hostname().split(':')[0].lstrip('[').rstrip(']') else: return self.get_hostname().lstrip('[').rstrip(']')
[docs] def get_hostname_port(self): """\ Retrieve the server port of the server to be validated. :returns: port :rtype: ``str`` """ if ":" in self.get_hostname(): return int(self.get_hostname().split(':')[1]) else: return 22
[docs] def get_key(self): """\ Retrieve the host key of the server to be validated. :returns: host key :rtype: Paramiko/SSH key instance """ return self.key
[docs] def get_key_name(self): """\ Retrieve the host key name of the server to be validated. :returns: host key name (RSA, DSA, ECDSA...) :rtype: ``str`` """ return self.key.get_name().upper()
[docs] def get_key_fingerprint(self): """\ Retrieve the host key fingerprint of the server to be validated. :returns: host key fingerprint :rtype: ``str`` """ if sys.version_info[0] >= 3: return binascii.hexlify(self.key.get_fingerprint()).decode() else: return binascii.hexlify(self.key.get_fingerprint())
[docs] def get_key_fingerprint_with_colons(self): """\ Retrieve the (colonized) host key fingerprint of the server to be validated. :returns: host key fingerprint (with colons) :rtype: ``str`` """ _fingerprint = self.get_key_fingerprint() _colon_fingerprint = '' idx = 0 for char in _fingerprint: idx += 1 _colon_fingerprint += char if idx % 2 == 0: _colon_fingerprint += ':' return _colon_fingerprint.rstrip(':')
[docs]class X2GoAutoAddPolicy(X2GoMissingHostKeyPolicy):
[docs] def missing_host_key(self, client, hostname, key): self.client = client self.hostname = hostname self.key = key if self.session_instance and self.session_instance.control_session.unique_hostkey_aliases: self.client._host_keys.add(self.session_instance.get_profile_id(), self.key.get_name(), self.key) else: self.client._host_keys.add(self.get_hostname(), self.key.get_name(), self.key) if self.client._host_keys_filename is not None: self.client.save_host_keys(self.client._host_keys_filename) self.client._log(paramiko.common.DEBUG, 'Adding %s host key for %s: %s' % (self.key.get_name(), self.get_hostname(), binascii.hexlify(self.key.get_fingerprint())))
[docs]class X2GoInteractiveAddPolicy(X2GoMissingHostKeyPolicy): """\ Policy for making host key information available to Python X2Go after a Paramiko/SSH connect has been attempted. This class needs information about the associated :class:`x2go.session.X2GoSession` instance. Once called, the :func:`missing_host_key()` method of this class will try to call :func:`X2GoSession.HOOK_check_host_dialog() <x2go.session.X2GoSession.HOOK_check_host_dialog()>`. This hook method---if not re-defined in your application---will then try to call the :func:`X2GoClient.HOOK_check_host_dialog() <x2go.client.X2GoClient.HOOK_check_host_dialog()>`, which then will return ``True`` by default if not customized in your application. To accept host key checks, make sure to either customize the :func:`X2GoClient.HOOK_check_host_dialog() <x2go.client.X2GoClient.HOOK_check_host_dialog()>` method or the :func:`X2GoSession.HOOK_check_host_dialog() <x2go.session.X2GoSession.HOOK_check_host_dialog()>` method and hook some interactive user dialog to either of them. """
[docs] def missing_host_key(self, client, hostname, key): """\ Handle a missing host key situation. This method calls Once called, the :func:`missing_host_key()` method will try to call :func:`X2GoSession.HOOK_check_host_dialog() <x2go.session.X2GoSession.HOOK_check_host_dialog()>`. This hook method---if not re-defined in your application---will then try to call the :func:`X2GoClient.HOOK_check_host_dialog() <x2go.client.X2GoClient.HOOK_check_host_dialog()>`, which then will return ``True`` by default if not customized in your application. To accept host key checks, make sure to either customize the :func:`X2GoClient.HOOK_check_host_dialog() <x2go.client.X2GoClient.HOOK_check_host_dialog()>` method or the :func:`X2GoSession.HOOK_check_host_dialog() <x2go.session.X2GoSession.HOOK_check_host_dialog()>` method and hook some interactive user dialog to either of them. :param client: SSH client (``X2GoControlSession*``) instance :type client: ``X2GoControlSession*`` instance :param hostname: remote hostname :type hostname: ``str`` :param key: host key to validate :type key: Paramiko/SSH key instance :raises X2GoHostKeyException: if the X2Go server host key is not in the ``known_hosts`` file :raises X2GoSSHProxyHostKeyException: if the SSH proxy host key is not in the ``known_hosts`` file :raises SSHException: if this instance does not know its {self.session_instance} """ self.client = client self.hostname = hostname if (self.hostname.find(']') == -1) and (self.hostname.find(':') == -1): # if hostname is an IPv4 quadruple with standard SSH port... self.hostname = '[%s]:22' % self.hostname self.key = key self.client._log(paramiko.common.DEBUG, 'Interactively Checking %s host key for %s: %s' % (self.key.get_name(), self.get_hostname(), binascii.hexlify(self.key.get_fingerprint()))) if self.session_instance: if self.fake_hostname is not None: server_key = client.get_transport().get_remote_server_key() keytype = server_key.get_name() our_server_key = client._system_host_keys.get(self.fake_hostname, {}).get(keytype, None) if our_server_key is None: if self.session_instance.control_session.unique_hostkey_aliases: our_server_key = client._host_keys.get(self.session_instance.get_profile_id(), {}).get(keytype, None) if our_server_key is not None: self.session_instance.logger('SSH host key verification for SSH-proxied host %s with %s fingerprint ,,%s\'\' succeeded. This host is known by the X2Go session profile ID of profile ,,%s\'\'.' % (self.fake_hostname, self.get_key_name(), self.get_key_fingerprint_with_colons(), self.session_instance.profile_name), loglevel=log.loglevel_NOTICE) return else: our_server_key = client._host_keys.get(self.fake_hostname, {}).get(keytype, None) if our_server_key is not None: self.session_instance.logger('SSH host key verification for SSH-proxied host %s with %s fingerprint ,,%s\'\' succeeded. This host is known by the address it has behind the SSH proxy host.' % (self.fake_hostname, self.get_key_name(), self.get_key_fingerprint_with_colons()), loglevel=log.loglevel_NOTICE) return self.session_instance.logger('SSH host key verification for host %s with %s fingerprint ,,%s\'\' initiated. We are seeing this X2Go server for the first time.' % (self.get_hostname(), self.get_key_name(), self.get_key_fingerprint_with_colons()), loglevel=log.loglevel_NOTICE) _valid = self.session_instance.HOOK_check_host_dialog(self.get_hostname_name(), port=self.get_hostname_port(), fingerprint=self.get_key_fingerprint_with_colons(), fingerprint_type=self.get_key_name(), ) if _valid: if self.session_instance.control_session.unique_hostkey_aliases and type(self.caller) not in (sshproxy.X2GoSSHProxy, ): paramiko.AutoAddPolicy().missing_host_key(client, self.session_instance.get_profile_id(), key) else: paramiko.AutoAddPolicy().missing_host_key(client, self.get_hostname(), key) else: if type(self.caller) in (sshproxy.X2GoSSHProxy, ): raise x2go_exceptions.X2GoSSHProxyHostKeyException('Invalid host %s is not authorized for access. Add the host to Paramiko/SSH\'s known_hosts file.' % self.get_hostname()) else: raise x2go_exceptions.X2GoHostKeyException('Invalid host %s is not authorized for access. Add the host to Paramiko/SSH\'s known_hosts file.' % self.get_hostname()) else: raise x2go_exceptions.SSHException('Policy has collected host key information on %s for further introspection' % self.get_hostname())
[docs]def check_ssh_host_key(x2go_sshclient_instance, hostname, port=22): """\ Perform a Paramiko/SSH host key check by connecting to the host and validating the results (i.e. by validating raised exceptions during the connect process). :param x2go_sshclient_instance: a Paramiko/SSH client instance to be used for testing host key validity. :type x2go_sshclient_instance: ``X2GoControlSession*`` instance :param hostname: hostname of server to validate :type hostname: ``str`` :param port: port of server to validate (Default value = 22) :type port: ``int`` :returns: returns a tuple with the following components (<host_ok>, <hostname>, <port>, <fingerprint>, <fingerprint_type>) :rtype: ``tuple`` :raises SSHException: if an SSH exception occurred, that we did not provocate in :func:`X2GoInteractiveAddPolicy.missing_host_key() <x2go.checkhosts.X2GoInteractiveAddPolicy.missing_host_key()` """ _hostname = hostname _port = port _fingerprint = 'NO-FINGERPRINT' _fingerprint_type = 'SOME-KEY-TYPE' _check_policy = X2GoInteractiveAddPolicy() x2go_sshclient_instance.set_missing_host_key_policy(_check_policy) host_ok = False try: paramiko.SSHClient.connect(x2go_sshclient_instance, hostname=hostname, port=port, username='foo', password="".join([random.choice(string.letters+string.digits) for x in range(1, 20)])) except x2go_exceptions.AuthenticationException: host_ok = True x2go_sshclient_instance.logger('SSH host key verification for host [%s]:%s succeeded. Host is already known to the client\'s Paramiko/SSH sub-system.' % (_hostname, _port), loglevel=log.loglevel_NOTICE) except x2go_exceptions.SSHException as e: msg = str(e) if msg.startswith('Policy has collected host key information on '): _hostname = _check_policy.get_hostname().split(':')[0].lstrip('[').rstrip(']') _port = _check_policy.get_hostname().split(':')[1] _fingerprint = _check_policy.get_key_fingerprint_with_colons() _fingerprint_type = _check_policy.get_key_name() else: raise(e) x2go_sshclient_instance.set_missing_host_key_policy(paramiko.RejectPolicy()) except: # let any other error be handled by subsequent algorithms pass return (host_ok, _hostname, _port, _fingerprint, _fingerprint_type)