[Merge] ~ddstreet/ubuntu-dev-tools/+git/ubuntu-dev-tools:megamerge_request into ubuntu-dev-tools:master

Mattia Rizzolo mattia at mapreri.org
Sun Nov 24 13:23:34 UTC 2019



Diff comments:

> diff --git a/ubuntutools/archive.py b/ubuntutools/archive.py
> index 3bc54b3..35b14bc 100644
> --- a/ubuntutools/archive.py
> +++ b/ubuntutools/archive.py
> @@ -517,39 +719,447 @@ class UbuntuSourcePackage(SourcePackage):
>      distribution = 'ubuntu'
>  
>  
> -class UbuntuCloudArchiveSourcePackage(UbuntuSourcePackage):
> -    "Download / unpack an Ubuntu Cloud Archive source package"
> -    def __init__(self, uca_release, *args, **kwargs):
> -        super(UbuntuCloudArchiveSourcePackage, self).__init__(*args, **kwargs)
> -        self._uca_release = uca_release
> -        self.masters = ["http://ubuntu-cloud.archive.canonical.com/ubuntu/"]
> +class PersonalPackageArchiveSourcePackage(UbuntuSourcePackage):
> +    "Download / unpack an Ubuntu Personal Package Archive source package"
> +    def __init__(self, *args, **kwargs):
> +        super(PersonalPackageArchiveSourcePackage, self).__init__(*args, **kwargs)
> +        assert 'ppa' in kwargs
> +        ppa = kwargs['ppa'].split('/')
> +        if len(ppa) != 2:
> +            raise ValueError('Invalid PPA value "%s",'
> +                             'must be "<USER>/<PPA>"' % kwargs['ppa'])
> +        self._set_ppa(ppa[0], ppa[1])
> +        self.masters = []
> +
> +    def getArchive(self):
> +        if not self._ppa:
> +            try:
> +                self._team = PersonTeam.fetch(self._ppateam)
> +            except KeyError:
> +                raise ValueError('No user/team "%s" found on Launchpad' % self._ppateam)
> +            self._ppa = self._team.getPPAByName(self._ppaname)
> +            Logger.debug('Using PPA %s' % self._ppa.web_link)
> +        return self._ppa
> +
> +    def _set_ppa(self, team, name):
> +        self._ppateam = team
> +        self._ppaname = name
> +        self._team = None
> +        self._ppa = None
>  
>      def _lp_url(self, filename):
>          "Build a source package URL on Launchpad"
> -        return os.path.join('https://launchpad.net', "~ubuntu-cloud-archive",
> -                            '+archive', ("%s-staging" % self._uca_release),
> +        return os.path.join('https://launchpad.net', '~' + self._ppateam,
> +                            '+archive', self.distribution, self._ppaname,
>                              '+files', filename)
>  
>  
> -class FakeSPPH(object):
> -    """Provide the same interface as
> -    ubuntutools.lpapicache.SourcePackagePublishingHistory
> -    """
> -    def __init__(self, name, version, component, distribution):
> +class UbuntuCloudArchiveSourcePackage(PersonalPackageArchiveSourcePackage):
> +    "Download / unpack an Ubuntu Cloud Archive source package"
> +    _ppateam = 'ubuntu-cloud-archive'
> +    _ppas = None
> +
> +    def __init__(self, *args, **kwargs):
> +        series = kwargs.pop('series', None)
> +        check_all_series = series is None
> +        if not series:
> +            series = UbuntuCloudArchiveSourcePackage.getDevelSeries()
> +        kwargs['ppa'] = ('%s/%s-staging' %
> +                         (UbuntuCloudArchiveSourcePackage._ppateam, series))
> +        super(UbuntuCloudArchiveSourcePackage, self).__init__(*args, **kwargs)
> +        self._uca_release = series
> +        self._check_all_series = check_all_series
> +        self.masters = ["http://ubuntu-cloud.archive.canonical.com/ubuntu/"]
> +
> +    @classmethod
> +    def getDevelSeries(cls):
> +        return cls.ppas()[0]
> +
> +    @classmethod
> +    def ppas(cls):
> +        if not cls._ppas:
> +            ppas = PersonTeam.fetch(cls._ppateam).getPPAs().keys()
> +            ppas = filter(lambda p: p.endswith('-staging'), ppas)
> +            ppas = map(lambda p: p.rsplit('-', 1)[0], ppas)
> +            ppas = sorted(ppas, reverse=True)
> +            if not ppas:
> +                raise SeriesNotFoundException('Internal Error: No UCA series found...?')
> +            cls._ppas = ppas
> +        return list(cls._ppas)
> +
> +    @classmethod
> +    def isValidRelease(cls, release):
> +        return release in cls.ppas()
> +
> +    @property
> +    def lp_spph(self):
> +        "Return the LP Source Package Publishing History entry"
> +        while True:
> +            try:
> +                return super(UbuntuCloudArchiveSourcePackage, self).lp_spph
> +            except PackageNotFoundException as pnfe:
> +                if self._check_all_series and self._set_next_release():
> +                    continue
> +                raise pnfe
> +
> +    def _set_next_release(self):
> +        ppas = UbuntuCloudArchiveSourcePackage.ppas()
> +        try:
> +            r = ppas[ppas.index(self._uca_release) + 1]
> +        except IndexError:
> +            return False
> +        self._uca_release = r
> +        self._set_ppa(UbuntuCloudArchiveSourcePackage._ppateam, '%s-staging' % r)
> +        return True
> +
> +    def getArchive(self):
> +        try:
> +            return super(UbuntuCloudArchiveSourcePackage, self).getArchive()
> +        except ValueError:
> +            raise SeriesNotFoundException('UCA release {} not found.'.format(self._uca_release))
> +
> +
> +class _WebJSON(object):
> +    def getHostUrl(self):
> +        raise Exception("Not implemented")
> +
> +    def load(self, path=''):
> +        reader = codecs.getreader('utf-8')
> +        url = self.getHostUrl() + path
> +        Logger.debug("Loading %s" % url)
> +        with closing(urlopen(url)) as data:
> +            return json.load(reader(data))
> +
> +
> +# DAKweb madison API
> +# https://github.com/Debian/dak/blob/master/dakweb/queries/madison.py
> +# This is really only useful to easily find the latest version of a
> +# package for a specific series (or unstable).  This does not provide
> +# any details at all for older-than-latest package versions.
> +class Madison(_WebJSON):
> +    urls = {
> +        'debian': 'https://api.ftp-master.debian.org/madison',
> +        'ubuntu': 'http://people.canonical.com/~ubuntu-archive/madison.cgi',
> +    }
> +
> +    def __init__(self, distro='debian'):
> +        super(Madison, self).__init__()
> +        self._distro = distro
> +        # This currently will NOT work with ubuntu; it doesn't support f=json
> +        if distro != 'debian':
> +            raise InvalidDistroValueError("Madison currently only supports Debian")
> +
> +    def getHostUrl(self):
> +        return self.urls[self._distro]
> +
> +    def getSourcePackage(self, name, series='unstable'):
> +        url = "?f=json&package={name}&s={series}".format(name=name, series=series)
> +        try:
> +            result = self.load(url)
> +        except HTTPError:
> +            result = None
> +        if not result:
> +            msg = "Package {} not found in '{}'".format(name, series)
> +            raise PackageNotFoundException(msg)
> +        versions = list(result[0][name].values())[0]
> +        latest = versions[sorted(versions.keys(), reverse=True)[0]]
> +        return Snapshot.getSourcePackage(name=latest['source'],
> +                                         version=latest['source_version'])
> +
> +
> +# Snapshot API
> +# https://anonscm.debian.org/cgit/mirror/snapshot.debian.org.git/plain/API

please point this to a current URL

> +class _Snapshot(_WebJSON):
> +    DEBIAN_COMPONENTS = ["main", "contrib", "non-free"]
> +
> +    def getHostUrl(self):
> +        return "http://snapshot.debian.org"
> +
> +    def getComponent(self, name, version):
> +        # unfortunately there is no (easy) way to find the component for older
> +        # package versions (madison only lists the most recent versions).
> +        # so we have to parse the file path to determine the component :(
> +        url = "/mr/package/{}/{}/srcfiles".format(name, version)
> +        try:
> +            response = self.load("{}?fileinfo=1".format(url))
> +        except HTTPError:
> +            msg = "Package {} version {} not found"
> +            raise PackageNotFoundException(msg.format(name, version))
> +        result = response.get('result')
> +        info = response.get('fileinfo')
> +        if len(result) < 1:
> +            msg = "No source files for package {} version {}"
> +            raise PackageNotFoundException(msg.format(name, version))
> +        path = info[result[0]['hash']][0]['path']
> +        # this expects the 'component' to follow 'pool[-*]' in the path
> +        found_pool = False
> +        component = None
> +        for s in path.split('/'):
> +            if found_pool:
> +                component = s
> +                break
> +            if s.startswith('pool'):
> +                found_pool = True
> +        if not component:
> +            Logger.warning("could not determine component from path %s" % path)
> +            return self.DEBIAN_COMPONENTS[0]
> +        if component not in self.DEBIAN_COMPONENTS:
> +            Logger.warning("unexpected component %s" % component)
> +        return component
> +
> +    def _get_package(self, name, url, pkginit, version, sort_key):
> +        try:
> +            results = self.load("/mr/{}/{}/".format(url, name))['result']
> +        except HTTPError:
> +            raise PackageNotFoundException("Package {} not found.".format(name))
> +
> +        results = sorted(results, key=lambda r: r[sort_key], reverse=True)
> +        results = [pkginit(r) for r in results if version == r['version']]
> +        if not results:
> +            msg = "Package {name} version {version} not found."
> +            raise PackageNotFoundException(msg.format(name=name, version=version))
> +        return results
> +
> +    def getSourcePackages(self, name, version):
> +        return self._get_package(name, "package",
> +                                 lambda obj: SnapshotSourcePackage(obj, name),
> +                                 version, "version")
> +
> +    def getSourcePackage(self, name, version):
> +        return self.getSourcePackages(name, version)[0]
> +
> +    def getBinaryPackages(self, name, version):
> +        return self._get_package(name, "binary",
> +                                 lambda obj: SnapshotBinaryPackage(obj),
> +                                 version, "binary_version")
> +
> +    def getBinaryPackage(self, name, version):
> +        return self.getBinaryPackages(name, version)[0]
> +
> +
> +Snapshot = _Snapshot()
> +
> +
> +class SnapshotPackage(object):
> +    def __init__(self, obj):
> +        self._obj = obj
> +        self._files = None
> +        self._component = None
> +
> +    @property
> +    def version(self):
> +        return self._obj['version']
> +
> +    @property
> +    def component(self):
> +        if not self._component:
> +            self._component = Snapshot.getComponent(self.name, self.version)
> +        return self._component
> +
> +
> +class SnapshotSourcePackage(SnapshotPackage):
> +    def __init__(self, obj, name):
> +        # obj required fields: 'version'
> +        super(SnapshotSourcePackage, self).__init__(obj)
>          self.name = name
> -        self.version = version
> +        self._binary_files = None
> +        self._spph = None
> +
> +    def getSPPH(self):
> +        if not self._spph:
> +            self._spph = SnapshotSPPH(self)
> +        return self._spph
> +
> +    def getAllFiles(self):
> +        return self.getFiles() + self.getBinaryFiles()
> +
> +    def getBinaryFiles(self, arch=None, name=None):
> +        if not self._binary_files:
> +            url = "/mr/package/{}/{}/allfiles".format(self.name, self.version)
> +            response = Snapshot.load("{}?fileinfo=1".format(url))
> +            info = response['fileinfo']
> +            files = [SnapshotBinaryFile(b['name'], b['version'], self.component,
> +                                        info[r['hash']][0], r['hash'],
> +                                        r['architecture'], self.name)
> +                     for b in response['result']['binaries'] for r in b['files']]
> +            self._binary_files = files
> +        bins = list(self._binary_files)
> +        if arch:
> +            bins = filter(lambda b: b.isArch(arch), bins)
> +        if name:
> +            bins = filter(lambda b: re.match(name, b.name), bins)
> +        return bins
> +
> +    def getFiles(self):
> +        if not self._files:
> +            url = "/mr/package/{}/{}/srcfiles".format(self.name, self.version)
> +            response = Snapshot.load("{}?fileinfo=1".format(url))
> +            info = response['fileinfo']
> +            self._files = [SnapshotSourceFile(self.name, self.version, self.component,
> +                                              info[r['hash']][0], r['hash'])
> +                           for r in response['result']]
> +        return list(self._files)
> +
> +
> +class SnapshotBinaryPackage(SnapshotPackage):
> +    def __init__(self, obj):
> +        # obj required fields: 'version', 'binary_version', 'name', 'source'
> +        super(SnapshotBinaryPackage, self).__init__(obj)
> +
> +    @property
> +    def name(self):
> +        return self._obj['name']
> +
> +    @property
> +    def binary_version(self):
> +        return self._obj['binary_version']
> +
> +    @property
> +    def source(self):
> +        return self._obj['source']
> +
> +    def getBPPH(self, arch):
> +        f = self.getFiles(arch)
> +        if not f:
> +            return None
> +        if not arch:
> +            raise RuntimeError("Must specify arch")
> +        # Can only be 1 binary file for this pkg name/version/arch
> +        return f[0].getBPPH()
> +
> +    def getFiles(self, arch=None):
> +        if not self._files:
> +            url = "/mr/binary/{}/{}/binfiles".format(self.name, self.version)
> +            response = Snapshot.load("{}?fileinfo=1".format(url))
> +            info = response['fileinfo']
> +            self._files = [SnapshotBinaryFile(self.name, self.version, self.component,
> +                                              info[r['hash']][0], r['hash'],
> +                                              r['architecture'], self.source)
> +                           for r in response['result']]
> +        if not arch:
> +            return list(self._files)
> +        return filter(lambda f: f.isArch(arch), self._files)
> +
> +
> +class SnapshotFile(object):
> +    def __init__(self, pkg_name, pkg_version, component, obj, h):
> +        self.package_name = pkg_name
> +        self.package_version = pkg_version
>          self.component = component
> -        self.distribution = distribution
> -        self._changelog = None
> +        self._obj = obj
> +        self._hash = h
> +
> +    @property
> +    def getType(self):
> +        return None
> +
> +    @property
> +    def archive_name(self):
> +        return self._obj['archive_name']
> +
> +    @property
> +    def name(self):
> +        return self._obj['name']
> +
> +    @property
> +    def path(self):
> +        return self._obj['path']
> +
> +    @property
> +    def size(self):
> +        return self._obj['size']
> +
> +    @property
> +    def date(self):
> +        if 'run' in self._obj:
> +            return self._obj['run']
> +        elif 'first_seen' in self._obj:
> +            return self._obj['first_seen']
> +        else:
> +            Logger.error('File {} has no date information', self.name)
> +            return 'unknown'
> +
> +    def getHash(self):
> +        return self._hash
> +
> +    def getUrl(self):
> +        return "{}/file/{}".format(Snapshot.getHostUrl(), self.getHash())
> +
> +    def __repr__(self):
> +        return "{}/{} {} bytes {}".format(self.path, self.name, self.size, self.date)
> +
> +
> +class SnapshotSourceFile(SnapshotFile):
> +    def __init__(self, name, version, component, obj, h):
> +        super(SnapshotSourceFile, self).__init__(name, version, component, obj, h)
> +
> +    def getType(self):
> +        return 'source'
> +
> +
> +class SnapshotBinaryFile(SnapshotFile):
> +    def __init__(self, name, version, component, obj, h, arch, source):
> +        super(SnapshotBinaryFile, self).__init__(name, version, component, obj, h)
> +        self.source = source
> +        self.arch = arch
> +        self._bpph = None
> +
> +    def isArch(self, arch):
> +        if not arch:
> +            return True
> +        if self.arch == 'all':
> +            return True
> +        return arch == self.arch
> +
> +    def getType(self):
> +        return 'binary'
> +
> +    def getBPPH(self):
> +        if not self._bpph:
> +            self._bpph = SnapshotBPPH(self)
> +        return self._bpph
> +
> +
> +class SnapshotSPPH(object):
> +    """Provide the same interface as SourcePackagePublishingHistory"""
> +    def __init__(self, snapshot_pkg):
> +        self._pkg = snapshot_pkg
> +
> +    # LP API defined fields
> +
> +    @property
> +    def component_name(self):
> +        return self.getComponent()
> +
> +    @property
> +    def display_name(self):
> +        return ("{name} {version}"
> +                .format(name=self.getPackageName(),
> +                        version=self.getVersion()))
> +
> +    @property
> +    def pocket(self):
> +        # Debian does not use 'pockets'
> +        return 'Release'
> +
> +    @property
> +    def source_package_name(self):
> +        return self.getPackageName()
> +
> +    @property
> +    def source_package_version(self):
> +        return self.getVersion()
> +
> +    # SPPH functions
>  
>      def getPackageName(self):
> -        return self.name
> +        return self._pkg.name
>  
>      def getVersion(self):
> -        return self.version
> +        return self._pkg.version
>  
>      def getComponent(self):
> -        return self.component
> +        return self._pkg.component
>  
>      def getChangelog(self, since_version=None):
>          '''


-- 
https://code.launchpad.net/~ddstreet/ubuntu-dev-tools/+git/ubuntu-dev-tools/+merge/375292
Your team Ubuntu Development Team is subscribed to branch ubuntu-dev-tools:master.



More information about the Ubuntu-reviews mailing list