[apparmor] [PATCH 1/3] utils: Basic support for signal rules

Tyler Hicks tyhicks at canonical.com
Thu Apr 3 19:55:59 UTC 2014


Bug: https://bugs.launchpad.net/bugs/1300316

This patch does bare bones parsing of signal rules and stores the raw
strings for writing them out later. It is meant to be a simple change to
prevent aa.py from emitting a traceback when encountering signal rules.

Signed-off-by: Tyler Hicks <tyhicks at canonical.com>
---
 utils/apparmor/aa.py             | 49 +++++++++++++++++++++
 utils/apparmor/rules.py          | 12 ++++++
 utils/test/test-regex_matches.py | 92 +++++++++++++++++++++++++++++++++++++++-
 utils/test/test-signal_parse.py  | 63 +++++++++++++++++++++++++++
 4 files changed, 215 insertions(+), 1 deletion(-)
 create mode 100644 utils/test/test-signal_parse.py

diff --git a/utils/apparmor/aa.py b/utils/apparmor/aa.py
index f56d50b..87eb7b0 100644
--- a/utils/apparmor/aa.py
+++ b/utils/apparmor/aa.py
@@ -2624,6 +2624,7 @@ RE_NETWORK_FAMILY_TYPE = re.compile('\s+(\S+)\s+(\S+)\s*,$')
 RE_NETWORK_FAMILY = re.compile('\s+(\S+)\s*,$')
 RE_PROFILE_DBUS = re.compile('^\s*(audit\s+)?(allow\s+|deny\s+)?(dbus[^#]*\s*,)\s*(#.*)?$')
 RE_PROFILE_MOUNT = re.compile('^\s*(audit\s+)?(allow\s+|deny\s+)?((mount|remount|umount)[^#]*\s*,)\s*(#.*)?$')
+RE_PROFILE_SIGNAL = re.compile('^\s*(audit\s+)?(allow\s+|deny\s+)?((signal)[^#]*\s*,)\s*(#.*)?$')
 
 # match anything that's not " or #, or matching quotes with anything except quotes inside
 __re_no_or_quoted_hash = '([^#"]|"[^"]*")*'
@@ -2702,6 +2703,7 @@ def parse_profile_data(data, file, do_include):
             profile_data[profile][hat]['allow']['path'] = hasher()
             profile_data[profile][hat]['allow']['dbus'] = list()
             profile_data[profile][hat]['allow']['mount'] = list()
+            profile_data[profile][hat]['allow']['signal'] = list()
             # Save the initial comment
             if initial_comment:
                 profile_data[profile][hat]['initial_comment'] = initial_comment
@@ -3062,6 +3064,28 @@ def parse_profile_data(data, file, do_include):
             mount_rules.append(mount_rule)
             profile_data[profile][hat][allow]['mount'] = mount_rules
 
+        elif RE_PROFILE_SIGNAL.search(line):
+            matches = RE_PROFILE_SIGNAL.search(line).groups()
+
+            if not profile:
+                raise AppArmorException(_('Syntax Error: Unexpected signal entry found in file: %s line: %s') % (file, lineno + 1))
+
+            audit = False
+            if matches[0]:
+                audit = True
+            allow = 'allow'
+            if matches[1] and matches[1].strip() == 'deny':
+                allow = 'deny'
+            signal = matches[2].strip()
+
+            signal_rule = parse_signal_rule(signal)
+            signal_rule.audit = audit
+            signal_rule.deny = (allow == 'deny')
+
+            signal_rules = profile_data[profile][hat][allow].get('signal', list())
+            signal_rules.append(signal_rule)
+            profile_data[profile][hat][allow]['signal'] = signal_rules
+
         elif RE_PROFILE_CHANGE_HAT.search(line):
             matches = RE_PROFILE_CHANGE_HAT.search(line).groups()
 
@@ -3160,6 +3184,10 @@ def parse_mount_rule(line):
     # XXX Do real parsing here
     return aarules.Raw_Mount_Rule(line)
 
