[Secure-testing-commits] r43769 - bin
Chris Lamb
lamby at moszumanska.debian.org
Thu Aug 4 17:49:11 UTC 2016
Author: lamby
Date: 2016-08-04 17:49:11 +0000 (Thu, 04 Aug 2016)
New Revision: 43769
Modified:
bin/lts-missing-uploads.py
Log:
lts-missing-uplodas: Rewrite as a class.
Modified: bin/lts-missing-uploads.py
===================================================================
--- bin/lts-missing-uploads.py 2016-08-04 15:56:55 UTC (rev 43768)
+++ bin/lts-missing-uploads.py 2016-08-04 17:49:11 UTC (rev 43769)
@@ -25,101 +25,109 @@
from debian.deb822 import Sources
from debian.debian_support import Version
-SOURCES = 'http://security.debian.org/dists/wheezy/updates/main/source/Sources.gz'
+class LTSMissingUploads(object):
+ SOURCES = 'http://security.debian.org/dists/wheezy/updates/main/source/Sources.gz'
-re_line = re.compile(
- r'(?P<suffix>msg\d+.html).*\[DLA (?P<dla>[\d-]+)\] (?P<source>[^\s]+) security update.*'
-)
-re_version = re.compile(r'^Version.*: (?P<version>.*)')
+ re_line = re.compile(
+ r'(?P<suffix>msg\d+.html).*\[DLA (?P<dla>[\d-]+)\] (?P<source>[^\s]+) security update.*'
+ )
+ re_version = re.compile(r'^Version.*: (?P<version>.*)')
-session = requests.Session()
+ def __init__(self):
+ self.session = requests.session()
-def get_dlas(year, month):
- url = 'https://lists.debian.org/debian-lts-announce/{}/{:02}/'.format(
- year,
- month,
- )
+ def main(self, *args):
+ dlas = {}
- result = parse(session.get(url).content, re_line)
+ for idx in range(3):
+ dt = datetime.datetime.utcnow().replace(day=1) - \
+ dateutil.relativedelta.relativedelta(months=idx)
- # Prepend URL as the indices have relative URIs
- for x in result:
- x['url'] = '{}{}'.format(url, x['suffix'])
+ self.info(
+ "Getting announcements for {}/{:02} ...",
+ dt.year,
+ dt.month,
+ )
- return result
+ # Prefer later DLAs with reversed(..)
+ for x in reversed(self.get_dlas(dt.year, dt.month)):
+ # Only comment on the latest upload
+ if x['source'] in dlas:
+ continue
-def get_dla(url):
- return parse(session.get(url).content, re_version)
+ self.info("{source}: parsing announcement from {url} ...", **x)
+ x.update(self.get_dla(x['url'])[0])
+ dlas[x['source']] = x
-def main(*args):
- dlas = {}
+ if not dlas:
+ return 0
- for idx in range(3):
- dt = datetime.datetime.utcnow().replace(day=1) - \
- dateutil.relativedelta.relativedelta(months=idx)
+ sources = self.get_sources()
- info("Getting announcements for {}/{:02} ...", dt.year, dt.month)
+ for source, dla in sorted(dlas.items()):
+ version = sources[source]
- # Prefer later DLAs with reversed(..)
- for x in reversed(get_dlas(dt.year, dt.month)):
- # Only comment on the latest upload
- if x['source'] in dlas:
- continue
+ if Version(dla['version']) > Version(version):
+ self.warn("{}: DLA-{} announced version {} but LTS has {} <{}>".format(
+ source,
+ dla['dla'],
+ dla['version'],
+ version,
+ dla['url'],
+ ))
- info("{source}: parsing announcement from {url} ...", **x)
- x.update(get_dla(x['url'])[0])
- dlas[x['source']] = x
-
- if not dlas:
return 0
- sources = get_sources()
+ def get_dlas(self, year, month):
+ url = 'https://lists.debian.org/debian-lts-announce/{}/{:02}/'.format(
+ year,
+ month,
+ )
- for source, dla in sorted(dlas.items()):
- version = sources[source]
+ result = self.parse(self.session.get(url).content, self.re_line)
- if Version(dla['version']) > Version(version):
- warn("{}: DLA-{} announced version {} but LTS has {} <{}>".format(
- source,
- dla['dla'],
- dla['version'],
- version,
- dla['url'],
- ))
+ # Prepend URL as the indices have relative URIs
+ for x in result:
+ x['url'] = '{}{}'.format(url, x['suffix'])
- return 0
+ return result
-def get_sources():
- info("Downloading Sources from {} ...", SOURCES)
+ def get_dla(self, url):
+ return self.parse(self.session.get(url).content, self.re_version)
- response = requests.get(SOURCES)
- response.raise_for_status()
+ def get_sources(self):
+ self.info("Downloading Sources from {} ...", self.SOURCES)
- val = gzip.decompress(response.content).decode('utf-8')
+ response = self.session.get(self.SOURCES)
+ response.raise_for_status()
- return {x['Package']: x['Version'] for x in Sources.iter_paragraphs(val)}
+ val = gzip.decompress(response.content).decode('utf-8')
-def warn(msg, *args, **kwargs):
- print("W: " + msg.format(*args, **kwargs), file=sys.stderr)
+ return {x['Package']: x['Version'] for x in Sources.iter_paragraphs(val)}
-def info(msg, *args, **kwargs):
- print("I: " + msg.format(*args, **kwargs), file=sys.stderr)
+ def parse(self, content, pattern):
+ result = []
-def parse(content, pattern):
- result = []
+ for x in content.splitlines():
+ m = pattern.search(x.decode('utf8'))
- for x in content.splitlines():
- m = pattern.search(x.decode('utf8'))
+ if m is None:
+ continue
- if m is None:
- continue
+ result.append(m.groupdict())
- result.append(m.groupdict())
+ return result
- return result
+ ##
+ def warn(self, msg, *args, **kwargs):
+ print("W: " + msg.format(*args, **kwargs), file=sys.stderr)
+
+ def info(self, msg, *args, **kwargs):
+ print("I: " + msg.format(*args, **kwargs), file=sys.stderr)
+
if __name__ == '__main__':
try:
- sys.exit(main(*sys.argv[1:]))
+ sys.exit(LTSMissingUploads().main(*sys.argv[1:]))
except KeyboardInterrupt:
sys.exit(1)
More information about the Secure-testing-commits
mailing list