Technical
Jan 9, 2024
Aneesh Arora
Cofounder and CTO
Learn why and how to set up real-time updates in your web app using Server-Sent Events using FastAPI
Having settled on Server-Sent Events (SSE) for Afterword, our next step was to implement it practically. For Afterword's backend, we chose FastAPI, a leading Python framework, aligning with our decision to use Python for our AI functionalities. I'll delve into Afterword's complete tech stack in a subsequent, more detailed blog post.
SSE implementation involves two main components: the server and the client. On the server side, we needed to integrate SSE within the FastAPI framework. Regarding the client side, the implementation of SSE in JavaScript will be the same no matter which frontend framework you choose to work with unless you use HTMX (the frontend framework for javascript haters. Just kidding, it’s really cool do check it out).
When it came to implementing SEE using FastAPI back in July 2022 I could find only two articles on the internet and neither of them were very helpful. There might be more articles today but all of them fail to explain certain basic concepts clearly. Hence I am going to attempt to mitigate that gap.
FastAPI, built upon the Starlette framework, can leverage a third-party library specifically designed for Server-Sent Events (SSE), namely sse-starlette . Implementing SSE in FastAPI using this library is straightforward. Instead of returning a typical response from a FastAPI route, you use EventSourceResponse(generator_function()).
To understand this better, let's clarify what a generator function is. In Python, a generator function is used to return a sequence of values over time, each time it's iterated, by employing the yield statement. This differs from a normal function, which returns a single value using the return statement.
For instance, a standard function is structured as follows:
def normal_function():
#Perform some action
return final_value
In contrast, a generator function looks like this:
def generator_function():
#Perform some action
yield first_value
#Perform more action
yield second_value
#Perform even more action
yield final_value
Each yield in the generator function produces a value that can be sent to the client as part of an SSE stream. This allows for real-time data transmission to the client, making it an ideal approach for applications that require continuous data updates, like Afterword.
In FastAPI, a standard route for a simple request-response cycle looks quite straightforward. Here's an example of such a route:
@app.get("/normal")
def hello_world():
return {"Hello": "World"}
This route responds with a simple JSON object upon a GET request.
However, for a route that implements Server-Sent Events (SSE), the structure changes to accommodate the SSE pattern. This is achieved by returning an `EventSourceResponse` with a generator function. Here's how the SSE route would look:
@app.get("/SSE")
def server_sent_events():
return EventSourceResponse(generator_function())
With the generator function you've described, this SSE route will send three separate updates to the client (browser) before ceasing transmission. Each yield in your generator function corresponds to a separate update sent to the client.
When using SSE, it's crucial that the messages sent to the browser adhere to the structure specified in the HTML format. Each message consists of various fields, each on a new line, defined as fieldname: value. The key fields include:
All other field names are ignored by the browser.
Events are optional but they are helpful. For example by setting an event name - “end” we can tell the browser when to stop listening for more messages. Of course the same can be achieved based on certain input received through the data field as well.
In your generator function, the messages are structured to include these key fields. For instance:
def generator_function():
yield {
"event": "event_name",
"id": event_id, #any unique id
"retry": 15000, #15s
"data": json.dumps({"message": "message text"}) #actual data being sent
}
Now the reason we use json.dumps is because Python data structures cannot be directly sent over a network. json.dumps serializes these structures into a string representation, which can be transmitted over the network.
Receiving Server-Sent Events (SSE) in the browser using JavaScript is relatively straightforward and involves setting up an EventSource object to listen for messages from the server. Here's a breakdown of how it works based on your provided code:
const evtSource = new EventSource(URL);
This line creates a new EventSource instance, connecting to the specified URL (where your FastAPI server sends SSE).
evtSource.addEventListener("event_name", function(event) {
let data = JSON.parse(event.data);
// Use the data
});
Here, you're adding an event listener for a specific event type, "event_name". When the event is received, the callback function is executed. The event data, which is a JSON string, is parsed back into a JavaScript object.
evtSource.addEventListener("end", function(event) {
evtSource.close();
});
This listener waits for an "end" event, signaling that no more messages will be sent. Upon receiving this event, it closes the EventSource connection.
evtSource.onerror = function(event) {
// Handle error
};
This function will be called if an error occurs with the EventSource connection. You can implement specific error handling logic here.
This setup ensures that your browser client is continuously listening for messages sent from your FastAPI server via SSE. It processes each message as it arrives and responds accordingly, whether that's updating the UI, triggering further actions, or closing the connection. This method is efficient for real-time applications, as it reduces the need for repeated polling and provides a more interactive user experience.
Now that we know how to send and receive SSE let us look at a full example.
In this we will assume that we are looping over an array and generating data and sending to the browser.
pip install fastapi
pip install uvicorn[standard]
pip install sse-starlette
from fastapi import FastAPI
from sse_starlette.sse import EventSourceResponse
import time
import json
from fastapi.middleware.cors import CORSMiddleware
app = FastAPI()
# Set up CORS middleware
app.add_middleware(
CORSMiddleware,
allow_origins=["*"], # Allows all origins
allow_credentials=True,
allow_methods=["*"], # Allows all methods
allow_headers=["*"], # Allows all headers
)
@app.get("/SSE")
async def SSE():
return EventSourceResponse(generator_function())
RETRY_TIMEOUT = 15000 #15s
async def generator_function():
#We are sleeping to simulate CPU processing
time.sleep(2) #sleep for 2 seconds
yield {"event": "message","id": 1,"retry": RETRY_TIMEOUT,"data": json.dumps({"message":"1st SSE"})}
time.sleep(1)
yield {"event": "message","id": 2,"retry": RETRY_TIMEOUT,"data": json.dumps({"message":"2nd SSE"})}
#Loop over a list and send SSE
messages = ["data 1","data 2","data 3"]
for message in messages:
time.sleep(1)
yield {"event": "message","id": 3,"retry": RETRY_TIMEOUT,"data": json.dumps({"message":message})}
time.sleep(1)
yield {"event": "end","id": 4,"retry": RETRY_TIMEOUT,"data": json.dumps({"message":"last SSE"})}
app.add_middleware(
CORSMiddleware,
allow_origins=["*"], # Allows all origins
allow_credentials=True,
allow_methods=["*"], # Allows all methods
allow_headers=["*"], # Allows all headers
)
This sets up CORS to allow requests from any origin, which is crucial for API accessibility from different domains. Since we will be making requests using javascript.
uvicorn sse:app
INFO: Started server process [22876]
INFO: Waiting for application startup.
INFO: Application startup complete.
INFO: Uvicorn running on http://127.0.0.1:8000 (Press CTRL+C to quit)
npm create svelte@latest sse
cd sse
npm install
<script>
let message = "";
async function SSE()
{
const evtSource = new EventSource("http://localhost:8000/SSE");
evtSource.addEventListener("message", function(event) {
let data = JSON.parse(event.data);
message = data.message;
});
evtSource.addEventListener("end", function(event) {
let data = JSON.parse(event.data);
message = data.message;
evtSource.close();
});
}
</script>
<h1>SSE</h1>
<p><a on:click={SSE} href="#">Click</a> to start SSE</p>
<p>{message}</p>
npm run dev -- --open