[apparmor] [patch] [utils] Refactor Severity module [Refactor series]

Kshitij Gupta kgupta8592 at gmail.com
Wed Jun 1 21:32:43 UTC 2016


Hello,

The following patch:
1. Refactors Severity module in hopes of making it more readable and
simplifying up some of the terse logic. (This resulted in re-write of some
segments)
2. Updates relevant modules/tools accordingly (checked with grep for usages)

Side notes:
1. The tests were ultra-useful while refactoring, hopefully they cover
sufficient stuff.
2. I missed not working with git (but then I am not familiar with bzr)
3. Comments are welcome regarding bugs and style


=== modified file 'utils/aa-mergeprof'
--- utils/aa-mergeprof    2016-05-10 12:32:46 +0000
+++ utils/aa-mergeprof    2016-06-01 21:16:50 +0000
@@ -18,14 +18,18 @@
 import os

 import apparmor.aa
-from apparmor.aa import available_buttons, combine_name,
delete_duplicates, get_profile_filename, is_known_rule, match_includes
 import apparmor.aamode
-from apparmor.common import AppArmorException
-from apparmor.regex import re_match_include
+
 import apparmor.severity
 import apparmor.cleanprofile as cleanprofile
 import apparmor.ui as aaui

+from apparmor.aa import (available_buttons, combine_name,
delete_duplicates,
+                         get_profile_filename, is_known_rule,
match_includes)
+from apparmor.common import AppArmorException
+from apparmor.regex import re_match_include
+
+
 # setup exception handling
 from apparmor.fail import enable_aa_exception_handler
 enable_aa_exception_handler()
@@ -660,7 +664,7 @@

                             # Load variables into sev_db? Not needed/used
for capabilities and network rules.
                             severity = rule_obj.severity(sev_db)
-                            if severity != sev_db.NOT_IMPLEMENTED:
+                            if severity !=
apparmor.severity.NOT_IMPLEMENTED:
                                 q.headers += [_('Severity'), severity]

                             q.functions = available_buttons(rule_obj)

=== modified file 'utils/apparmor/aa.py'
--- utils/apparmor/aa.py    2016-05-10 12:32:46 +0000
+++ utils/apparmor/aa.py    2016-05-29 18:40:31 +0000
@@ -1644,7 +1644,7 @@

                             # Load variables into sev_db? Not needed/used
for capabilities and network rules.
                             severity = rule_obj.severity(sev_db)
-                            if severity != sev_db.NOT_IMPLEMENTED:
+                            if severity !=
apparmor.severity.NOT_IMPLEMENTED:
                                 q.headers += [_('Severity'), severity]

                             q.functions = available_buttons(rule_obj)

=== modified file 'utils/apparmor/rule/__init__.py'
--- utils/apparmor/rule/__init__.py    2016-01-25 22:48:34 +0000
+++ utils/apparmor/rule/__init__.py    2016-05-29 18:42:17 +0000
@@ -13,6 +13,8 @@
 #
 # ----------------------------------------------------------------------

+import apparmor.severity
+
 from apparmor.aare import AARE
 from apparmor.common import AppArmorBug, type_is_str

@@ -237,9 +239,9 @@
         '''return severity of this rule, which can be:
            - a number between 0 and 10, where 0 means harmless and 10
means critical,
            - "unknown" (to be exact: the value specified for "unknown" as
set when loading the severity database), or
-           - sev_db.NOT_IMPLEMENTED if no severity check is implemented
for this rule type.
+           - apparmor.severity.NOT_IMPLEMENTED if no severity check is
implemented for this rule type.
            sev_db must be an apparmor.severity.Severity object.'''
