Securely retrieving dscs from snapshot.debian.org

peter green plugwash at p10link.net
Sat Dec 30 10:57:17 UTC 2017


On 27/12/17 23:42, Paul Wise wrote:
> On Thu, Dec 28, 2017 at 5:41 AM, peter green wrote:
>
>> Unfortunately there doesn't seem to be a good way to securely retrive a dsc
>> from snapshot.debian.org given a package name and version number.
> At this time there isn't any good way to do that securely, until
> #763419 gets implemented.
That may help a little, though it raises questions of it's own, like

* what keys would be used to sign these re-signed release files? You wouldn't want to use a regular Debian archive key because you wouldn't want people to be able to use snapshots to attack Debian users.
* How secure would the re-signing infrastructure be?

And it would only solve one aspect of the problem, the fact that verifying Release signatures may involve old keys. It wouldn't solve the issue of how to find that damn Release/Sources pair in the first place.

I have attatched my attempt at a tool for downloading source packages securely from snapshot.debian.org. It seems to work, comments/improvements welcome.
-------------- next part --------------
#!/usr/bin/python3
import sys
import urllib.request
import urllib.error
import json
from bs4 import BeautifulSoup
import re
import deb822
import io
import gzip
import bz2
import lzma
import os
import subprocess
import hashlib
import argparse

parser = argparse.ArgumentParser(description="retrieve a source package from snapshot.debian.org with gpg verification\n"+
"the source package will be stored in the current directory\n"+
"in the process of verification files source_version_Release and source_version_Release.gpg will be created in the current directory, these will be "+
"removed unless --keepevidence is specified"
)
parser.add_argument("source", help="source package name")
parser.add_argument("version", help="source package version")
parser.add_argument("--keepevidence", help="keep Release.gpg, Release and Sources files as evidence of package integrity", action="store_true")
parser.add_argument("--1024", help="allow 1024 bit keys, this is needed for old packages but may leave you vulnerable to well-funded attackers",action="store_true",dest='allow_1024')
args = parser.parse_args()

package=args.source
version=args.version
scriptdir=os.path.dirname(os.path.realpath(__file__))

colonpos = version.find(':')

#regex used for checking  package name
pnallowed = re.compile('[a-z0-9][a-z0-9\-\+\.]+',re.ASCII)

#regex used for checking version number
#this is not a full implementation of Debian version number rules
#just a sanity check for unexcepted characters
vallowed = re.compile('[a-z0-9\-:\+~\.]+',re.ASCII)

#regex used for checking allowed characters in package filenames
#and a few other things.
pfnallowed = re.compile('[a-z0-9\-_:\+~\.]+',re.ASCII)

#regex used for checked allowed characters in timestamp strings
tsallowed = re.compile('[A-Z0-9]+',re.ASCII)

#regex used for matching duplicate or aincient (no Release.gpg) distribution names
dupain = re.compile('(Debian.*|bo.*|buzz.*|hamm.*|potato|rex|slink.*)/',re.ASCII)

if not pnallowed.fullmatch(package):
	print('package name fails to match required format')

if not vallowed.fullmatch(package):
	print('version number fails to match required format')

if colonpos >= 0:
	versionnoepoch=version[colonpos+1:]
else:
	versionnoepoch=version

url='http://snapshot.debian.org/mr/package/'+package+'/'+version+'/srcfiles?fileinfo=1'

with urllib.request.urlopen(url) as response:
	jsondata = response.read()

jsondecoded = json.loads(jsondata.decode("utf-8"))

instances = []
for sha1, info in jsondecoded['fileinfo'].items():
	for instance in info:
		#print(repr(instance))
		if instance['name'] == package+'_'+versionnoepoch+'.dsc':
			#print(repr(instance))
			instances.append(instance)

#unfortunately snapshot.debian.org doesn't seem to provide a mr interface for file listings, so we have to screen scrape
#the aim here is to only get true subdirs, not files or symlinks, these seem to be indicated by a trailing / in the link
#string. We also need to avoid any links with complex urls, which likely represent page chrome.
def snapshotsubdirlist(url):
	result = []
	with urllib.request.urlopen(url) as response:
		pagedata = response.read()
	soup = BeautifulSoup(pagedata, "lxml")
	p = re.compile('[a-zA-Z0-9\-]+/',re.ASCII)
	for item in soup.find_all('a'):
		if not (finalentry is None): break
		link = item['href']
		if (p.fullmatch(link)):
			result.append(link)
	return result

#ideally we want sha256 but sometimes that doesn't exist
def findmostsecurereleasefiles(deb822):
	if 'SHA256' in deb822:
		return deb822['SHA256']
	if 'SHA1' in deb822:
		return deb822['SHA1']
	return deb822['MD5SUM']

def findmostsecurespfiles(deb822):
	if 'Checksums-Sha256' in deb822:
		return deb822['Checksums-Sha256']
	if 'Checksums-Sha1' in deb822:
		return deb822['Checksums-Sha1']
	return deb822['Files']