+def parse_signal_rule(line):
+    # XXX Do real parsing here
+    return aarules.Raw_Signal_Rule(line)
+
 def separate_vars(vs):
     """Returns a list of all the values for a variable"""
     data = []
@@ -3389,6 +3417,24 @@ def write_mount(prof_data, depth):
     data += write_mount_rules(prof_data, depth, 'allow')
     return data
 
+def write_signal_rules(prof_data, depth, allow):
+    pre = '  ' * depth
+    data = []
+
+    # no signal rules, so return
+    if not prof_data[allow].get('signal', False):
+        return data
+
+    for signal_rule in prof_data[allow]['signal']:
+        data.append('%s%s' % (pre, signal_rule.serialize()))
+    data.append('')
+    return data
+
+def write_signal(prof_data, depth):
+    data = write_signal_rules(prof_data, depth, 'deny')
+    data += write_signal_rules(prof_data, depth, 'allow')
+    return data
+
 def write_link_rules(prof_data, depth, allow):
     pre = '  ' * depth
     data = []
@@ -3495,6 +3541,7 @@ def write_rules(prof_data, depth):
     data += write_netdomain(prof_data, depth)
     data += write_dbus(prof_data, depth)
     data += write_mount(prof_data, depth)
+    data += write_signal(prof_data, depth)
     data += write_links(prof_data, depth)
     data += write_paths(prof_data, depth)
     data += write_change_profile(prof_data, depth)
@@ -3644,6 +3691,7 @@ def serialize_profile_from_old_profile(profile_data, name, options):
                          'netdomain': write_netdomain,
                          'dbus': write_dbus,
                          'mount': write_mount,
+                         'signal': write_signal,
                          'link': write_links,
                          'path': write_paths,
                          'change_profile': write_change_profile,
@@ -3736,6 +3784,7 @@ def serialize_profile_from_old_profile(profile_data, name, options):
                     data += write_netdomain(write_prof_data[name], depth)
                     data += write_dbus(write_prof_data[name], depth)
                     data += write_mount(write_prof_data[name], depth)
+                    data += write_signal(write_prof_data[name], depth)
                     data += write_links(write_prof_data[name], depth)
                     data += write_paths(write_prof_data[name], depth)
                     data += write_change_profile(write_prof_data[name], depth)
diff --git a/utils/apparmor/rules.py b/utils/apparmor/rules.py
index e34100f..64f8e54 100644
--- a/utils/apparmor/rules.py
+++ b/utils/apparmor/rules.py
@@ -67,3 +67,15 @@ class Raw_Mount_Rule(object):
         return "%s%s%s" % ('audit ' if self.audit else '',
                            'deny '  if self.deny else '',
                            self.rule)
