Published on Nov. 10, 2025
Go homeASGI Path Send Event
ASGI PathSend Application
import mimetypes
import os
from asgiref import typing as t
from django.conf import settings
from django.contrib.staticfiles import finders
from django.core.asgi import get_asgi_application
from django.core.handlers.asgi import ASGIHandler
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "abacus.settings")
class PathSendApp:
"""Custom ASGI application which implements the http.response.pathsend event for static files."""
def __init__(self, app: ASGIHandler) -> None:
self.app = app
async def _handle_websocket(
self,
scope: t.WebSocketScope,
receive: t.ASGIReceiveCallable,
send: t.ASGISendCallable,
) -> None:
await self.app(
scope=scope, # type: ignore
receive=receive,
send=send, # type: ignore
)
async def _handle_http(
self,
scope: t.HTTPScope,
receive: t.ASGIReceiveCallable,
send: t.ASGISendCallable,
) -> None:
"""Handle HTTP requests.
If the request PATH starts with the STATIC_URL handle the request suing the http.response.pathsend event.
Note http.response.pathsend is only supported by some ASGI servers.
Riseapp base makes use of Granian which is an ASGI server which http.response.pathsend, if this is switched
out this custom ASGI application should be reconsidered.
"""
path = scope["path"]
if not path.startswith(settings.STATIC_URL):
await self.app(
scope=scope, # type: ignore
receive=receive,
send=send, # type: ignore
)
else:
if settings.DJANGO_ENV == "local":
file_path: str = finders.find(stripped_path_raw) # type: ignore
else:
stripped_path = (settings.STATIC_ROOT / stripped_path_raw).resolve()
if stripped_path.is_relative_to(settings.STATIC_ROOT):
file_path = f"{stripped_path}"
else:
file_path = None
if file_path:
content_type, _ = mimetypes.guess_type(file_path)
if not content_type:
content_type = "text/plain"
content_length = os.path.getsize(file_path)
await send(
{
"type": "http.response.start",
"status": 200,
"headers": [
(b"content-type", f"{content_type}".encode()),
(b"content-length", f"{content_length}".encode()),
(b"x-served-by", b"PathSend"),
],
} # type: ignore
)
await send(
{
"type": "http.response.pathsend", # type: ignore
"path": file_path,
} # type: ignore
)
else:
await send(
{
"type": "http.response.start",
"status": 404,
"headers": [
(b"content-type", b"text/plain"),
],
} # type: ignore
)
await send(
{
"type": "http.response.body", # type: ignore
"body": b"404 Not Found",
"more_body": False,
} # type: ignore
)
async def __call__(
self,
scope: t.Scope,
receive: t.ASGIReceiveCallable,
send: t.ASGISendCallable,
) -> None:
match scope["type"]:
case "http":
await self._handle_http(scope=scope, receive=receive, send=send)
case "websocket":
await self._handle_websocket(scope=scope, receive=receive, send=send)
case unhandled:
raise ValueError(f"Unhandled ASGI scope {unhandled}")
django_application = get_asgi_application()
application = PathSendApp(django_application)