finalentry = None
for instance in instances:
	if not (finalentry is None): break
	if not pfnallowed.fullmatch(instance['archive_name']):
		print("archive name contains unexpected characters")
		sys.exit(10)
	if not tsallowed.fullmatch(instance['first_seen']):
		print("first seen contains unexpected characters")
		sys.exit(11)
	if instance['archive_name'] == 'debian-archive':
		url = 'http://snapshot.debian.org/archive/'+instance['archive_name']+'/'+instance['first_seen']+'/'
		print('searching '+url)
		dirlist = snapshotsubdirlist(url)
		#print(repr(dirlist))
		distsurls=[]
		for dir in dirlist:
			aname = dir[:-1] #strip trailing /
			distsurls.append(('http://snapshot.debian.org/archive/'+instance['archive_name']+'/'+instance['first_seen']+'/'+dir+'dists/',aname))
	else:
		distsurls=[('http://snapshot.debian.org/archive/'+instance['archive_name']+'/'+instance['first_seen']+'/dists/',instance['archive_name'])]
	for (distsurl,aname) in distsurls:
		if not (finalentry is None): break
		print('searching '+distsurl)
		dirlist = snapshotsubdirlist(distsurl)
		for link in dirlist:
			if not (finalentry is None): break
			#regular potato archive doesn't have Release.gpg but potato security archive does
			if dupain.fullmatch(link) and ((aname != 'debian-security') or (link != 'potato/')):
				print('ignoring ancient or duplicate distribution '+link)
				continue
			if aname == 'debian-security':
				link += 'updates/'
			elif (aname == 'debian-non-US') and (link != 'potato-proposed-updates'):
				link += 'non-US/'
			url = distsurl + link
			releaseurl = url + 'Release'
			print('searching '+releaseurl+' aname='+aname)
			try:
				with urllib.request.urlopen(releaseurl) as response:
					releasedata = response.read()
			except urllib.error.URLError as e:
				print('WARNING: failed to fetch '+releaseurl+' continueing search')
				continue
			#print(releasedata)
			release = deb822.Release(releasedata)
			#for key in release:
			#	print(key)
			components = {}
			#print(repr(release))
			releasefiles = findmostsecurereleasefiles(release)
			for file in releasefiles:
				pn = file['name']
				pns = pn.split('/')
				component = pns[0]
				fn = pns[-1]
				# 0=none 1=gz 2=bz2 3=xz
				cl = -1
				if fn == 'Sources': cl = 0
				if fn == 'Sources.gz': cl = 1
				if fn == 'Sources.bz2': cl = 2
				if fn == 'Sources.xz': cl = 3
				if len(pns) == 1:
					component = ''
				if (cl >= 0):
					if (component != '') and (not pfnallowed.fullmatch(component)):
						#print(component)
						print('component name contains unexpected characters')
						sys.exit(12)
					if (component in components):
						if components[component] < cl: components[component] = cl
					else:
						components[component] = cl
			for component, cl in components.items():
				if not (finalentry is None): break
				compressionsuffix = ''
				if cl == 1: compressionsuffix = '.gz'
				if cl == 2: compressionsuffix = '.bz2'
				if cl == 3: compressionsuffix = '.xz'
				if component != '':
					pn = component+'/source/Sources'+compressionsuffix
				else:
					pn = 'Sources'+compressionsuffix
				sourcesurl = url+pn
				#print(sourcesurl)
				sourcescompressed = io.BytesIO()
				with urllib.request.urlopen(sourcesurl) as response:
					sourcescompressed.write(response.read())
				sourcescompressed.seek(0)
				sourcesf = sourcescompressed
				if cl == 1: sourcesf = gzip.open(sourcescompressed)
				if cl == 2: sourcesf = bz2.open(sourcescompressed)
				if cl == 3: sourcesf = lzma.open(sourcescompressed)
				sourcesdata = sourcesf.read()
				for entry in deb822.Sources.iter_paragraphs(sourcesdata):
					if (entry['Package'] == package) and (entry['Version'] == version):
						print('found required entry in '+sourcesurl)
						#print(repr(entry))
						finalentry = (releaseurl,releasedata,sourcesdata,instance['first_seen'],instance['archive_name'],entry['Directory'],findmostsecurespfiles(entry))
						break

