[apparmor] [patch 3/3] Convert existing code to use NetworkRule and NetworkRuleset
Christian Boltz
apparmor at cboltz.de
Tue Apr 14 21:41:33 UTC 2015
Hello,
Change aa.py to use NetworkRule and NetworkRuleset instead of a
sub-hasher to store, check and write network rules. In detail:
- drop profile_known_network() and use is_known_rule() instead
- replace match_net_includes() usage with match_includes() calls
- drop delete_net_duplicates(), use the code in NetworkRule(set) instead
- make match_net_includes() (still used by aa-mergeprof) a wrapper for
match_includes()
- drop all the network rule parsing from parse_profile_data() and
serialize_profile_from_old_profile() - instead, just call
NetworkRule.parse
- now that write_net_rules() got fixed, drop it ;-)
- change write_netdomain to use NetworkRuleset
- drop netrules_access_check() - that's is_covered() now
- use 'network' instead of 'netdomain' as storage keyword (log events
still use 'netdomain')
Also update cleanprofile.py to use the NetworkRuleset class.
This also means to delete the (now superfluous) delete_net_duplicates()
function.
Finally, there are some changes in regex.py:
- change RE_PROFILE_NETWORK in regex.py to named matches and to use
RE_COMMA_EOL (not only RE_EOL)
- drop the no longer needed RE_NETWORK_FAMILY and RE_NETWORK_FAMILY_TYPE
(rule/network.py has regexes that check against the list of available
keywords)
Note: Some parts of this patch will only apply if you apply my other
pending patches first.
Diffstat for all 3 patches:
apparmor/aa.py | 224 +++---------------------
apparmor/cleanprofile.py | 38 ----
apparmor/regex.py | 4
apparmor/rule/network.py | 210 +++++++++++++++++++++++
test/test-network.py | 428 +++++++++++++++++++++++++++++++++++++++++++++++
5 files changed, 673 insertions(+), 231 deletions(-)
[ 46-convert-to-use-NetworkRule.diff ]
=== modified file 'utils/apparmor/aa.py'
--- utils/apparmor/aa.py 2015-04-11 00:20:31 +0000
+++ utils/apparmor/aa.py 2015-04-11 18:04:51 +0000
@@ -45,7 +45,7 @@
RE_PROFILE_BOOLEAN, RE_PROFILE_VARIABLE, RE_PROFILE_CONDITIONAL,
RE_PROFILE_CONDITIONAL_VARIABLE, RE_PROFILE_CONDITIONAL_BOOLEAN,
RE_PROFILE_BARE_FILE_ENTRY, RE_PROFILE_PATH_ENTRY, RE_PROFILE_NETWORK,
- RE_NETWORK_FAMILY_TYPE, RE_NETWORK_FAMILY, RE_PROFILE_CHANGE_HAT,
+ RE_PROFILE_CHANGE_HAT,
RE_PROFILE_HAT_DEF, RE_PROFILE_DBUS, RE_PROFILE_MOUNT,
RE_PROFILE_SIGNAL, RE_PROFILE_PTRACE, RE_PROFILE_PIVOT_ROOT,
RE_PROFILE_UNIX, RE_RULE_HAS_COMMA, RE_HAS_COMMENT_SPLIT,
@@ -54,6 +54,7 @@
import apparmor.rules as aarules
from apparmor.rule.capability import CapabilityRuleset, CapabilityRule
+from apparmor.rule.network import NetworkRuleset, NetworkRule
from apparmor.rule import parse_modifiers
from apparmor.yasti import SendDataToYast, GetDataFromYast, shutdown_yast
@@ -1450,8 +1451,6 @@
if stub_profile[hat][hat].get('include', False):
aa[profile][hat]['include'] = stub_profile[hat][hat]['include']
- aa[profile][hat]['allow']['netdomain'] = hasher()
-
file_name = aa[profile][profile]['filename']
filelist[file_name]['profiles'][profile][hat] = True
@@ -1958,11 +1957,12 @@
for family in sorted(log_dict[aamode][profile][hat]['netdomain'].keys()):
# severity handling for net toggles goes here
for sock_type in sorted(log_dict[aamode][profile][hat]['netdomain'][family].keys()):
- if profile_known_network(aa[profile][hat], family, sock_type):
+ network_obj = NetworkRule(family, sock_type)
+ if is_known_rule(aa[profile][hat], 'network', network_obj):
continue
default_option = 1
options = []
- newincludes = match_net_includes(aa[profile][hat], family, sock_type)
+ newincludes = match_includes(aa[profile][hat], 'network', network_obj)
q = aaui.PromptQuestion()
if newincludes:
options += list(map(lambda s: '#include <%s>' % s, sorted(set(newincludes))))
@@ -2031,8 +2031,7 @@
aaui.UI_Info(_('Deleted %s previous matching profile entries.') % deleted)
else:
- aa[profile][hat]['allow']['netdomain']['audit'][family][sock_type] = audit_toggle
- aa[profile][hat]['allow']['netdomain']['rule'][family][sock_type] = True
+ aa[profile][hat]['network'].add(NetworkRule(family, sock_type, audit=audit_toggle))
changed[profile] = True
@@ -2040,7 +2039,7 @@
elif ans == 'CMD_DENY':
done = True
- aa[profile][hat]['deny']['netdomain']['rule'][family][sock_type] = True
+ aa[profile][hat]['network'].add(NetworkRule(family, sock_type, audit=audit_toggle, deny=True))
changed[profile] = True
aaui.UI_Info(_('Denying network access %(family)s %(type)s to profile') % { 'family': family, 'type': sock_type })
@@ -2103,31 +2102,6 @@
newpath = re.sub('/[^/]+(\.[^/]+)$', '/*' + match.groups()[0], newpath)
return newpath
-def delete_net_duplicates(netrules, incnetrules):
- deleted = 0
- hasher_obj = hasher()
- copy_netrules = deepcopy(netrules)
- if incnetrules and netrules:
- incnetglob = False
- # Delete matching rules from abstractions
- if incnetrules.get('all', False):
- incnetglob = True
- for fam in copy_netrules['rule'].keys():
- if incnetglob or (type(incnetrules['rule'][fam]) != type(hasher_obj) and incnetrules['rule'][fam]):
- if type(netrules['rule'][fam]) == type(hasher_obj):
- deleted += len(netrules['rule'][fam].keys())
- else:
- deleted += 1
- netrules['rule'].pop(fam)
- elif type(netrules['rule'][fam]) != type(hasher_obj) and netrules['rule'][fam]:
- continue
- else:
- for socket_type in copy_netrules['rule'][fam].keys():
- if incnetrules['rule'][fam].get(socket_type, False):
- netrules['rule'][fam].pop(socket_type)
- deleted += 1
- return deleted
-
def delete_path_duplicates(profile, incname, allow):
deleted = []
for entry in profile[allow]['path'].keys():
@@ -2150,20 +2124,14 @@
# only a subset allow rules may actually be denied
if include.get(incname, False):
- deleted += delete_net_duplicates(profile['allow']['netdomain'], include[incname][incname]['allow']['netdomain'])
-
- deleted += delete_net_duplicates(profile['deny']['netdomain'], include[incname][incname]['deny']['netdomain'])
-
+ deleted += profile['network'].delete_duplicates(include[incname][incname]['network'])
deleted += profile['capability'].delete_duplicates(include[incname][incname]['capability'])
deleted += delete_path_duplicates(profile, incname, 'allow')
deleted += delete_path_duplicates(profile, incname, 'deny')
elif filelist.get(incname, False):
- deleted += delete_net_duplicates(profile['allow']['netdomain'], filelist[incname][incname]['allow']['netdomain'])
-
- deleted += delete_net_duplicates(profile['deny']['netdomain'], filelist[incname][incname]['deny']['netdomain'])
-
+ deleted += profile['network'].delete_duplicates(filelist[incname][incname]['network'])
deleted += profile['capability'].delete_duplicates(filelist[incname][incname]['capability'])
deleted += delete_path_duplicates(profile, incname, 'allow')
@@ -2172,25 +2140,10 @@
return deleted
def match_net_include(incname, family, type):
- includelist = [incname]
- checked = []
- name = None
- if includelist:
- name = includelist.pop(0)
- while name:
- checked.append(name)
- if netrules_access_check(include[name][name]['allow']['netdomain'], family, type):
- return True
-
- if include[name][name]['include'].keys() and name not in checked:
- includelist += include[name][name]['include'].keys()
-
- if len(includelist):
- name = includelist.pop(0)
- else:
- name = False
-
- return False
+ # still used by aa-mergeprof
+ network_obj = NetworkRule(family, type)
+ return match_includes(incname, 'network', network_obj)
+
def match_cap_includes(profile, capability):
# still used by aa-mergeprof
@@ -2537,7 +2490,7 @@
nd = prelog[aamode][profile][hat]['netdomain']
for family in nd.keys():
for sock_type in nd[family].keys():
- if not profile_known_network(aa[profile][hat], family, sock_type):
+ if not is_known_rule(aa[profile][hat], 'network', NetworkRule(family, sock_type)):
log_dict[aamode][profile][hat]['netdomain'][family][sock_type] = True
@@ -2713,7 +2666,7 @@
profile_data[profile][hat]['flags'] = flags
- profile_data[profile][hat]['allow']['netdomain'] = hasher()
+ profile_data[profile][hat]['network'] = NetworkRuleset()
profile_data[profile][hat]['allow']['path'] = hasher()
profile_data[profile][hat]['allow']['dbus'] = list()
profile_data[profile][hat]['allow']['mount'] = list()
@@ -2963,34 +2916,14 @@
load_include(include_name)
elif RE_PROFILE_NETWORK.search(line):
- matches = RE_PROFILE_NETWORK.search(line).groups()
-
if not profile:
raise AppArmorException(_('Syntax Error: Unexpected network entry found in file: %(file)s line: %(line)s') % { 'file': file, 'line': lineno + 1 })
- audit = False
- if matches[0]:
- audit = True
- allow = 'allow'
- if matches[1] and matches[1].strip() == 'deny':
- allow = 'deny'
- network = matches[2]
+ # init rule class (if not done yet)
+ if not profile_data[profile][hat].get('network', False):
+ profile_data[profile][hat]['network'] = CapabilityRuleset()
- if RE_NETWORK_FAMILY_TYPE.search(network):
- nmatch = RE_NETWORK_FAMILY_TYPE.search(network).groups()
- fam, typ = nmatch[:2]
- ##Simply ignore any type subrules if family has True (seperately for allow and deny)
- ##This will lead to those type specific rules being lost when written
- #if type(profile_data[profile][hat][allow]['netdomain']['rule'].get(fam, False)) == dict:
- profile_data[profile][hat][allow]['netdomain']['rule'][fam][typ] = 1
- profile_data[profile][hat][allow]['netdomain']['audit'][fam][typ] = audit
- elif RE_NETWORK_FAMILY.search(network):
- fam = RE_NETWORK_FAMILY.search(network).groups()[0]
- profile_data[profile][hat][allow]['netdomain']['rule'][fam] = True
- profile_data[profile][hat][allow]['netdomain']['audit'][fam] = audit
- else:
- profile_data[profile][hat][allow]['netdomain']['rule']['all'] = True
- profile_data[profile][hat][allow]['netdomain']['audit']['all'] = audit # True
+ profile_data[profile][hat]['network'].add(NetworkRule.parse(line))
elif RE_PROFILE_DBUS.search(line):
matches = RE_PROFILE_DBUS.search(line).groups()
@@ -3386,39 +3319,10 @@
data = prof_data['capability'].get_clean(depth)
return data
-def write_net_rules(prof_data, depth, allow):
- pre = ' ' * depth
- data = []
- allowstr = set_allow_str(allow)
- audit = ''
- if prof_data[allow].get('netdomain', False):
- if prof_data[allow]['netdomain'].get('rule', False) == 'all':
- if prof_data[allow]['netdomain']['audit'].get('all', False):
- audit = 'audit '
- data.append('%s%snetwork,' % (pre, audit))
- else:
- for fam in sorted(prof_data[allow]['netdomain']['rule'].keys()):
- audit = ''
- if prof_data[allow]['netdomain']['rule'][fam] is True:
- if prof_data[allow]['netdomain']['audit'][fam]:
- audit = 'audit '
- if fam == 'all':
- data.append('%s%s%snetwork,' % (pre, audit, allowstr))
- else:
- data.append('%s%s%snetwork %s,' % (pre, audit, allowstr, fam))
- else:
- for typ in sorted(prof_data[allow]['netdomain']['rule'][fam].keys()):
- if prof_data[allow]['netdomain']['audit'][fam].get(typ, False):
- audit = 'audit '
- data.append('%s%s%snetwork %s %s,' % (pre, audit, allowstr, fam, typ))
- if prof_data[allow].get('netdomain', False):
- data.append('')
-
- return data
-
def write_netdomain(prof_data, depth):
- data = write_net_rules(prof_data, depth, 'deny')
- data += write_net_rules(prof_data, depth, 'allow')
+ data = []
+ if prof_data.get('network', False):
+ data = prof_data['network'].get_clean(depth)
return data
def write_dbus_rules(prof_data, depth, allow):
@@ -3776,7 +3680,7 @@
'include': write_includes,
'rlimit': write_rlimits,
'capability': write_capabilities,
- 'netdomain': write_netdomain,
+ 'network': write_netdomain,
'dbus': write_dbus,
'mount': write_mount,
'signal': write_signal,
@@ -3791,7 +3695,7 @@
'include',
'rlimit',
'capability',
- 'netdomain',
+ 'network',
'dbus',
'mount',
'signal',
@@ -3807,7 +3711,7 @@
'include': False,
'rlimit': False,
'capability': False,
- 'netdomain': False,
+ 'network': False,
'dbus': False,
'mount': True, # not handled otherwise yet
'signal': True, # not handled otherwise yet
@@ -4158,44 +4062,13 @@
data.append(line)
elif RE_PROFILE_NETWORK.search(line):
- matches = RE_PROFILE_NETWORK.search(line).groups()
- audit = False
- if matches[0]:
- audit = True
- allow = 'allow'
- if matches[1] and matches[1].strip() == 'deny':
- allow = 'deny'
- network = matches[2]
- if RE_NETWORK_FAMILY_TYPE.search(network):
- nmatch = RE_NETWORK_FAMILY_TYPE.search(network).groups()
- fam, typ = nmatch[:2]
- if write_prof_data[hat][allow]['netdomain']['rule'][fam][typ] and write_prof_data[hat][allow]['netdomain']['audit'][fam][typ] == audit:
- write_prof_data[hat][allow]['netdomain']['rule'][fam].pop(typ)
- write_prof_data[hat][allow]['netdomain']['audit'][fam].pop(typ)
- data.append(line)
- else:
- correct = False
-
- elif RE_NETWORK_FAMILY.search(network):
- fam = RE_NETWORK_FAMILY.search(network).groups()[0]
- if write_prof_data[hat][allow]['netdomain']['rule'][fam] and write_prof_data[hat][allow]['netdomain']['audit'][fam] == audit:
- write_prof_data[hat][allow]['netdomain']['rule'].pop(fam)
- write_prof_data[hat][allow]['netdomain']['audit'].pop(fam)
- data.append(line)
- else:
- correct = False
- else:
- if write_prof_data[hat][allow]['netdomain']['rule']['all'] and write_prof_data[hat][allow]['netdomain']['audit']['all'] == audit:
- write_prof_data[hat][allow]['netdomain']['rule'].pop('all')
- write_prof_data[hat][allow]['netdomain']['audit'].pop('all')
- data.append(line)
- else:
- correct = False
-
- if correct:
- if not segments['netdomain'] and True in segments.values():
+ network_obj = NetworkRule.parse(line)
+ if write_prof_data[hat]['network'].is_covered(network_obj, True, True):
+ if not segments['network'] and True in segments.values():
data += write_prior_segments(write_prof_data[name], segments, line)
- segments['netdomain'] = True
+ segments['network'] = True
+ write_prof_data[hat]['network'].delete(network_obj)
+ data.append(line)
elif RE_PROFILE_CHANGE_HAT.search(line):
matches = RE_PROFILE_CHANGE_HAT.search(line).groups()
@@ -4321,41 +4194,6 @@
return False
-def profile_known_network(profile, family, sock_type):
- if netrules_access_check(profile['deny']['netdomain'], family, sock_type):
- return -1
- if netrules_access_check(profile['allow']['netdomain'], family, sock_type):
- return 1
-
- for incname in profile['include'].keys():
- if netrules_access_check(include[incname][incname]['deny']['netdomain'], family, sock_type):
- return -1
- if netrules_access_check(include[incname][incname]['allow']['netdomain'], family, sock_type):
- return 1
-
- return 0
-
-def netrules_access_check(netrules, family, sock_type):
- if not netrules:
- return 0
- all_net = False
- all_net_family = False
- net_family_sock = False
- if netrules['rule'].get('all', False):
- all_net = True
- if netrules['rule'].get(family, False) is True:
- all_net_family = True
- if (netrules['rule'].get(family, False) and
- type(netrules['rule'][family]) == type(hasher()) and
- sock_type in netrules['rule'][family].keys() and
- netrules['rule'][family][sock_type]):
- net_family_sock = True
-
- if all_net or all_net_family or net_family_sock:
- return True
- else:
- return False
-
def reload_base(bin_path):
if not check_for_apparmor():
return None
=== modified file 'utils/apparmor/cleanprofile.py'
--- utils/apparmor/cleanprofile.py 2014-12-16 22:13:25 +0000
+++ utils/apparmor/cleanprofile.py 2015-04-11 22:35:00 +0000
@@ -64,20 +64,18 @@
apparmor.aa.load_include(inc)
deleted += apparmor.aa.delete_duplicates(self.other.aa[program][hat], inc)
- #Clean the duplicates of caps in other profile
+ #Clean duplicate rules in other profile
if not self.same_file:
deleted += self.other.aa[program][hat]['capability'].delete_duplicates(self.profile.aa[program][hat]['capability'])
+ deleted += self.other.aa[program][hat]['network'].delete_duplicates(self.profile.aa[program][hat]['network'])
else:
deleted += self.other.aa[program][hat]['capability'].delete_duplicates(None)
+ deleted += self.other.aa[program][hat]['network'].delete_duplicates(None)
#Clean the duplicates of path in other profile
deleted += delete_path_duplicates(self.profile.aa[program][hat], self.other.aa[program][hat], 'allow', self.same_file)
deleted += delete_path_duplicates(self.profile.aa[program][hat], self.other.aa[program][hat], 'deny', self.same_file)
- #Clean the duplicates of net rules in other profile
- deleted += delete_net_duplicates(self.profile.aa[program][hat]['allow']['netdomain'], self.other.aa[program][hat]['allow']['netdomain'], self.same_file)
- deleted += delete_net_duplicates(self.profile.aa[program][hat]['deny']['netdomain'], self.other.aa[program][hat]['deny']['netdomain'], self.same_file)
-
return deleted
def delete_path_duplicates(profile, profile_other, allow, same_profile=True):
@@ -108,33 +106,3 @@
return len(deleted)
-def delete_net_duplicates(netrules, netrules_other, same_profile=True):
- deleted = 0
- hasher_obj = apparmor.aa.hasher()
- if netrules_other and netrules:
- netglob = False
- # Delete matching rules
- if netrules.get('all', False):
- netglob = True
- # Iterate over a copy of the rules in the other profile
- for fam in list(netrules_other['rule'].keys()):
- if netglob or (type(netrules['rule'][fam]) != type(hasher_obj) and netrules['rule'][fam]):
- if not same_profile:
- if type(netrules_other['rule'][fam]) == type(hasher_obj):
- deleted += len(netrules_other['rule'][fam].keys())
- else:
- deleted += 1
- netrules_other['rule'].pop(fam)
- elif type(netrules_other['rule'][fam]) != type(hasher_obj) and netrules_other['rule'][fam]:
- if type(netrules['rule'][fam]) != type(hasher_obj) and netrules['rule'][fam]:
- if not same_profile:
- netrules_other['rule'].pop(fam)
- deleted += 1
- else:
- for sock_type in list(netrules_other['rule'][fam].keys()):
- if netrules['rule'].get(fam, False):
- if netrules['rule'][fam].get(sock_type, False):
- if not same_profile:
- netrules_other['rule'][fam].pop(sock_type)
- deleted += 1
- return deleted
=== modified file 'utils/apparmor/regex.py'
--- utils/apparmor/regex.py 2015-04-03 15:28:03 +0000
+++ utils/apparmor/regex.py 2015-04-12 00:33:41 +0000
@@ -39,9 +39,7 @@
RE_PROFILE_CONDITIONAL_BOOLEAN = re.compile('^\s*if\s+(not\s+)?defined\s+(\$\{?\w+\}?)\s*\{\s*(#.*)?$')
RE_PROFILE_BARE_FILE_ENTRY = re.compile(RE_AUDIT_DENY + RE_OWNER + 'file' + RE_COMMA_EOL)
RE_PROFILE_PATH_ENTRY = re.compile(RE_AUDIT_DENY + RE_OWNER + '(file\s+)?([\"@/].*?)\s+(\S+)(\s+->\s*(.*?))?' + RE_COMMA_EOL)
-RE_PROFILE_NETWORK = re.compile(RE_AUDIT_DENY + 'network(.*)' + RE_EOL)
-RE_NETWORK_FAMILY_TYPE = re.compile('\s+(\S+)\s+(\S+)\s*,$')
-RE_NETWORK_FAMILY = re.compile('\s+(\S+)\s*,$')
+RE_PROFILE_NETWORK = re.compile(RE_AUDIT_DENY + 'network(?P<details>\s+.*)?' + RE_COMMA_EOL)
RE_PROFILE_CHANGE_HAT = re.compile('^\s*\^(\"??.+?\"??)' + RE_COMMA_EOL)
RE_PROFILE_HAT_DEF = re.compile('^\s*(\^|hat\s+)(?P<hat>\"??.+?\"??)\s+((flags=)?\((?P<flags>.+)\)\s+)*\{' + RE_EOL)
RE_PROFILE_DBUS = re.compile(RE_AUDIT_DENY + '(dbus\s*,|dbus\s+[^#]*\s*,)' + RE_EOL)
Regards,
Christian Boltz
--
> (Beschwerden bitte an die Verbrecher des jeweiligen Programms)
:) Die KDE4-Entwickler sind vermutlich eh noch eingebunkert...
[> Karl Thomas Schmidt und Helga Fischer in opensuse-de]
More information about the AppArmor
mailing list