i officially hate netctl now i think
This commit is contained in:
@@ -1,3 +1,213 @@
|
||||
import ipaddress
|
||||
import os
|
||||
import pathlib
|
||||
import re
|
||||
##
|
||||
from pyroute2 import IPDB
|
||||
##
|
||||
import aif.utils
|
||||
|
||||
# Not needed
|
||||
# import gi
|
||||
# gi.require_version('NM', '1.0')
|
||||
# from gi.repository import GObject, NM, GLib
|
||||
|
||||
|
||||
def canonizeEUI(phyaddr):
|
||||
# The easy transformations first.
|
||||
phyaddr = re.sub(r'[.:-]', '', phyaddr.upper().strip())
|
||||
eui = ':'.join(['{0}'.format(phyaddr[i:i+2]) for i in range(0, 12, 2)])
|
||||
return(eui)
|
||||
|
||||
|
||||
def convertIpTuples(addr_xmlobj):
|
||||
# These tuples follow either:
|
||||
# ('dhcp'/'dhcp6'/'slaac', None, None) for auto configuration
|
||||
# (ipaddress.IPv4/6Address(IP), CIDR, ipaddress.IPv4/6Address(GW)) for static configuration
|
||||
if addr_xmlobj.text in ('dhcp', 'dhcp6', 'slaac'):
|
||||
addr = addr_xmlobj.text.strip()
|
||||
net = None
|
||||
gw = None
|
||||
else:
|
||||
components = addr_xmlobj.text.strip().split('/')
|
||||
if len(components) > 2:
|
||||
raise ValueError('Invalid IP/CIDR format: {0}'.format(addr_xmlobj.text))
|
||||
if len(components) == 1:
|
||||
addr = ipaddress.ip_address(components[0])
|
||||
if addr.version == 4:
|
||||
components.append('24')
|
||||
elif addr.version == 6:
|
||||
components.append('64')
|
||||
addr = ipaddress.ip_address(components[0])
|
||||
net = ipaddress.ip_network('/'.join(components), strict = False)
|
||||
try:
|
||||
gw = ipaddress.ip_address(addr_xmlobj.attrib.get('gateway').strip())
|
||||
except (ValueError, AttributeError):
|
||||
gw = next(net.hosts())
|
||||
return((addr, net, gw))
|
||||
|
||||
|
||||
def convertWifiCrypto(crypto_xmlobj):
|
||||
crypto = {'type': crypto_xmlobj.find('type').text.strip()}
|
||||
# if crypto['type'] in ('wpa', 'wpa2', 'wpa3'):
|
||||
if crypto['type'] in ('wpa', 'wpa2'):
|
||||
crypto['mode'] = crypto_xmlobj.find('mode')
|
||||
if not crypto['mode']:
|
||||
crypto['mode'] = 'personal'
|
||||
else:
|
||||
crypto['mode'] = crypto['mode'].text.strip()
|
||||
else:
|
||||
crypto['mode'] = None
|
||||
creds = crypto_xmlobj.find('creds')
|
||||
crypto['auth'] = {'type': creds.attrib.get('type', 'psk').strip()}
|
||||
if crypto['auth']['type'] == 'psk':
|
||||
crypto['auth']['psk'] = creds.text
|
||||
# TODO: enterprise support
|
||||
return(crypto)
|
||||
|
||||
|
||||
def getDefIface(ifacetype):
|
||||
if ifacetype == 'ethernet':
|
||||
if isNotPersistent():
|
||||
prefix = 'eth'
|
||||
else:
|
||||
prefix = 'en'
|
||||
elif ifacetype == 'wireless':
|
||||
prefix = 'wl'
|
||||
else:
|
||||
raise ValueError('ifacetype must be one of "ethernet" or "wireless"')
|
||||
ifname = None
|
||||
with IPDB() as ipdb:
|
||||
for iface in ipdb.interfaces.keys():
|
||||
if iface.startswith(prefix):
|
||||
ifname = iface
|
||||
break
|
||||
if not ifname:
|
||||
return(None)
|
||||
return(ifname)
|
||||
|
||||
|
||||
def isNotPersistent(chroot_base = '/'):
|
||||
chroot_base = pathlib.Path(chroot_base)
|
||||
systemd_override = chroot_base.joinpath('etc',
|
||||
'systemd',
|
||||
'network',
|
||||
'99-default.link')
|
||||
kernel_cmdline = chroot_base.joinpath('proc', 'cmdline')
|
||||
devnull = chroot_base.joinpath('dev', 'null')
|
||||
rootdevnull = pathlib.PosixPath('/dev/null')
|
||||
if os.path.islink(systemd_override) and pathlib.Path(systemd_override).resolve() in (devnull, rootdevnull):
|
||||
return(True)
|
||||
cmds = aif.utils.kernelCmdline(chroot_base)
|
||||
if 'net.ifnames' in cmds.keys() and cmds['net.ifnames'] == '0':
|
||||
return(True)
|
||||
return(False)
|
||||
|
||||
|
||||
class BaseConnection(object):
|
||||
def __init__(self, iface_xml):
|
||||
self.xml = iface_xml
|
||||
self.id = self.xml.attrib['id'].strip()
|
||||
self.device = self.xml.attrib['device'].strip()
|
||||
self.is_defroute = aif.utils.xmlBool(self.xml.attrib.get('defroute', 'false').strip())
|
||||
try:
|
||||
self.domain = self.xml.attrib.get('searchDomain').strip()
|
||||
except AttributeError:
|
||||
self.domain = None
|
||||
self.dhcp_client = self.xml.attrib.get('dhcpClient', 'dhcpcd').strip()
|
||||
self._cfg = None
|
||||
self.connection_type = None
|
||||
self.provider_type = None
|
||||
self.packages = []
|
||||
self.services = {}
|
||||
self.resolvers = []
|
||||
self.addrs = {'ipv4': [],
|
||||
'ipv6': []}
|
||||
self.routes = {'ipv4': [],
|
||||
'ipv6': []}
|
||||
self.auto = {}
|
||||
for x in ('resolvers', 'routes', 'addresses'):
|
||||
self.auto[x] = {}
|
||||
x_xml = self.xml.find(x)
|
||||
for t in ('ipv4', 'ipv6'):
|
||||
if t == 'ipv6' and x == 'addresses':
|
||||
self.auto[x][t] = 'slaac'
|
||||
else:
|
||||
self.auto[x][t] = True
|
||||
if x_xml:
|
||||
t_xml = x_xml.find(t)
|
||||
if t_xml:
|
||||
if t == 'ipv6' and x == 'addresses':
|
||||
a = t_xml.attrib.get('auto', 'slaac').strip()
|
||||
if a.lower() in ('false', '0', 'none'):
|
||||
self.auto[x][t] = False
|
||||
else:
|
||||
self.auto[x][t] = a
|
||||
else:
|
||||
self.auto[x][t] = aif.utils.xmlBool(t_xml.attrib.get('auto', 'true').strip())
|
||||
# These defaults are from the man page. However, we might want to add:
|
||||
# domain-search, netbios-scope, interface-mtu, rfc3442-classless-static-routes, ntp-servers,
|
||||
# dhcp6.fqdn, dhcp6.sntp-servers
|
||||
# under requests and for requires, maybe:
|
||||
# routers, domain-name-servers, domain-name, domain-search, host-name
|
||||
self.dhcp_defaults = {
|
||||
'dhclient': {'requests': {'ipv4': ('subnet-mask', 'broadcast-address', 'time-offset', 'routers',
|
||||
'domain-name', 'domain-name-servers', 'host-name'),
|
||||
'ipv6': ('dhcp6.name-servers',
|
||||
'dhcp6.domain-search')},
|
||||
'requires': {'ipv4': tuple(),
|
||||
'ipv6': tuple()}},
|
||||
'dhcpcd': {'default_opts': ('hostname', 'duid', 'persistent', 'slaac private', 'noipv4ll'),
|
||||
# dhcpcd -V to display variables.
|
||||
# "option <foo>", prepend "dhcp6_" for ipv6. if no ipv6 opts present, same are mapped to ipv6.
|
||||
# But we explicitly add them for munging downstream.
|
||||
'requests': {'ipv4': ('rapid_commit', 'domain_name_servers', 'domain_name', 'domain_search',
|
||||
'host_name', 'classless_static_routes', 'interface_mtu'),
|
||||
'ipv6': ('dhcp6_rapid_commit', 'dhcp6_domain_name_servers', 'dhcp6_domain_name',
|
||||
'dhcp6_domain_search', 'dhcp6_host_name', 'dhcp6_classless_static_routes',
|
||||
'dhcp6_interface_mtu')},
|
||||
# "require <foo>"
|
||||
'requires': {'ipv4': ('dhcp_server_identifier', ),
|
||||
'ipv6': tuple()}}}
|
||||
self._initAddrs()
|
||||
self._initResolvers()
|
||||
self._initRoutes()
|
||||
|
||||
def _initAddrs(self):
|
||||
for addrtype in ('ipv4', 'ipv6'):
|
||||
for a in self.xml.findall('addresses/{0}/address'.format(addrtype)):
|
||||
addrset = convertIpTuples(a)
|
||||
if addrset not in self.addrs[addrtype]:
|
||||
self.addrs[addrtype].append(addrset)
|
||||
return()
|
||||
|
||||
def _initCfg(self):
|
||||
# A dummy method; this is overridden by the subclasses.
|
||||
# It's honestly here to make my IDE stop complaining. :)
|
||||
pass
|
||||
return()
|
||||
|
||||
def _initConnCfg(self):
|
||||
# A dummy method; this is overridden by the subclasses.
|
||||
# It's honestly here to make my IDE stop complaining. :)
|
||||
pass
|
||||
return()
|
||||
|
||||
def _initResolvers(self):
|
||||
resolvers_xml = self.xml.find('resolvers')
|
||||
if resolvers_xml:
|
||||
for r in resolvers_xml.findall('resolver'):
|
||||
resolver = ipaddress.ip_address(r.text.strip())
|
||||
if resolver not in self.resolvers:
|
||||
self.resolvers.append(resolver)
|
||||
return()
|
||||
|
||||
def _initRoutes(self):
|
||||
routes_xml = self.xml.find('routes')
|
||||
if routes_xml:
|
||||
for addrtype in ('ipv4', 'ipv6'):
|
||||
for a in self.xml.findall('routes/{0}/route'.format(addrtype)):
|
||||
addrset = convertIpTuples(a)
|
||||
if addrset not in self.routes[addrtype]:
|
||||
self.routes[addrtype].append(addrset)
|
||||
return()
|
||||
|
||||
Reference in New Issue
Block a user