[apparmor] [PATCH 2/2] parser: Verify cache file mtime in caching tests

John Johansen john.johansen at canonical.com
Wed Aug 12 16:46:11 UTC 2015


On 08/11/2015 07:53 PM, Tyler Hicks wrote:
> This makes several improvements to the parser caching tests to verify
> that the parser is properly consuming the mtime of profiles and
> abstractions when dealing with the policy cache.
> 
> It introduces a simple abstraction file and tests the mtime handling by
> changing the mtime on the profile, abstraction, apparmor_parser, and
> cache file in various combinations to check the parser's behavior.
> 
> Signed-off-by: Tyler Hicks <tyhicks at canonical.com>

Acked-by: John Johansen <john.johansen at canonical.com>

> ---
>  parser/tst/caching.py | 138 +++++++++++++++++++++++++++++++++++++++++---------
>  parser/tst/testlib.py |   4 +-
>  2 files changed, 117 insertions(+), 25 deletions(-)
> 
> diff --git a/parser/tst/caching.py b/parser/tst/caching.py
> index 7e82cb6..df854d7 100755
> --- a/parser/tst/caching.py
> +++ b/parser/tst/caching.py
> @@ -1,8 +1,9 @@
>  #!/usr/bin/env python3
>  # ------------------------------------------------------------------
>  #
> -#   Copyright (C) 2013 Canonical Ltd.
> -#   Author: Steve Beattie <steve at nxnw.org>
> +#   Copyright (C) 2013-2015 Canonical Ltd.
> +#   Authors: Steve Beattie <steve at nxnw.org>
> +#            Tyler Hicks <tyhicks at canonical.com>
>  #
>  #   This program is free software; you can redistribute it and/or
>  #   modify it under the terms of version 2 of the GNU General Public
> @@ -12,7 +13,6 @@
>  
>  # TODO
>  # - check cache not used if parser in $PATH is newer
> -# - check cache used/not used if includes are newer/older
>  # - check cache used for force-complain, disable symlink, etc.
>  
>  from argparse import ArgumentParser
> @@ -24,19 +24,24 @@ import unittest
>  
>  import testlib
>  
> +ABSTRACTION_CONTENTS = '''
> +  # Simple example abstraction
> +  capability setuid,
> +'''
> +ABSTRACTION = 'suid-abstraction'
>  
>  PROFILE_CONTENTS = '''
>  # Simple example profile for caching tests
>  
>  /bin/pingy {
> +  #include <%s>
>    capability net_raw,
> -  capability setuid,
>    network inet raw,
>  
>    /bin/ping mixr,
>    /etc/modules.conf r,
>  }
> -'''
> +''' % (ABSTRACTION)
>  PROFILE = 'sbin.pingy'
>  config = None
>  
> @@ -63,7 +68,11 @@ class AAParserCachingCommon(testlib.AATestTemplate):
>          self.cache_dir = os.path.join(self.tmp_dir, 'cache')
>          os.mkdir(self.cache_dir)
>  
> -        # write our sample profile out
> +        # default path of the output cache file
> +        self.cache_file = os.path.join(self.cache_dir, PROFILE)
> +
> +        # write our sample abstraction and profile out
> +        self.abstraction = testlib.write_file(self.tmp_dir, ABSTRACTION, ABSTRACTION_CONTENTS)
>          self.profile = testlib.write_file(self.tmp_dir, PROFILE, PROFILE_CONTENTS)
>  
>          if config.debug:
> @@ -203,21 +212,20 @@ class AAParserCachingTests(AAParserCachingCommon):
>      def setUp(self):
>          super(AAParserCachingTests, self).setUp()
>  
> -        # need separation of length timeout between generating profile
> -        # and generating cache entry, as the parser distinguishes
> -        # between ctime, not mtime.
> -        if not 'timeout' in dir(config):
> -            r = testlib.filesystem_time_resolution()
> -            config.timeout = r[1]
> -
> -        time.sleep(config.timeout)
> +        r = testlib.filesystem_time_resolution()
> +        self.mtime_res = r[1]
>  
>      def _generate_cache_file(self):
>  
>          cmd = list(self.cmd_prefix)
>          cmd.extend(['-q', '--write-cache', '-r', self.profile])
>          self.run_cmd_check(cmd)
> -        self.assert_path_exists(os.path.join(self.cache_dir, PROFILE))
> +        self.assert_path_exists(self.cache_file)
> +
> +    def _set_mtime(self, path, mtime):
> +        atime = os.stat(path).st_atime
> +        os.utime(path, (atime, mtime))
> +        self.assertEquals(os.stat(path).st_mtime, mtime)
>  
>      def test_cache_loaded_when_exists(self):
>          '''test cache is loaded when it exists, is newer than profile,  and features match'''
> @@ -277,7 +285,7 @@ class AAParserCachingTests(AAParserCachingCommon):
>          cmd = list(self.cmd_prefix)
>          cmd.extend(['-v', '--write-cache', '--skip-bad-cache', '-r', self.profile])
>          self.run_cmd_check(cmd, expected_string='Replacement succeeded for')
> -        self.assert_path_exists(os.path.join(self.cache_dir, PROFILE), expected=False)
> +        self.assert_path_exists(self.cache_file, expected=False)
>  
>      def test_cache_writing_updates_features(self):
>          '''test cache writing updates features'''
> @@ -294,18 +302,18 @@ class AAParserCachingTests(AAParserCachingCommon):
>          '''test cache writing updates cache file'''
>  
>          cache_file = testlib.write_file(self.cache_dir, PROFILE, 'monkey\n')
> -        orig_size = os.stat(cache_file).st_size
> +        orig_stat = os.stat(cache_file)
>  
>          cmd = list(self.cmd_prefix)
>          cmd.extend(['-v', '--write-cache', '-r', self.profile])
>          self.run_cmd_check(cmd, expected_string='Replacement succeeded for')
>          self.assert_path_exists(cache_file)
> -        with open(cache_file, 'rb') as f:
> -            new_size = os.fstat(f.fileno()).st_size
> +        stat = os.stat(cache_file)
>          # We check sizes here rather than whether the string monkey is
>          # in cache_contents because of the difficulty coercing cache
>          # file bytes into strings in python3
> -        self.assertNotEquals(orig_size, new_size, 'Expected cache file to be updated, size is not changed.')
> +        self.assertNotEquals(orig_stat.st_size, stat.st_size, 'Expected cache file to be updated, size is not changed.')
> +        self.assertEquals(os.stat(self.profile).st_mtime, stat.st_mtime)
>  
>      def test_cache_writing_clears_all_files(self):
>          '''test cache writing clears all cache files'''
> @@ -317,27 +325,110 @@ class AAParserCachingTests(AAParserCachingCommon):
>          self.run_cmd_check(cmd, expected_string='Replacement succeeded for')
>          self.assert_path_exists(check_file, expected=False)
>  
> +    def test_profile_mtime_preserved(self):
> +        '''test profile mtime is preserved when it is newest'''
> +        expected = 1
> +        self._set_mtime(self.abstraction, 0)
> +        self._set_mtime(self.profile, expected)
> +        self._generate_cache_file()
> +        self.assertEquals(expected, os.stat(self.cache_file).st_mtime)
> +
> +    def test_abstraction_mtime_preserved(self):
> +        '''test abstraction mtime is preserved when it is newest'''
> +        expected = 1000
> +        self._set_mtime(self.profile, 0)
> +        self._set_mtime(self.abstraction, expected)
> +        self._generate_cache_file()
> +        self.assertEquals(expected, os.stat(self.cache_file).st_mtime)
> +
> +    def test_equal_mtimes_preserved(self):
> +        '''test equal profile and abstraction mtimes are preserved'''
> +        expected = 10000 + self.mtime_res
> +        self._set_mtime(self.profile, expected)
> +        self._set_mtime(self.abstraction, expected)
> +        self._generate_cache_file()
> +        self.assertEquals(expected, os.stat(self.cache_file).st_mtime)
> +
>      def test_profile_newer_skips_cache(self):
>          '''test cache is skipped if profile is newer'''
>  
>          self._generate_cache_file()
> -        time.sleep(config.timeout)
> -        testlib.touch(self.profile)
> +        profile_mtime = os.stat(self.cache_file).st_mtime + self.mtime_res
> +        self._set_mtime(self.profile, profile_mtime)
> +
> +        orig_stat = os.stat(self.cache_file)
> +
> +        cmd = list(self.cmd_prefix)
> +        cmd.extend(['-v', '-r', self.profile])
> +        self.run_cmd_check(cmd, expected_string='Replacement succeeded for')
> +
> +        stat = os.stat(self.cache_file)
> +        self.assertEquals(orig_stat.st_size, stat.st_size)
> +        self.assertEquals(orig_stat.st_ino, stat.st_ino)
> +        self.assertEquals(orig_stat.st_mtime, stat.st_mtime)
> +
> +    def test_abstraction_newer_skips_cache(self):
> +        '''test cache is skipped if abstraction is newer'''
> +
> +        self._generate_cache_file()
> +        abstraction_mtime = os.stat(self.cache_file).st_mtime + self.mtime_res
> +        self._set_mtime(self.abstraction, abstraction_mtime)
> +
> +        orig_stat = os.stat(self.cache_file)
>  
>          cmd = list(self.cmd_prefix)
>          cmd.extend(['-v', '-r', self.profile])
>          self.run_cmd_check(cmd, expected_string='Replacement succeeded for')
>  
> +        stat = os.stat(self.cache_file)
> +        self.assertEquals(orig_stat.st_size, stat.st_size)
> +        self.assertEquals(orig_stat.st_ino, stat.st_ino)
> +        self.assertEquals(orig_stat.st_mtime, stat.st_mtime)
> +
> +    def test_profile_newer_rewrites_cache(self):
> +        '''test cache is rewritten if profile is newer'''
> +
> +        self._generate_cache_file()
> +        profile_mtime = os.stat(self.cache_file).st_mtime + self.mtime_res
> +        self._set_mtime(self.profile, profile_mtime)
> +
> +        orig_stat = os.stat(self.cache_file)
> +
> +        cmd = list(self.cmd_prefix)
> +        cmd.extend(['-v', '-r', '-W', self.profile])
> +        self.run_cmd_check(cmd, expected_string='Replacement succeeded for')
> +
> +        stat = os.stat(self.cache_file)
> +        self.assertNotEquals(orig_stat.st_ino, stat.st_ino)
> +        self.assertEquals(profile_mtime, stat.st_mtime)
> +
> +    def test_abstraction_newer_rewrites_cache(self):
> +        '''test cache is rewritten if abstraction is newer'''
> +
> +        self._generate_cache_file()
> +        abstraction_mtime = os.stat(self.cache_file).st_mtime + self.mtime_res
> +        self._set_mtime(self.abstraction, abstraction_mtime)
> +
> +        orig_stat = os.stat(self.cache_file)
> +
> +        cmd = list(self.cmd_prefix)
> +        cmd.extend(['-v', '-r', '-W', self.profile])
> +        self.run_cmd_check(cmd, expected_string='Replacement succeeded for')
> +
> +        stat = os.stat(self.cache_file)
> +        self.assertNotEquals(orig_stat.st_ino, stat.st_ino)
> +        self.assertEquals(abstraction_mtime, stat.st_mtime)
> +
>      def test_parser_newer_uses_cache(self):
>          '''test cache is not skipped if parser is newer'''
>  
>          self._generate_cache_file()
> -        time.sleep(config.timeout)
>  
>          # copy parser
>          os.mkdir(os.path.join(self.tmp_dir, 'parser'))
>          new_parser = os.path.join(self.tmp_dir, 'parser', 'apparmor_parser')
>          shutil.copy(config.parser, new_parser)
> +        self._set_mtime(new_parser, os.stat(self.cache_file).st_mtime + self.mtime_res)
>  
>          cmd = list(self.cmd_prefix)
>          cmd[0] = new_parser
> @@ -379,6 +470,7 @@ class AAParserAltCacheTests(AAParserCachingTests):
>  
>          self.orig_cache_dir = self.cache_dir
>          self.cache_dir = alt_cache_dir
> +        self.cache_file = os.path.join(self.cache_dir, PROFILE)
>          self.cmd_prefix.extend(['--cache-loc', alt_cache_dir])
>  
>      def tearDown(self):
> diff --git a/parser/tst/testlib.py b/parser/tst/testlib.py
> index b3474f3..90694f9 100644
> --- a/parser/tst/testlib.py
> +++ b/parser/tst/testlib.py
> @@ -135,7 +135,7 @@ class TimeoutFunction:
>  
>  
>  def filesystem_time_resolution():
> -    '''detect whether the filesystem stores sub 1 second timestamps'''
> +    '''detect whether the filesystem stores subsecond timestamps'''
>  
>      default_diff = 0.1
>      result = (True, default_diff)
> @@ -150,7 +150,7 @@ def filesystem_time_resolution():
>                  s = os.fstat(f.fileno())
>  
>              if (s.st_mtime == last_stamp):
> -                print('\n===> WARNING: TMPDIR lacks nanosecond timestamp resolution, falling back to slower test')
> +                print('\n===> WARNING: TMPDIR lacks subsecond timestamp resolution, falling back to slower test')
>                  result = (False, 1.0)
>                  break
>  
> 




More information about the AppArmor mailing list