[Merge] ~ubuntu-support-team/software-properties:devel into software-properties:ubuntu/master
Julian Andres Klode
julian.klode at canonical.com
Wed May 20 08:24:28 UTC 2020
Diff comments:
> diff --git a/softwareproperties/shortcuthandler.py b/softwareproperties/shortcuthandler.py
> new file mode 100644
> index 0000000..070b879
> --- /dev/null
> +++ b/softwareproperties/shortcuthandler.py
> @@ -0,0 +1,633 @@
> +# Copyright (c) 2019 Canonical Ltd.
> +#
> +# This program is free software; you can redistribute it and/or
> +# modify it under the terms of the GNU General Public License as
> +# published by the Free Software Foundation; either version 2 of the
> +# License, or (at your option) any later version.
> +#
> +# This program is distributed in the hope that it will be useful,
> +# but WITHOUT ANY WARRANTY; without even the implied warranty of
> +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
> +# GNU General Public License for more details.
> +#
> +# You should have received a copy of the GNU General Public License
> +# along with this program; if not, write to the Free Software
> +# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
> +# USA
> +
> +import os
> +import re
> +import apt_pkg
> +import subprocess
> +import tempfile
> +
> +from aptsources.distro import get_distro
> +from aptsources.sourceslist import (SourceEntry, SourcesList,
> + CollapsedSourcesList)
> +
> +from contextlib import suppress
> +
> +from copy import copy
> +
> +from gettext import gettext as _
> +
> +from urllib.parse import urlparse
> +
> +
> +GPG_KEYRING_CMD = 'gpg -q --no-options --no-default-keyring --batch --keyring %s'
trusted.gpg.d files are not gpg keyrings, and hence should not be used like that. Files inside trusted.gpg.d should be considered read-only files that contain binary blobs (.gpg) or ASCII encoded blobs (.asc). Manipulating them goes against the very basic idea behind trusted.gpg.d itself.
> +
> +class ShortcutHandler(object):
> + '''Superclass for shortcut handler implementations.
> +
> + This provides a way to take a apt repository reference, in various forms,
> + and write the specific apt configuration to local files. This also can
> + remove previously written configuration from local files.
> +
> + This class and any subclasses should never modify any main apt configuration
> + files, only specifically named files in '.d' subdirs (e.g. sources.list.d, etc)
> + should be modified. The only exception to that rule is adding or removing
> + sourceslist lines or components of existing source entries.
> + '''
> + def __init__(self, shortcut, components=None, enable_source=False, codename=None, pocket=None, dry_run=False, **kwargs):
> + self.shortcut = shortcut
> + self.components = components or []
> + self.enable_source = enable_source
> + self.distro = get_distro()
> + self.codename = codename or self.distro.codename
> + self.pocket = pocket
> + self.dry_run = dry_run
> +
> + # Subclasses should not directly reference _source_entry,
> + # use _set_source_entry() and SourceEntry()
> + self._source_entry = None
> +
> + # Subclasses should directly set these fields, if appropriate
> + self._filebase = None
> + self._username = None
> + self._password = None
> +
> + @classmethod
> + def is_valid_uri(cls, uri):
> + '''Return if the uri is in valid uri format'''
> + parsed = urlparse(uri)
> + return parsed.scheme and parsed.netloc
> +
> + @classmethod
> + def uri_strip_auth(cls, uri):
> + '''Return the uri with the username and password stripped'''
> + parsed = urlparse(uri)
> + # urlparse doesn't have any great way to simply remove the auth data,
> + # so let's just strip everything to the left of '@'
> + return parsed._replace(netloc=parsed.netloc.rpartition('@')[2]).geturl()
> +
> + @classmethod
> + def uri_insert_auth(cls, uri, username, password):
> + '''Return the uri with the username and password included'''
> + parsed = urlparse(cls.uri_strip_auth(uri))
> + netloc='%s:%s@%s' % (username, password, parsed.netloc)
> + return parsed._replace(netloc=netloc).geturl()
> +
> + @classmethod
> + def fingerprints(cls, keys):
> + '''Return an array of fingerprint(s) for provided key(s).
> +
> + The 'keys' parameter should be in text (str) or binary (bytes) format;
> + it is converted to bytes if needed, and then passed to the 'gpg' program.
> + '''
> + cmd = 'gpg -q --no-options --no-keyring --batch --with-colons'
> + # yes, --with-fingerprint twice, to print subkey fingerprints
> + cmd += ' --with-fingerprint' * 2
> + try:
> + with tempfile.TemporaryDirectory() as homedir:
> + cmd += f' --homedir {homedir}'
> + if not isinstance(keys, bytes):
> + keys = keys.encode()
> + stdout = subprocess.run(cmd.split(), check=True, input=keys,
> + stdout=subprocess.PIPE).stdout.decode()
> + except subprocess.CalledProcessError as e:
> + print(_("Warning: gpg error while processing keys:\n%s") % e)
> + return []
> +
> + try:
> + # gpg --with-colons fpr field puts fingerprint into (1-based) field 10
> + return [l.split(':')[9] for l in stdout.splitlines() if l.startswith('fpr')]
> + except KeyError:
> + print(_("Warning: invalid gpg output:\n%s") % stdout)
> + return []
> +
> + @property
> + def description(self):
> + return (_("Archive for codename: %s components: %s" %
> + (self.SourceEntry().dist,
> + ','.join(self.SourceEntry().comps))))
> +
> + @property
> + def web_link(self):
> + return self.archive_link
> +
> + @property
> + def archive_link(self):
> + return self.SourceEntry().uri
> +
> + @property
> + def dist(self):
> + if self.pocket:
> + return '%s-%s' % (self.codename, self.pocket)
> + return self.codename
> +
> + @property
> + def binary_type(self):
> + '''Text indicating a binary-type SourceEntry.'''
> + return self.distro.binary_type
> +
> + @property
> + def source_type(self):
> + '''Text indicating a source-type SourceEntry.'''
> + return self.distro.source_type
> +
> + def SourceEntry(self, pkgtype=None):
> + '''Get the SourceEntry representing this archive/shortcut.
> +
> + This should never include any authentication data; if required,
> + the username and password should only be available from the
> + username and password properties, as well as from the
> + netrcparts_content property.
> +
> + If pkgtype is provided, it must be either binary_type or source_type,
> + in which case this returns a SourceEntry with the requested type.
> + If pkgtype is not specified, this returns a SourceEntry with an
> + implementation-dependent type (in most cases, implementations should
> + default to binary_type).
> +
> + Note that the default SourceEntry will be returned without modification,
> + and the implementation will determine if it is enabled or disabled;
> + while the source-type SourceEntry will be enabled or disabled based on
> + self.enable_source. The binary-type SourceEntry will always be enabled.
> +
> + The SourceEntry 'file' field should always be set to the value of
> + sourceparts_file.
> + '''
> + if not self._source_entry:
> + raise NotImplementedError('Implementation class did not set self._source_entry')
> + e = copy(self._source_entry)
> + if not pkgtype:
> + return e
> + if pkgtype == self.binary_type:
> + e.set_enabled(True)
> + e.type = self.binary_type
> + elif pkgtype == self.source_type:
> + e.set_enabled(self.enable_source)
> + e.type = self.source_type
> + else:
> + raise ValueError('Invalid pkgtype: %s' % pkgtype)
> + return SourceEntry(str(e), file=e.file)
> +
> + @property
> + def username(self):
> + '''Return the username used for authentication
> +
> + If authentication is used, return the username; otherwise return None.
> +
> + By default, this returns the private variable self._username, which
> + defaults to None. Subclasses should override this method and/or
> + set self._username if they have authentication data.
> + '''
> + return self._username
> +
> + @property
> + def password(self):
> + '''Return the password used for authentication
> +
> + If authentication is used, return the password; otherwise return None.
> +
> + By default, this returns the private variable self._password, which
> + defaults to None. Subclasses should override this method and/or
> + set self._password if they have authentication data.
> + '''
> + return self._password
> +
> + def add(self):
> + '''Save all data for this shortcut to file(s).
> +
> + This writes everything to the relevant files. By default, it
> + calls add_source(), add_key(), and add_login(). Subclasses
> + should override it if other actions are required.
> + '''
> + self.add_source()
> + self.add_key()
> + self.add_login()
> +
> + def remove(self):
> + '''Remove all data for this shortcut from file(s).
> +
> + This removes everything from the relevant files. By default, it
> + only calls remove_source() and remove_login(). Subclasses
> + should override it if other actions are required. Note that by
> + default is does not call remove_key().
> + '''
> + self.remove_source()
> + self.remove_login()
> +
> + def add_source(self):
> + '''Add the apt SourceEntries.
> +
> + This uses SourcesList to add the binary-type and source-type
> + SourceEntries.
> +
> + If the SourceEntry matches a known apt template, this will ignore
> + the sourceparts_file and instead place the SourceEntries into
> + the main/default sources.list file. Otherwise, this will add
> + the SourceEntries into the sourceparts_file.
> +
> + If either the binary-type or source-type entry exist in the current
> + SourcesList, the existing entries are updated instead of placing
> + the entries in the sourceparts_file.
> + '''
> + binentry = self.SourceEntry(self.binary_type)
> + srcentry = self.SourceEntry(self.source_type)
> + mode = self.sourceparts_mode
> +
> + sourceslist = SourcesList()
> + collapsedlist = CollapsedSourcesList(sourceslist)
> +
> + newentry = collapsedlist.get_entry(binentry)
> + if newentry:
> + print(_("Found existing %s entry in %s") % (newentry.type, newentry.file))
> + else:
> + newentry = collapsedlist.add_entry(binentry)
> +
> + if binentry.file != newentry.file:
> + # existing binentry, but not in file we were expecting, just update it
> + print(_("Updating existing entry instead of using %s") % binentry.file)
> + elif newentry.template:
> + # our SourceEntry matches a template; use default sources.list file
> + newentry.file = SourceEntry('').file
> + print(_("Archive has template, updating %s") % newentry.file)
> + elif binentry.disabled:
> + print(_("Adding disabled %s entry to %s") % (newentry.type, newentry.file))
> + else:
> + print(_("Adding %s entry to %s") % (newentry.type, newentry.file))
> +
> + binentry = newentry
> +
> + # Unless it already exists somewhere, add the srcentry right after the binentry
> + srcentry.file = binentry.file
> +
> + newentry = collapsedlist.get_entry(srcentry)
> + if newentry:
> + print(_("Found existing %s entry in %s") % (newentry.type, newentry.file))
> + else:
> + newentry = collapsedlist.add_entry(srcentry, after=binentry)
> +
> + if srcentry.file != newentry.file:
> + # existing srcentry, but not in file we were expecting, just update it
> + print(_("Updating existing entry instead of using %s") % srcentry.file)
> + elif srcentry.disabled:
> + print(_("Adding disabled %s entry to %s") % (newentry.type, newentry.file))
> + else:
> + print(_("Adding %s entry to %s") % (newentry.type, newentry.file))
> +
> + srcentry = newentry
> +
> + if not self.dry_run:
> + # If the file doesn't exist, create it so we can set the mode
> + for entryfile in set([binentry.file, srcentry.file]):
> + if not os.path.exists(entryfile):
> + with open(entryfile, 'w'):
> + os.chmod(entryfile, mode)
> + sourceslist.save()
> +
> + def remove_source(self):
> + '''Remove the apt SourceEntries.
> +
> + This uses SourcesList to remove the binary-type and source-type
> + SourceEntries.
> +
> + This must disable the corresponding SourceEntries, from whatever file(s)
> + they are located in. This must not disable more than matches, e.g.
> + if the existing SourceEntry line contains more components this must
> + edit the existing line to remove this SourceEntry's component(s).
> +
> + After disabling all matching SourceEntries, if the sourceparts_file is
> + empty or contains only invalid and/or disabled SourceEntries, this
> + may remove the sourceparts_file.
> + '''
> + sourceslist = SourcesList()
> + collapsedlist = CollapsedSourcesList(sourceslist)
> +
> + binentry = self.SourceEntry(self.binary_type)
> + srcentry = self.SourceEntry(self.source_type)
> +
> + # Disable the entries
> + binentry.set_enabled(True)
> + if collapsedlist.has_entry(binentry):
> + print(_("Disabling %s entry in %s") % (binentry.type, binentry.file))
> + collapsedlist.add_entry(binentry._replace(disabled=True))
> + srcentry.set_enabled(True)
> + if collapsedlist.has_entry(srcentry):
> + print(_("Disabling %s entry in %s") % (srcentry.type, srcentry.file))
> + collapsedlist.add_entry(srcentry._replace(disabled=True))
> +
> + file_entries = [s for s in sourceslist if s.file == self.sourceparts_file]
> + if not [e for e in file_entries if not e.invalid and not e.disabled]:
> + # no more valid/enabled entries in our file, remove them
> + for e in file_entries:
> + if not e.invalid:
> + print(_("Removing disabled %s entry from %s") % (e.type, e.file))
> + sourceslist.remove(e)
> +
> + if not self.dry_run:
> + sourceslist.save(remove=True)
> +
> + @property
> + def sourceparts_path(self):
> + '''Return result of apt_pkg.config.find_dir("Dir::Etc::sourceparts")'''
> + return apt_pkg.config.find_dir("Dir::Etc::sourceparts")
> +
> + @property
> + def sourceparts_filename(self):
> + '''Get the sources.list.d filename, without the leading path.
> +
> + By default, this combines the filebase with the codename, and uses a
> + extension of 'list'. This is different than the trustedparts or
> + netrcparts filenames, which use only the filebase plus extension.
> + '''
> + return self._filebase_to_filename('list', suffix=self.codename)
> +
> + @property
> + def sourceparts_file(self):
> + '''Get the sources.list.d absolute-path filename.
> +
> + Note that the add_source() function will not use this file if this shortcut's
> + SourceEntry matches a known apt template; instead the entries will be placed
> + in the main sources.list file. Also, if the SourceEntry already exists in
> + the SourcesList, it will be edited in place, instead of using this file.
> + See add_source() for more details.
> + '''
> + return self._filename_to_file(self.sourceparts_path, self.sourceparts_filename)
> +
> + @property
> + def sourceparts_mode(self):
> + '''Mode of sourceparts file.
> +
> + Note that add_source() will only use this mode if it creates a new file
> + for sourceparts_file; if the file already exists or if the SourceEntry is
> + saved in a different file, this mode is not used.
> + '''
> + return 0o644
> +
> + def add_key(self):
> + '''Add the GPG key(s) corresponding to this repo.
> +
> + By default, if self.trustedparts_content contains content,
> + and self.trustedparts_file points to a file, the key(s) will
> + be added to the file.
> +
> + If the file does not yet exist, and self.trustedparts_mode is set,
> + the file will be created with that mode.
> + '''
> + if not all((self.trustedparts_file, self.trustedparts_content)):
> + return
> +
> + dest = self.trustedparts_file
> + keys = self.trustedparts_content
> + if not isinstance(keys, bytes):
> + keys = keys.encode()
> + fp = self.fingerprints(keys)
> +
> + print(_("Adding key to %s with fingerprint %s") % (dest, ','.join(fp)))
> +
> + cmd = GPG_KEYRING_CMD % dest
> + action = "--import"
> + if not self.dry_run:
> + if not os.path.exists(dest) and self.trustedparts_mode:
> + with open(dest, 'wb'):
> + os.chmod(dest, self.trustedparts_mode)
> + try:
> + with tempfile.TemporaryDirectory() as homedir:
> + cmd += f" --homedir {homedir} {action}"
> + subprocess.run(cmd.split(), check=True, input=keys)
> + except subprocess.CalledProcessError as e:
> + raise ShortcutException(e)
> +
> + def remove_key(self):
> + '''Remove the GPG key(s) corresponding to this repo.
> +
> + By default, if self.trustedparts_content contains content,
> + and self.trustedparts_file points to a file, the key(s) will
> + be removed from the file.
> +
> + If the file contains no more keys after removal, the file will
> + be removed.
> +
> + This does not consider other files; multiple repositories may
> + use the same signing key. This only modifies/removes
> + self.trustedparts_file.
> + '''
> + if not all((self.trustedparts_file, self.trustedparts_content)):
> + return
> +
> + dest = self.trustedparts_file
> + fp = self.fingerprints(self.trustedparts_content)
> +
> + if not os.path.exists(dest):
> + return
> +
> + print(_("Removing key from %s with fingerprint %s") % (dest, ','.join(fp)))
> +
> + cmd = GPG_KEYRING_CMD % dest
> + action = "--delete-keys %s" % ' '.join(fp)
> + if not self.dry_run:
> + try:
> + with tempfile.TemporaryDirectory() as homedir:
> + cmd += f" --homedir {homedir} {action}"
> + subprocess.run(cmd.split(), check=True)
> + except subprocess.CalledProcessError as e:
> + raise ShortcutException(e)
> +
> + with open(dest, 'rb') as f:
> + empty = not self.fingerprints(f.read())
> + if empty:
> + os.remove(dest)
> +
> + @property
> + def trustedparts_path(self):
> + '''Return result of apt_pkg.config.find_dir("Dir::Etc::trustedparts")'''
> + return apt_pkg.config.find_dir("Dir::Etc::trustedparts")
> +
> + @property
> + def trustedparts_filename(self):
> + '''Get the trusted.gpg.d filename, without the leading path.'''
> + return self._filebase_to_filename('gpg')
> +
> + @property
> + def trustedparts_file(self):
> + '''Get the trusted.gpg.d absolute-path filename.'''
> + return self._filename_to_file(self.trustedparts_path, self.trustedparts_filename)
> +
> + @property
> + def trustedparts_content(self):
> + '''Content to put into trusted.gpg.d file'''
> + return None
> +
> + @property
> + def trustedparts_mode(self):
> + '''Mode of trustedparts file'''
> + return 0o644
> +
> + def add_login(self):
> + '''Add the login credentials corresponding to this repo.
> +
> + By default, if self.netrcparts_content contains content,
> + and self.netrcparts_file points to a file, the file will be
> + created and content placed into it.
> + '''
> + if not all((self.netrcparts_file, self.netrcparts_content)):
> + return
> +
> + dest = self.netrcparts_file
> + content = self.netrcparts_content
> +
> + newfile = not os.path.exists(dest)
> + finalchar = '\n'
> + if not newfile:
> + with open(dest, 'r') as f:
> + lines = [l.strip() for l in f.readlines()]
> + with suppress(KeyError):
> + finalchar = lines[-1][-1]
> + if all([l.strip() in lines for l in content.splitlines()]):
> + print(_("Authentication data already in %s") % dest)
> + return
> +
> + print(_("Adding authentication data to %s") % dest)
> + if not self.dry_run:
> + if newfile and self.netrcparts_mode:
> + with open(dest, 'w'):
> + os.chmod(dest, self.netrcparts_mode)
> + with open(dest, 'a') as f:
> + # we're appending; if the file doesn't end in \n, throw one in
> + if finalchar != '\n':
> + f.write('\n')
> + f.write(self.netrcparts_content)
> +
> + def remove_login(self):
> + '''Remove the login credentials corresponding to this repo.
> +
> + By default, if self.netrcparts_content contains content,
> + and self.netrcparts_file points to a file, the content will
> + be removed from the file.
> +
> + If the file is empty (other than whitespace) after removal, the file
> + will be removed.
> +
> + This does not consider other files; this only modifies/removes
> + self.netrcparts_file.
> + '''
> + if not all((self.netrcparts_file, self.netrcparts_content)):
> + return
> +
> + dest = self.netrcparts_file
> + content = set([l.strip() for l in self.netrcparts_content.splitlines()])
> +
> + if not os.path.exists(dest):
> + return
> +
> + with open(dest, 'r') as f:
> + filecontent = set([l.strip() for l in f.readlines()])
> + if not filecontent & content:
> + print(_("Authentication data not contained in %s") % dest)
> + else:
> + print(_("Removing authentication data from %s") % dest)
> + if not self.dry_run:
> + with open(dest, 'w') as f:
> + f.write('\n'.join(filecontent - content))
> +
> + if not self.dry_run:
> + with open(dest, 'r') as f:
> + empty = not f.read().strip()
> + if empty:
> + os.remove(dest)
> +
> + @property
> + def netrcparts_path(self):
> + '''Return result of apt_pkg.config.find_dir("Dir::Etc::netrcparts")'''
> + return apt_pkg.config.find_dir("Dir::Etc::netrcparts")
> +
> + @property
> + def netrcparts_filename(self):
> + '''Get the auth.conf.d filename, without the leading path.'''
> + return self._filebase_to_filename('conf')
> +
> + @property
> + def netrcparts_file(self):
> + '''Get the auth.conf.d absolute-path filename.'''
> + return self._filename_to_file(self.netrcparts_path, self.netrcparts_filename)
> +
> + @property
> + def netrcparts_content(self):
> + '''Content to put into auth.conf.d file
> +
> + By default, if both username and password are set, this will return a proper
> + netrc-formatted line with the authentication information, including the
> + hostname and path.
> + '''
> + if not all((self.username, self.password)):
> + return None
> +
> + hostname = urlparse(self.SourceEntry().uri).hostname
> + path = urlparse(self.SourceEntry().uri).path
> + return f'machine {hostname}{path} login {self.username} password {self.password}'
> +
> + @property
> + def netrcparts_mode(self):
> + '''Mode of netrcparts file'''
> + return 0o600
> +
> + def _set_source_entry(self, line):
> + '''Set the SourceEntry.
> +
> + This should be called from subclasses to set the SourceEntry.
> + The SourceEntry file will be set to the sourceparts_file value.
> +
> + The self.components, if any, will be added to the line's component(s).
> + '''
> + e = SourceEntry(line)
> + e.comps = list(set(e.comps) | set(self.components))
> + self._source_entry = SourceEntry(str(e), file=self.sourceparts_file)
> +
> + def _encode_filebase(self, suffix=None):
> + base = self._filebase
> + if not base:
> + return None
> + if suffix:
> + base += '-%s' % suffix
> + return re.sub("[^a-z0-9_-]+", "_", base.lower())
> +
> + def _filebase_to_filename(self, ext, suffix=None):
> + base = self._encode_filebase(suffix=suffix)
> + if not base:
> + return None
> + return '%s.%s' % (base, ext)
> +
> + def _filename_to_file(self, path, name):
> + if not name:
> + return None
> + return os.path.join(path, name)
> +
> +
> +class ShortcutException(Exception):
> + '''General Exception during shortcut processing.'''
> + pass
> +
> +
> +class InvalidShortcutException(ShortcutException):
> + '''Invalid shortcut.
> +
> + This should only be thrown from the constructor of a ShortcutHandler
> + subclass, and only to indicate that the provided shortcut is invalid
> + for that ShortcutHandler class.
> + '''
> + pass
> +
> +
> +# vi: ts=4 expandtab
--
https://code.launchpad.net/~ubuntu-support-team/software-properties/+git/software-properties/+merge/379928
Your team Ubuntu Core Development Team is requested to review the proposed merge of ~ubuntu-support-team/software-properties:devel into software-properties:ubuntu/master.
More information about the Ubuntu-reviews
mailing list