+
+class Raw_Signal_Rule(object):
+    audit = False
+    deny = False
+
+    def __init__(self, rule):
+        self.rule = rule
+
+    def serialize(self):
+        return "%s%s%s" % ('audit ' if self.audit else '',
+                           'deny '  if self.deny else '',
+                           self.rule)
diff --git a/utils/test/test-regex_matches.py b/utils/test/test-regex_matches.py
index 0b656cc..7096a50 100644
--- a/utils/test/test-regex_matches.py
+++ b/utils/test/test-regex_matches.py
@@ -41,7 +41,12 @@ regex_has_comma_testcases = [
     ('audit "/tmp/foo, # bar" rw%s # comment', 'comment embedded in quote 02'),
 
     # lifted from parser/tst/simple_tests/vars/vars_alternation_3.sd
-    ('/does/not/@{BAR},exist,notexist} r%s', 'partial alternation')
+    ('/does/not/@{BAR},exist,notexist} r%s', 'partial alternation'),
+
+    ('signal%s', 'bare signal'),
+    ('signal receive%s', 'simple signal'),
+    ('signal (send, receive)%s', 'embedded parens signal 01'),
+    ('signal (send, receive) set=(hup, quit)%s', 'embedded parens signal 02'),
 
     # the following fail due to inadequacies in the regex
     # ('dbus (r, w, %s', 'incomplete dbus action'),
@@ -99,6 +104,8 @@ regex_split_comment_testcases = [
     ('file,', False),
     ('file, # bare', ('file, ', '# bare')),
     ('file /tmp/foo rw, # read-write', ('file /tmp/foo rw, ', '# read-write')),
+    ('signal, # comment', ('signal, ', '# comment')),
+    ('signal receive set=(usr1 usr2) peer=foo,', False),
 ]
 
 def setup_split_comment_testcases():
@@ -276,6 +283,88 @@ class AARegexFile(unittest.TestCase):
         result = aa.RE_PROFILE_FILE_ENTRY.search(line)
         self.assertFalse(result, 'RE_PROFILE_FILE_ENTRY unexpectedly matched "%s"' % line)
 
+class AARegexSignal(unittest.TestCase):
+    '''Tests for RE_PROFILE_SIGNAL'''
+
+    def test_bare_signal_01(self):
+        '''test '   signal,' '''
+
+        rule = 'signal,'
+        line = '   %s' % rule
+        result = aa.RE_PROFILE_SIGNAL.search(line)
+        self.assertTrue(result, 'Couldn\'t find signal rule in "%s"' % line)
+        parsed = result.groups()[2].strip()
+        self.assertEqual(parsed, rule, 'Expected signal rule "%s", got "%s"'
+                         % (rule, parsed))
+
+    def test_bare_signal_02(self):
+        '''test '   audit signal,' '''
+
+        rule = 'signal,'
+        line = '   audit %s' % rule
+        result = aa.RE_PROFILE_SIGNAL.search(line)
+        self.assertTrue(result, 'Couldn\'t find signal rule in "%s"' % line)
+        self.assertTrue(result.groups()[0], 'Couldn\'t find audit modifier in "%s"' % line)
+        parsed = result.groups()[2].strip()
+        self.assertEqual(parsed, rule, 'Expected signal rule "%s", got "%s"'
+                         % (rule, parsed))
+
+    def test_simple_signal_01(self):
+        '''test '   signal receive,' '''
+
+        rule = 'signal receive,'
+        line = '   %s' % rule
+        result = aa.RE_PROFILE_SIGNAL.search(line)
+        self.assertTrue(result, 'Couldn\'t find signal rule in "%s"' % line)
+        parsed = result.groups()[2].strip()
+        self.assertEqual(parsed, rule, 'Expected signal rule "%s", got "%s"'
+                         % (rule, parsed))
+
+    def test_simple_signal_02(self):
+        '''test '   signal (send, receive),' '''
+
+        rule = 'signal (send, receive),'
+        line = '   %s' % rule
+        result = aa.RE_PROFILE_SIGNAL.search(line)
+        self.assertTrue(result, 'Couldn\'t find signal rule in "%s"' % line)
+        parsed = result.groups()[2].strip()
+        self.assertEqual(parsed, rule, 'Expected signal rule "%s", got "%s"'
+                         % (rule, parsed))
+
+    def test_simple_signal_03(self):
+        '''test '   audit signal (receive),' '''
+
+        rule = 'signal (receive),'
+        line = '   audit %s' % rule
+        result = aa.RE_PROFILE_SIGNAL.search(line)
+        self.assertTrue(result, 'Couldn\'t find signal rule in "%s"' % line)
+        self.assertTrue(result.groups()[0], 'Couldn\'t find audit modifier in "%s"' % line)
+        parsed = result.groups()[2].strip()
+        self.assertEqual(parsed, rule, 'Expected signal rule "%s", got "%s"'
+                         % (rule, parsed))
+
+    def test_set_signal_01(self):
+        '''test '   signal (send, receive) set=(usr1 usr2),' '''
+
+        rule = 'signal (send, receive) set=(usr1 usr2),'
+        line = '   %s' % rule
+        result = aa.RE_PROFILE_SIGNAL.search(line)
+        self.assertTrue(result, 'Couldn\'t find signal rule in "%s"' % line)
+        parsed = result.groups()[2].strip()
+        self.assertEqual(parsed, rule, 'Expected signal rule "%s", got "%s"'
+                         % (rule, parsed))
+
+    def test_peer_signal_01(self):
+        '''test '   signal send set=(hup, quit) peer=/usr/sbin/daemon,' '''
+
+        rule = 'signal send set=(hup, quit) peer=/usr/sbin/daemon,'
+        line = '   %s' % rule
+        result = aa.RE_PROFILE_SIGNAL.search(line)
+        self.assertTrue(result, 'Couldn\'t find signal rule in "%s"' % line)
+        parsed = result.groups()[2].strip()
+        self.assertEqual(parsed, rule, 'Expected signal rule "%s", got "%s"'
+                         % (rule, parsed))
+
 if __name__ == '__main__':
     verbosity = 2
 
@@ -288,6 +377,7 @@ if __name__ == '__main__':
     test_suite.addTest(unittest.TestLoader().loadTestsFromTestCase(AARegexCapability))
     test_suite.addTest(unittest.TestLoader().loadTestsFromTestCase(AARegexPath))
     test_suite.addTest(unittest.TestLoader().loadTestsFromTestCase(AARegexFile))
+    test_suite.addTest(unittest.TestLoader().loadTestsFromTestCase(AARegexSignal))
     result = unittest.TextTestRunner(verbosity=verbosity).run(test_suite)
     if not result.wasSuccessful():
         exit(1)
diff --git a/utils/test/test-signal_parse.py b/utils/test/test-signal_parse.py
new file mode 100644
index 0000000..392fb32
--- /dev/null
+++ b/utils/test/test-signal_parse.py
@@ -0,0 +1,63 @@
+#! /usr/bin/env python
+# ------------------------------------------------------------------
+#
+#    Copyright (C) 2014 Canonical Ltd.
+#
+#    This program is free software; you can redistribute it and/or
+#    modify it under the terms of version 2 of the GNU General Public
+#    License published by the Free Software Foundation.
+#
+# ------------------------------------------------------------------
+
+import apparmor.aa as aa
+import unittest
+
+class AAParseSignalTest(unittest.TestCase):
+
+    def _test_parse_signal_rule(self, rule):
+        signal = aa.parse_signal_rule(rule)
+        print(signal.serialize())
+        self.assertEqual(rule, signal.serialize(),
+                'signal object returned "%s", expected "%s"' % (signal.serialize(), rule))
+
+    def test_parse_plain_signal_rule(self):
+        self._test_parse_signal_rule('signal,')
+
+    def test_parse_receive_signal_rule(self):
+        self._test_parse_signal_rule('signal (receive),')
+
+    def test_parse_send_signal_rule(self):
+        self._test_parse_signal_rule('signal (send),')
+
+    def test_parse_send_receive_signal_rule(self):
+        self._test_parse_signal_rule('signal (send receive),')
+
+    def test_parse_r_signal_rule(self):
+        self._test_parse_signal_rule('signal r,')
+
+    def test_parse_w_signal_rule(self):
+        self._test_parse_signal_rule('signal w,')
+
+    def test_parse_rw_signal_rule(self):
+        self._test_parse_signal_rule('signal rw,')
+
+    def test_parse_set_1_signal_rule(self):
+        self._test_parse_signal_rule('signal send set=("hup"),')
+
+    def test_parse_set_2_signal_rule(self):
+        self._test_parse_signal_rule('signal (receive) set=kill,')
+
+    def test_parse_set_3_signal_rule(self):
+        self._test_parse_signal_rule('signal w set=(quit int),')
+
+    def test_parse_peer_1_signal_rule(self):
+        self._test_parse_signal_rule('signal receive peer=foo,')
+
+    def test_parse_peer_2_signal_rule(self):
+        self._test_parse_signal_rule('signal (send receive) peer=/usr/bin/bar,')
+
+    def test_parse_peer_3_signal_rule(self):
+        self._test_parse_signal_rule('signal wr set=(pipe, usr1) peer=/sbin/baz,')
+
+if __name__ == '__main__':
+    unittest.main()
-- 
1.9.1




More information about the AppArmor mailing list