[Secure-testing-commits] r14640 - in lib/python: . sectracker sectracker_test
Florian Weimer
fw at alioth.debian.org
Sat May 8 10:20:31 UTC 2010
Author: fw
Date: 2010-05-08 10:20:31 +0000 (Sat, 08 May 2010)
New Revision: 14640
Added:
lib/python/sectracker/repo.py
lib/python/sectracker_test/test_repo.py
Removed:
lib/python/repo.py
Log:
sectracker.repo: rename from repo
Deleted: lib/python/repo.py
===================================================================
--- lib/python/repo.py 2010-05-08 10:14:00 UTC (rev 14639)
+++ lib/python/repo.py 2010-05-08 10:20:31 UTC (rev 14640)
@@ -1,274 +0,0 @@
-# repo.py -- mirror Debian repository metadata
-# Copyright (C) 2010 Florian Weimer <fw at deneb.enyo.de>
-#
-# This program is free software; you can redistribute it and/or modify
-# it under the terms of the GNU General Public License as published by
-# the Free Software Foundation; either version 2 of the License, or
-# (at your option) any later version.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License
-# along with this program; if not, write to the Free Software
-# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
-
-from __future__ import with_statement
-
-import bz2
-import hashlib
-import gzip
-import os
-import re
-import tempfile
-import urllib
-
-import xpickle
-import debian_support
-import parsers
-
-MARKER_NAME = "DEBIAN_REPO_MIRROR"
-
-_re_name = re.compile(r'^[a-z0-9-]+$')
-_re_hashentry = re.compile('^\s*([0-9a-fA-F]{20,})\s+(\d+)\s+(\S+)$')
-
-def _splitfield(data, field):
- tup = tuple(data[field].strip().split())
- if tup == ():
- data[field] = ('',)
- else:
- data[field] = tup
-
-def _splithashes(path, data, field):
- result = {}
- for line in data[field].split('\n'):
- if line == "":
- continue
- match = _re_hashentry.match(line)
- if match is None:
- raise ValueError("invalid line in %r: %r" % (path, line))
- digest, size, name = match.groups()
- result[name] = digest
- data[field] = result
-
-def parserelease(path, f):
- data = {}
- for p in debian_support.PackageFile(path, f):
- for k, v in p:
- data[k.lower()] = v
- break # file contains only one record
- _splitfield(data, "components")
- _splitfield(data, "architectures")
- _splithashes(path, data, "md5sum")
- _splithashes(path, data, "sha1")
- _splithashes(path, data, "sha256")
- return data
-
-def unbzip2hash(src, dst):
- dec = bz2.BZ2Decompressor()
- digest = hashlib.sha256()
- while True:
- data = src.read(8192)
- if data == '':
- break
- data = dec.decompress(data)
- dst.write(data)
- digest.update(data)
- return digest.hexdigest()
-
-def downloadbz2(url, target, expecteddigest):
- try:
- bz2src = urllib.urlopen(url)
- try:
- dgst = xpickle.replacefile(target,
- lambda fname, f: unbzip2hash(bz2src, f))
- if dgst == expecteddigest:
- return True
- return False
- finally:
- bz2src.close()
- except IOError:
- return False
-
-def downloadgz(url, target, expecteddigest):
- with tempfile.NamedTemporaryFile() as t:
- try:
- (filename, headers) = urllib.urlretrieve(url, t.name)
- except IOError:
- return False
- gfile = gzip.GzipFile(t.name)
- try:
- def copy(fname, f):
- digest = hashlib.sha256()
- while True:
- data = gfile.read(8192)
- if data == "":
- break
- f.write(data)
- digest.update(data)
- if digest.hexdigest() == expecteddigest:
- return True
- return False
- return xpickle.replacefile(target, copy)
- finally:
- gfile.close()
- return True
-
-class RepoCollection:
- def __init__(self, root):
- """Creates a new repository mirror.
-
- root: path in the local file system"""
- self.root = root
- self.repos = {}
- self.used = ()
- self.releases = None
- self.verbose = False
-
- if not os.path.exists(root):
- os.makedirs(root)
- l = os.listdir(root)
- if len(l) == 0:
- file(root + "/" + MARKER_NAME, "w").close()
- elif MARKER_NAME not in l:
- raise ValueError("not a Debian repository mirror directory: "
- + repr(root))
-
- def add(self, name, url):
- """Adds a repository, given its name and the root URL"""
- if _re_name.match(name) is None:
- raise ValueError("invalid repository name: " + repr(name))
- if name in self.repos:
- raise ValueError("repository already registered: " + repr(name))
- if url[-1:] != '/':
- url += '/'
- self.repos[name] = url
-
- def update(self):
- self._initused()
- for (name, url) in self.repos.items():
- if not self._updatelrelease(name):
- continue
- if not self.hasrelease(name):
- continue
- rel = self.release(name)
- hashes = rel["sha256"]
- for comp in rel["components"]:
- for arch in rel["architectures"]:
- plainpath = self._plainpath(comp, arch)
- plainurl = url + plainpath
- if not plainpath in hashes:
- self.warn("not downloaded because uncompressed version not present in Release file: " + plainurl)
- continue
- uncompressed_digest = hashes[plainpath]
- listname = self._listname(uncompressed_digest)
- if os.path.exists(listname):
- continue
- success = False
- for suffix, method in ((".bz2", downloadbz2),
- (".gz", downloadgz)):
- if method(plainurl + suffix, listname,
- uncompressed_digest):
- success = True
- break
- if not success:
- self.warn("download failed: " + plainurl)
-
- def _updatelrelease(self, name):
- url = self.repos[name]
- relname = self._relname(name)
- self._markused(relname)
- try:
- def download(fname, f):
- urllib.urlretrieve(url + 'Release', fname)
- xpickle.replacefile(relname, download)
- return True
- except IOError:
- self.warn("download of Release file failed: " + url)
- return False
-
- def hasrelease(self, name):
- if name not in self.repos:
- raise ValueError("name not registered: " + repr(name))
- return os.path.exists(self._relname(name))
-
- def release(self, name):
- if name not in self.repos:
- raise ValueError("name not registered: " + repr(name))
- with file(self._relname(name)) as f:
- return parserelease(name, f)
-
- def filemap(self):
- d = {}
- for name in self.repos:
- rel = self.release(name)
- hashes = rel["sha256"]
- l = []
- for comp in rel["components"]:
- for arch in rel["architectures"]:
- plainpath = self._plainpath(comp, arch)
- if not plainpath in hashes:
- self.warn("failed to find %s/%s/%s" % (name, comp, arch))
- continue
- digest = hashes[plainpath]
- listname = self._listname(digest)
- if not os.path.exists(listname):
- self.warn("file %s for %s/%s/%s not present" %
- (listname, name, comp, arch))
- continue
- if arch == "source":
- method = parsers.sourcepackages
- else:
- method = parsers.binarypackages
- l.append((comp, arch, listname, method))
- d[name] = l
- return d
-
- def _relname(self, name):
- return "%s/r_%s" % (self.root, name)
-
- def _plainpath(self, comp, arch):
- # Hack to deal with the "updates/" special case.
- comp = comp.split("/")[-1]
- if arch == "source":
- return comp + "/source/Sources"
- return "%s/binary-%s/Packages" % (comp, arch)
-
- def _listname(self, digest):
- return "%s/h_%s" % (self.root, digest)
-
- def _initused(self):
- self.used = set()
- self.used.add("%s/%s" % (self.root, MARKER_NAME))
-
- def _markused(self, name):
- self.used.add(name)
- self.used.add(name + xpickle.EXTENSION)
-
- def _haslist(self, digest):
- return os.path.exists(self._listname(digest))
-
- def warn(self, msg):
- if self.verbose:
- print msg
-
-def _test():
- import shutil
-
- tmp = tempfile.mkdtemp()
- try:
- r = RepoCollection(tmp)
- r.verbose = True
- mirror = "http://localhost:9999/"
- r.add("lenny", mirror + "debian/dists/lenny")
- r.add("lenny-security", mirror + "debian-security/dists/lenny/updates")
- r.add("lenny-proposed-updates", mirror + "debian/dists/lenny-proposed-updates")
- r.add("squeeze", mirror + "debian/dists/squeeze")
- r.add("sid", mirror + "debian/dists/sid")
- r.update()
- finally:
- shutil.rmtree(tmp)
-if __name__ == "__main__":
- _test()
Copied: lib/python/sectracker/repo.py (from rev 14616, lib/python/repo.py)
===================================================================
--- lib/python/sectracker/repo.py (rev 0)
+++ lib/python/sectracker/repo.py 2010-05-08 10:20:31 UTC (rev 14640)
@@ -0,0 +1,255 @@
+# sectracker.repo -- mirror Debian repository metadata
+# Copyright (C) 2010 Florian Weimer <fw at deneb.enyo.de>
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
+
+from __future__ import with_statement
+
+import bz2
+import hashlib
+import gzip
+import os
+import re
+import tempfile
+import urllib
+
+import debian_support
+import sectracker.xpickle as _xpickle
+import sectracker.parsers as _parsers
+
+MARKER_NAME = "DEBIAN_REPO_MIRROR"
+
+_re_name = re.compile(r'^[a-z0-9-]+$')
+_re_hashentry = re.compile('^\s*([0-9a-fA-F]{20,})\s+(\d+)\s+(\S+)$')
+
+def _splitfield(data, field):
+ tup = tuple(data[field].strip().split())
+ if tup == ():
+ data[field] = ('',)
+ else:
+ data[field] = tup
+
+def _splithashes(path, data, field):
+ result = {}
+ for line in data[field].split('\n'):
+ if line == "":
+ continue
+ match = _re_hashentry.match(line)
+ if match is None:
+ raise ValueError("invalid line in %r: %r" % (path, line))
+ digest, size, name = match.groups()
+ result[name] = digest
+ data[field] = result
+
+def parserelease(path, f):
+ data = {}
+ for p in debian_support.PackageFile(path, f):
+ for k, v in p:
+ data[k.lower()] = v
+ break # file contains only one record
+ _splitfield(data, "components")
+ _splitfield(data, "architectures")
+ _splithashes(path, data, "md5sum")
+ _splithashes(path, data, "sha1")
+ _splithashes(path, data, "sha256")
+ return data
+
+def unbzip2hash(src, dst):
+ dec = bz2.BZ2Decompressor()
+ digest = hashlib.sha256()
+ while True:
+ data = src.read(8192)
+ if data == '':
+ break
+ data = dec.decompress(data)
+ dst.write(data)
+ digest.update(data)
+ return digest.hexdigest()
+
+def downloadbz2(url, target, expecteddigest):
+ try:
+ bz2src = urllib.urlopen(url)
+ try:
+ dgst = _xpickle.replacefile(
+ target, lambda fname, f: unbzip2hash(bz2src, f))
+ if dgst == expecteddigest:
+ return True
+ return False
+ finally:
+ bz2src.close()
+ except IOError:
+ return False
+
+def downloadgz(url, target, expecteddigest):
+ with tempfile.NamedTemporaryFile() as t:
+ try:
+ (filename, headers) = urllib.urlretrieve(url, t.name)
+ except IOError:
+ return False
+ gfile = gzip.GzipFile(t.name)
+ try:
+ def copy(fname, f):
+ digest = hashlib.sha256()
+ while True:
+ data = gfile.read(8192)
+ if data == "":
+ break
+ f.write(data)
+ digest.update(data)
+ if digest.hexdigest() == expecteddigest:
+ return True
+ return False
+ return _xpickle.replacefile(target, copy)
+ finally:
+ gfile.close()
+ return True
+
+class RepoCollection:
+ def __init__(self, root):
+ """Creates a new repository mirror.
+
+ root: path in the local file system"""
+ self.root = root
+ self.repos = {}
+ self.used = ()
+ self.releases = None
+ self.verbose = False
+
+ if not os.path.exists(root):
+ os.makedirs(root)
+ l = os.listdir(root)
+ if len(l) == 0:
+ file(root + "/" + MARKER_NAME, "w").close()
+ elif MARKER_NAME not in l:
+ raise ValueError("not a Debian repository mirror directory: "
+ + repr(root))
+
+ def add(self, name, url):
+ """Adds a repository, given its name and the root URL"""
+ if _re_name.match(name) is None:
+ raise ValueError("invalid repository name: " + repr(name))
+ if name in self.repos:
+ raise ValueError("repository already registered: " + repr(name))
+ if url[-1:] != '/':
+ url += '/'
+ self.repos[name] = url
+
+ def update(self):
+ self._initused()
+ for (name, url) in self.repos.items():
+ if not self._updatelrelease(name):
+ continue
+ if not self.hasrelease(name):
+ continue
+ rel = self.release(name)
+ hashes = rel["sha256"]
+ for comp in rel["components"]:
+ for arch in rel["architectures"]:
+ plainpath = self._plainpath(comp, arch)
+ plainurl = url + plainpath
+ if not plainpath in hashes:
+ self.warn("not downloaded because uncompressed version not present in Release file: " + plainurl)
+ continue
+ uncompressed_digest = hashes[plainpath]
+ listname = self._listname(uncompressed_digest)
+ if os.path.exists(listname):
+ continue
+ success = False
+ for suffix, method in ((".bz2", downloadbz2),
+ (".gz", downloadgz)):
+ if method(plainurl + suffix, listname,
+ uncompressed_digest):
+ success = True
+ break
+ if not success:
+ self.warn("download failed: " + plainurl)
+
+ def _updatelrelease(self, name):
+ url = self.repos[name]
+ relname = self._relname(name)
+ self._markused(relname)
+ try:
+ def download(fname, f):
+ urllib.urlretrieve(url + 'Release', fname)
+ _xpickle.replacefile(relname, download)
+ return True
+ except IOError:
+ self.warn("download of Release file failed: " + url)
+ return False
+
+ def hasrelease(self, name):
+ if name not in self.repos:
+ raise ValueError("name not registered: " + repr(name))
+ return os.path.exists(self._relname(name))
+
+ def release(self, name):
+ if name not in self.repos:
+ raise ValueError("name not registered: " + repr(name))
+ with file(self._relname(name)) as f:
+ return parserelease(name, f)
+
+ def filemap(self):
+ d = {}
+ for name in self.repos:
+ rel = self.release(name)
+ hashes = rel["sha256"]
+ l = []
+ for comp in rel["components"]:
+ for arch in rel["architectures"]:
+ plainpath = self._plainpath(comp, arch)
+ if not plainpath in hashes:
+ self.warn("failed to find %s/%s/%s" % (name, comp, arch))
+ continue
+ digest = hashes[plainpath]
+ listname = self._listname(digest)
+ if not os.path.exists(listname):
+ self.warn("file %s for %s/%s/%s not present" %
+ (listname, name, comp, arch))
+ continue
+ if arch == "source":
+ method = _parsers.sourcepackages
+ else:
+ method = _parsers.binarypackages
+ l.append((comp, arch, listname, method))
+ d[name] = l
+ return d
+
+ def _relname(self, name):
+ return "%s/r_%s" % (self.root, name)
+
+ def _plainpath(self, comp, arch):
+ # Hack to deal with the "updates/" special case.
+ comp = comp.split("/")[-1]
+ if arch == "source":
+ return comp + "/source/Sources"
+ return "%s/binary-%s/Packages" % (comp, arch)
+
+ def _listname(self, digest):
+ return "%s/h_%s" % (self.root, digest)
+
+ def _initused(self):
+ self.used = set()
+ self.used.add("%s/%s" % (self.root, MARKER_NAME))
+
+ def _markused(self, name):
+ self.used.add(name)
+ self.used.add(name + _xpickle.EXTENSION)
+
+ def _haslist(self, digest):
+ return os.path.exists(self._listname(digest))
+
+ def warn(self, msg):
+ if self.verbose:
+ print msg
Added: lib/python/sectracker_test/test_repo.py
===================================================================
--- lib/python/sectracker_test/test_repo.py (rev 0)
+++ lib/python/sectracker_test/test_repo.py 2010-05-08 10:20:31 UTC (rev 14640)
@@ -0,0 +1,34 @@
+# Test cases for sectracker.repo
+# Copyright (C) 2010 Florian Weimer <fw at deneb.enyo.de>
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
+
+import shutil
+
+from sectracker.repo import *
+
+tmp = tempfile.mkdtemp()
+try:
+ r = RepoCollection(tmp)
+ r.verbose = True
+ mirror = "http://localhost:9999/"
+ r.add("lenny", mirror + "debian/dists/lenny")
+ r.add("lenny-security", mirror + "debian-security/dists/lenny/updates")
+ r.add("lenny-proposed-updates", mirror + "debian/dists/lenny-proposed-updates")
+ r.add("squeeze", mirror + "debian/dists/squeeze")
+ r.add("sid", mirror + "debian/dists/sid")
+ r.update()
+finally:
+ shutil.rmtree(tmp)
More information about the Secure-testing-commits
mailing list