[apparmor] [patch] [utils] Refactor Severity module [Refactor series]
Kshitij Gupta
kgupta8592 at gmail.com
Wed Jun 1 21:32:43 UTC 2016
Hello,
The following patch:
1. Refactors Severity module in hopes of making it more readable and
simplifying up some of the terse logic. (This resulted in re-write of some
segments)
2. Updates relevant modules/tools accordingly (checked with grep for usages)
Side notes:
1. The tests were ultra-useful while refactoring, hopefully they cover
sufficient stuff.
2. I missed not working with git (but then I am not familiar with bzr)
3. Comments are welcome regarding bugs and style
=== modified file 'utils/aa-mergeprof'
--- utils/aa-mergeprof 2016-05-10 12:32:46 +0000
+++ utils/aa-mergeprof 2016-06-01 21:16:50 +0000
@@ -18,14 +18,18 @@
import os
import apparmor.aa
-from apparmor.aa import available_buttons, combine_name,
delete_duplicates, get_profile_filename, is_known_rule, match_includes
import apparmor.aamode
-from apparmor.common import AppArmorException
-from apparmor.regex import re_match_include
+
import apparmor.severity
import apparmor.cleanprofile as cleanprofile
import apparmor.ui as aaui
+from apparmor.aa import (available_buttons, combine_name,
delete_duplicates,
+ get_profile_filename, is_known_rule,
match_includes)
+from apparmor.common import AppArmorException
+from apparmor.regex import re_match_include
+
+
# setup exception handling
from apparmor.fail import enable_aa_exception_handler
enable_aa_exception_handler()
@@ -660,7 +664,7 @@
# Load variables into sev_db? Not needed/used
for capabilities and network rules.
severity = rule_obj.severity(sev_db)
- if severity != sev_db.NOT_IMPLEMENTED:
+ if severity !=
apparmor.severity.NOT_IMPLEMENTED:
q.headers += [_('Severity'), severity]
q.functions = available_buttons(rule_obj)
=== modified file 'utils/apparmor/aa.py'
--- utils/apparmor/aa.py 2016-05-10 12:32:46 +0000
+++ utils/apparmor/aa.py 2016-05-29 18:40:31 +0000
@@ -1644,7 +1644,7 @@
# Load variables into sev_db? Not needed/used
for capabilities and network rules.
severity = rule_obj.severity(sev_db)
- if severity != sev_db.NOT_IMPLEMENTED:
+ if severity !=
apparmor.severity.NOT_IMPLEMENTED:
q.headers += [_('Severity'), severity]
q.functions = available_buttons(rule_obj)
=== modified file 'utils/apparmor/rule/__init__.py'
--- utils/apparmor/rule/__init__.py 2016-01-25 22:48:34 +0000
+++ utils/apparmor/rule/__init__.py 2016-05-29 18:42:17 +0000
@@ -13,6 +13,8 @@
#
# ----------------------------------------------------------------------
+import apparmor.severity
+
from apparmor.aare import AARE
from apparmor.common import AppArmorBug, type_is_str
@@ -237,9 +239,9 @@
'''return severity of this rule, which can be:
- a number between 0 and 10, where 0 means harmless and 10
means critical,
- "unknown" (to be exact: the value specified for "unknown" as
set when loading the severity database), or
- - sev_db.NOT_IMPLEMENTED if no severity check is implemented
for this rule type.
+ - apparmor.severity.NOT_IMPLEMENTED if no severity check is
implemented for this rule type.
sev_db must be an apparmor.severity.Severity object.'''
- return sev_db.NOT_IMPLEMENTED
+ return apparmor.severity.NOT_IMPLEMENTED
def logprof_header(self):
'''return the headers (human-readable version of the rule) to
display in aa-logprof for this rule object
=== modified file 'utils/apparmor/rule/capability.py'
--- utils/apparmor/rule/capability.py 2016-01-25 22:48:34 +0000
+++ utils/apparmor/rule/capability.py 2016-06-01 21:17:54 +0000
@@ -13,10 +13,11 @@
#
# ----------------------------------------------------------------------
+import re
+
from apparmor.regex import RE_PROFILE_CAP
from apparmor.common import AppArmorBug, AppArmorException, type_is_str
from apparmor.rule import BaseRule, BaseRuleset, logprof_value_or_all,
parse_modifiers
-import re
# setup module translations
from apparmor.translations import init_translation
=== modified file 'utils/apparmor/severity.py'
--- utils/apparmor/severity.py 2015-06-19 19:43:19 +0000
+++ utils/apparmor/severity.py 2016-06-01 21:11:08 +0000
@@ -12,134 +12,170 @@
#
# ----------------------------------------------------------------------
from __future__ import with_statement
+
import os
import re
-from apparmor.common import AppArmorException, open_file_read, warn,
convert_regexp # , msg, error, debug
+
+from collections import defaultdict
+
+from apparmor.common import AppArmorException, open_file_read, warn,
convert_regexp
from apparmor.regex import re_match_include
+from apparmor.utility import Trie, TrieNode
+
+CAPABILITY_ALL = '__ALL__'
+DEFAULT_PROFILE_DIR = '/etc/apparmor.d'
+DEFAULT_RANK = 10;
+NOT_IMPLEMENTED = '_-*not*implemented*-_' # used for rule types that
don't have severity ratings
+VALID_SEVERITY_RANGE = range(0,11)
+MINIMUM_SEVERITY = float('-inf')
+
+
+class SeverityRegexTrieNode(TrieNode):
+ """A Trie node to store information for regex trie used for
calculating severity"""
+ def __init__(self, path_piece):
+ self.mode = None
+ self.path_piece = path_piece
+ self.child = dict()
+ self.globs = dict()
+
+ def get_mode(self):
+ return self.mode
+
+ def set_mode(self, mode):
+ if self.mode: raise Exception("Mode already exists for path piece:
" + self.path_piece)
+
+ self.mode = mode
+
+ def update_mode(self, mode):
+ if not self.mode: raise Exception("Mode was never set for path
piece: " + self.path_piece)
+
+ self.mode = mode
+
+ def add_glob(self, glob, mode):
+ self.globs[glob] = mode
+
+ def get_globs(self):
+ return self.globs
+
+ def is_terminal_node(self):
+ return self.mode is not None
+
class Severity(object):
- def __init__(self, dbname=None, default_rank=10):
- """Initialises the class object"""
- self.PROF_DIR = '/etc/apparmor.d' # The profile directory
- self.NOT_IMPLEMENTED = '_-*not*implemented*-_' # used for rule
types that don't have severity ratings
- self.severity = dict()
- self.severity['DATABASENAME'] = dbname
- self.severity['CAPABILITIES'] = {}
- self.severity['FILES'] = {}
- self.severity['REGEXPS'] = {}
- self.severity['DEFAULT_RANK'] = default_rank
- # For variable expansions for the profile
- self.severity['VARIABLES'] = dict()
- if not dbname:
- raise AppArmorException("No severity db file given")
-
- with open_file_read(dbname) as database: # open(dbname, 'r')
- for lineno, line in enumerate(database, start=1):
- line = line.strip() # or only rstrip and lstrip?
+ def __init__(self, dbname=None, default_rank=DEFAULT_RANK,
profile_dir=DEFAULT_PROFILE_DIR):
+ """Initialises the Severity object"""
+
+ if not dbname or not os.path.isfile(dbname):
+ raise AppArmorException("Invalid severity db file given: " +
str(dbname))
+
+ self.profile_dir = profile_dir
+ self.database_name = dbname
+ self.default_rank = default_rank
+
+ self.capabilities = defaultdict(lambda: self.default_rank)
+ self.files = dict()
+ self.regex_trie = Trie(SeverityRegexTrieNode)
+ # For variable expansions in a profile
+ self.variables = dict()
+
+ # Init the the database
+ self.load_database()
+
+ def load_database(self):
+ with open_file_read(self.database_name) as database:
+ for line_no, line in enumerate(database, start=1):
+ line = line.strip()
if line == '' or line.startswith('#'):
continue
+
if line.startswith('/'):
- try:
- path, read, write, execute = line.split()
- read, write, execute = int(read), int(write),
int(execute)
- except ValueError:
- raise AppArmorException("Insufficient values for
permissions in file: %s\n\t[Line %s]: %s" % (dbname, lineno, line))
- else:
- if read not in range(0, 11) or write not in
range(0, 11) or execute not in range(0, 11):
- raise AppArmorException("Inappropriate values
for permissions in file: %s\n\t[Line %s]: %s" % (dbname, lineno, line))
- path = path.lstrip('/')
- if '*' not in path:
- self.severity['FILES'][path] = {'r': read,
'w': write, 'x': execute}
- else:
- ptr = self.severity['REGEXPS']
- pieces = path.split('/')
- for index, piece in enumerate(pieces):
- if '*' in piece:
- path = '/'.join(pieces[index:])
- regexp = convert_regexp(path)
- ptr[regexp] = {'AA_RANK': {'r': read,
'w': write, 'x': execute}}
- break
- else:
- ptr[piece] = ptr.get(piece, {})
- ptr = ptr[piece]
- elif line.startswith('CAP_'):
- try:
- resource, severity = line.split()
- severity = int(severity)
- except ValueError:
- error_message = 'No severity value present in
file: %s\n\t[Line %s]: %s' % (dbname, lineno, line)
- #error(error_message)
- raise AppArmorException(error_message) # from None
- else:
- if severity not in range(0, 11):
- raise AppArmorException("Inappropriate
severity value present in file: %s\n\t[Line %s]: %s" % (dbname, lineno,
line))
- self.severity['CAPABILITIES'][resource] = severity
+ path, mode_severities =
self.get_path_and_mode_severities(line, line_no)
+
+ if not path_contains_regex(path):
+ self.files[path] = mode_severities
+ else:
+ add_path_to_trie(self.regex_trie, path,
mode_severities)
+
+ elif is_capability(line):
+ resource, severity =
self.get_resource_and_severity(line, line_no)
+ self.capabilities[resource] = severity
+
else:
- raise AppArmorException("Unexpected line in file:
%s\n\t[Line %s]: %s" % (dbname, lineno, line))
+ raise AppArmorException("Unexpected line in file:
%(database_name)s\n\t[Line %(line_no)d]: %(line)s"
+ % {'database_name' :
self.database_name, 'line_no' : line_no, 'line': line})
+
+ def get_path_and_mode_severities(self, line, line_no):
+ try:
+ path, read, write, execute = line.split()
+ read, write, execute = int(read), int(write), int(execute)
+ except ValueError:
+ raise AppArmorException("Insufficient values for permissions
in file: %(database_name)s\n\t[Line %(line_no)d]: %(line)s"
+ % {'database_name' :
self.database_name, 'line_no' : line_no, 'line': line})
+ else:
+ if not is_severity_valid(read) or not is_severity_valid(write)
or not is_severity_valid(execute):
+ raise AppArmorException("Inappropriate values for
permissions in file: %(database_name)s\n\t[Line %(line_no)d]: %(line)s"
+ % {'database_name' :
self.database_name, 'line_no' : line_no, 'line': line})
+
+ return path.lstrip('/'), {'r': read, 'w': write, 'x': execute}
+
+ def get_resource_and_severity(self, line, line_no):
+ try:
+ resource, severity = line.split()
+ severity = int(severity)
+ except ValueError:
+ raise AppArmorException("No severity value present in file:
%(database_name)s\n\t[Line %(line_no)d]: %(line)s"
+ % {'database_name' :
self.database_name, 'line_no' : line_no, 'line': line})
+ else:
+ if not is_severity_valid(severity):
+ raise AppArmorException("Inappropriate severity value
present in file: %(database_name)s\n\t[Line %(line_no)d]: %(line)s"
+ % {'database_name' :
self.database_name, 'line_no' : line_no, 'line': line})
+
+ return resource, severity
def rank_capability(self, resource):
"""Returns the severity of for the capability resource, default
value if no match"""
+ resource = resource.upper()
cap = 'CAP_%s' % resource.upper()
- if resource == '__ALL__':
- return max(self.severity['CAPABILITIES'].values())
- if cap in self.severity['CAPABILITIES'].keys():
- return self.severity['CAPABILITIES'][cap]
- # raise ValueError("unexpected capability rank input: %s"%resource)
- warn("unknown capability: %s" % resource)
- return self.severity['DEFAULT_RANK']
-
- def check_subtree(self, tree, mode, sev, segments):
- """Returns the max severity from the regex tree"""
- if len(segments) == 0:
- first = ''
- else:
- first = segments[0]
- rest = segments[1:]
- path = '/'.join([first] + rest)
- # Check if we have a matching directory tree to descend into
- if tree.get(first, False):
- sev = self.check_subtree(tree[first], mode, sev, rest)
- # If severity still not found, match against globs
- if sev is None:
- # Match against all globs at this directory level
- for chunk in tree.keys():
- if '*' in chunk:
- # Match rest of the path
- if re.search("^" + chunk, path):
- # Find max rank
- if "AA_RANK" in tree[chunk].keys():
- for m in mode:
- if sev is None or
tree[chunk]["AA_RANK"].get(m, -1) > sev:
- sev = tree[chunk]["AA_RANK"].get(m,
None)
- return sev
+
+ if resource == CAPABILITY_ALL:
+ return max(self.capabilities.values())
+
+ if not cap in self.capabilities.keys():
+ warn("unknown capability: %s" % resource)
+
+ return self.capabilities[cap]
def handle_file(self, resource, mode):
"""Returns the severity for the file, default value if no match
found"""
resource = resource[1:] # remove initial / from path
pieces = resource.split('/') # break path into directory level
chunks
- sev = None
- # Check for an exact match in the db
- if resource in self.severity['FILES'].keys():
- # Find max value among the given modes
- for m in mode:
- if sev is None or self.severity['FILES'][resource].get(m,
-1) > sev:
- sev = self.severity['FILES'][resource].get(m, None)
+ severity = None
+ # Check for an exact match in the sverity db db
+ if resource in self.files:
+ severity = find_max_severity_for_modes(mode,
self.files[resource])
else:
- # Search regex tree for matching glob
- sev = self.check_subtree(self.severity['REGEXPS'], mode, sev,
pieces)
- if sev is None:
+ # Search regex trie for a matching glob
+ modes = self.regex_trie.find_node(pieces,
find_matching_modes_for_path)
+ if modes is None:
+ severity = None
+ else:
+ modes = filter(lambda x: x is not None, modes)
+ for mode_severities in modes:
+ severity = find_max_severity_for_modes(mode,
mode_severities, severity)
+
+ if severity is None:
# Return default rank if severity cannot be found
- return self.severity['DEFAULT_RANK']
+ return self.default_rank
else:
- return sev
+ return severity
def rank(self, resource, mode=None):
"""Returns the rank for the resource file/capability"""
if '@' in resource: # path contains variable
return self.handle_variable_rank(resource, mode)
- elif resource[0] == '/': # file resource
+ elif resource.startswith('/'): # file resource
return self.handle_file(resource, mode)
- elif resource[0:4] == 'CAP_': # capability resource
+ elif is_capability(resource):
return self.rank_capability(resource[4:])
else:
raise AppArmorException("Unexpected rank input: %s" % resource)
@@ -149,16 +185,18 @@
regex_variable = re.compile('@{([^{.]*)}')
matches = regex_variable.search(resource)
if matches:
- rank = self.severity['DEFAULT_RANK']
+ rank = self.default_rank
variable = '@{%s}' % matches.groups()[0]
- #variables = regex_variable.findall(resource)
- for replacement in self.severity['VARIABLES'][variable]:
+
+ for replacement in self.variables[variable]:
resource_replaced = self.variable_replace(variable,
replacement, resource)
rank_new = self.handle_variable_rank(resource_replaced,
mode)
- if rank == self.severity['DEFAULT_RANK']:
- rank = rank_new
- elif rank_new != self.severity['DEFAULT_RANK'] and
rank_new > rank:
- rank = rank_new
+
+ if rank == self.default_rank:
+ rank = rank_new
+ elif rank_new != self.default_rank and rank_new > rank:
+ rank = rank_new
+
return rank
else:
return self.handle_file(resource, mode)
@@ -178,35 +216,104 @@
replacement = replacement[:-1]
return resource.replace(variable, replacement)
- def load_variables(self, prof_path):
- """Loads the variables for the given profile"""
- if os.path.isfile(prof_path):
- with open_file_read(prof_path) as f_in:
- for line in f_in:
- line = line.strip()
- # If any includes, load variables from them first
- match = re_match_include(line)
- if match:
- new_path = self.PROF_DIR + '/' + match
- self.load_variables(new_path)
- else:
- # Remove any comments
- if '#' in line:
- line = line.split('#')[0].rstrip()
- # Expected format is @{Variable} = value1 value2 ..
- if line.startswith('@') and '=' in line:
- if '+=' in line:
- line = line.split('+=')
- try:
- self.severity['VARIABLES'][line[0]] +=
[i.strip('"') for i in line[1].split()]
- except KeyError:
- raise AppArmorException("Variable %s
was not previously declared, but is being assigned additional value in
file: %s" % (line[0], prof_path))
- else:
- line = line.split('=')
- if line[0] in
self.severity['VARIABLES'].keys():
- raise AppArmorException("Variable %s
was previously declared in file: %s" % (line[0], prof_path))
- self.severity['VARIABLES'][line[0]] =
[i.strip('"') for i in line[1].split()]
+ def load_variables(self, file_path):
+ """Loads the variables from the given file"""
+ if not os.path.isfile(file_path):
+ return
+
+ with open_file_read(file_path) as f_in:
+ for line in f_in:
+ line = line.strip()
+ # If any includes, load variables from them first
+ match = re_match_include(line)
+ if match:
+ new_path = self.profile_dir + '/' + match
+ self.load_variables(new_path)
+ else:
+ # Remove any comments
+ if '#' in line:
+ line = line.split('#')[0].rstrip()
+ # Expected format is @{Variable} = value1 value2 ..
+ if line.startswith('@') and '=' in line:
+ if '+=' in line:
+ line = line.split('+=')
+ try:
+ self.variables[line[0]] += [i.strip('"')
for i in line[1].split()]
+ except KeyError:
+ raise AppArmorException("Variable %s was
not previously declared, but is being assigned additional value in file:
%s" % (line[0], file_path))
+ else:
+ line = line.split('=')
+ if line[0] in self.variables.keys():
+ raise AppArmorException("Variable %s was
previously declared in file: %s" % (line[0], file_path))
+ self.variables[line[0]] = [i.strip('"') for i
in line[1].split()]
def unload_variables(self):
"""Clears all loaded variables"""
- self.severity['VARIABLES'] = dict()
+ self.variables = dict()
+
+
+def is_severity_valid(severity):
+ return severity in VALID_SEVERITY_RANGE
+
+def path_contains_regex(path):
+ return '*' in path
+
+def is_capability(resource):
+ return resource.startswith('CAP_')
+
+def find_max_severity_for_modes(modes, mode_severities,
severity=MINIMUM_SEVERITY):
+ if severity is None:
+ severity = MINIMUM_SEVERITY
+
+ for mode in modes:
+ severity = max(severity, mode_severities.get(mode,
MINIMUM_SEVERITY))
+
+ if severity == MINIMUM_SEVERITY:
+ return None
+
+ return severity
+
+def add_path_to_trie(trie, path, mode_severities):
+ node = trie.get_head()
+
+ path_pieces = path.split('/')
+
+ for index, path_piece in enumerate(path_pieces):
+ if path_contains_regex(path_piece):
+ path_onwards = '/'.join(path_pieces[index:])
+
+ node.add_glob(convert_regexp(path_onwards), mode_severities)
+ break
+ else:
+ if node.is_present(path_piece):
+ node = node.get_child(path_piece)
+ else:
+ child_node = SeverityRegexTrieNode(path_piece)
+ node.add_child(path_piece, child_node)
+ node = child_node
+
+ return trie
+
+def find_matching_modes_for_path(node, keys):
+ if not keys:
+ ## TODO: Possibly merge file paths into the trie
+ #if node.has_value():
+ # return node.get_mode()
+ return None
+
+ key = keys[0]
+ path = '/'.join(keys)
+ modes = None
+
+ if node.is_present(key):
+ modes = find_matching_modes_for_path(node.get_child(key), keys[1:])
+
+ globs_to_mode = node.get_globs()
+ if modes is None and globs_to_mode is not None:
+ modes = []
+
+ for glob in globs_to_mode:
+ if re.search("^" + glob, path):
+ modes.append(globs_to_mode[glob])
+
+ return modes
=== added file 'utils/apparmor/utility.py'
--- utils/apparmor/utility.py 1970-01-01 00:00:00 +0000
+++ utils/apparmor/utility.py 2016-06-01 21:04:40 +0000
@@ -0,0 +1,43 @@
+class Trie(object):
+ """A basic skeleton of a trie data structure
+ supports basic head, find operations for a
+ type of node using custom search function
+ """
+ def __init__(self, node_type):
+ self.root = node_type(None)
+
+ def get_head(self):
+ return self.root
+
+ def find_node(self, keys, search_function):
+ if len(keys) > 0 and self.root.is_present(keys[0]):
+ return search_function(self.root.get_child(keys[0]), keys[1:])
+
+ return None
+
+
+class TrieNode(object):
+ """A basic trie node supporting basic operations to add child,
+ search presence of a child, find if it is a leaf node
+ """
+ def __init__(self):
+ self.child = dict()
+ self.is_value_present = False
+
+ def add_child(self, key, child_node):
+ # Currently sev_db has multiple entries for same path,
+ # maybe we want to do this in a strict mode or never
+ # if key in self.child: raise Exception("TrieNode already exists
for key: " + key)
+ self.child[key] = child_node
+
+ def is_present(self, key):
+ return key in self.child
+
+ def get_child(self, key):
+ return self.child[key]
+
+ def has_value(self):
+ return self.is_value_present
+
+ def is_terminal_node(self):
+ return self.child.keys()
=== modified file 'utils/test/test-severity.py'
--- utils/test/test-severity.py 2015-06-06 11:59:11 +0000
+++ utils/test/test-severity.py 2016-06-01 20:35:09 +0000
@@ -65,6 +65,7 @@
('UNKNOWN', 'unknown'),
('K*', 'unknown'),
('__ALL__', 10),
+ ('__all__', 10)
]
def _run_test(self, params, expected):
@@ -167,7 +168,7 @@
self.assertRaises(AppArmorException, severity.Severity,
'severity_broken.db')
def test_nonexistent_db(self):
- self.assertRaises(IOError, severity.Severity,
'severity.db.does.not.exist')
+ self.assertRaises(AppArmorException, severity.Severity,
'severity.db.does.not.exist')
def test_no_arg_to_severity(self):
with self.assertRaises(AppArmorException):
--
Regards,
Kshitij Gupta
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <https://lists.ubuntu.com/archives/apparmor/attachments/20160602/b5b8b142/attachment-0001.html>
-------------- next part --------------
A non-text attachment was scrubbed...
Name: severity-patch
Type: application/octet-stream
Size: 25886 bytes
Desc: not available
URL: <https://lists.ubuntu.com/archives/apparmor/attachments/20160602/b5b8b142/attachment-0001.obj>
More information about the AppArmor
mailing list