Bradley Kirton's Blog

Published on Jan. 20, 2021

Go home

Server sent events in Django 3

If you want to skip directly to the code the repository can be found here django-sse-poc.

This project is a proof of concept and is missing a few important aspects of any web application like authentication. The project creation followed standard django practices, I have however just included a simple view within the project urls file. This view just displays a simple HTML page which listens for server sent events. The remainder of the project can be found within the asgi module. The asgi module includes a custom asgi application which routes all requests to the sse endpoint to a custom handler and all other traffic to django.

This work is inspired by the work on Tom Christie and others on the Starlette project, specifically this issue thread.

"""
ASGI config for config project.

It exposes the ASGI callable as a module-level variable named ``application``.

For more information on this file, see
https://docs.djangoproject.com/en/3.1/howto/deployment/asgi/
"""

import asyncio
import json
import os
from typing import Any, Dict

import aioredis
from django.core.asgi import get_asgi_application

os.environ.setdefault("DJANGO_SETTINGS_MODULE", "config.settings")


class SSE:
    """Publishes server sent events to connected clients.

    Usage:
        sse = SSE(receive, send)

        try:
            await sse.setup()
            await sse.stream_response()
        finally:
            await sse.tear_down()
    """

    def __init__(self, receive: Any, send: Any, channel: str = "channel:sse") -> None:
        self.receive = receive
        self.send = send
        self.disconnected = False
        self.channel = channel

    async def setup(self) -> None:
        """Begin the http response and create a redis instance."""

        await self.send(
            {
                "type": "http.response.start",
                "status": 200,
                "headers": [
                    [b"content-type", b"text/event-stream"],
                    [b"Connection", b"keep-alive"],
                    [b"Cache-Control", b"no-cache"],
                ],
            }
        )
        self.redis = await aioredis.create_redis_pool("redis://localhost")

    async def tear_down(self) -> None:
        """Run clean up."""

        await self.send({"type": "http.response.body", "body": b"", "more_body": False})
        await self.redis.unsubscribe("channel:sse")
        await self.redis.wait_closed()

    async def _listen_for_disconnect(self) -> None:
        """Coroutine which listens for the client disconnect."""

        while self.disconnected is False:
            message = await self.receive()

            if message["type"] == "http.disconnect":
                self.disconnected = True

    async def _subscribe(self) -> None:
        """Coroutine to subscribe to redis channel and push server sent events."""

        channel, *_ = await self.redis.subscribe(self.channel)

        while await channel.wait_message():
            if self.disconnected is True:
                break

            message = await channel.get_json()

            if message is None:
                continue

            payload = json.dumps(message)

            await self.send(
                {
                    "type": "http.response.body",
                    "body": f"data: {payload}\n\n".encode(),
                    "more_body": True,
                }
            )

    async def stream_response(self) -> None:
        """Entrypoint for streaming pubsub events to clients."""

        done, pending = await asyncio.wait(
            [self._subscribe(), self._listen_for_disconnect()],
            return_when=asyncio.FIRST_COMPLETED,
        )
        for task in pending:
            task.cancel()


async def publish(receive: Any, send: Any) -> None:
    """Publish server sent events to connected clients."""

    sse = SSE(receive, send)

    try:
        await sse.setup()
        await sse.stream_response()
    finally:
        await sse.tear_down()


async def application(scope: Dict[str, Any], receive: Any, send: Any) -> None:
    """Custom django asgi application which routes server sent
    events to the publish function and other requests to Django.
    """

    if scope["path"] == "/sse/":
        await publish(receive, send)
    else:
        await get_asgi_application()(scope, receive, send)

Details for project setup can be found on the gitlab repository.