diff --git a/Lib/os.py b/Lib/os.py index a5e1d805556998..739292b60a783b 100644 --- a/Lib/os.py +++ b/Lib/os.py @@ -374,6 +374,14 @@ def walk(top, topdown=True, onerror=None, followlinks=False): """ sys.audit("os.walk", top, topdown, onerror, followlinks) + try: + top_stat = stat(fspath(top)) + except OSError: + pass # non-existing: fall through, scandir will handle it via onerror + else: + if not path.isdir(fspath(top)) and not path.islink(fspath(top)): + raise NotADirectoryError(20, "Not a directory", top) + stack = [fspath(top)] islink, join = path.islink, path.join while stack: @@ -540,11 +548,14 @@ def _fwalk(stack, isbytes, topdown, onerror, follow_symlinks): stack.append((_fwalk_close, topfd)) if not follow_symlinks: if isroot and not st.S_ISDIR(orig_st.st_mode): - return + raise NotADirectoryError(20, "Not a directory", toppath) if not path.samestat(orig_st, stat(topfd)): return - scandir_it = scandir(topfd) + try: + scandir_it = scandir(topfd) + except NotADirectoryError: + raise NotADirectoryError(20, "Not a directory", toppath) dirs = [] nondirs = [] entries = None if topdown or follow_symlinks else [] diff --git a/Lib/test/test_os/test_os.py b/Lib/test/test_os/test_os.py index 68cb32cd40be30..d8cdeef0c77b6f 100644 --- a/Lib/test/test_os/test_os.py +++ b/Lib/test/test_os/test_os.py @@ -1981,6 +1981,13 @@ def test_walk_above_recursion_limit(self): self.assertEqual(sorted(dirs), ["SUB1", "SUB2", "d"]) self.assertEqual(all, expected) + def test_walk_on_file_raises_not_a_directory(self): + # gh-101420: os.walk() was silently returning [] when top is a + # regular file instead of raising NotADirectoryError. + with tempfile.NamedTemporaryFile() as f: + with self.assertRaises(NotADirectoryError): + list(os.walk(f.name)) + @unittest.skipUnless(hasattr(os, 'fwalk'), "Test needs os.fwalk()") class FwalkTests(WalkTests): @@ -2038,6 +2045,19 @@ def test_yields_correct_dir_fd(self): # check that listdir() returns consistent information self.assertEqual(set(os.listdir(rootfd)), set(dirs) | set(files)) + def test_fwalk_on_file_raises_not_a_directory(self): + with tempfile.NamedTemporaryFile() as f: + with self.assertRaises(NotADirectoryError): + list(os.fwalk(f.name, follow_symlinks=False)) + + # follow_symlinks=True: raised but with fd int as filename, must now have path + with self.assertRaises(NotADirectoryError) as ctx: + list(os.fwalk(f.name, follow_symlinks=True)) + self.assertEqual( + ctx.exception.filename, f.name, + "filename should be the path string, not a raw fd integer" + ) + @unittest.skipIf( support.is_android, "dup return value is unpredictable on Android" )