[Secure-testing-commits] r14640 - in lib/python: . sectracker sectracker_test

Florian Weimer fw at alioth.debian.org
Sat May 8 10:20:31 UTC 2010


Author: fw
Date: 2010-05-08 10:20:31 +0000 (Sat, 08 May 2010)
New Revision: 14640

Added:
   lib/python/sectracker/repo.py
   lib/python/sectracker_test/test_repo.py
Removed:
   lib/python/repo.py
Log:
sectracker.repo: rename from repo


Deleted: lib/python/repo.py
===================================================================
--- lib/python/repo.py	2010-05-08 10:14:00 UTC (rev 14639)
+++ lib/python/repo.py	2010-05-08 10:20:31 UTC (rev 14640)
@@ -1,274 +0,0 @@
-# repo.py -- mirror Debian repository metadata
-# Copyright (C) 2010 Florian Weimer <fw at deneb.enyo.de>
-# 
-# This program is free software; you can redistribute it and/or modify
-# it under the terms of the GNU General Public License as published by
-# the Free Software Foundation; either version 2 of the License, or
-# (at your option) any later version.
-# 
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
-# GNU General Public License for more details.
-# 
-# You should have received a copy of the GNU General Public License
-# along with this program; if not, write to the Free Software
-# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
-
-from __future__ import with_statement
-
-import bz2
-import hashlib
-import gzip
-import os
-import re
-import tempfile
-import urllib
-
-import xpickle
-import debian_support
-import parsers
-
-MARKER_NAME = "DEBIAN_REPO_MIRROR"
-
-_re_name = re.compile(r'^[a-z0-9-]+$')
-_re_hashentry = re.compile('^\s*([0-9a-fA-F]{20,})\s+(\d+)\s+(\S+)$')
-
-def _splitfield(data, field):
-    tup = tuple(data[field].strip().split())
-    if tup == ():
-        data[field] = ('',)
-    else:
-        data[field] = tup
-
-def _splithashes(path, data, field):
-    result = {}
-    for line in data[field].split('\n'):
-        if line == "":
-            continue
-        match = _re_hashentry.match(line)
-        if match is None:
-            raise ValueError("invalid line in %r: %r" % (path, line))
-        digest, size, name = match.groups()
-        result[name] = digest
-    data[field] = result
-
-def parserelease(path, f):
-    data = {}
-    for p in debian_support.PackageFile(path, f):
-        for k, v in p:
-            data[k.lower()] = v
-        break # file contains only one record
-    _splitfield(data, "components")
-    _splitfield(data, "architectures")
-    _splithashes(path, data, "md5sum")
-    _splithashes(path, data, "sha1")
-    _splithashes(path, data, "sha256")
-    return data
-
-def unbzip2hash(src, dst):
-    dec = bz2.BZ2Decompressor()
-    digest = hashlib.sha256()
-    while True:
-        data = src.read(8192)
-        if data == '':
-            break
-        data = dec.decompress(data)
-        dst.write(data)
-        digest.update(data)
-    return digest.hexdigest()
-
-def downloadbz2(url, target, expecteddigest):
-    try:
-        bz2src = urllib.urlopen(url)
-        try:
-            dgst = xpickle.replacefile(target,
-                                       lambda fname, f: unbzip2hash(bz2src, f))
-            if dgst == expecteddigest:
-                return True
-            return False
-        finally:
-            bz2src.close()
-    except IOError:
-        return False
-
-def downloadgz(url, target, expecteddigest):
-    with tempfile.NamedTemporaryFile() as t:
-        try:
-            (filename, headers) = urllib.urlretrieve(url, t.name)
-        except IOError:
-            return False
-        gfile = gzip.GzipFile(t.name)
-        try:
-            def copy(fname, f):
-                digest = hashlib.sha256()
-                while True:
-                    data = gfile.read(8192)
-                    if data == "":
-                        break
-                    f.write(data)
-                    digest.update(data)
-                if digest.hexdigest() == expecteddigest:
-                    return True
-                return False
-            return xpickle.replacefile(target, copy)
-        finally:
-            gfile.close()
-    return True
-
-class RepoCollection:
-    def __init__(self, root):
-        """Creates a new repository mirror.
-        
-        root: path in the local file system"""
-        self.root = root
-        self.repos = {}
-        self.used = ()
-        self.releases = None
-        self.verbose = False
-
-        if not os.path.exists(root):
-            os.makedirs(root)
-        l = os.listdir(root)
-        if len(l) == 0:
-            file(root + "/" + MARKER_NAME, "w").close()
-        elif MARKER_NAME not in l:
-            raise ValueError("not a Debian repository mirror directory: "
-                             + repr(root))
-
-    def add(self, name, url):
-        """Adds a repository, given its name and the root URL"""
-        if _re_name.match(name) is None:
-            raise ValueError("invalid repository name: " + repr(name))
-        if name in self.repos:
-            raise ValueError("repository already registered: " + repr(name))
-        if url[-1:] != '/':
-            url += '/'
-        self.repos[name] = url
-
-    def update(self):
-        self._initused()
-        for (name, url) in self.repos.items():
-            if not self._updatelrelease(name):
-                continue
-            if not self.hasrelease(name):
-                continue
-            rel = self.release(name)
-            hashes = rel["sha256"]
-            for comp in rel["components"]:
-                for arch in rel["architectures"]:
-                    plainpath = self._plainpath(comp, arch)
-                    plainurl = url + plainpath
-                    if not plainpath in hashes:
-                        self.warn("not downloaded because uncompressed version not present in Release file: " + plainurl)
-                        continue
-                    uncompressed_digest = hashes[plainpath]
-                    listname = self._listname(uncompressed_digest)
-                    if os.path.exists(listname):
-                        continue
-                    success = False
-                    for suffix, method in ((".bz2", downloadbz2),
-                                           (".gz", downloadgz)):
-                        if method(plainurl + suffix, listname,
-                                  uncompressed_digest):
-                            success = True
-                            break
-                    if not success:
-                        self.warn("download failed: " + plainurl)
-
-    def _updatelrelease(self, name):
-        url = self.repos[name]
-        relname = self._relname(name)
-        self._markused(relname)
-        try:
-            def download(fname, f):
-                urllib.urlretrieve(url + 'Release', fname)
-            xpickle.replacefile(relname, download)
-            return True
-        except IOError:
-            self.warn("download of Release file failed: " + url)
-            return False
-
-    def hasrelease(self, name):
-        if name not in self.repos:
-            raise ValueError("name not registered: " + repr(name))
-        return os.path.exists(self._relname(name))
-
-    def release(self, name):
-        if name not in self.repos:
-            raise ValueError("name not registered: " + repr(name))
-        with file(self._relname(name)) as f:
-            return parserelease(name, f)
-
-    def filemap(self):
-        d = {}
-        for name in self.repos:
-            rel = self.release(name)
-            hashes = rel["sha256"]
-            l = []
-            for comp in rel["components"]:
-                for arch in rel["architectures"]:
-                    plainpath = self._plainpath(comp, arch)
-                    if not plainpath in hashes:
-                        self.warn("failed to find %s/%s/%s" % (name, comp, arch))
-                        continue
-                    digest = hashes[plainpath]
-                    listname = self._listname(digest)
-                    if not os.path.exists(listname):
-                        self.warn("file %s for %s/%s/%s not present" %
-                                  (listname, name, comp, arch))
-                        continue
-                    if arch == "source":
-                        method = parsers.sourcepackages
-                    else:
-                        method = parsers.binarypackages
-                    l.append((comp, arch, listname, method))
-            d[name] = l
-        return d
-
-    def _relname(self, name):
-        return "%s/r_%s" % (self.root, name)
-
-    def _plainpath(self, comp, arch):
-        # Hack to deal with the "updates/" special case.
-        comp = comp.split("/")[-1]
-        if arch == "source":
-            return comp + "/source/Sources"
-        return "%s/binary-%s/Packages" % (comp, arch)
-
-    def _listname(self, digest):
-        return "%s/h_%s" % (self.root, digest)
-
-    def _initused(self):
-        self.used = set()
-        self.used.add("%s/%s" % (self.root, MARKER_NAME))
-
-    def _markused(self, name):
-        self.used.add(name)
-        self.used.add(name + xpickle.EXTENSION)
-
-    def _haslist(self, digest):
-        return os.path.exists(self._listname(digest))
-
-    def warn(self, msg):
-        if self.verbose:
-            print msg
-
-def _test():
-    import shutil
-
-    tmp = tempfile.mkdtemp()
-    try:
-        r = RepoCollection(tmp)
-        r.verbose = True
-        mirror = "http://localhost:9999/"
-        r.add("lenny", mirror + "debian/dists/lenny")
-        r.add("lenny-security", mirror + "debian-security/dists/lenny/updates")
-        r.add("lenny-proposed-updates", mirror + "debian/dists/lenny-proposed-updates")
-        r.add("squeeze", mirror + "debian/dists/squeeze")
-        r.add("sid", mirror + "debian/dists/sid")
-        r.update()
-    finally:
-        shutil.rmtree(tmp)
-if __name__ == "__main__":
-    _test()

