[apparmor] [PATCH 2/2] parser: Verify cache file mtime in caching tests

Tyler Hicks tyhicks at canonical.com
Wed Aug 12 02:53:54 UTC 2015


This makes several improvements to the parser caching tests to verify
that the parser is properly consuming the mtime of profiles and
abstractions when dealing with the policy cache.

It introduces a simple abstraction file and tests the mtime handling by
changing the mtime on the profile, abstraction, apparmor_parser, and
cache file in various combinations to check the parser's behavior.

Signed-off-by: Tyler Hicks <tyhicks at canonical.com>
---
 parser/tst/caching.py | 138 +++++++++++++++++++++++++++++++++++++++++---------
 parser/tst/testlib.py |   4 +-
 2 files changed, 117 insertions(+), 25 deletions(-)

diff --git a/parser/tst/caching.py b/parser/tst/caching.py
index 7e82cb6..df854d7 100755
--- a/parser/tst/caching.py
+++ b/parser/tst/caching.py
@@ -1,8 +1,9 @@
 #!/usr/bin/env python3
 # ------------------------------------------------------------------
 #
-#   Copyright (C) 2013 Canonical Ltd.
-#   Author: Steve Beattie <steve at nxnw.org>
+#   Copyright (C) 2013-2015 Canonical Ltd.
+#   Authors: Steve Beattie <steve at nxnw.org>
+#            Tyler Hicks <tyhicks at canonical.com>
 #
 #   This program is free software; you can redistribute it and/or
 #   modify it under the terms of version 2 of the GNU General Public
@@ -12,7 +13,6 @@
 
 # TODO
 # - check cache not used if parser in $PATH is newer
-# - check cache used/not used if includes are newer/older
 # - check cache used for force-complain, disable symlink, etc.
 
 from argparse import ArgumentParser
@@ -24,19 +24,24 @@ import unittest
 
 import testlib
 
