a rocket launch being celebrated by robots to signify the launch of the company

Technical

Why we use Server Sent Events and how to implement them in FastAPI

Jan 9, 2024

CTO Aneesh

Aneesh Arora

Cofounder and CTO

Learn why and how to set up real-time updates in your web app using Server-Sent Events using FastAPI

Challenges in AI Model Integration
  • Speed and Content Limitation: When building an application with AI models, a key challenge is the speed of content creation and the limit on the amount of content that can be processed at once.
Enhancing User Experience in Afterword
  • Dual Tasks: Afterword must scrape web articles and condense their content, both time-consuming tasks.
  • Managing User Expectations: Users expect quick results. It's vital to:
    • Provide partial summaries during processing.
    • Update users on the process status.
  • Purpose: This ensures users know the application is active and not stalled.
Technical Solutions for Timely Updates
  • WebSockets vs. Server-Sent Events (SSE):
    • WebSockets: Used in bidirectional communication (e.g., WhatsApp), but resource-intensive.
    • Server-Sent Events (SSE): Offers a unidirectional communication path.
      • Server broadcasts updates.
      • Client receives updates passively, different from traditional HTTP requests where client has to make a new request for every update.
Why Choose SSE?
  • Efficient Broadcasting: Chosen for its efficiency in broadcasting updates.
  • Industry Validation: Similar approach used by OpenAI for ChatGPT.
  • Early Adoption: We implemented SSE in July 2022, predating ChatGPT's launch.
Implementing SSE in Afterword

Having settled on Server-Sent Events (SSE) for Afterword, our next step was to implement it practically. For Afterword's backend, we chose FastAPI, a leading Python framework, aligning with our decision to use Python for our AI functionalities. I'll delve into Afterword's complete tech stack in a subsequent, more detailed blog post.

SSE implementation involves two main components: the server and the client. On the server side, we needed to integrate SSE within the FastAPI framework. Regarding the client side, the implementation of SSE in JavaScript will be the same no matter which frontend framework you choose to work with unless you use HTMX (the frontend framework for javascript haters. Just kidding, it’s really cool do check it out).

When it came to implementing SEE using FastAPI back in July 2022 I could find only two articles on the internet and neither of them were very helpful. There might be more articles today but all of them fail to explain certain basic concepts clearly. Hence I am going to attempt to mitigate that gap.

FastAPI, built upon the Starlette framework, can leverage a third-party library specifically designed for Server-Sent Events (SSE), namely sse-starlette . Implementing SSE in FastAPI using this library is straightforward. Instead of returning a typical response from a FastAPI route, you use EventSourceResponse(generator_function()).

To understand this better, let's clarify what a generator function is. In Python, a generator function is used to return a sequence of values over time, each time it's iterated, by employing the yield statement. This differs from a normal function, which returns a single value using the return statement.

For instance, a standard function is structured as follows:

python
def normal_function():
    #Perform some action
    return final_value

In contrast, a generator function looks like this:

python
def generator_function():
    #Perform some action
    yield first_value
    #Perform more action
    yield second_value
    #Perform even more action
    yield final_value

Each yield in the generator function produces a value that can be sent to the client as part of an SSE stream. This allows for real-time data transmission to the client, making it an ideal approach for applications that require continuous data updates, like Afterword.

Standard vs SSE route

In FastAPI, a standard route for a simple request-response cycle looks quite straightforward. Here's an example of such a route:

python
@app.get("/normal")
def hello_world():
    return {"Hello": "World"}

This route responds with a simple JSON object upon a GET request.

However, for a route that implements Server-Sent Events (SSE), the structure changes to accommodate the SSE pattern. This is achieved by returning an `EventSourceResponse` with a generator function. Here's how the SSE route would look:

python
@app.get("/SSE")
def server_sent_events():
    return EventSourceResponse(generator_function())

With the generator function you've described, this SSE route will send three separate updates to the client (browser) before ceasing transmission. Each yield in your generator function corresponds to a separate update sent to the client.

Message format for SSE

When using SSE, it's crucial that the messages sent to the browser adhere to the structure specified in the HTML format. Each message consists of various fields, each on a new line, defined as fieldname: value. The key fields include:

  1. event: Specifies the event type. If this is set, it triggers a browser event for which listeners can be added using addEventListener(). If not set, the onmessage handler is used.
  2. data: Sets the event ID.
  3. id: Contains the actual message data.
  4. retry: Defines the reconnection time in milliseconds, instructing the browser how long to wait before attempting to reconnect if the connection is lost.

All other field names are ignored by the browser.

Events are optional but they are helpful. For example by setting an event name - “end” we can tell the browser when to stop listening for more messages. Of course the same can be achieved based on certain input received through the data field as well.

In your generator function, the messages are structured to include these key fields. For instance:

python
def generator_function():
    yield {
        "event": "event_name",
        "id": event_id, #any unique id
        "retry": 15000, #15s
        "data": json.dumps({"message": "message text"}) #actual data being sent
    }

