i think i have something working

This commit is contained in:
brent s
2019-09-20 12:55:13 -04:00
parent 78254a5970
commit 0695b86add
10 changed files with 513 additions and 228 deletions

0
ARB/__init__.py Normal file
View File

9
ARB/arb_util.py Normal file
View File

@@ -0,0 +1,9 @@
def xmlBool(xmlobj):
if isinstance(xmlobj, bool):
return (xmlobj)
if xmlobj.lower() in ('1', 'true'):
return(True)
elif xmlobj.lower() in ('0', 'false'):
return(False)
else:
return(None)

145
ARB/main.py Executable file
View File

@@ -0,0 +1,145 @@
#!/usr/bin/env python3
import argparse
import os
import sqlite3
##
from lxml import etree
##
import repo
# TODO: track which versions are built so we don't need to consistently rebuild ALL packages
# TODO: logging
# TODO: check result of build and *conditionally* cleanup if self.cleanup == True.
_dflts = {'cfgfile': '~/.config/arch_repo_builder/config.xml',
'cache_db': '~/.cache/arch_repo_builder/packages.sqlite3'}
class Packager(object):
def __init__(self, cfgfile = _dflts['cfgfile'], cache_db = _dflts['cache_db'], validate = True, *args, **kwargs):
self.cfgfile = os.path.abspath(os.path.expanduser(cfgfile))
self.cache_db = os.path.abspath(os.path.expanduser(cache_db))
self.cfg = None
self.xml = None
self.schema = None
self.ns = None
self.repos = []
self.db = None
self.cur = None
self.origdir = os.path.abspath(os.path.expanduser(os.getcwd()))
self._initCfg(validate = validate)
self._initDB()
self._initRepos()
def _initCfg(self, validate = True):
with open(self.cfgfile, 'rb') as f:
self.xml = etree.parse(f)
self.xml.xinclude()
self.cfg = self.xml.getroot()
self.ns = self.cfg.nsmap.get(None, 'http://git.square-r00t.net/Arch_Repo_Builder/tree/')
self.ns = '{{{0}}}'.format(self.ns)
if validate:
if not self.schema:
from urllib.request import urlopen
xsi = self.cfg.nsmap.get('xsi', 'http://www.w3.org/2001/XMLSchema-instance')
schemaLocation = '{{{0}}}schemaLocation'.format(xsi)
schemaURL = self.cfg.attrib.get(schemaLocation,
('http://git.square-r00t.net/Arch_Repo_Builder/plain/archrepo.xsd'))
with urlopen(schemaURL) as url:
self.schema = url.read()
self.schema = etree.XMLSchema(etree.XML(self.schema))
self.schema.assertValid(self.xml)
return()
def _initDB(self):
is_new = False
if not os.path.isdir(os.path.dirname(self.cache_db)):
os.makedirs(os.path.dirname(self.cache_db), exist_ok = True)
is_new = True
if not os.path.isfile(self.cache_db):
is_new = True
self.db = sqlite3.connect(self.cache_db)
self.db.row_factory = sqlite3.Row
self.cur = self.db.cursor()
if is_new:
self.cur.execute(('CREATE TABLE IF NOT EXISTS '
'"packages" ('
'"name" TEXT NOT NULL UNIQUE, '
'"version" TEXT NOT NULL, '
'"source" TEXT NOT NULL, '
'"repo" TEXT NOT NULL, '
'"sign" INTEGER NOT NULL, '
'PRIMARY KEY("name"))'))
self.cur.execute(('CREATE TABLE IF NOT EXISTS '
'"repos" ('
'"name" TEXT NOT NULL UNIQUE, '
'"dest" TEXT NOT NULL, '
'"path" TEXT NOT NULL, '
'"sign" INTEGER NOT NULL, '
'"gpghome" TEXT, '
'"gpgkeyid" TEXT, '
'PRIMARY KEY("name"))'))
self.db.commit()
return()
def _initRepos(self):
for r in self.xml.findall('{0}repo'.format(self.ns)):
self.repos.append(repo.Repo(r, ns = self.ns))
return()
def build(self):
pass
return()
def _closeDB(self):
if self.cur:
self.cur.close()
if self.db:
self.db.close()
return()
def parseArgs():
args = argparse.ArgumentParser(description = 'Build Pacman packages and update a local repository')
args.add_argument('-n', '--no-validate',
dest = 'validate',
action = 'store_false',
help = ('If specified, do NOT attempt to validate the config file (-c/--config)'))
args.add_argument('-c', '--config',
dest = 'cfgfile',
default = _dflts['cfgfile'],
help = ('The path to the configuration file. Default: {0}').format(_dflts['cfgfile']))
args.add_argument('-C', '--cache-db',
dest = 'cache_db',
default = _dflts['cache_db'],
help = ('The path to the cache DB file. Default: {0}').format(_dflts['cache_db']))
return (args)
def main():
args = parseArgs().parse_args()
varargs = vars(args)
pkgr = Packager(**varargs)
# pkgr.buildPkgs(auronly = varargs['auronly'])
# pkgr.createRepo()
import pprint
# print('PACKAGER:')
# pprint.pprint(vars(pkgr))
for r in pkgr.repos:
# print('\nREPO: {0}'.format(r.name))
# pprint.pprint(vars(r))
# for m in r.mirrors:
# print('\nREPO/MIRROR: {0}/{1}'.format(r.name, type(m).__name__))
# pprint.pprint(vars(m))
for p in r.packages:
#p.extract(r.staging_dir)
p.getPkgInfo()
print('\nREPO/PACKAGE: {0}/{1} ({2})'.format(r.name, p.name, type(p).__name__))
pprint.pprint(vars(p))
return ()
if __name__ == '__main__':
main()