+ABSTRACTION_CONTENTS = '''
+  # Simple example abstraction
+  capability setuid,
+'''
+ABSTRACTION = 'suid-abstraction'
 
 PROFILE_CONTENTS = '''
 # Simple example profile for caching tests
 
 /bin/pingy {
+  #include <%s>
   capability net_raw,
-  capability setuid,
   network inet raw,
 
   /bin/ping mixr,
   /etc/modules.conf r,
 }
-'''
+''' % (ABSTRACTION)
 PROFILE = 'sbin.pingy'
 config = None
 
@@ -63,7 +68,11 @@ class AAParserCachingCommon(testlib.AATestTemplate):
         self.cache_dir = os.path.join(self.tmp_dir, 'cache')
         os.mkdir(self.cache_dir)
 
-        # write our sample profile out
+        # default path of the output cache file
+        self.cache_file = os.path.join(self.cache_dir, PROFILE)
+
+        # write our sample abstraction and profile out
+        self.abstraction = testlib.write_file(self.tmp_dir, ABSTRACTION, ABSTRACTION_CONTENTS)
         self.profile = testlib.write_file(self.tmp_dir, PROFILE, PROFILE_CONTENTS)
 
         if config.debug:
@@ -203,21 +212,20 @@ class AAParserCachingTests(AAParserCachingCommon):
     def setUp(self):
         super(AAParserCachingTests, self).setUp()
 
-        # need separation of length timeout between generating profile
-        # and generating cache entry, as the parser distinguishes
-        # between ctime, not mtime.
-        if not 'timeout' in dir(config):
-            r = testlib.filesystem_time_resolution()
-            config.timeout = r[1]
-
-        time.sleep(config.timeout)
+        r = testlib.filesystem_time_resolution()
+        self.mtime_res = r[1]
 
     def _generate_cache_file(self):
 
         cmd = list(self.cmd_prefix)
         cmd.extend(['-q', '--write-cache', '-r', self.profile])
         self.run_cmd_check(cmd)
-        self.assert_path_exists(os.path.join(self.cache_dir, PROFILE))
+        self.assert_path_exists(self.cache_file)
+
+    def _set_mtime(self, path, mtime):
+        atime = os.stat(path).st_atime
+        os.utime(path, (atime, mtime))
+        self.assertEquals(os.stat(path).st_mtime, mtime)
 
     def test_cache_loaded_when_exists(self):
         '''test cache is loaded when it exists, is newer than profile,  and features match'''
@@ -277,7 +285,7 @@ class AAParserCachingTests(AAParserCachingCommon):
         cmd = list(self.cmd_prefix)
         cmd.extend(['-v', '--write-cache', '--skip-bad-cache', '-r', self.profile])
         self.run_cmd_check(cmd, expected_string='Replacement succeeded for')
-        self.assert_path_exists(os.path.join(self.cache_dir, PROFILE), expected=False)
+        self.assert_path_exists(self.cache_file, expected=False)
 
     def test_cache_writing_updates_features(self):
         '''test cache writing updates features'''
@@ -294,18 +302,18 @@ class AAParserCachingTests(AAParserCachingCommon):
         '''test cache writing updates cache file'''
 
         cache_file = testlib.write_file(self.cache_dir, PROFILE, 'monkey\n')
-        orig_size = os.stat(cache_file).st_size
+        orig_stat = os.stat(cache_file)
 
         cmd = list(self.cmd_prefix)
         cmd.extend(['-v', '--write-cache', '-r', self.profile])
         self.run_cmd_check(cmd, expected_string='Replacement succeeded for')
         self.assert_path_exists(cache_file)
-        with open(cache_file, 'rb') as f:
-            new_size = os.fstat(f.fileno()).st_size
+        stat = os.stat(cache_file)
         # We check sizes here rather than whether the string monkey is
         # in cache_contents because of the difficulty coercing cache
         # file bytes into strings in python3
-        self.assertNotEquals(orig_size, new_size, 'Expected cache file to be updated, size is not changed.')
+        self.assertNotEquals(orig_stat.st_size, stat.st_size, 'Expected cache file to be updated, size is not changed.')
+        self.assertEquals(os.stat(self.profile).st_mtime, stat.st_mtime)
 
     def test_cache_writing_clears_all_files(self):
         '''test cache writing clears all cache files'''
@@ -317,27 +325,110 @@ class AAParserCachingTests(AAParserCachingCommon):
         self.run_cmd_check(cmd, expected_string='Replacement succeeded for')
         self.assert_path_exists(check_file, expected=False)
 
+    def test_profile_mtime_preserved(self):
+        '''test profile mtime is preserved when it is newest'''
+        expected = 1
+        self._set_mtime(self.abstraction, 0)
+        self._set_mtime(self.profile, expected)
+        self._generate_cache_file()
+        self.assertEquals(expected, os.stat(self.cache_file).st_mtime)
+
+    def test_abstraction_mtime_preserved(self):
+        '''test abstraction mtime is preserved when it is newest'''
+        expected = 1000
+        self._set_mtime(self.profile, 0)
+        self._set_mtime(self.abstraction, expected)
+        self._generate_cache_file()
+        self.assertEquals(expected, os.stat(self.cache_file).st_mtime)
+
+    def test_equal_mtimes_preserved(self):
+        '''test equal profile and abstraction mtimes are preserved'''
+        expected = 10000 + self.mtime_res
+        self._set_mtime(self.profile, expected)
+        self._set_mtime(self.abstraction, expected)
+        self._generate_cache_file()
+        self.assertEquals(expected, os.stat(self.cache_file).st_mtime)
+
     def test_profile_newer_skips_cache(self):
         '''test cache is skipped if profile is newer'''
 
         self._generate_cache_file()
-        time.sleep(config.timeout)
-        testlib.touch(self.profile)
+        profile_mtime = os.stat(self.cache_file).st_mtime + self.mtime_res
+        self._set_mtime(self.profile, profile_mtime)
+
+        orig_stat = os.stat(self.cache_file)
+
+        cmd = list(self.cmd_prefix)
+        cmd.extend(['-v', '-r', self.profile])
+        self.run_cmd_check(cmd, expected_string='Replacement succeeded for')
+
+        stat = os.stat(self.cache_file)
+        self.assertEquals(orig_stat.st_size, stat.st_size)
+        self.assertEquals(orig_stat.st_ino, stat.st_ino)
+        self.assertEquals(orig_stat.st_mtime, stat.st_mtime)
+
+    def test_abstraction_newer_skips_cache(self):
+        '''test cache is skipped if abstraction is newer'''
+
+        self._generate_cache_file()
+        abstraction_mtime = os.stat(self.cache_file).st_mtime + self.mtime_res
+        self._set_mtime(self.abstraction, abstraction_mtime)
+
+        orig_stat = os.stat(self.cache_file)
 
         cmd = list(self.cmd_prefix)
         cmd.extend(['-v', '-r', self.profile])
         self.run_cmd_check(cmd, expected_string='Replacement succeeded for')
 
+        stat = os.stat(self.cache_file)
+        self.assertEquals(orig_stat.st_size, stat.st_size)
+        self.assertEquals(orig_stat.st_ino, stat.st_ino)
+        self.assertEquals(orig_stat.st_mtime, stat.st_mtime)
+
+    def test_profile_newer_rewrites_cache(self):
+        '''test cache is rewritten if profile is newer'''
+
+        self._generate_cache_file()
+        profile_mtime = os.stat(self.cache_file).st_mtime + self.mtime_res
+        self._set_mtime(self.profile, profile_mtime)
+
+        orig_stat = os.stat(self.cache_file)
+
+        cmd = list(self.cmd_prefix)
+        cmd.extend(['-v', '-r', '-W', self.profile])
+        self.run_cmd_check(cmd, expected_string='Replacement succeeded for')
+
+        stat = os.stat(self.cache_file)
+        self.assertNotEquals(orig_stat.st_ino, stat.st_ino)
+        self.assertEquals(profile_mtime, stat.st_mtime)
+
+    def test_abstraction_newer_rewrites_cache(self):
+        '''test cache is rewritten if abstraction is newer'''
+
+        self._generate_cache_file()
+        abstraction_mtime = os.stat(self.cache_file).st_mtime + self.mtime_res
+        self._set_mtime(self.abstraction, abstraction_mtime)
+
+        orig_stat = os.stat(self.cache_file)
+
+        cmd = list(self.cmd_prefix)
+        cmd.extend(['-v', '-r', '-W', self.profile])
+        self.run_cmd_check(cmd, expected_string='Replacement succeeded for')
+
+        stat = os.stat(self.cache_file)
+        self.assertNotEquals(orig_stat.st_ino, stat.st_ino)
+        self.assertEquals(abstraction_mtime, stat.st_mtime)
+
     def test_parser_newer_uses_cache(self):
         '''test cache is not skipped if parser is newer'''
 
         self._generate_cache_file()
-        time.sleep(config.timeout)
 
         # copy parser
         os.mkdir(os.path.join(self.tmp_dir, 'parser'))
         new_parser = os.path.join(self.tmp_dir, 'parser', 'apparmor_parser')
         shutil.copy(config.parser, new_parser)
+        self._set_mtime(new_parser, os.stat(self.cache_file).st_mtime + self.mtime_res)
 
         cmd = list(self.cmd_prefix)
         cmd[0] = new_parser
@@ -379,6 +470,7 @@ class AAParserAltCacheTests(AAParserCachingTests):
 
         self.orig_cache_dir = self.cache_dir
         self.cache_dir = alt_cache_dir
+        self.cache_file = os.path.join(self.cache_dir, PROFILE)
         self.cmd_prefix.extend(['--cache-loc', alt_cache_dir])
 
     def tearDown(self):
diff --git a/parser/tst/testlib.py b/parser/tst/testlib.py
index b3474f3..90694f9 100644
--- a/parser/tst/testlib.py
+++ b/parser/tst/testlib.py
@@ -135,7 +135,7 @@ class TimeoutFunction:
 
 
 def filesystem_time_resolution():
-    '''detect whether the filesystem stores sub 1 second timestamps'''
+    '''detect whether the filesystem stores subsecond timestamps'''
 
     default_diff = 0.1
     result = (True, default_diff)
@@ -150,7 +150,7 @@ def filesystem_time_resolution():
                 s = os.fstat(f.fileno())
 
             if (s.st_mtime == last_stamp):
-                print('\n===> WARNING: TMPDIR lacks nanosecond timestamp resolution, falling back to slower test')
+                print('\n===> WARNING: TMPDIR lacks subsecond timestamp resolution, falling back to slower test')
                 result = (False, 1.0)
                 break
 
-- 
2.1.4




More information about the AppArmor mailing list