Copied: lib/python/sectracker/repo.py (from rev 14616, lib/python/repo.py)
===================================================================
--- lib/python/sectracker/repo.py	                        (rev 0)
+++ lib/python/sectracker/repo.py	2010-05-08 10:20:31 UTC (rev 14640)
@@ -0,0 +1,255 @@
+# sectracker.repo -- mirror Debian repository metadata
+# Copyright (C) 2010 Florian Weimer <fw at deneb.enyo.de>
+# 
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version.
+# 
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+# GNU General Public License for more details.
+# 
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
+
+from __future__ import with_statement
+
+import bz2
+import hashlib
+import gzip
+import os
+import re
+import tempfile
+import urllib
+
+import debian_support
+import sectracker.xpickle as _xpickle
+import sectracker.parsers as _parsers
+
+MARKER_NAME = "DEBIAN_REPO_MIRROR"
+
+_re_name = re.compile(r'^[a-z0-9-]+$')
+_re_hashentry = re.compile('^\s*([0-9a-fA-F]{20,})\s+(\d+)\s+(\S+)$')
+
+def _splitfield(data, field):
+    tup = tuple(data[field].strip().split())
+    if tup == ():
+        data[field] = ('',)
+    else:
+        data[field] = tup
+
+def _splithashes(path, data, field):
+    result = {}
+    for line in data[field].split('\n'):
+        if line == "":
+            continue
+        match = _re_hashentry.match(line)
+        if match is None:
+            raise ValueError("invalid line in %r: %r" % (path, line))
+        digest, size, name = match.groups()
+        result[name] = digest
+    data[field] = result
+
+def parserelease(path, f):
+    data = {}
+    for p in debian_support.PackageFile(path, f):
+        for k, v in p:
+            data[k.lower()] = v
+        break # file contains only one record
+    _splitfield(data, "components")
+    _splitfield(data, "architectures")
+    _splithashes(path, data, "md5sum")
+    _splithashes(path, data, "sha1")
+    _splithashes(path, data, "sha256")
+    return data
+
+def unbzip2hash(src, dst):
+    dec = bz2.BZ2Decompressor()
+    digest = hashlib.sha256()
+    while True:
+        data = src.read(8192)
+        if data == '':
+            break
+        data = dec.decompress(data)
+        dst.write(data)
+        digest.update(data)
+    return digest.hexdigest()
+
+def downloadbz2(url, target, expecteddigest):
+    try:
+        bz2src = urllib.urlopen(url)
+        try:
+            dgst = _xpickle.replacefile(
+                target, lambda fname, f: unbzip2hash(bz2src, f))
+            if dgst == expecteddigest:
+                return True
+            return False
+        finally:
+            bz2src.close()
+    except IOError:
+        return False
+
+def downloadgz(url, target, expecteddigest):
+    with tempfile.NamedTemporaryFile() as t:
+        try:
+            (filename, headers) = urllib.urlretrieve(url, t.name)
+        except IOError:
+            return False
+        gfile = gzip.GzipFile(t.name)
+        try:
+            def copy(fname, f):
+                digest = hashlib.sha256()
+                while True:
+                    data = gfile.read(8192)
+                    if data == "":
+                        break
+                    f.write(data)
+                    digest.update(data)
+                if digest.hexdigest() == expecteddigest:
+                    return True
+                return False
+            return _xpickle.replacefile(target, copy)
+        finally:
+            gfile.close()
+    return True
+
+class RepoCollection:
+    def __init__(self, root):
+        """Creates a new repository mirror.
+        
+        root: path in the local file system"""
+        self.root = root
+        self.repos = {}
+        self.used = ()
+        self.releases = None
+        self.verbose = False
+
+        if not os.path.exists(root):
+            os.makedirs(root)
+        l = os.listdir(root)
+        if len(l) == 0:
+            file(root + "/" + MARKER_NAME, "w").close()
+        elif MARKER_NAME not in l:
+            raise ValueError("not a Debian repository mirror directory: "
+                             + repr(root))
+
+    def add(self, name, url):
+        """Adds a repository, given its name and the root URL"""
+        if _re_name.match(name) is None:
+            raise ValueError("invalid repository name: " + repr(name))
+        if name in self.repos:
+            raise ValueError("repository already registered: " + repr(name))
+        if url[-1:] != '/':
+            url += '/'
+        self.repos[name] = url
+
+    def update(self):
+        self._initused()
+        for (name, url) in self.repos.items():
+            if not self._updatelrelease(name):
+                continue
+            if not self.hasrelease(name):
+                continue
+            rel = self.release(name)
+            hashes = rel["sha256"]
+            for comp in rel["components"]:
+                for arch in rel["architectures"]:
+                    plainpath = self._plainpath(comp, arch)
+                    plainurl = url + plainpath
+                    if not plainpath in hashes:
+                        self.warn("not downloaded because uncompressed version not present in Release file: " + plainurl)
+                        continue
+                    uncompressed_digest = hashes[plainpath]
+                    listname = self._listname(uncompressed_digest)
+                    if os.path.exists(listname):
+                        continue
+                    success = False
+                    for suffix, method in ((".bz2", downloadbz2),
+                                           (".gz", downloadgz)):
+                        if method(plainurl + suffix, listname,
+                                  uncompressed_digest):
+                            success = True
+                            break
+                    if not success:
+                        self.warn("download failed: " + plainurl)
+
+    def _updatelrelease(self, name):
+        url = self.repos[name]
+        relname = self._relname(name)
+        self._markused(relname)
+        try:
+            def download(fname, f):
+                urllib.urlretrieve(url + 'Release', fname)
+            _xpickle.replacefile(relname, download)
+            return True
+        except IOError:
+            self.warn("download of Release file failed: " + url)
+            return False
+
+    def hasrelease(self, name):
+        if name not in self.repos:
+            raise ValueError("name not registered: " + repr(name))
+        return os.path.exists(self._relname(name))
+
+    def release(self, name):
+        if name not in self.repos:
+            raise ValueError("name not registered: " + repr(name))
+        with file(self._relname(name)) as f:
+            return parserelease(name, f)
+
+    def filemap(self):
+        d = {}
+        for name in self.repos:
+            rel = self.release(name)
+            hashes = rel["sha256"]
+            l = []
+            for comp in rel["components"]:
+                for arch in rel["architectures"]:
+                    plainpath = self._plainpath(comp, arch)
+                    if not plainpath in hashes:
+                        self.warn("failed to find %s/%s/%s" % (name, comp, arch))
+                        continue
+                    digest = hashes[plainpath]
+                    listname = self._listname(digest)
+                    if not os.path.exists(listname):
+                        self.warn("file %s for %s/%s/%s not present" %
+                                  (listname, name, comp, arch))
+                        continue
+                    if arch == "source":
+                        method = _parsers.sourcepackages
+                    else:
+                        method = _parsers.binarypackages
+                    l.append((comp, arch, listname, method))
+            d[name] = l
+        return d
+
+    def _relname(self, name):
+        return "%s/r_%s" % (self.root, name)
+
+    def _plainpath(self, comp, arch):
+        # Hack to deal with the "updates/" special case.
+        comp = comp.split("/")[-1]
+        if arch == "source":
+            return comp + "/source/Sources"
+        return "%s/binary-%s/Packages" % (comp, arch)
+
+    def _listname(self, digest):
+        return "%s/h_%s" % (self.root, digest)
+
+    def _initused(self):
+        self.used = set()
+        self.used.add("%s/%s" % (self.root, MARKER_NAME))
+
+    def _markused(self, name):
+        self.used.add(name)
+        self.used.add(name + _xpickle.EXTENSION)
+
+    def _haslist(self, digest):
+        return os.path.exists(self._listname(digest))
+
+    def warn(self, msg):
+        if self.verbose:
+            print msg

Added: lib/python/sectracker_test/test_repo.py
===================================================================
--- lib/python/sectracker_test/test_repo.py	                        (rev 0)
+++ lib/python/sectracker_test/test_repo.py	2010-05-08 10:20:31 UTC (rev 14640)
@@ -0,0 +1,34 @@
+# Test cases for sectracker.repo
+# Copyright (C) 2010 Florian Weimer <fw at deneb.enyo.de>
+# 
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version.
+# 
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+# GNU General Public License for more details.
+# 
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
+
+import shutil
+
+from sectracker.repo import *
+
+tmp = tempfile.mkdtemp()
+try:
+    r = RepoCollection(tmp)
+    r.verbose = True
+    mirror = "http://localhost:9999/"
+    r.add("lenny", mirror + "debian/dists/lenny")
+    r.add("lenny-security", mirror + "debian-security/dists/lenny/updates")
+    r.add("lenny-proposed-updates", mirror + "debian/dists/lenny-proposed-updates")
+    r.add("squeeze", mirror + "debian/dists/squeeze")
+    r.add("sid", mirror + "debian/dists/sid")
+    r.update()
+finally:
+    shutil.rmtree(tmp)




More information about the Secure-testing-commits mailing list