39
ARB/mirror.py Normal file
View File

@@ -0,0 +1,39 @@
import os
import grp
import pwd
##
import paramiko
class Mirror(object):
def __init__(self, mirror_xml, ns = '', *args, **kwargs):
self.xml = mirror_xml
self.ns = ns
if os.environ.get('SUDO_USER'):
_uname = os.environ['SUDO_USER']
else:
_uname = pwd.getpwuid(os.geteuid()).pw_name
self.user = pwd.getpwnam(mirror_xml.attrib.get('user', _uname))
self.fmode = int(self.xml.attrib.get('fileMode', '0600'), 8)
self.dmode = int(self.xml.attrib.get('dirMode', '0700'), 8)
self.dest = self.xml.text
class LocalMirror(Mirror):
def __init__(self, mirror_xml, ns = '', *args, **kwargs):
super().__init__(mirror_xml, ns = ns, *args, **kwargs)
if os.environ.get('SUDO_GID'):
_grpnm = os.environ['SUDO_GID']
else:
_grpnm = grp.getgrgid(os.getegid()).gr_name
self.group = grp.getgrnam(mirror_xml.attrib.get('group', _grpnm))
self.dest = os.path.abspath(os.path.expanduser(self.dest))
class RemoteMirror(Mirror):
def __init__(self, mirror_xml, ns = '', *args, **kwargs):
super().__init__(mirror_xml, ns = ns, *args, **kwargs)
self.port = int(mirror_xml.attrib.get('port', 22))
self.keyfile = os.path.abspath(os.path.expanduser(mirror_xml.attrib.get('key', '~/.ssh/id_rsa')))
self.remote_user = mirror_xml.attrib.get('remoteUser')
self.remote_group = mirror_xml.attrib.get('remoteGroup')

226
ARB/package.py Normal file
View File

