Asynchronous Python LLM APIs | FastAPI, Redis, AsyncIO

Code with Irtiza · Intermediate ·🔧 Backend Engineering ·1y ago

Key Takeaways

This video demonstrates building an asynchronous LLM API using FastAPI, Redis, and AsyncIO, allowing for parallel processing of LLM tasks in the background. It covers setting up a task queue, using a thread pool for concurrent task execution, and monitoring task progress using Redis.

Full Transcript

All right, folks. Today I'll show you how you can build an API that processes LLM tasks asynchronously in the background. So, we're going to build two endpoints. One is going to be a post endpoint. You're going to pass in your LLM prompts to the endpoint and it's going to process all of them in the background. While the processing is happening in the background, it's also going to give you a tracker that you can use to see the progress for each of the prompt that you initially gave. So that's the first endpoint and the second endpoint is going to be a get endpoint. U through the get endpoint uh you can pass in your task ids. These are the ids that the post endpoint will give you. And for every ID when you call the get endpoint, it's going to give you either the result if the LLM is done processing your request. Uh if not, it's going to give you some kind of a progress bar that you can use to see how far off are you. So these are the two endpoints that we're going to build. Uh I'll jump to the code very quickly. uh I think it's going to be around uh let me see around like 120 lines of code to build both the endpoints. So let's take a look at a demo where we go through the whole flow end to end. Once we finish the demo, we're going to jump into the code and I'm going to explain every function and how they are working. Okay, so let me quickly jump to the UI. Uh this is what fast API gives you. It's a swagger UI that you can use to essentially make curl request to your back end. And we also have the locks showing up here which should uh also give you an idea of how the uh how the API works. So let me restart the API at first. Okay. So it is using fast API. So I'm using the uv corn command and I'm going to refresh the get and post endpoints. All right. So let's start with the post endpoint u which takes in it takes in a list called prompts and every element in the list is going to be uh the question that you have. So we're going to quickly write four questions. These can be anything. So, I'll just write tell me a joke about the economy. And then let me just copy paste it four times. Uh we got the second, third, and fourth. And then we're just going to randomly choose things about let's say about animals, let's say about plants, and let's say about uh I don't know, let's say food. Okay, so we have four prompts and our our goal is to get answers from the LLM for all four of these prompts in parallel. It's very easy to do it sequentially because when you do it sequentially, you're essentially writing a for loop u and the for loop is going to go through each one of these prompts and for every prompt it's going to call openai get the answer give the response and only after that move on to the next one. But instead in this example we're going to do things in parallel asynchronously using Python's async IO and Reddis as a database. uh you're going to see where Reddus fits into into this system. So let me quickly kick these up. So I'm going to execute the command. And now in the response you see you're getting four task IDs. Each of the task ID is uh going to tell you about your progress for each of the prompt. So I'm going to quickly copy paste all the task IDs into our get endpoint. So we're going to so the get endpoint takes in uh task ids which is a string and each uh you're going to separate the task ids uh using a comma. So we have the first task ID comma we're going to pass in the second task ID. So it's going to be here. All right. So we're going to pass the third one now. So I'm going to copy paste again and then finally we're going to do the fourth task ID which is going to be the one over here. Okay, so we have four task ids and let me uh run the command now. Okay, so you can see that uh four of them or three of them has completed and the fourth one we're waiting. If I execute it again, it still says processing. So, we're going to keep waiting, and you're going to soon see why three of them finished immediately while the last one is taking a long time. Uh, for the last one, it says processing progress is 25. So, like 25%, we're going to keep running the command. It's still 25. And you're going to see if you go through the logs, let me let me hit the command. You're going to see that every time I'm uh triggering the get request, this is where the back end is receiving it. So, we need to give it a couple of minutes. Uh and there you go. Now, we have all four of the tasks completed. And once again, you saw that three of them complete immediately and the fourth one took some time to complete. We're going to soon see why uh that is the case. So, these are the two endpoints. Once again, we have a post endpoint that takes in a list of prompts and spits out uh the same number of task ID and then we have a get endpoint that takes in a bunch of task ID and for each uh each of the task ID it gives me the result if available if not it gives me the progress and the status the status can be completed finished uh something like that. Now let's see how both these endpoints work in uh uh in code. Okay. So our two endpoints are here. The first endpoint is the create tasks endpoint. This is the post endpoint that we were using to uh kick off our requests. And then we have the get endpoint that shows me progress for each of the task. Let's start with the uh post endpoint and see how it works. So the first thing it does is uh for every prompt in the list, it is going to generate a task ID. The task ID is just a UU ID and this is the same UU ID that uh the same UYU ID that you get back when you have sent the prompts to the back end. Now we have a list of uh we have a list of task ID and then for every task we create uh create an entry in a reddis database or reddis cache. We're using the h set uh which is a set data struct structure in reddus and for every entry our key is going to be the task ID and the task ID is going to be mapped to an object which is going to have the status and the progress. So if we take a look at our get endpoint that's almost exactly what we're getting for each of the task we have the status pro progress and result and that is where radius comes into this uh into this system u we only use radius to either store the data or store the result if it's done processing um and or we store the status and progress. It doesn't have anything to do with how we're doing asynchronous programming. Instead, uh, its only job is to act as a board or like a sort of like a status board that you can use to see how where each of your prompts are, if they're finished, failed, or how long uh can you expect to wait again. So, for every prompt, we generate a task ID. We create an entry on uh on Reddus to uh see the progress. And this is where the as uh the asynchronous programming starts. So we're going to use the await command and then we're going to put a task in our task Q. Now what is our task Q? Let's scroll all the way up. Um and this is where we're instantiating our task Q, which is just a Q object from uh Async IO. And async io is the library we're importing to uh help us do asynchronous programming in Python. So this is where the global Q comes in. And this is the queue that's going to help us process everything parallelly. Without a queue, you would have to do it sequentially. That means if let's say one prompt takes 20 seconds and you have four prompts, it will take at least 80 seconds to process everything cuz you're going to take one task at a time. Whereas when you're using a Q from async IO uh in combination with a thread pool which we're going to come to in a little bit um you'll be able to process at least three cuz we're setting this to three at least three prompts in parallel. Okay. So we have the Q which is an async io Q. Now I'm going to jump back to the to the endpoint. Yeah. So we're going to uh put the task into the queue and then we're going to append the task ID which is just to give the response uh here. Okay. So again we have the prompts. This is where we are in a uh we're generating a yo-yo id setting an entry in reddis putting that prompt as a task in our queue for future processing and then just to send the client a response back immediately we're sending them all the task ids back. Okay. So now let's see how does the Q work here because I think that's where the key is like how is the uh Q storing these requests or prompts and how is OpenAI interacting with the queue. Uh all right so we're going to go all the way up and then we have the task Q and the thread pool. So let's see how this is working. Right here you go. So we have something called a background worker. Okay. So the background worker imagine uh either uh yeah some kind of a background process that is just working in the same server as your API. So this is not something that's running in the cloud but it is running in the same machine as your API. And the job of the worker is to wait, keep watching the queue, and whenever there is a new item in the queue, it's going to pick it up and start processing. Uh, and I'm going to show you what kind of processing it is doing. So, it's very simple. It's a async function called background worker. You're going to see it starts with while true cuz we want it to keep running until we shut off our uh API server. uh it waits uh with the test q.get. This is where it's almost pulling the queue uh and waiting until something becomes available. Uh and you see task ID and prompt cuz that's exactly what we put in the queue for every task. Uh whenever there is no element, it's just going to go back to while true and keep looping until it actually has something to act on. The moment it gets an item from the queue, uh, it starts our function called process llm task and, uh, as arguments, it's going to pass the task ID and prompt. Whenever this is done, it's going to mark the task as done. Now, let's see what exactly does this do. So, loop.run run an executor essentially uh tells the worker to execute this function or more specifically the process lm task function. Now uh we have not talked about how many workers are actually going to be available to uh pull tasks from the queue and that's where the thread pool comes in. So if we go up the thread pool is coming from this concurrent features library and I'm setting the max worker to three. What this means is um there are going to be three workers in a thread pool. So think of it as three individual threads all pulled together and every time there is a task in the queue, one of the worker is going to spin up a thread and do the processing of that task in that thread. Now while that is being processed, if there's a new task that comes into our task queue, we'll have the second worker pick it up, start a new thread, and independently process it. So at this point, we're going to have two workers independently and concurrently working on two separate tasks. And then if a third task comes in, uh we're going to spin up a third worker to do the exact same thing. Now it gets interesting when a fourth task comes in. When a fourth task comes in, now we don't have any worker available cuz all three of them are busy processing the three prompts that already came in. because we don't have that worker available. This fourth task is going to keep waiting in the queue until the one of the three tasks is done processing and then the worker that just got freed is going to pick up the fourth task. It's the same if for your fifth, sixth, and seventh task at a time. You can only work on three tasks at a time. And that is because we're setting max worker to three. If you want, you can set it to five or 10. I think there is some kind of an upper bound depending on the server resources. But three is a sweet number to work with. U if you if you expect your LLM to not take too long. U okay. So that's where the pool comes in. I'm going to go back to our background worker, uh which is here. So again for the background worker it's going to keep waiting for a task to become available in the queue. Whenever the uh task becomes available it's going to spin up a new thread and in that thread it's going to run the process lm task function. Whenever it's done running that function, uh, it's going to the worker is going to mark the task as done and then go back to the while true part of the loop and keep looking at the queue to grab the next task. Now, let's look at the process llm task function. So, that's the function here. Um, so we're going to go through again line by line for this one. The function takes in a task ID and the prompt. So if I go over here uh remember this is the task ID and the way it is getting it is uh from the task Q. Okay. So whenever we're writing a task to our task Q, we're giving it a task ID and a prompt. And it's the same task ID and prompt that is flowing through to our process LLM task function. Now let's see what the function does. Everything is in a try except uh so let's look at the try uh try block at first. So we are instantiating our open AI client with our API key and then we are setting our reddis progress bar to zero and status to processing. Now we just simulate some sleep here. So we sleep for 20 seconds and this is just to help us test and show more easily how the tasks are being executed in parallel. You can also imagine in this case it's a very simple prompt. So we don't expect OpenAI to take too long but it can be something much more complicated. Say you're processing a 4K image, doing some object object recognition uh or running some complex model uh with some kind of a detection, transcription or translation. All those can take significantly longer than uh just a simple question. So we're going to simulate that by just explicitly waiting for 202. So we wait for 20 seconds and then we just say that it's been 25% complete. After that we just call OpenAI with its GPT 3.5 turbo model. We again simulate a 30 secondond sleep. We set our progress bar to 75. Again, both the sleeping and the progress bar is just to show you how things are happening in the background while you're waiting for the LLM to proc to complete its processing. Finally, it's going to set the result to whatever OpenAI returned. It's going to set the status to completed and progress to 100. and the radius entry is going to have a TTL of 24 hours. Okay, so after 24 hours that entry should uh automatically be removed. And the reason we're removing it uh is the whole goal of keeping the Reddis client is to monitor progress of a task. When the task is uh when the task is completed, there's no reason to keep monitoring its progress. So we just delete it after 24 hours. Um, okay. So, that's the process LLM task. Now, if I go back to the post endpoint, uh, you're going to see that, uh, for every prompt, we do the processing in the background, but the moment you send the post request, you already get a response with all the task IDs and a status set to ceued. The way you monitor the progress for each of the task is through the get endpoint that we're going to talk about next. So uh the get endpoint takes in a query parameter called task ids. It is going to be a commaepparated list of all the task id. So if I move back to the API UI, uh so this is your post endpoint that takes in all the prompt and if you remember the moment you uh you call the API, you get a response immediately with four task ids and these are the same task ids you're going to use to monitor the progress or get the result for each of the four prompts over here. Okay, jumping back to the code. Uh you're going to see I'm just splitting the query parameter. So initially the query parameter is a commaepparated list of task ids. So it's going to turn it into a list of task ids. Then for every task in the list, we look at our reddish database. We get the object we stored for that given task ID and then we return it back to the client. If you give it a task ID that does not exist, you're going to get the not found status here. So just to see the get endpoint in action once again. Uh so I'm going to quickly run the get endpoint here and you can see that you get the same result. Now, if I change one of the task ID to something that does not exist, so some random number, you're going to see that the for the one I just changed, status is not found cuz this does not actually stay in our radius database. Um, okay. So, that's how the get endpoint works. Now, we have gone through the post endpoint and the get endpoint. Now the last thing remaining is just to show you why did we wait longer for the fourth task. So once again I'm going to quickly kick off the same four prompts but of course these are going to be new prompts. So let's go through it again. So I'm going to execute it. You're going to see that everything has started and I'm going to quickly copy paste the task IDs. So we have the first one. Whoops. here. And then we have our second one over here. And then we have our third one over here. And then we have our fourth one. Our fourth ones here. And then I'm going to execute it. And you're going to see that all three of them are processing 25% while the fourth one is still ceued. It's still the same. It's processing and the fourth one is still ceued. I'm going to keep running it a couple of times. Finally, you see all three of these completed at the same time. But the fourth one is processing. It's still processing cuz after one of these finished, only then were we able to pick up the fourth one. So, I'm going to keep waiting for a little bit. It is still it's starting to make some progress. And while it completes it, let me look at the code and show you why exactly that's happening. If I go all the way to the top, if you remember, I'm setting the number of workers to three. This means we can only work on three things at the same time. If a fourth job comes in, we need to wait until one of the first three job is completed before we pick up the fourth one. Now, instead if I change this to [Music] four. Okay. And then let me see. Yes. So if I change this to four now we should be able to do or we should be able to process four tasks at a time. So whereas previously we saw these three start immediately but the fourth one was in the ceued stage for a long time instead this time when I run it it should process all four at the same time. So let's try that out before we close out the video. So again, same four prompts. I'm going to execute it. It started. I'm going to quickly copy these. And so again, we have our first [Music] one, we have our second one over here. Then we have our third one here. And then we have our last one. Okay, I'm going to run execute. Um, okay. So, this is interesting, right? So, you still see three of them processing and the fourth is ceued. Whereas I told you, you should have seen all four to be processing. Now, this is where we're going to wait until it's finished at least with the three over here. And then I'm going to show you why exactly this is happening. Yes. So, it completed all three and then it started processing fourth. Now, let me show you why did that happen. So, we did set number of workers to four, right? Uh but the max workers over here is still three. That means even though I am kicking off four background workers, my thread pool has only three threads available. So even though I have the fourth worker ready to pick up uh work, it does not have a p a thread available in the pool to execute that work in. So we will need to match the number of workers with max workers. So, let's set this to four. Okay. And then I'm going to run it again. Uh, all right. There you go. Again, we have the four task IDs. I'm going to very quickly copy them over. Uh, so the first one, here's the second one. And then here's the third one. Uh, and then finally the fourth one. All right, there you go. Fourth one. We're going to run it. And now you're going to see all four of them are being processed at the same time. Unlike last time when we always had to wait for the fourth task. So, if we wait a little bit longer, you're going to see all four finish processing almost at the same time. So, we'll give it a couple of seconds. You see all four of them are finished processing almost around the same time. And that is because we made sure we have four workers and we have four threads available for these workers to independently work. Now finally uh all of the things you saw in this video they are working in memory within the API server. So if your API server was to go down all the workers would also stop working and something like this is not really scalable. You can uh build it to for a prototype or for a demo like I am here. But if you are productionizing something like that uh something like this, you need to uh you need to move these out of your API server. So if you were to think about it in principle, you have a task queue and then you have a bunch of workers grabbing tasks from the queue. So in a production setting, this is going to look like the task Q will be some kind of a message broker like Kafka uh or whatever you want to I think there's like Amazon SQS and many other that you can use and then the thread the thread pool uh where the workers are uh waiting uh the workers here can be anything if you are going with Kafka as your message broker it can be just Kafka consumers that can be scaled up or down independently. Or if you're using AWS uh SQS, so I think a simple Q service, it automatically makes sure you have workers that are pulling tasks from the queue. So the next video will be about that where I go over a message broker and how you can use a message broker very similar to how you're doing everything in memory within the API server. So hopefully that was helpful. If you have any questions, just leave them in the comments below. And I'll see you folks in the next one. Take care. Bye-bye.

