[Secure-testing-commits] r14642 - in lib/python: sectracker sectracker_test
Florian Weimer
fw at alioth.debian.org
Sat May 8 14:49:39 UTC 2010
Author: fw
Date: 2010-05-08 14:49:37 +0000 (Sat, 08 May 2010)
New Revision: 14642
Modified:
lib/python/sectracker/repo.py
lib/python/sectracker_test/test_repo.py
Log:
sectracker.repo: export public API only
Modified: lib/python/sectracker/repo.py
===================================================================
--- lib/python/sectracker/repo.py 2010-05-08 10:35:44 UTC (rev 14641)
+++ lib/python/sectracker/repo.py 2010-05-08 14:49:37 UTC (rev 14642)
@@ -17,22 +17,22 @@
from __future__ import with_statement
-import bz2
-import hashlib
-import gzip
-import os
-import re
-import tempfile
-import urllib
+import bz2 as _bz2
+import hashlib as _hashlib
+import gzip as _gzip
+import os as _os
+import re as _re
+import tempfile as _tempfile
+import urllib as _urllib
-import debian_support
+import debian_support as _debian_support
import sectracker.xpickle as _xpickle
import sectracker.parsers as _parsers
MARKER_NAME = "DEBIAN_REPO_MIRROR"
-_re_name = re.compile(r'^[a-z0-9-]+$')
-_re_hashentry = re.compile('^\s*([0-9a-fA-F]{20,})\s+(\d+)\s+(\S+)$')
+_re_name = _re.compile(r'^[a-z0-9-]+$')
+_re_hashentry = _re.compile('^\s*([0-9a-fA-F]{20,})\s+(\d+)\s+(\S+)$')
def _splitfield(data, field):
tup = tuple(data[field].strip().split())
@@ -53,9 +53,9 @@
result[name] = digest
data[field] = result
-def parserelease(path, f):
+def _parserelease(path, f):
data = {}
- for p in debian_support.PackageFile(path, f):
+ for p in _debian_support.PackageFile(path, f):
for k, v in p:
data[k.lower()] = v
break # file contains only one record
@@ -66,9 +66,9 @@
_splithashes(path, data, "sha256")
return data
-def unbzip2hash(src, dst):
- dec = bz2.BZ2Decompressor()
- digest = hashlib.sha256()
+def _unbzip2hash(src, dst):
+ dec = _bz2.BZ2Decompressor()
+ digest = _hashlib.sha256()
while True:
data = src.read(8192)
if data == '':
@@ -78,12 +78,12 @@
digest.update(data)
return digest.hexdigest()
-def downloadbz2(url, target, expecteddigest):
+def _downloadbz2(url, target, expecteddigest):
try:
- bz2src = urllib.urlopen(url)
+ bz2src = _urllib.urlopen(url)
try:
dgst = _xpickle.replacefile(
- target, lambda fname, f: unbzip2hash(bz2src, f))
+ target, lambda fname, f: _unbzip2hash(bz2src, f))
if dgst == expecteddigest:
return True
return False
@@ -92,16 +92,16 @@
except IOError:
return False
-def downloadgz(url, target, expecteddigest):
- with tempfile.NamedTemporaryFile() as t:
+def _downloadgz(url, target, expecteddigest):
+ with _tempfile.NamedTemporaryFile() as t:
try:
- (filename, headers) = urllib.urlretrieve(url, t.name)
+ (filename, headers) = _urllib.urlretrieve(url, t.name)
except IOError:
return False
- gfile = gzip.GzipFile(t.name)
+ gfile = _gzip.GzipFile(t.name)
try:
def copy(fname, f):
- digest = hashlib.sha256()
+ digest = _hashlib.sha256()
while True:
data = gfile.read(8192)
if data == "":
@@ -116,7 +116,7 @@
gfile.close()
return True
-class RepoCollection:
+class RepoCollection(object):
def __init__(self, root):
"""Creates a new repository mirror.
@@ -127,9 +127,9 @@
self.releases = None
self.verbose = False
- if not os.path.exists(root):
- os.makedirs(root)
- l = os.listdir(root)
+ if not _os.path.exists(root):
+ _os.makedirs(root)
+ l = _os.listdir(root)
if len(l) == 0:
file(root + "/" + MARKER_NAME, "w").close()
elif MARKER_NAME not in l:
@@ -163,11 +163,11 @@
continue
uncompressed_digest = hashes[plainpath]
listname = self._listname(uncompressed_digest)
- if os.path.exists(listname):
+ if _os.path.exists(listname):
continue
success = False
- for suffix, method in ((".bz2", downloadbz2),
- (".gz", downloadgz)):
+ for suffix, method in ((".bz2", _downloadbz2),
+ (".gz", _downloadgz)):
if method(plainurl + suffix, listname,
uncompressed_digest):
success = True
@@ -181,7 +181,7 @@
self._markused(relname)
try:
def download(fname, f):
- urllib.urlretrieve(url + 'Release', fname)
+ _urllib.urlretrieve(url + 'Release', fname)
_xpickle.replacefile(relname, download)
return True
except IOError:
@@ -191,13 +191,13 @@
def hasrelease(self, name):
if name not in self.repos:
raise ValueError("name not registered: " + repr(name))
- return os.path.exists(self._relname(name))
+ return _os.path.exists(self._relname(name))
def release(self, name):
if name not in self.repos:
raise ValueError("name not registered: " + repr(name))
with file(self._relname(name)) as f:
- return parserelease(name, f)
+ return _parserelease(name, f)
def filemap(self):
d = {}
@@ -212,7 +212,7 @@
continue
digest = hashes[plainpath]
listname = self._listname(digest)
- if not os.path.exists(listname):
+ if not _os.path.exists(listname):
self.warn("file %s for %s/%s not present" %
(listname, name, comp))
continue
@@ -240,7 +240,7 @@
self.used.add(name + _xpickle.EXTENSION)
def _haslist(self, digest):
- return os.path.exists(self._listname(digest))
+ return _os.path.exists(self._listname(digest))
def warn(self, msg):
if self.verbose:
Modified: lib/python/sectracker_test/test_repo.py
===================================================================
--- lib/python/sectracker_test/test_repo.py 2010-05-08 10:35:44 UTC (rev 14641)
+++ lib/python/sectracker_test/test_repo.py 2010-05-08 14:49:37 UTC (rev 14642)
@@ -16,6 +16,7 @@
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
import shutil
+import tempfile
from sectracker.repo import *
import sectracker.parsers as p
More information about the Secure-testing-commits
mailing list