#search complete, now on to the verification
if not (finalentry is None):
	(releaseurl,releasedata,sourcesdata,seents,archivename,directory,files) = finalentry
	gpgurl = releaseurl+'.gpg'
	print(gpgurl)
	with urllib.request.urlopen(gpgurl) as response:
		gpgdata = response.read()
	
	
	f = open(package+'_'+version+'_Release.gpg','wb')
	f.write(gpgdata)
	f.close
	
	f = open(package+'_'+version+'_Release','wb')
	f.write(releasedata)
	f.close
	
	#f = open('snapshotsecuretmp/Sources','wb')
	#f.write(sourcesdata)
	#f.close
	
	#print(scriptdir)
	#first verify the gpg signature on relese file
	command = ['gpgv','--keyring', scriptdir+'/snapshotsecure.gpg']
	if args.allow_1024:
		command += ['--keyring', scriptdir+'/snapshotsecure-1024.gpg']
	command += [package+'_'+version+'_Release.gpg',package+'_'+version+'_Release']
	if (subprocess.call(command) != 0):
		print('gpg validation failed')
		if not args.keepevidence:
			os.remove(package+'_'+version+'_Release.gpg')
			os.remove(package+'_'+version+'_Release')
		sys.exit(2)
	
	#next verify that the Sources file matches the release file.
	releasefiles = findmostsecurereleasefiles(release)
	if 'sha256' in releasefiles[0]:
		hashalg = 'sha256'
		m = hashlib.sha256()
	elif 'sha1' in releasefiles[0]:
		hashalg = 'sha1'
		m = hashlib.sha1()
	else:
		hashalg = 'md5sum'
		m = hashlib.md5()
	m.update(sourcesdata)
	hash = m.hexdigest();
	print('sources file has hash '+hash)
	release = deb822.Release(releasedata)
	foundsources = False
	for file in releasefiles:
		pn = file['name']
		pns = pn.split('/')
		component = pns[0]
		fn = pns[-1]
		#if (fn == 'Sources'):
		#	print(repr(file))
		if (fn == 'Sources') and (hash == file[hashalg]):
			foundsources = True
			#print(repr(file))
	if (foundsources):
		print('successfully matched sources file to release file')
	else:
		print('failed to match Sources file to Release file')
		sys.exit(3)
	
	if args.keepevidence:
		f = open(package+'_'+version+'_Sources','wb')
		f.write(sourcesdata)
		f.close
	
	for fileentry in files:
		#print(repr(fileentry))
		if 'sha256' in fileentry:
			hashalg = 'sha256'
			m = hashlib.sha256()
		elif 'sha1' in fileentry:
			hashalg = 'sha1'
			m = hashlib.sha1()
		else:
			hashalg = 'md5sum'
			m = hashlib.md5()
		filehash = fileentry[hashalg]
		filesize = int(fileentry['size'])
		filename = fileentry['name']
		#sanity check, filename must begin with the package name
		if not filename.startswith(package):
			print('filename in source package does not begin with source package name')
			sys.exit(4)
		#sanity check, filename shouldn't contain any unwanted characters
		if not pfnallowed.fullmatch(filename):
			#print(repr(pfnallowed.match(filename)))
			print('filename in source package contains unwanted characters')
			sys.exit(5)
		if os.path.exists(filename):
			f = open(filename,'rb')
			filedata = f.read()
			f.close
			#m = hashlib.sha256()
			m.update(filedata)
			hash = m.hexdigest();
			if hash != filehash:
				print("hash sum mismatch when verifying existing file "+filename)
				sys.exit(7)
			if (len(filedata) != filesize):
				print("fize mismatch when verifying existing file "+filename)
				sys.exit(8)
			print("verified existing file "+filename)
		else:
			#file does not exist, download it
			fileurl='http://snapshot.debian.org/archive/'+archivename+'/'+seents+'/'+directory+'/'+filename
			#print(fileurl)
			with urllib.request.urlopen(fileurl) as response:
				filedata = response.read()
			#m = hashlib.sha256()
			m.update(filedata)
			hash = m.hexdigest();
			if hash != filehash:
				print("hash sum mismatch when downloading "+filename)
				sys.exit(6)
			if (len(filedata) != filesize):
				print("fize mismatch when downloading "+filename)
				sys.exit(9)
			f = open(filename,'wb')
			f.write(filedata)
			f.close
			print("successfully downloaded "+filename)
		
else:
	print('unable to locate Sources file containing package, most likely it belongs to a very old distribution that does not provide Release.gpg')
	sys.exit(1)


-------------- next part --------------
#!/bin/bash
cp /usr/share/keyrings/debian-archive-keyring.gpg snapshotsecure.gpg
RMKEYS=/usr/share/keyrings/debian-archive-removed-keys.gpg
gpg --keyring $RMKEYS --no-default-keyring --export `gpg --keyring $RMKEYS --no-default-keyring --list-keys --with-colons | grep '^pub:[^:]:1024:' | cut -d ':' -f 5` > snapshotsecure-1024.gpg
gpg --keyring $RMKEYS --no-default-keyring --export `gpg --keyring $RMKEYS --no-default-keyring --list-keys --with-colons | grep '^pub:' | grep -v '^pub:[^:]:1024:' | cut -d ':' -f 5` >> snapshotsecure.gpg


More information about the vcs-pkg-discuss mailing list