Original Description

In this video we build an API to process LLM responses asynchronously in the background using Python's AsyncIO library that takes advantage of thread pools and background workers. 🔴 More from me: https://irtizahafiz.com?utm_source=youtube 🟢 Join my mailing list: https://irtizahafiz.com/newsletter?utm_source=youtube 0:00 API Demos 6:22 Async POST API 10:44 Python AsyncIO Task Queues & Threads 15:50 Async LLM Processing 19:15 GET API to track task progress 21:15 Threads, Workers & Parallel Processing 26:50 Productionizing Using Message Brokers
Watch on YouTube ↗ (saves to browser)
Sign in to unlock AI tutor explanation · ⚡30

This video teaches how to build an asynchronous LLM API using FastAPI, Redis, and AsyncIO, enabling parallel processing of LLM tasks. It covers key concepts such as task queues, concurrent task execution, and message brokers.

Key Takeaways
  1. Build an API that processes LLM tasks asynchronously in the background
  2. Create a post endpoint to process LLM prompts in parallel
  3. Create a get endpoint to retrieve task IDs and progress
  4. Use Redis as a database to store task IDs and progress
  5. Use async/await to process LLM tasks in parallel
  6. Set up a task queue using Redis
  7. Use a thread pool for concurrent task execution
  8. Monitor task progress using Redis
💡 Using a message broker like Kafka or Amazon SQS can help scale task queueing and improve production scalability

Related AI Lessons

Chapters (7)

API Demos
6:22 Async POST API
10:44 Python AsyncIO Task Queues & Threads
15:50 Async LLM Processing
19:15 GET API to track task progress
21:15 Threads, Workers & Parallel Processing
26:50 Productionizing Using Message Brokers
Up next
This Cop Was Held Accountable For His Brutality! #police #lawyer
Hampton Law
Watch →