[Merge] ~ddstreet/ubuntu-dev-tools/+git/ubuntu-dev-tools:megamerge_request into ubuntu-dev-tools:master
Mattia Rizzolo
mattia at mapreri.org
Sun Nov 24 13:23:34 UTC 2019
Diff comments:
> diff --git a/ubuntutools/archive.py b/ubuntutools/archive.py
> index 3bc54b3..35b14bc 100644
> --- a/ubuntutools/archive.py
> +++ b/ubuntutools/archive.py
> @@ -517,39 +719,447 @@ class UbuntuSourcePackage(SourcePackage):
> distribution = 'ubuntu'
>
>
> -class UbuntuCloudArchiveSourcePackage(UbuntuSourcePackage):
> - "Download / unpack an Ubuntu Cloud Archive source package"
> - def __init__(self, uca_release, *args, **kwargs):
> - super(UbuntuCloudArchiveSourcePackage, self).__init__(*args, **kwargs)
> - self._uca_release = uca_release
> - self.masters = ["http://ubuntu-cloud.archive.canonical.com/ubuntu/"]
> +class PersonalPackageArchiveSourcePackage(UbuntuSourcePackage):
> + "Download / unpack an Ubuntu Personal Package Archive source package"
> + def __init__(self, *args, **kwargs):
> + super(PersonalPackageArchiveSourcePackage, self).__init__(*args, **kwargs)
> + assert 'ppa' in kwargs
> + ppa = kwargs['ppa'].split('/')
> + if len(ppa) != 2:
> + raise ValueError('Invalid PPA value "%s",'
> + 'must be "<USER>/<PPA>"' % kwargs['ppa'])
> + self._set_ppa(ppa[0], ppa[1])
> + self.masters = []
> +
> + def getArchive(self):
> + if not self._ppa:
> + try:
> + self._team = PersonTeam.fetch(self._ppateam)
> + except KeyError:
> + raise ValueError('No user/team "%s" found on Launchpad' % self._ppateam)
> + self._ppa = self._team.getPPAByName(self._ppaname)
> + Logger.debug('Using PPA %s' % self._ppa.web_link)
> + return self._ppa
> +
> + def _set_ppa(self, team, name):
> + self._ppateam = team
> + self._ppaname = name
> + self._team = None
> + self._ppa = None
>
> def _lp_url(self, filename):
> "Build a source package URL on Launchpad"
> - return os.path.join('https://launchpad.net', "~ubuntu-cloud-archive",
> - '+archive', ("%s-staging" % self._uca_release),
> + return os.path.join('https://launchpad.net', '~' + self._ppateam,
> + '+archive', self.distribution, self._ppaname,
> '+files', filename)
>
>
> -class FakeSPPH(object):
> - """Provide the same interface as
> - ubuntutools.lpapicache.SourcePackagePublishingHistory
> - """
> - def __init__(self, name, version, component, distribution):
> +class UbuntuCloudArchiveSourcePackage(PersonalPackageArchiveSourcePackage):
> + "Download / unpack an Ubuntu Cloud Archive source package"
> + _ppateam = 'ubuntu-cloud-archive'
> + _ppas = None
> +
> + def __init__(self, *args, **kwargs):
> + series = kwargs.pop('series', None)
> + check_all_series = series is None
> + if not series:
> + series = UbuntuCloudArchiveSourcePackage.getDevelSeries()
> + kwargs['ppa'] = ('%s/%s-staging' %
> + (UbuntuCloudArchiveSourcePackage._ppateam, series))
> + super(UbuntuCloudArchiveSourcePackage, self).__init__(*args, **kwargs)
> + self._uca_release = series
> + self._check_all_series = check_all_series
> + self.masters = ["http://ubuntu-cloud.archive.canonical.com/ubuntu/"]
> +
> + @classmethod
> + def getDevelSeries(cls):
> + return cls.ppas()[0]
> +
> + @classmethod
> + def ppas(cls):
> + if not cls._ppas:
> + ppas = PersonTeam.fetch(cls._ppateam).getPPAs().keys()
> + ppas = filter(lambda p: p.endswith('-staging'), ppas)
> + ppas = map(lambda p: p.rsplit('-', 1)[0], ppas)
> + ppas = sorted(ppas, reverse=True)
> + if not ppas:
> + raise SeriesNotFoundException('Internal Error: No UCA series found...?')
> + cls._ppas = ppas
> + return list(cls._ppas)
> +
> + @classmethod
> + def isValidRelease(cls, release):
> + return release in cls.ppas()
> +
> + @property
> + def lp_spph(self):
> + "Return the LP Source Package Publishing History entry"
> + while True:
> + try:
> + return super(UbuntuCloudArchiveSourcePackage, self).lp_spph
> + except PackageNotFoundException as pnfe:
> + if self._check_all_series and self._set_next_release():
> + continue
> + raise pnfe
> +
> + def _set_next_release(self):
> + ppas = UbuntuCloudArchiveSourcePackage.ppas()
> + try:
> + r = ppas[ppas.index(self._uca_release) + 1]
> + except IndexError:
> + return False
> + self._uca_release = r
> + self._set_ppa(UbuntuCloudArchiveSourcePackage._ppateam, '%s-staging' % r)
> + return True
> +
> + def getArchive(self):
> + try:
> + return super(UbuntuCloudArchiveSourcePackage, self).getArchive()
> + except ValueError:
> + raise SeriesNotFoundException('UCA release {} not found.'.format(self._uca_release))
> +
> +
> +class _WebJSON(object):
> + def getHostUrl(self):
> + raise Exception("Not implemented")
> +
> + def load(self, path=''):
> + reader = codecs.getreader('utf-8')
> + url = self.getHostUrl() + path
> + Logger.debug("Loading %s" % url)
> + with closing(urlopen(url)) as data:
> + return json.load(reader(data))
> +
> +
> +# DAKweb madison API
> +# https://github.com/Debian/dak/blob/master/dakweb/queries/madison.py
> +# This is really only useful to easily find the latest version of a
> +# package for a specific series (or unstable). This does not provide
> +# any details at all for older-than-latest package versions.
> +class Madison(_WebJSON):
> + urls = {
> + 'debian': 'https://api.ftp-master.debian.org/madison',
> + 'ubuntu': 'http://people.canonical.com/~ubuntu-archive/madison.cgi',
> + }
> +
> + def __init__(self, distro='debian'):
> + super(Madison, self).__init__()
> + self._distro = distro
> + # This currently will NOT work with ubuntu; it doesn't support f=json
> + if distro != 'debian':
> + raise InvalidDistroValueError("Madison currently only supports Debian")
> +
> + def getHostUrl(self):
> + return self.urls[self._distro]
> +
> + def getSourcePackage(self, name, series='unstable'):
> + url = "?f=json&package={name}&s={series}".format(name=name, series=series)
> + try:
> + result = self.load(url)
> + except HTTPError:
> + result = None
> + if not result:
> + msg = "Package {} not found in '{}'".format(name, series)
> + raise PackageNotFoundException(msg)
> + versions = list(result[0][name].values())[0]
> + latest = versions[sorted(versions.keys(), reverse=True)[0]]
> + return Snapshot.getSourcePackage(name=latest['source'],
> + version=latest['source_version'])
> +
> +
> +# Snapshot API
> +# https://anonscm.debian.org/cgit/mirror/snapshot.debian.org.git/plain/API
please point this to a current URL
> +class _Snapshot(_WebJSON):
> + DEBIAN_COMPONENTS = ["main", "contrib", "non-free"]
> +
> + def getHostUrl(self):
> + return "http://snapshot.debian.org"
> +
> + def getComponent(self, name, version):
> + # unfortunately there is no (easy) way to find the component for older
> + # package versions (madison only lists the most recent versions).
> + # so we have to parse the file path to determine the component :(
> + url = "/mr/package/{}/{}/srcfiles".format(name, version)
> + try:
> + response = self.load("{}?fileinfo=1".format(url))
> + except HTTPError:
> + msg = "Package {} version {} not found"
> + raise PackageNotFoundException(msg.format(name, version))
> + result = response.get('result')
> + info = response.get('fileinfo')
> + if len(result) < 1:
> + msg = "No source files for package {} version {}"
> + raise PackageNotFoundException(msg.format(name, version))
> + path = info[result[0]['hash']][0]['path']
> + # this expects the 'component' to follow 'pool[-*]' in the path
> + found_pool = False
> + component = None
> + for s in path.split('/'):
> + if found_pool:
> + component = s
> + break
> + if s.startswith('pool'):
> + found_pool = True
> + if not component:
> + Logger.warning("could not determine component from path %s" % path)
> + return self.DEBIAN_COMPONENTS[0]
> + if component not in self.DEBIAN_COMPONENTS:
> + Logger.warning("unexpected component %s" % component)
> + return component
> +
> + def _get_package(self, name, url, pkginit, version, sort_key):
> + try:
> + results = self.load("/mr/{}/{}/".format(url, name))['result']
> + except HTTPError:
> + raise PackageNotFoundException("Package {} not found.".format(name))
> +
> + results = sorted(results, key=lambda r: r[sort_key], reverse=True)
> + results = [pkginit(r) for r in results if version == r['version']]
> + if not results:
> + msg = "Package {name} version {version} not found."
> + raise PackageNotFoundException(msg.format(name=name, version=version))
> + return results
> +
> + def getSourcePackages(self, name, version):
> + return self._get_package(name, "package",
> + lambda obj: SnapshotSourcePackage(obj, name),
> + version, "version")
> +
> + def getSourcePackage(self, name, version):
> + return self.getSourcePackages(name, version)[0]
> +
> + def getBinaryPackages(self, name, version):
> + return self._get_package(name, "binary",
> + lambda obj: SnapshotBinaryPackage(obj),
> + version, "binary_version")
> +
> + def getBinaryPackage(self, name, version):
> + return self.getBinaryPackages(name, version)[0]
> +
> +
> +Snapshot = _Snapshot()
> +
> +
> +class SnapshotPackage(object):
> + def __init__(self, obj):
> + self._obj = obj
> + self._files = None
> + self._component = None
> +
> + @property
> + def version(self):
> + return self._obj['version']
> +
> + @property
> + def component(self):
> + if not self._component:
> + self._component = Snapshot.getComponent(self.name, self.version)
> + return self._component
> +
> +
> +class SnapshotSourcePackage(SnapshotPackage):
> + def __init__(self, obj, name):
> + # obj required fields: 'version'
> + super(SnapshotSourcePackage, self).__init__(obj)
> self.name = name
> - self.version = version
> + self._binary_files = None
> + self._spph = None
> +
> + def getSPPH(self):
> + if not self._spph:
> + self._spph = SnapshotSPPH(self)
> + return self._spph
> +
> + def getAllFiles(self):
> + return self.getFiles() + self.getBinaryFiles()
> +
> + def getBinaryFiles(self, arch=None, name=None):
> + if not self._binary_files:
> + url = "/mr/package/{}/{}/allfiles".format(self.name, self.version)
> + response = Snapshot.load("{}?fileinfo=1".format(url))
> + info = response['fileinfo']
> + files = [SnapshotBinaryFile(b['name'], b['version'], self.component,
> + info[r['hash']][0], r['hash'],
> + r['architecture'], self.name)
> + for b in response['result']['binaries'] for r in b['files']]
> + self._binary_files = files
> + bins = list(self._binary_files)
> + if arch:
> + bins = filter(lambda b: b.isArch(arch), bins)
> + if name:
> + bins = filter(lambda b: re.match(name, b.name), bins)
> + return bins
> +
> + def getFiles(self):
> + if not self._files:
> + url = "/mr/package/{}/{}/srcfiles".format(self.name, self.version)
> + response = Snapshot.load("{}?fileinfo=1".format(url))
> + info = response['fileinfo']
> + self._files = [SnapshotSourceFile(self.name, self.version, self.component,
> + info[r['hash']][0], r['hash'])
> + for r in response['result']]
> + return list(self._files)
> +
> +
> +class SnapshotBinaryPackage(SnapshotPackage):
> + def __init__(self, obj):
> + # obj required fields: 'version', 'binary_version', 'name', 'source'
> + super(SnapshotBinaryPackage, self).__init__(obj)
> +
> + @property
> + def name(self):
> + return self._obj['name']
> +
> + @property
> + def binary_version(self):
> + return self._obj['binary_version']
> +
> + @property
> + def source(self):
> + return self._obj['source']
> +
> + def getBPPH(self, arch):
> + f = self.getFiles(arch)
> + if not f:
> + return None
> + if not arch:
> + raise RuntimeError("Must specify arch")
> + # Can only be 1 binary file for this pkg name/version/arch
> + return f[0].getBPPH()
> +
> + def getFiles(self, arch=None):
> + if not self._files:
> + url = "/mr/binary/{}/{}/binfiles".format(self.name, self.version)
> + response = Snapshot.load("{}?fileinfo=1".format(url))
> + info = response['fileinfo']
> + self._files = [SnapshotBinaryFile(self.name, self.version, self.component,
> + info[r['hash']][0], r['hash'],
> + r['architecture'], self.source)
> + for r in response['result']]
> + if not arch:
> + return list(self._files)
> + return filter(lambda f: f.isArch(arch), self._files)
> +
> +
> +class SnapshotFile(object):
> + def __init__(self, pkg_name, pkg_version, component, obj, h):
> + self.package_name = pkg_name
> + self.package_version = pkg_version
> self.component = component
> - self.distribution = distribution
> - self._changelog = None
> + self._obj = obj
> + self._hash = h
> +
> + @property
> + def getType(self):
> + return None
> +
> + @property
> + def archive_name(self):
> + return self._obj['archive_name']
> +
> + @property
> + def name(self):
> + return self._obj['name']
> +
> + @property
> + def path(self):
> + return self._obj['path']
> +
> + @property
> + def size(self):
> + return self._obj['size']
> +
> + @property
> + def date(self):
> + if 'run' in self._obj:
> + return self._obj['run']
> + elif 'first_seen' in self._obj:
> + return self._obj['first_seen']
> + else:
> + Logger.error('File {} has no date information', self.name)
> + return 'unknown'
> +
> + def getHash(self):
> + return self._hash
> +
> + def getUrl(self):
> + return "{}/file/{}".format(Snapshot.getHostUrl(), self.getHash())
> +
> + def __repr__(self):
> + return "{}/{} {} bytes {}".format(self.path, self.name, self.size, self.date)
> +
> +
> +class SnapshotSourceFile(SnapshotFile):
> + def __init__(self, name, version, component, obj, h):
> + super(SnapshotSourceFile, self).__init__(name, version, component, obj, h)
> +
> + def getType(self):
> + return 'source'
> +
> +
> +class SnapshotBinaryFile(SnapshotFile):
> + def __init__(self, name, version, component, obj, h, arch, source):
> + super(SnapshotBinaryFile, self).__init__(name, version, component, obj, h)
> + self.source = source
> + self.arch = arch
> + self._bpph = None
> +
> + def isArch(self, arch):
> + if not arch:
> + return True
> + if self.arch == 'all':
> + return True
> + return arch == self.arch
> +
> + def getType(self):
> + return 'binary'
> +
> + def getBPPH(self):
> + if not self._bpph:
> + self._bpph = SnapshotBPPH(self)
> + return self._bpph
> +
> +
> +class SnapshotSPPH(object):
> + """Provide the same interface as SourcePackagePublishingHistory"""
> + def __init__(self, snapshot_pkg):
> + self._pkg = snapshot_pkg
> +
> + # LP API defined fields
> +
> + @property
> + def component_name(self):
> + return self.getComponent()
> +
> + @property
> + def display_name(self):
> + return ("{name} {version}"
> + .format(name=self.getPackageName(),
> + version=self.getVersion()))
> +
> + @property
> + def pocket(self):
> + # Debian does not use 'pockets'
> + return 'Release'
> +
> + @property
> + def source_package_name(self):
> + return self.getPackageName()
> +
> + @property
> + def source_package_version(self):
> + return self.getVersion()
> +
> + # SPPH functions
>
> def getPackageName(self):
> - return self.name
> + return self._pkg.name
>
> def getVersion(self):
> - return self.version
> + return self._pkg.version
>
> def getComponent(self):
> - return self.component
> + return self._pkg.component
>
> def getChangelog(self, since_version=None):
> '''
--
https://code.launchpad.net/~ddstreet/ubuntu-dev-tools/+git/ubuntu-dev-tools/+merge/375292
Your team Ubuntu Development Team is subscribed to branch ubuntu-dev-tools:master.
More information about the Ubuntu-reviews
mailing list