Skip to content

Commit 12d6dd0

Browse files
committed
Update to patch _execute and _executescript & to make them more dynamic
1 parent 103b1f8 commit 12d6dd0

2 files changed

Lines changed: 27 additions & 24 deletions

File tree

aikido_zen/sinks/__init__.py

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -138,14 +138,12 @@ async def decorator(func, instance, args, kwargs):
138138

139139

140140
def patch_immutable_class(base_cls, method_patches):
141-
class_name = f"{base_cls.__name__}"
142-
143141
modifiable_attributes = {}
144142
for name in method_patches:
145143
modifiable_attributes[name] = getattr(base_cls, name)
146144

147145
cls = type(
148-
class_name,
146+
base_cls.__name__,
149147
(base_cls,),
150148
# this modifiable_attributes object contains a python (not c) map of functions, so we can apply the
151149
# patch_function to these attributes of our new class.

aikido_zen/sinks/sqlite3.py

Lines changed: 26 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -23,37 +23,29 @@ def _cursor_execute(func, instance, args, kwargs):
2323

2424

2525
@before
26-
def _cursor_executemany(func, instance, args, kwargs):
26+
def _execute(func, instance, args, kwargs):
27+
op = f"sqlite3.{type(instance).__name__}.{func.__name__}"
2728
query = get_argument(args, kwargs, 0, "sql")
28-
29-
register_call("sqlite3.Cursor.executemany", "sql_op")
30-
vulns.run_vulnerability_scan(
31-
kind="sql_injection",
32-
op="sqlite3.Cursor.executemany",
33-
args=(query, "sqlite"),
34-
)
29+
register_call(op, "sql_op")
30+
vulns.run_vulnerability_scan(kind="sql_injection", op=op, args=(query, "sqlite"))
3531

3632

3733
@before
38-
def _cursor_executescript(func, instance, args, kwargs):
34+
def _executescript(func, instance, args, kwargs):
35+
op = f"sqlite3.{type(instance).__name__}.{func.__name__}"
3936
query = get_argument(args, kwargs, 0, "sql_script")
40-
41-
register_call("sqlite3.Cursor.executescript", "sql_op")
42-
vulns.run_vulnerability_scan(
43-
kind="sql_injection",
44-
op="sqlite3.Cursor.executescript",
45-
args=(query, "sqlite"),
46-
)
37+
register_call(op, "sql_op")
38+
vulns.run_vulnerability_scan(kind="sql_injection", op=op, args=(query, "sqlite"))
4739

4840

4941
def _cursor_patch(func, instance, args, kwargs):
5042
factory = get_argument(args, kwargs, 0, "factory") or _sqlite3.Cursor
5143
patched_factory = patch_immutable_class(
5244
factory,
5345
{
54-
"execute": _cursor_execute,
55-
"executemany": _cursor_executemany,
56-
"executescript": _cursor_executescript,
46+
"execute": _execute,
47+
"executemany": _execute,
48+
"executescript": _executescript,
5749
},
5850
)
5951

@@ -63,8 +55,21 @@ def _cursor_patch(func, instance, args, kwargs):
6355

6456
def _connect(func, instance, args, kwargs):
6557
factory = get_argument(args, kwargs, 5, "factory") or _sqlite3.Connection
66-
patched_factory = patch_immutable_class(factory, {"cursor": _cursor_patch})
67-
58+
connection_patches = {
59+
"cursor": _cursor_patch
60+
}
61+
62+
if _PATCH_CONNECTION_EXECUTE:
63+
# Since py 3.11 there are more ways than using the cursor to execute (e.g. using the connection)
64+
connection_patches.update(
65+
{
66+
"execute": _execute,
67+
"executemany": _execute,
68+
"executescript": _executescript,
69+
}
70+
)
71+
72+
patched_factory = patch_immutable_class(factory, connection_patches)
6873
new_args, new_kwargs = modify_arguments(args, kwargs, 5, "factory", patched_factory)
6974
return func(*new_args, **new_kwargs)
7075

0 commit comments

Comments
 (0)