Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
90 changes: 90 additions & 0 deletions kernelguard.py
Original file line number Diff line number Diff line change
Expand Up @@ -2459,6 +2459,96 @@ def _has_ver(expr: ast.AST | None) -> bool:
),
}]

# Fourth pass: replay through local storage loaded from the
# entrypoint's mutable __defaults__ tuple.
#
# Pattern: store = custom_kernel.__defaults__[0]
# if store: return store[0]
# store.append(data.clone())
params = {
arg.arg
for args in (node.args.posonlyargs, node.args.args, node.args.kwonlyargs)
for arg in args
}
if node.args.vararg is not None:
params.add(node.args.vararg.arg)
if node.args.kwarg is not None:
params.add(node.args.kwarg.arg)

defaults_mutated = False
for stmt in getattr(tree, "body", []):
if not isinstance(stmt, ast.Assign):
continue
if not any(
isinstance(target, ast.Attribute)
and target.attr == "__defaults__"
and isinstance(target.value, ast.Name)
and is_entrypoint_name(target.value.id)
for target in stmt.targets
):
continue
if any(isinstance(part, (ast.List, ast.Dict, ast.Set)) for part in ast.walk(stmt.value)):
defaults_mutated = True
break

default_aliases: set[str] = set()
mutated_aliases: set[str] = set()
mutating_methods = {
"add", "append", "clear", "extend", "insert", "pop", "popitem",
"remove", "rotate", "setdefault", "update",
}
if defaults_mutated:
for child in ast.walk(node):
if isinstance(child, ast.Assign):
if (len(child.targets) == 1
and isinstance(child.targets[0], ast.Name)
and isinstance(child.value, ast.Subscript)
and isinstance(child.value.value, ast.Attribute)
and child.value.value.attr == "__defaults__"
and isinstance(child.value.value.value, ast.Name)
and is_entrypoint_name(child.value.value.value.id)):
default_aliases.add(child.targets[0].id)
for target in child.targets:
if _ast_root_name(target) in default_aliases:
mutated_aliases.add(_ast_root_name(target))
elif isinstance(child, ast.AugAssign):
target_root = _ast_root_name(child.target)
if target_root in default_aliases:
mutated_aliases.add(target_root)
elif (isinstance(child, ast.Call)
and isinstance(child.func, ast.Attribute)
and child.func.attr in mutating_methods
and _ast_root_name(child.func.value) in default_aliases):
mutated_aliases.add(_ast_root_name(child.func.value))

if mutated_aliases:
for child in ast.walk(node):
if not isinstance(child, ast.If):
continue
if _body_has_calls(child.body):
continue
if _expr_names(child.test) & params:
continue
if not (_expr_names(child.test) & mutated_aliases):
continue
if not any(
isinstance(stmt, ast.Return)
and stmt.value is not None
and isinstance(stmt.value, ast.Subscript)
and _ast_root_name(stmt.value) in mutated_aliases
for stmt in child.body
):
continue

return [{
"pattern": "LAST_CALL_REPLAY",
"severity": "critical",
"evidence": (
f"{entrypoint_name} replays output through mutable "
"__defaults__ storage"
),
}]

return []


Expand Down