i'm pretty sure luks non-gi is now done

This commit is contained in:
brent s
2019-11-10 01:37:15 -05:00
parent a1c126847c
commit f68069a25e
6 changed files with 320 additions and 16 deletions

View File

@@ -1,5 +1,6 @@
import os
import secrets
import uuid
##
from . import _common
import aif.disk.block as block
@@ -63,6 +64,7 @@ class LUKS(object):
'aif.disk.mdadm.Array'))
_common.addBDPlugin('crypto')
self.devpath = '/dev/mapper/{0}'.format(self.name)
self.info = None
def addSecret(self, secretobj):
if not isinstance(secretobj, LuksSecret):
@@ -117,19 +119,24 @@ class LUKS(object):
return()
def create(self):
if self.created:
return()
if not self.secrets:
raise RuntimeError('Cannot create a LUKS volume with no secrets added')
for idx, secret in enumerate(self.secrets):
if idx == 0:
_BlockDev.crypto.luks_format_luks2(self.source,
self.cipher,
self.keysize,
self.passphrase,
self.key_file,
self.min_entropy)
# TODO: add support for custom parameters for below?
_BlockDev.crypto.luks_format_luks2_blob(self.source,
None, # cipher (use default)
0, # keysize (use default)
secret.passphrase, # passphrase
0, # minimum entropy (use default)
_BlockDev.CryptoLUKSVersion.LUKS2, # LUKS version
None) # extra args
else:
pass # TODO: *add* keyfile/passphrase/whatev.
pass
_BlockDev.crypto.luks_add_key_blob(self.source,
self.secrets[0].passphrase,
secret.passphrase)
self.created = True
return()
@@ -138,8 +145,7 @@ class LUKS(object):
raise RuntimeError('Cannot lock a LUKS volume before it is created')
if self.locked:
return()
pass
_BlockDev.crypto.luks_close(self.name)
self.locked = True
return()
@@ -148,7 +154,54 @@ class LUKS(object):
raise RuntimeError('Cannot unlock a LUKS volume before it is created')
if not self.locked:
return()
pass
_BlockDev.crypto.luks_open_blob(self.source,
self.name,
self.secrets[0].passphrase,
False) # read-only
self.locked = False
return()
def updateInfo(self):
if self.locked:
raise RuntimeError('Must be unlocked to gather info')
info = {}
_info = _BlockDev.crypto.luks_info(self.devpath)
for k in dir(_info):
if k.startswith('_'):
continue
elif k in ('copy', ):
continue
v = getattr(_info, k)
if k == 'uuid':
v = uuid.UUID(hex = v)
info[k] = v
info['_cipher'] = '{cipher}-{mode}'.format(**info)
self.info = info
return()
def writeConf(self, conf = '/etc/crypttab'):
if not self.secrets:
raise RuntimeError('secrets must be added before the configuration can be written')
conf = os.path.realpath(conf)
with open(conf, 'r') as fh:
conflines = fh.read().splitlines()
# Get UUID
disk_uuid = None
uuid_dir = '/dev/disk/by-uuid'
for u in os.listdir(uuid_dir):
d = os.path.join(uuid_dir, u)
if os.path.realpath(d) == self.source:
disk_uuid = u
if disk_uuid:
identifer = 'UUID={0}'.format(disk_uuid)
else:
# This is *not* ideal, but better than nothing.
identifer = self.source
primary_key = self.secrets[0]
luksinfo = '{0}\t{1}\t{2}\tluks'.format(self.name,
identifer,
(primary_key.path if primary_key.path else '-'))
if luksinfo not in conflines:
with open(conf, 'a') as fh:
fh.write('{0}\n'.format(luksinfo))
return()