[Secure-testing-commits] r14616 - lib/python
Florian Weimer
fw at alioth.debian.org
Thu May 6 14:04:14 UTC 2010
Author: fw
Date: 2010-05-06 14:04:13 +0000 (Thu, 06 May 2010)
New Revision: 14616
Added:
lib/python/repo.py
Log:
lib/python/repo.py: mirror of Debian repository metadata
This version is still somewhat preliminary.
No package diffs support for now; we can use a close-by mirror instead.
Added: lib/python/repo.py
===================================================================
--- lib/python/repo.py (rev 0)
+++ lib/python/repo.py 2010-05-06 14:04:13 UTC (rev 14616)
@@ -0,0 +1,274 @@
+# repo.py -- mirror Debian repository metadata
+# Copyright (C) 2010 Florian Weimer <fw at deneb.enyo.de>
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
+
+from __future__ import with_statement
+
+import bz2
+import hashlib
+import gzip
+import os
+import re
+import tempfile
+import urllib
+
+import xpickle
+import debian_support
+import parsers
+
+MARKER_NAME = "DEBIAN_REPO_MIRROR"
+
+_re_name = re.compile(r'^[a-z0-9-]+$')
+_re_hashentry = re.compile('^\s*([0-9a-fA-F]{20,})\s+(\d+)\s+(\S+)$')
+
+def _splitfield(data, field):
+ tup = tuple(data[field].strip().split())
+ if tup == ():
+ data[field] = ('',)
+ else:
+ data[field] = tup
+
+def _splithashes(path, data, field):
+ result = {}
+ for line in data[field].split('\n'):
+ if line == "":
+ continue
+ match = _re_hashentry.match(line)
+ if match is None:
+ raise ValueError("invalid line in %r: %r" % (path, line))
+ digest, size, name = match.groups()
+ result[name] = digest
+ data[field] = result
+
+def parserelease(path, f):
+ data = {}
+ for p in debian_support.PackageFile(path, f):
+ for k, v in p:
+ data[k.lower()] = v
+ break # file contains only one record
+ _splitfield(data, "components")
+ _splitfield(data, "architectures")
+ _splithashes(path, data, "md5sum")
+ _splithashes(path, data, "sha1")
+ _splithashes(path, data, "sha256")
+ return data
+
+def unbzip2hash(src, dst):
+ dec = bz2.BZ2Decompressor()
+ digest = hashlib.sha256()
+ while True:
+ data = src.read(8192)
+ if data == '':
+ break
+ data = dec.decompress(data)
+ dst.write(data)
+ digest.update(data)
+ return digest.hexdigest()
+
+def downloadbz2(url, target, expecteddigest):
+ try:
+ bz2src = urllib.urlopen(url)
+ try:
+ dgst = xpickle.replacefile(target,
+ lambda fname, f: unbzip2hash(bz2src, f))
+ if dgst == expecteddigest:
+ return True
+ return False
+ finally:
+ bz2src.close()
+ except IOError:
+ return False
+
+def downloadgz(url, target, expecteddigest):
+ with tempfile.NamedTemporaryFile() as t:
+ try:
+ (filename, headers) = urllib.urlretrieve(url, t.name)
+ except IOError:
+ return False
+ gfile = gzip.GzipFile(t.name)
+ try:
+ def copy(fname, f):
+ digest = hashlib.sha256()
+ while True:
+ data = gfile.read(8192)
+ if data == "":
+ break
+ f.write(data)
+ digest.update(data)
+ if digest.hexdigest() == expecteddigest:
+ return True
+ return False
+ return xpickle.replacefile(target, copy)
+ finally:
+ gfile.close()
+ return True
+
+class RepoCollection:
+ def __init__(self, root):
+ """Creates a new repository mirror.
+
+ root: path in the local file system"""
+ self.root = root
+ self.repos = {}
+ self.used = ()
+ self.releases = None
+ self.verbose = False
+
+ if not os.path.exists(root):
+ os.makedirs(root)
+ l = os.listdir(root)
+ if len(l) == 0:
+ file(root + "/" + MARKER_NAME, "w").close()
+ elif MARKER_NAME not in l:
+ raise ValueError("not a Debian repository mirror directory: "
+ + repr(root))
+
+ def add(self, name, url):
+ """Adds a repository, given its name and the root URL"""
+ if _re_name.match(name) is None:
+ raise ValueError("invalid repository name: " + repr(name))
+ if name in self.repos:
+ raise ValueError("repository already registered: " + repr(name))
+ if url[-1:] != '/':
+ url += '/'
+ self.repos[name] = url
+
+ def update(self):
+ self._initused()
+ for (name, url) in self.repos.items():
+ if not self._updatelrelease(name):
+ continue
+ if not self.hasrelease(name):
+ continue
+ rel = self.release(name)
+ hashes = rel["sha256"]
+ for comp in rel["components"]:
+ for arch in rel["architectures"]:
+ plainpath = self._plainpath(comp, arch)
+ plainurl = url + plainpath
+ if not plainpath in hashes:
+ self.warn("not downloaded because uncompressed version not present in Release file: " + plainurl)
+ continue
+ uncompressed_digest = hashes[plainpath]
+ listname = self._listname(uncompressed_digest)
+ if os.path.exists(listname):
+ continue
+ success = False
+ for suffix, method in ((".bz2", downloadbz2),
+ (".gz", downloadgz)):
+ if method(plainurl + suffix, listname,
+ uncompressed_digest):
+ success = True
+ break
+ if not success:
+ self.warn("download failed: " + plainurl)
+
+ def _updatelrelease(self, name):
+ url = self.repos[name]
+ relname = self._relname(name)
+ self._markused(relname)
+ try:
+ def download(fname, f):
+ urllib.urlretrieve(url + 'Release', fname)
+ xpickle.replacefile(relname, download)
+ return True
+ except IOError:
+ self.warn("download of Release file failed: " + url)
+ return False
+
+ def hasrelease(self, name):
+ if name not in self.repos:
+ raise ValueError("name not registered: " + repr(name))
+ return os.path.exists(self._relname(name))
+
+ def release(self, name):
+ if name not in self.repos:
+ raise ValueError("name not registered: " + repr(name))
+ with file(self._relname(name)) as f:
+ return parserelease(name, f)
+
+ def filemap(self):
+ d = {}
+ for name in self.repos:
+ rel = self.release(name)
+ hashes = rel["sha256"]
+ l = []
+ for comp in rel["components"]:
+ for arch in rel["architectures"]:
+ plainpath = self._plainpath(comp, arch)
+ if not plainpath in hashes:
+ self.warn("failed to find %s/%s/%s" % (name, comp, arch))
+ continue
+ digest = hashes[plainpath]
+ listname = self._listname(digest)
+ if not os.path.exists(listname):
+ self.warn("file %s for %s/%s/%s not present" %
+ (listname, name, comp, arch))
+ continue
+ if arch == "source":
+ method = parsers.sourcepackages
+ else:
+ method = parsers.binarypackages
+ l.append((comp, arch, listname, method))
+ d[name] = l
+ return d
+
+ def _relname(self, name):
+ return "%s/r_%s" % (self.root, name)
+
+ def _plainpath(self, comp, arch):
+ # Hack to deal with the "updates/" special case.
+ comp = comp.split("/")[-1]
+ if arch == "source":
+ return comp + "/source/Sources"
+ return "%s/binary-%s/Packages" % (comp, arch)
+
+ def _listname(self, digest):
+ return "%s/h_%s" % (self.root, digest)
+
+ def _initused(self):
+ self.used = set()
+ self.used.add("%s/%s" % (self.root, MARKER_NAME))
+
+ def _markused(self, name):
+ self.used.add(name)
+ self.used.add(name + xpickle.EXTENSION)
+
+ def _haslist(self, digest):
+ return os.path.exists(self._listname(digest))
+
+ def warn(self, msg):
+ if self.verbose:
+ print msg
+
+def _test():
+ import shutil
+
+ tmp = tempfile.mkdtemp()
+ try:
+ r = RepoCollection(tmp)
+ r.verbose = True
+ mirror = "http://localhost:9999/"
+ r.add("lenny", mirror + "debian/dists/lenny")
+ r.add("lenny-security", mirror + "debian-security/dists/lenny/updates")
+ r.add("lenny-proposed-updates", mirror + "debian/dists/lenny-proposed-updates")
+ r.add("squeeze", mirror + "debian/dists/squeeze")
+ r.add("sid", mirror + "debian/dists/sid")
+ r.update()
+ finally:
+ shutil.rmtree(tmp)
+if __name__ == "__main__":
+ _test()
More information about the Secure-testing-commits
mailing list