@@ -0,0 +1,226 @@
import copy
import os
import re
import shutil
import subprocess
import tarfile
import tempfile
import warnings
##
import Namcap
import requests
##
import arb_util
# TODO: should this be a configuration option?
aurbase = 'https://aur.archlinux.org'
# Maps the AUR API attribute names to their pkgbuild equivalents.
# Remote all attributes not present here.
_attrmap = {'Description': 'desc',
'Depends': 'depends',
'License': 'licenses',
'Name': 'name',
'URL': 'url',
'URLPath': 'pkgurl',
'Version': 'version'}
# And this is a blank dict as returned by Namcap.package (with useless values removed)
_pkgattrs = {k: None for k in ('base', 'desc', 'install', 'name', 'url', 'version')}
_pkgattrs.update({k: [] for k in ('arch', 'backup', 'conflicts', 'depends', 'groups', 'licenses', 'makedepends',
'md5sums', 'names', 'optdepends', 'options', 'orig_depends', 'orig_makedepends',
'orig_optdepends', 'orig_provides', 'provides', 'replaces', 'sha1sums',
'sha224sums', 'sha256sums', 'sha384sums', 'sha512sums', 'source', 'split',
'validgpgkeys')})
# This be custom.
_pkgattrs.update({'pkgurl': None})
class Package(object):
def __init__(self, pkg_xml, ns = '', gpgobj = None, *args, **kwargs):
self.xml = pkg_xml
self.always_build = arb_util.xmlBool(pkg_xml.attrib.get('alwaysBuild', True))
self.cleanup = arb_util.xmlBool(pkg_xml.attrib.get('cleanUp', True))
self.name = pkg_xml.text
self.gpg = gpgobj
self.pkginfo = None
self.srcdir = None
def build(self, destdir):
if not self.srcdir:
raise RuntimeError('You must run .extract() before running .build()')
prebuild_files = []
postbuild_files = []
for root, dirs, files in os.walk(self.srcdir):
for f in files:
prebuild_files.append(os.path.join(root, f))
os.chdir(self.srcdir)
# customizepkg-scripting in AUR
try:
custpkg_out = subprocess.run(['/usr/bin/customizepkg',
'-m'],
stdout = subprocess.PIPE,
stderr = subprocess.PIPE)
except FileNotFoundError:
pass # Not installed
build_out = subprocess.run(['/usr/bin/multilib-build',
'-c',
'--',
'--',
'--skippgpcheck',
'--syncdeps',
'--noconfirm',
'--log',
'--holdver',
'--skipinteg'],
stdout = subprocess.PIPE,
stderr = subprocess.PIPE)
for root, dirs, files in os.walk(self.srcdir):
for f in files:
fpath = os.path.join(root, f)
if fpath in prebuild_files:
continue
if fpath.endswith('.log'):
continue
postbuild_files.append(fpath)
postbuild_files = [i for i in postbuild_files if i.endswith('.pkg.tar.xz')]
if not postbuild_files:
warnings.warn('Could not reliably find any built packages for {0}; skipping'.format(self.name))
else:
for f in postbuild_files:
fdest = os.path.join(destdir,
os.path.basename(f))
if os.path.isfile(fdest):
os.remove(fdest)
shutil.move(f, fdest)
if self.cleanup:
shutil.rmtree(self.srcdir)
return([os.path.basename(f) for f in postbuild_files])
def extract(self, dest):
# no-op; this is handled in the subclasses since it's unique to them.
pass
return(True)
def getPkgInfo(self):
# no-op; this is handled in the subclasses since it's unique to them.
pass
return(True)
class LocalPkg(Package):
def __init__(self, pkg_xml, ns = '', *args, **kwargs):
super().__init__(pkg_xml, ns = ns, *args, **kwargs)
self.source = os.path.abspath(os.path.expanduser(pkg_xml.attrib.get('path',
'.')))
def extract(self, dest):
self.getPkgInfo()
if os.path.isfile(self.source):
try:
with tarfile.open(name = self.source, mode = 'r|*') as tar:
tar.extractall(dest)
dest = os.path.join(dest, self.name)
except tarfile.ReadError as e:
if str(e) != 'invalid header':
# TODO: log instead
raise (e)
# "invalid header" means it isn't a tarball. Contextually, that means a PKGBUILD file.
dest = os.path.join(dest, self.name)
os.makedirs(dest, exist_ok = True)
shutil.copy2(self.source, dest)
elif os.path.isdir(self.source):
os.makedirs(dest, exist_ok = True)
# Already "extracted".
shutil.copytree(self.source, dest)
pkg = dict(Namcap.package.load_from_pkgbuild(os.path.join(dest, 'PKGBUILD')))
del(pkg['setvars'])
self.pkginfo.update(pkg)
self.srcdir = dest
return(True)
def getPkgInfo(self):
pkgbuild = None
pkgbld_re = re.compile(r'(^|/)PKGBUILD$')
is_temp = False
if os.path.isfile(self.source):
try:
with tarfile.open(name = self.source, mode = 'r:*') as tar:
for f in tar.getmembers():
if pkgbld_re.search(f.name):
pkgbuild = tempfile.mkstemp()[1]
with open(pkgbuild, 'wb') as fh:
fh.write(tar.extractfile(f).read())
is_temp = True
break
except tarfile.ReadError as e:
if str(e) != 'file could not be opened successfully':
# TODO: log instead
raise(e)
# "file could not be opened successfully" means it isn't a tarball.
# Contextually, that means a PKGBUILD file.
pkgbuild = self.source
elif os.path.isdir(self.source):
pkgbuild = os.path.join(self.source, 'PKGBUILD')
if not pkgbuild:
raise RuntimeError('Could not find a PKGBUILD for {0}'.format(self.name))
pkg = copy.deepcopy(_pkgattrs)
pkg.update(dict(Namcap.package.load_from_pkgbuild(pkgbuild)))
del(pkg['setvars'])
if is_temp:
os.remove(pkgbuild)
if self.pkginfo and isinstance(self.pkginfo, dict):
self.pkginfo.update(pkg)
else:
self.pkginfo = pkg
return()
class AURPkg(Package):
def __init__(self, pkg_xml, ns = '', *args, **kwargs):
super().__init__(pkg_xml, ns = ns, *args, **kwargs)
def extract(self, dest):
dl_url = None
self.getPkgInfo()
if self.pkginfo['name'] == self.name:
dl_url = os.path.join(aurbase, re.sub(r'^/+', r'', self.pkginfo['pkgurl']))
if not dl_url:
# TODO: log instead?
warnings.warn('Could not find a download path for {0}; skipping'.format(self.name))
return(False)
with requests.get(dl_url, stream = True) as url:
try:
with tarfile.open(mode = 'r|*', fileobj = io.BytesIO(url.content)) as tar:
tar.extractall(dest)
dest = os.path.join(dest, self.name)
# This *technically* does nothing unless the AUR is *very* broken.
except tarfile.ReadError as e:
# "invalid header" means it isn't a tarball
if str(e) != 'invalid header':
# TODO: log instead
raise(e)
pkg = Namcap.package.load_from_pkgbuild(os.path.join(dest, 'PKGBUILD'))
del(pkg['setvars'])
self.pkginfo.update(pkg)
self.srcdir = dest
return(True)
def getPkgInfo(self):
pkg_srch = requests.get(os.path.join(aurbase, 'rpc'),
params = {'v': 5,
'type': 'info',
'arg': self.name}).json()
if 'results' not in pkg_srch:
raise RuntimeError(('AUR request for {0} was unsuccessful.'
'Check {1} for status').format(self.name, aurbase))
if len(pkg_srch['results']) != 1:
# TODO: log instead?
warnings.warn('Package {0} not found in the AUR'.format(self.name))
return(False)
pkginfo = copy.deepcopy(_pkgattrs)
for k, v in pkg_srch['results'][0].items():
if k in _attrmap:
pkginfo[_attrmap[k]] = v
self.pkginfo = pkginfo
return()