-        return sev_db.NOT_IMPLEMENTED
+        return apparmor.severity.NOT_IMPLEMENTED

     def logprof_header(self):
         '''return the headers (human-readable version of the rule) to
display in aa-logprof for this rule object

=== modified file 'utils/apparmor/rule/capability.py'
--- utils/apparmor/rule/capability.py    2016-01-25 22:48:34 +0000
+++ utils/apparmor/rule/capability.py    2016-06-01 21:17:54 +0000
@@ -13,10 +13,11 @@
 #
 # ----------------------------------------------------------------------

+import re
+
 from apparmor.regex import RE_PROFILE_CAP
 from apparmor.common import AppArmorBug, AppArmorException, type_is_str
 from apparmor.rule import BaseRule, BaseRuleset, logprof_value_or_all,
parse_modifiers
-import re

 # setup module translations
 from apparmor.translations import init_translation

=== modified file 'utils/apparmor/severity.py'
--- utils/apparmor/severity.py    2015-06-19 19:43:19 +0000
+++ utils/apparmor/severity.py    2016-06-01 21:11:08 +0000
@@ -12,134 +12,170 @@
 #
 # ----------------------------------------------------------------------
 from __future__ import with_statement
+
 import os
 import re
-from apparmor.common import AppArmorException, open_file_read, warn,
convert_regexp  # , msg, error, debug
+
+from collections import defaultdict
+
+from apparmor.common import AppArmorException, open_file_read, warn,
convert_regexp
 from apparmor.regex import re_match_include
+from apparmor.utility import Trie, TrieNode
+
+CAPABILITY_ALL = '__ALL__'
+DEFAULT_PROFILE_DIR = '/etc/apparmor.d'
+DEFAULT_RANK = 10;
+NOT_IMPLEMENTED = '_-*not*implemented*-_'  # used for rule types that
don't have severity ratings
+VALID_SEVERITY_RANGE = range(0,11)
+MINIMUM_SEVERITY = float('-inf')
+
+
+class SeverityRegexTrieNode(TrieNode):
+    """A Trie node to store information for regex trie used for
calculating severity"""
+    def __init__(self, path_piece):
+        self.mode = None
+        self.path_piece = path_piece
+        self.child = dict()
+        self.globs = dict()
+
+    def get_mode(self):
+        return self.mode
+
+    def set_mode(self, mode):
+        if self.mode: raise Exception("Mode already exists for path piece:
" + self.path_piece)
+
+        self.mode = mode
+
+    def update_mode(self, mode):
+        if not self.mode: raise Exception("Mode was never set for path
piece: " + self.path_piece)
+
+        self.mode = mode
+
+    def add_glob(self, glob, mode):
+        self.globs[glob] = mode
+
+    def get_globs(self):
+        return self.globs
+
+    def is_terminal_node(self):
+        return self.mode is not None
+

 class Severity(object):
-    def __init__(self, dbname=None, default_rank=10):
-        """Initialises the class object"""
-        self.PROF_DIR = '/etc/apparmor.d'  # The profile directory
-        self.NOT_IMPLEMENTED = '_-*not*implemented*-_'  # used for rule
types that don't have severity ratings
-        self.severity = dict()
-        self.severity['DATABASENAME'] = dbname
-        self.severity['CAPABILITIES'] = {}
-        self.severity['FILES'] = {}
-        self.severity['REGEXPS'] = {}
-        self.severity['DEFAULT_RANK'] = default_rank
-        # For variable expansions for the profile
-        self.severity['VARIABLES'] = dict()
-        if not dbname:
-            raise AppArmorException("No severity db file given")
-
-        with open_file_read(dbname) as database:  # open(dbname, 'r')
-            for lineno, line in enumerate(database, start=1):
-                line = line.strip()  # or only rstrip and lstrip?
+    def __init__(self, dbname=None, default_rank=DEFAULT_RANK,
profile_dir=DEFAULT_PROFILE_DIR):
+        """Initialises the Severity object"""
+
+        if not dbname or not os.path.isfile(dbname):
+            raise AppArmorException("Invalid severity db file given: " +
str(dbname))
+
+        self.profile_dir = profile_dir
+        self.database_name = dbname
+        self.default_rank = default_rank
+
+        self.capabilities = defaultdict(lambda: self.default_rank)
+        self.files = dict()
+        self.regex_trie = Trie(SeverityRegexTrieNode)
+        # For variable expansions in a profile
+        self.variables = dict()
+
+        # Init the the database
+        self.load_database()
+
+    def load_database(self):
+        with open_file_read(self.database_name) as database:
+            for line_no, line in enumerate(database, start=1):
+                line = line.strip()
                 if line == '' or line.startswith('#'):
                     continue
+
                 if line.startswith('/'):
-                    try:
-                        path, read, write, execute = line.split()
-                        read, write, execute = int(read), int(write),
int(execute)
-                    except ValueError:
-                        raise AppArmorException("Insufficient values for
permissions in file: %s\n\t[Line %s]: %s" % (dbname, lineno, line))
-                    else:
-                        if read not in range(0, 11) or write not in
range(0, 11) or execute not in range(0, 11):
-                            raise AppArmorException("Inappropriate values
for permissions in file: %s\n\t[Line %s]: %s" % (dbname, lineno, line))
-                        path = path.lstrip('/')
-                        if '*' not in path:
-                            self.severity['FILES'][path] = {'r': read,
'w': write, 'x': execute}
-                        else:
-                            ptr = self.severity['REGEXPS']
-                            pieces = path.split('/')
-                            for index, piece in enumerate(pieces):
-                                if '*' in piece:
-                                    path = '/'.join(pieces[index:])
-                                    regexp = convert_regexp(path)
-                                    ptr[regexp] = {'AA_RANK': {'r': read,
'w': write, 'x': execute}}
-                                    break
-                                else:
-                                    ptr[piece] = ptr.get(piece, {})
-                                    ptr = ptr[piece]
-                elif line.startswith('CAP_'):
-                    try:
-                        resource, severity = line.split()
-                        severity = int(severity)
-                    except ValueError:
-                        error_message = 'No severity value present in
file: %s\n\t[Line %s]: %s' % (dbname, lineno, line)
-                        #error(error_message)
-                        raise AppArmorException(error_message)  # from None
-                    else:
-                        if severity not in range(0, 11):
-                            raise AppArmorException("Inappropriate
severity value present in file: %s\n\t[Line %s]: %s" % (dbname, lineno,
line))
-                        self.severity['CAPABILITIES'][resource] = severity
+                    path, mode_severities =
self.get_path_and_mode_severities(line, line_no)
+
+                    if not path_contains_regex(path):
+                        self.files[path] = mode_severities
+                    else:
+                        add_path_to_trie(self.regex_trie, path,
mode_severities)
+
+                elif is_capability(line):
+                    resource, severity =
self.get_resource_and_severity(line, line_no)
+                    self.capabilities[resource] = severity
+
                 else:
-                    raise AppArmorException("Unexpected line in file:
%s\n\t[Line %s]: %s" % (dbname, lineno, line))
+                    raise AppArmorException("Unexpected line in file:
%(database_name)s\n\t[Line %(line_no)d]: %(line)s"
+                                                   % {'database_name' :
self.database_name, 'line_no' : line_no, 'line': line})
+
+    def get_path_and_mode_severities(self, line, line_no):
+        try:
+            path, read, write, execute = line.split()
+            read, write, execute = int(read), int(write), int(execute)
+        except ValueError:
+            raise AppArmorException("Insufficient values for permissions
in file: %(database_name)s\n\t[Line %(line_no)d]: %(line)s"
+                                    % {'database_name' :
self.database_name, 'line_no' : line_no, 'line': line})
+        else:
+            if not is_severity_valid(read) or not is_severity_valid(write)
or not is_severity_valid(execute):
+                raise AppArmorException("Inappropriate values for
permissions in file: %(database_name)s\n\t[Line %(line_no)d]: %(line)s"
+                                        % {'database_name' :
self.database_name, 'line_no' : line_no, 'line': line})
+
+            return path.lstrip('/'), {'r': read, 'w': write, 'x': execute}
+
+    def get_resource_and_severity(self, line, line_no):
+        try:
+            resource, severity = line.split()
+            severity = int(severity)
+        except ValueError:
+            raise AppArmorException("No severity value present in file:
%(database_name)s\n\t[Line %(line_no)d]: %(line)s"
+                                                   % {'database_name' :
self.database_name, 'line_no' : line_no, 'line': line})
+        else:
+            if not is_severity_valid(severity):
+                raise AppArmorException("Inappropriate severity value
present in file: %(database_name)s\n\t[Line %(line_no)d]: %(line)s"
+                                                   % {'database_name' :
self.database_name, 'line_no' : line_no, 'line': line})
+
+            return resource, severity

     def rank_capability(self, resource):
         """Returns the severity of for the capability resource, default
value if no match"""
+        resource = resource.upper()
         cap = 'CAP_%s' % resource.upper()
-        if resource == '__ALL__':
-            return max(self.severity['CAPABILITIES'].values())
-        if cap in self.severity['CAPABILITIES'].keys():
-            return self.severity['CAPABILITIES'][cap]
-        # raise ValueError("unexpected capability rank input: %s"%resource)
-        warn("unknown capability: %s" % resource)
-        return self.severity['DEFAULT_RANK']
-
-    def check_subtree(self, tree, mode, sev, segments):
-        """Returns the max severity from the regex tree"""
-        if len(segments) == 0:
-            first = ''
-        else:
-            first = segments[0]
-        rest = segments[1:]
-        path = '/'.join([first] + rest)
-        # Check if we have a matching directory tree to descend into
-        if tree.get(first, False):
-            sev = self.check_subtree(tree[first], mode, sev, rest)
-        # If severity still not found, match against globs
-        if sev is None:
-            # Match against all globs at this directory level
-            for chunk in tree.keys():
-                if '*' in chunk:
-                    # Match rest of the path
-                    if re.search("^" + chunk, path):
-                        # Find max rank
-                        if "AA_RANK" in tree[chunk].keys():
-                            for m in mode:
-                                if sev is None or
tree[chunk]["AA_RANK"].get(m, -1) > sev:
-                                    sev = tree[chunk]["AA_RANK"].get(m,
None)
-        return sev
+
+        if resource == CAPABILITY_ALL:
+            return max(self.capabilities.values())
+
+        if not cap in self.capabilities.keys():
+            warn("unknown capability: %s" % resource)
+
+        return self.capabilities[cap]

     def handle_file(self, resource, mode):
         """Returns the severity for the file, default value if no match
found"""
         resource = resource[1:]    # remove initial / from path
         pieces = resource.split('/')    # break path into directory level
chunks
-        sev = None
-        # Check for an exact match in the db
-        if resource in self.severity['FILES'].keys():
-            # Find max value among the given modes
-            for m in mode:
-                if sev is None or self.severity['FILES'][resource].get(m,
-1) > sev:
-                    sev = self.severity['FILES'][resource].get(m, None)
+        severity = None
+        # Check for an exact match in the sverity db db
+        if resource in self.files:
+            severity = find_max_severity_for_modes(mode,
self.files[resource])
         else:
-            # Search regex tree for matching glob
-            sev = self.check_subtree(self.severity['REGEXPS'], mode, sev,
pieces)
-        if sev is None:
+            # Search regex trie for a matching glob
+            modes = self.regex_trie.find_node(pieces,
find_matching_modes_for_path)
+            if modes is None:
+                severity = None
+            else:
+                modes = filter(lambda x: x is not None, modes)
+                for mode_severities in modes:
+                    severity = find_max_severity_for_modes(mode,
mode_severities, severity)
+
+        if severity is None:
             # Return default rank if severity cannot be found
-            return self.severity['DEFAULT_RANK']
+            return self.default_rank
         else:
-            return sev
+            return severity

     def rank(self, resource, mode=None):
         """Returns the rank for the resource file/capability"""
         if '@' in resource:    # path contains variable
             return self.handle_variable_rank(resource, mode)
-        elif resource[0] == '/':    # file resource
+        elif resource.startswith('/'):    # file resource
             return self.handle_file(resource, mode)
-        elif resource[0:4] == 'CAP_':    # capability resource
+        elif is_capability(resource):
             return self.rank_capability(resource[4:])
         else:
             raise AppArmorException("Unexpected rank input: %s" % resource)
@@ -149,16 +185,18 @@
         regex_variable = re.compile('@{([^{.]*)}')
         matches = regex_variable.search(resource)
         if matches:
-            rank = self.severity['DEFAULT_RANK']
+            rank = self.default_rank
             variable = '@{%s}' % matches.groups()[0]
-            #variables = regex_variable.findall(resource)
-            for replacement in self.severity['VARIABLES'][variable]:
+
+            for replacement in self.variables[variable]:
                 resource_replaced = self.variable_replace(variable,
replacement, resource)
                 rank_new = self.handle_variable_rank(resource_replaced,
mode)
-                if rank == self.severity['DEFAULT_RANK']:
-                    rank = rank_new
-                elif rank_new != self.severity['DEFAULT_RANK'] and
rank_new > rank:
-                    rank = rank_new
+
+                if rank == self.default_rank:
+                    rank = rank_new
+                elif rank_new != self.default_rank and rank_new > rank:
+                    rank = rank_new
+
             return rank
         else:
             return self.handle_file(resource, mode)
@@ -178,35 +216,104 @@
             replacement = replacement[:-1]
         return resource.replace(variable, replacement)

-    def load_variables(self, prof_path):
-        """Loads the variables for the given profile"""
-        if os.path.isfile(prof_path):
-            with open_file_read(prof_path) as f_in:
-                for line in f_in:
-                    line = line.strip()
-                    # If any includes, load variables from them first
-                    match = re_match_include(line)
-                    if match:
-                        new_path = self.PROF_DIR + '/' + match
-                        self.load_variables(new_path)
-                    else:
-                        # Remove any comments
-                        if '#' in line:
-                            line = line.split('#')[0].rstrip()
-                        # Expected format is @{Variable} = value1 value2 ..
-                        if line.startswith('@') and '=' in line:
-                            if '+=' in line:
-                                line = line.split('+=')
-                                try:
-                                    self.severity['VARIABLES'][line[0]] +=
[i.strip('"') for i in line[1].split()]
-                                except KeyError:
-                                    raise AppArmorException("Variable %s
was not previously declared, but is being assigned additional value in
file: %s" % (line[0], prof_path))
-                            else:
-                                line = line.split('=')
-                                if line[0] in
self.severity['VARIABLES'].keys():
-                                    raise AppArmorException("Variable %s
was previously declared in file: %s" % (line[0], prof_path))
-                                self.severity['VARIABLES'][line[0]] =
[i.strip('"') for i in line[1].split()]
+    def load_variables(self, file_path):
+        """Loads the variables from the given file"""
+        if not os.path.isfile(file_path):
+            return
+
+        with open_file_read(file_path) as f_in:
+            for line in f_in:
+                line = line.strip()
+                # If any includes, load variables from them first
+                match = re_match_include(line)
+                if match:
+                    new_path = self.profile_dir + '/' + match
+                    self.load_variables(new_path)
+                else:
+                    # Remove any comments
+                    if '#' in line:
+                        line = line.split('#')[0].rstrip()
+                    # Expected format is @{Variable} = value1 value2 ..
+                    if line.startswith('@') and '=' in line:
+                        if '+=' in line:
+                            line = line.split('+=')
+                            try:
+                                self.variables[line[0]] += [i.strip('"')
for i in line[1].split()]
+                            except KeyError:
+                                raise AppArmorException("Variable %s was
not previously declared, but is being assigned additional value in file:
%s" % (line[0], file_path))
+                        else:
+                            line = line.split('=')
+                            if line[0] in self.variables.keys():
+                                raise AppArmorException("Variable %s was
previously declared in file: %s" % (line[0], file_path))
+                            self.variables[line[0]] = [i.strip('"') for i
in line[1].split()]

     def unload_variables(self):
         """Clears all loaded variables"""
-        self.severity['VARIABLES'] = dict()
+        self.variables = dict()
+
+
+def is_severity_valid(severity):
+    return severity in VALID_SEVERITY_RANGE
+
+def path_contains_regex(path):
+    return '*' in path
+
+def is_capability(resource):
+    return resource.startswith('CAP_')
+
+def find_max_severity_for_modes(modes, mode_severities,
severity=MINIMUM_SEVERITY):
+    if severity is None:
+        severity = MINIMUM_SEVERITY
+
+    for mode in modes:
+        severity = max(severity, mode_severities.get(mode,
MINIMUM_SEVERITY))
+
+    if severity == MINIMUM_SEVERITY:
+        return None
+
+    return severity
+
+def add_path_to_trie(trie, path, mode_severities):
+    node = trie.get_head()
+
+    path_pieces = path.split('/')
+
+    for index, path_piece in enumerate(path_pieces):
+        if path_contains_regex(path_piece):
+            path_onwards = '/'.join(path_pieces[index:])
+
+            node.add_glob(convert_regexp(path_onwards), mode_severities)
+            break
+        else:
+            if node.is_present(path_piece):
+                node = node.get_child(path_piece)
+            else:
+                child_node = SeverityRegexTrieNode(path_piece)
+                node.add_child(path_piece, child_node)
+                node = child_node
+
+    return trie
+
+def find_matching_modes_for_path(node, keys):
+    if not keys:
+        ## TODO: Possibly merge file paths into the trie
+        #if node.has_value():
+        #    return node.get_mode()
+        return None
+
+    key = keys[0]
+    path = '/'.join(keys)
+    modes = None
+
+    if node.is_present(key):
+        modes = find_matching_modes_for_path(node.get_child(key), keys[1:])
+
+    globs_to_mode = node.get_globs()
+    if modes is None and globs_to_mode is not None:
+        modes = []
+
+        for glob in globs_to_mode:
+            if re.search("^" + glob, path):
+                modes.append(globs_to_mode[glob])
+
+    return modes

=== added file 'utils/apparmor/utility.py'
--- utils/apparmor/utility.py    1970-01-01 00:00:00 +0000
+++ utils/apparmor/utility.py    2016-06-01 21:04:40 +0000
@@ -0,0 +1,43 @@
+class Trie(object):
+    """A basic skeleton of a trie data structure
+        supports basic head, find operations for a
+        type of node using custom search function
+    """
+    def __init__(self, node_type):
+        self.root = node_type(None)
+
+    def get_head(self):
+        return self.root
+
+    def find_node(self, keys, search_function):
+        if len(keys) > 0 and self.root.is_present(keys[0]):
+            return search_function(self.root.get_child(keys[0]), keys[1:])
+
+        return None
+
+
+class TrieNode(object):
+    """A basic trie node supporting basic operations to add child,
+        search presence of a child, find if it is a leaf node
+    """
+    def __init__(self):
+        self.child = dict()
+        self.is_value_present = False
+
+    def add_child(self, key, child_node):
+        # Currently sev_db has multiple entries for same path,
+        # maybe we want to do this in a strict mode or never
+        # if key in self.child: raise Exception("TrieNode already exists
for key: " + key)
+        self.child[key] = child_node
+
+    def is_present(self, key):
+        return key in self.child
+
+    def get_child(self, key):
+        return self.child[key]
+
+    def has_value(self):
+        return self.is_value_present
+
+    def is_terminal_node(self):
+        return self.child.keys()

=== modified file 'utils/test/test-severity.py'
--- utils/test/test-severity.py    2015-06-06 11:59:11 +0000
+++ utils/test/test-severity.py    2016-06-01 20:35:09 +0000
@@ -65,6 +65,7 @@
         ('UNKNOWN', 'unknown'),
         ('K*', 'unknown'),
         ('__ALL__', 10),
+        ('__all__', 10)
     ]

     def _run_test(self, params, expected):
@@ -167,7 +168,7 @@
         self.assertRaises(AppArmorException, severity.Severity,
'severity_broken.db')

     def test_nonexistent_db(self):
-        self.assertRaises(IOError, severity.Severity,
'severity.db.does.not.exist')
+        self.assertRaises(AppArmorException, severity.Severity,
'severity.db.does.not.exist')

     def test_no_arg_to_severity(self):
         with self.assertRaises(AppArmorException):




-- 
Regards,

Kshitij Gupta
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <https://lists.ubuntu.com/archives/apparmor/attachments/20160602/b5b8b142/attachment-0001.html>
-------------- next part --------------
A non-text attachment was scrubbed...
Name: severity-patch
Type: application/octet-stream
Size: 25886 bytes
Desc: not available
URL: <https://lists.ubuntu.com/archives/apparmor/attachments/20160602/b5b8b142/attachment-0001.obj>


More information about the AppArmor mailing list