Now the reason we use json.dumps is because Python data structures cannot be directly sent over a network. json.dumps serializes these structures into a string representation, which can be transmitted over the network.

Handling SSE in browser using javascript

Receiving Server-Sent Events (SSE) in the browser using JavaScript is relatively straightforward and involves setting up an EventSource object to listen for messages from the server. Here's a breakdown of how it works based on your provided code:

  1. Initialize the EventSource:
    javascript
    const evtSource = new EventSource(URL);

    This line creates a new EventSource instance, connecting to the specified URL (where your FastAPI server sends SSE).

  2. Listen for Specific Events:
    javascript
    evtSource.addEventListener("event_name", function(event) {
        let data = JSON.parse(event.data);
        // Use the data
    });

    Here, you're adding an event listener for a specific event type, "event_name". When the event is received, the callback function is executed. The event data, which is a JSON string, is parsed back into a JavaScript object.

  3. Listen for a Termination Event:
    javascript
    evtSource.addEventListener("end", function(event) {
        evtSource.close();
    });

    This listener waits for an "end" event, signaling that no more messages will be sent. Upon receiving this event, it closes the EventSource connection.

  4. Error Handling:
    javascript
    evtSource.onerror = function(event) {
        // Handle error
    };

    This function will be called if an error occurs with the EventSource connection. You can implement specific error handling logic here.

This setup ensures that your browser client is continuously listening for messages sent from your FastAPI server via SSE. It processes each message as it arrives and responds accordingly, whether that's updating the UI, triggering further actions, or closing the connection. This method is efficient for real-time applications, as it reduces the need for repeated polling and provides a more interactive user experience.

Complete working example

Now that we know how to send and receive SSE let us look at a full example.

In this we will assume that we are looping over an array and generating data and sending to the browser.

  1. Install all dependencies:
    terminal
    pip install fastapi
    pip install uvicorn[standard]
    pip install sse-starlette
  2. Create a python file 'sse.py' and copy the below code to it:
    python
    from fastapi import FastAPI
    from sse_starlette.sse import EventSourceResponse
    import time
    import json
    from fastapi.middleware.cors import CORSMiddleware
    
    app = FastAPI()
    
    # Set up CORS middleware
    app.add_middleware(
        CORSMiddleware,
        allow_origins=["*"],  # Allows all origins
        allow_credentials=True,
        allow_methods=["*"],  # Allows all methods
        allow_headers=["*"],  # Allows all headers
    )
    
    @app.get("/SSE")
    async def SSE():
        return EventSourceResponse(generator_function())
    
    RETRY_TIMEOUT = 15000 #15s
    
    async def generator_function():
        #We are sleeping to simulate CPU processing
        time.sleep(2) #sleep for 2 seconds
        yield {"event": "message","id": 1,"retry": RETRY_TIMEOUT,"data": json.dumps({"message":"1st SSE"})}
        time.sleep(1)
        yield {"event": "message","id": 2,"retry": RETRY_TIMEOUT,"data": json.dumps({"message":"2nd SSE"})}
        #Loop over a list and send SSE
        messages = ["data 1","data 2","data 3"]
        for message in messages:
            time.sleep(1)
            yield {"event": "message","id": 3,"retry": RETRY_TIMEOUT,"data": json.dumps({"message":message})}
        time.sleep(1)
        yield {"event": "end","id": 4,"retry": RETRY_TIMEOUT,"data": json.dumps({"message":"last SSE"})}
  3. CORS Middleware Configuration:
    python
    app.add_middleware(
        CORSMiddleware,
        allow_origins=["*"],  # Allows all origins
        allow_credentials=True,
        allow_methods=["*"],  # Allows all methods
        allow_headers=["*"],  # Allows all headers
    )

    This sets up CORS to allow requests from any origin, which is crucial for API accessibility from different domains. Since we will be making requests using javascript.

  4. Start FastAPI server:
    terminal
    uvicorn sse:app
  5. The server should start on port 8000:
    terminal
    INFO:     Started server process [22876]
    INFO:     Waiting for application startup.
    INFO:     Application startup complete.
    INFO:     Uvicorn running on http://127.0.0.1:8000 (Press CTRL+C to quit)
  6. Setup a svelte project: You can use any framework you like
    terminal
    npm create svelte@latest sse
    cd sse
    npm install
  7. Overwrite content of sse/src/routes/+page.svelte with the code below:
    svelte
    <script>
    let message = "";
    async function SSE()
    {
      const evtSource = new EventSource("http://localhost:8000/SSE");
      evtSource.addEventListener("message", function(event) {
          let data = JSON.parse(event.data);
          message = data.message;
      });
      evtSource.addEventListener("end", function(event) {
          let data = JSON.parse(event.data);
          message = data.message;
          evtSource.close();
      });
    }
    </script>
    <h1>SSE</h1>
    <p><a on:click={SSE} href="#">Click</a> to start SSE</p>
    <p>{message}</p>
  8. Start svelte server:
    terminal
    npm run dev -- --open
  9. Open http://localhost:5173 and click to start SSE: You should be able to see the messages in your browser Browser showing SSE working