79
ARB/repo.py Normal file
View File

@@ -0,0 +1,79 @@
import os
import re
##
import gpg
##
import arb_util
import mirror
import package
class Repo(object):
def __init__(self, repo_xml, ns = '', *args, **kwargs):
self.xml = repo_xml
self.name = repo_xml.attrib['name']
self.ns = ns
self.gpg = None
self.key = None
self.mirrors = []
self.packages = []
_key_id = self.xml.attrib.get('gpgKeyID')
self.key_id = (re.sub(r'\s+', '', _key_id) if _key_id else None)
self.staging_dir = os.path.abspath(os.path.expanduser(self.xml.attrib.get('staging',
'.')))
self.sign_pkgs = arb_util.xmlBool(self.xml.attrib.get('signPkgs', True))
self.sign_db = arb_util.xmlBool(self.xml.attrib.get('signDB', True))
self._initSigner()
self._initMirrors()
self._initPackages()
def _initMirrors(self):
for m in self.xml.findall('{0}mirrors/{0}mirror.RemoteMirror'.format(self.ns)):
self.mirrors.append(mirror.RemoteMirror(m, ns = self.ns))
for m in self.xml.findall('{0}mirrors/{0}mirror.LocalMirror'.format(self.ns)):
self.mirrors.append(mirror.LocalMirror(m, ns = self.ns))
return()
def _initPackages(self):
for pkg in self.xml.findall('{0}packages/{0}aur'.format(self.ns)):
self.packages.append(package.AURPkg(pkg, ns = self.ns))
for pkg in self.xml.findall('{0}packages/{0}pkgbuild'.format(self.ns)):
self.packages.append(package.LocalPkg(pkg, ns = self.ns))
return()
def _initSigner(self):
if self.key_id:
squashed_key = re.sub(r'^(?:0X)?([0-9A-Z]+)$', r'\g<1>', self.key_id.upper())
else:
squashed_key = None
gpghome = self.xml.attrib.get('gnupgHome',
os.environ.get('GNUPGHOME',
'~/.gnupg'))
gpghome = os.path.abspath(os.path.expanduser(gpghome))
if not gpghome:
raise FileNotFoundError('{0} does not exist'.format(gpghome))
self.gpg = gpg.Context(home_dir = gpghome)
keys = [k for k in self.gpg.keylist(pattern = self.key_id, secret = True)]
for k in keys:
# In form of: (fingerprint/full, long, short)
keyforms = (k.fpr, k.fpr[-16:], k.fpr[-8:])
if squashed_key:
if squashed_key in keyforms:
if k.can_sign:
self.key = k
break
else:
for s in k.subkeys:
subkeyforms = (s.fpr, s.fpr[-16:], s.fpr[-8:])
if squashed_key in subkeyforms:
if s.can_sign:
self.key = s
break
else:
if k.can_sign:
self.key = k
break
if not self.key:
raise ValueError('Cannot find a suitable signing GPG key')
self.gpg.signers = [self.key]
return()