Concurrent API requests with Python by example
APIs are one of the things we often come across when working with data. In this blogpost we’ll have a look at some ways of extracting data out of them with Python in an efficient way. For small amounts of data we could write a simple script that sends one request at a time. For large amounts of data this is no longer a recommended approach. Each response arrives with some latency. During this waiting period the script is idle. When we do a lot of requests, the total waiting time starts adding up. Retrieving all information in one big request is not a solution either since oftentimes this would result in timeout errors. We’ll need to tackle this problem in a different way. Luckily there are some options. We’ll have a look at three of them: threading, asyncio and multiprocessing. In our examples we’ll only use very basic GET-requests. For those who want to dive deeper into the requests library: check out the docs (link in further reading). Time for some action, but first let’s make sure that all the required software is installed.
- 1. Brief introduction to concurrency
- 2. Configuring the mock API
- 3. Python code examples
- 4. From dataframe to… almost everything
- 5. Conclusion and further reading
- 6. Python setup used
Required software
- Visual Studio Code with Python installed (freeware)
- Mockoon (freeware / open source): tool for mocking API requests locally
1. Brief introduction to concurrency
1.1. Threading
1.1.1 What is threading?
A thread is a separate flow of execution. By using threads Python can handle multiple tasks concurrently. However, only one thread can run at a time due to interactions with the Global Interpreter lock (GIL). So threads will not speed up all tasks. They’re mostly useful for tasks that spend much of their time waiting like GUI’s and applications with a lot of I/O. For CPU intensive tasks it’s better to proceed with multiprocessing.
It’s also important to note that switching between threads is done by the operating system and can take place at any point in time. This makes it harder to debug threaded applications. Variables that are shared between threads could also cause race conditions when not properly implemented. This can be avoided by adding locking mechanisms. Choosing the appropriate lock granularity is not always easy however. Moreover adding locks could increase the risk of deadlocking. So working with threads always requires a certain amount of caution.
1.1.2. The Queue/Worker flow
We want communication between our threads, but also want to stay away from manual lock configurations. This is the perfect scenario to bring in queues. This is a Python class that implements all the required locking semantics for you. We populate the queue from the main thread and start up some worker threads to process the queue items.
The same can be done with asyncio, but here the worker threads will be replaced by worker tasks. Easily put, tasks are wrappers for coroutines and coroutines are objects scheduled in the asyncio event loop. All tasks are processed in the same process and thread, but context switching is possible at places that you specify to reduce the idle time.
For multiprocessing the same flow can be applied. This time the worker threads will be replaced by worker processes.
The Queue/Worker-flow is visualized in Figure 1.
1.2. Asyncio
1.2.1 What is asyncio?
We had a look at threading. But what is asyncio and why would it be useful in this case? Asyncio can be used to run a program asynchronously within one thread. The core of asyncio is the event loop. This loop keeps track of all the running tasks and wakes them up when needed. Tasks can be checked to see if they’re for example done or cancelled. A Task, in turn, wraps a Coroutine. This is the code that’s actually getting executed when a Task is activated.
1.2.2 Asyncio versus threading
The main difference with threading is that you choose when the program is allowed to switch to another task. This is done by setting checkpoints, recognizable by the keyword await. With threading switching is handled automatically by the operating system.
Implementing asyncio can be challenging at first but can make the application perform better while keeping debugging relatively easy. Another thing to keep in mind is that you need to use different libraries for asynchronous programming. For our API-requests we used aiohttp instead of requests.
1.3. Multiprocessing
Although this option is not recommended for CPU-inexpensive tasks with a lot of I/O, I’ll demonstrate it to show similarities and dissimilarities with the two previous options. With multi-processing the limitations of the GIL can be circumvented, what makes it the go-to solution for CPU expensive tasks. Multiprocessing gives each process its own Python interpreter and memory space. So this involves more overhead than threading. In this example you will also see that communication between the processes adds some complexity to the script due to the fact that our workers do not share memory by default.
2. Configuring the mock API
To have full control, we first configure a mock API locally with Mockoon. This is a free opensource tool that can be downloaded from: https://mockoon.com/download/
- Install and open the application
- Add a new route named: inventory/:type to the demo API
- Change the body to:
1{
2"id": "{{queryParam 'id'}}",
3{{# switch (urlParam 'type')}}
4{{# case 'products'}}
5"name": "{{faker 'commerce.product'}}",
6"price": "{{faker 'commerce.price'}} EUR",
7"quantity": "{{faker 'random.number' 50}}"
8{{/ case}}
9{{# case 'materials'}}
10"name": "{{faker 'commerce.productMaterial'}}",
11"price": "{{faker 'commerce.price'}} EUR",
12"quantity": "{{faker 'random.number' 50}}"
13{{/ case}}
14{{/ switch}}
15}
- Set a delay to make it all a bit more realistic. I’ve chosen a delay of 200 ms in this case.
- Make sure that the response code is configured to 200. The python script checks the response code after each request before adding the output to the results. The complete configuration is shown in figure 2.
Make sure that the server is started (press the play button) and test the API in the web browser by entering the following two URL’s. The results are shown in figure 3.
http://localhost:3000/inventory/materials?id=1 http://localhost:3000/inventory/products?id=1
The mock-API is running. Now we’re ready to start in Python.
3. Python code examples
3.1. Python code example for threading
First we’ll start with the threading example. 50 requests for materials and 50 requests for products are added to the queue. This queue is then processed by 3 workers that each have their own thread. Each worker consists of a while loop that continuously sends GET-requests to the API. A request session was opened before starting the loop to make sure that connection to the server stays open and configured for all the consecutive requests.
When the entire queue is processed all results are brought together in one dataframe. For all the failed requests the category and id are stored in a list. A set is created to store the unique combinations of worker id, process id and thread id. The code and results are shown below.
Code
1import threading, queue
2import os
3import requests
4import pandas as pd
5import time
6from pandas.core.frame import DataFrame
7
8base_url = 'http://localhost:3000'
9num_workers = 3
10records = 50
11
12results = []
13failed_requests = []
14Ids = set()
15
16def worker(worker_num:int, q:queue) -> None:
17 with requests.Session() as session:
18 while True:
19 Ids.add(f'Worker: {worker_num}, PID: {os.getpid()}, TID: {threading.get_ident()}')
20 category, id = q.get()
21 endpoint = f'/inventory/{category}?id={id}'
22 print(f'WORKER {worker_num}: API request for cat: {category}, id: {id} started ...')
23 response = session.get(url=base_url+endpoint)
24 if response.ok:
25 results.append(response.json())
26 else:
27 failed_requests.append((category, id))
28 q.task_done()
29
30def main() -> DataFrame:
31 # Create queue and add items
32 q = queue.Queue()
33 for category in ('materials', 'products'):
34 for id in range(records):
35 q.put((category,id))
36
37 # turn-on the worker thread(s)
38 # daemon: runs without blocking the main program from exiting
39 for i in range(num_workers):
40 threading.Thread(target=worker, args=(i, q), daemon=True).start()
41
42 # block until all tasks are done
43 q.join()
44
45 # make dataframe of results
46 return pd.DataFrame(results)
47
48if __name__ == "__main__":
49 print('THREADING')
50 start_time = time.time()
51 df = main()
52 print(f'\nDataframe ({len(failed_requests)} failed requests, {len(results)} successful requests)\n {df.head()}')
53 print("\n--- %s seconds ---" % (time.time() - start_time))
54 print(list(Ids))
Result
1THREADING
2WORKER 0: API request for cat: materials, id: 0 started ...
3...
4WORKER 2: API request for cat: products, id: 49 started ...
5
6Dataframe (0 failed requests, 100 successful requests)
7
8 id name price quantity
90 2 Wooden 970.00 EUR 49
101 0 Metal 813.00 EUR 26
112 1 Frozen 387.00 EUR 28
123 3 Granite 537.00 EUR 12
134 4 Fresh 865.00 EUR 10
14
15--- 9.845561742782593 seconds ---
16['Worker: 1, PID: 17920, TID: 17488', 'Worker: 0, PID: 17920, TID: 19796', 'Worker: 2, PID: 17920, TID: 10608']
As you can see in the results, all workers ran under the same process id (PID). The thread id (TID) was different for each worker. You’ll see that increasing the number of workers will result in a lower runtime.
3.2. Python code example for AsyncIO
We do the same but this time the workers will be coroutines that are placed in an event loop. The code and results are shown below again:
Code
1import asyncio
2import aiohttp
3import threading, os
4import time
5import pandas as pd
6from pandas.core.frame import DataFrame
7
8base_url = 'http://localhost:3000'
9num_tasks = 3
10records = 50
11
12results = []
13failed_requests = []
14Ids = set()
15
16async def task(task_id, work_queue) -> None:
17 async with aiohttp.ClientSession() as session:
18 while not work_queue.empty():
19 Ids.add(f'Task: {task_id}, PID: {os.getpid()}, TID: {threading.get_ident()}')
20 category, id = await work_queue.get()
21 endpoint = f'/inventory/{category}?id={id}'
22 async with session.get(base_url+endpoint) as response:
23 print(f'TASK {task_id}: API request for cat: {category}, id: {id}')
24 j = await response.json()
25 status = response.status
26 if status < 400:
27 results.append(j)
28 else:
29 failed_requests.append((category, id))
30 work_queue.task_done()
31
32async def main() -> DataFrame:
33 # Create the queue of work
34 q = asyncio.Queue()
35
36 # Put some work in the queue
37 for category in ('materials', 'products'):
38 for id in range(records):
39 await q.put((category,id))
40
41 # Schedule tasks concurrently
42 await asyncio.gather(
43 *[asyncio.create_task(task(task_id, q)) for task_id in range(num_tasks)]
44 )
45
46 # make dataframe of results
47 return pd.DataFrame(results)
48
49if __name__ == "__main__":
50 print('ASYNCIO')
51 start_time = time.time()
52 df = asyncio.get_event_loop().run_until_complete(main())
53 print(f'\nDataframe ({len(failed_requests)} failed requests, {len(results)} successful requests)\n {df.head()}')
54 print("\n--- %s seconds ---" % (time.time() - start_time))
55 print(list(Ids))
Result
1ASYNCIO
2TASK 0: API request for cat: materials, id: 0
3...
4TASK 0: API request for cat: products, id: 49
5
6Dataframe (0 failed requests, 100 successful requests)
7
8 id name price quantity
90 0 Concrete 590.00 EUR 24
101 2 Soft 737.00 EUR 16
112 1 Rubber 637.00 EUR 33
123 3 Plastic 551.00 EUR 41
134 4 Granite 641.00 EUR 50
14
15--- 9.883800745010376 seconds ---
16['Task: 1, PID: 536, TID: 27600', 'Task: 0, PID: 536, TID: 27600', 'Task: 2, PID: 536, TID:27600']
3.3. Python code example for multiprocessing
The last example shows the same flow but this time with multiprocessing. I wouldn’t recommend multiprocessing for this kind of application, but I just show it to complete the comparison. The code and results are shown below.
Code
1import multiprocessing
2import requests
3import pandas as pd
4import time
5import threading, os
6from pandas.core.frame import DataFrame
7
8base_url = 'http://localhost:3000'
9num_workers = 3
10records = 50
11
12def worker(worker_num, q, results, Ids, failed_requests) -> None:
13 with requests.Session() as session:
14 while True:
15 Ids[f'Worker: {worker_num}, PID: {os.getpid()}, TID: {(threading.get_ident())}'] = 1
16 category, id = q.get()
17 endpoint = f'/inventory/{category}?id={id}'
18 print(f'WORKER {worker_num}: API request for cat: {category}, id: {id} started ...')
19 response = session.get(url=base_url+endpoint)
20 if response.ok:
21 results.append(response.json())
22 else:
23 failed_requests.append((category, id))
24 q.task_done()
25
26def main(results, Ids, failed_requests) -> DataFrame:
27 q = multiprocessing.JoinableQueue()
28 for category in ('materials', 'products'):
29 for id in range(records):
30 q.put((category,id))
31 for i in range(num_workers):
32 multiprocessing.Process(target=worker, args=(i,q,results,Ids,failed_requests), daemon=True).start()
33
34 q.join()
35 return pd.DataFrame(list(results))
36
37if __name__ == "__main__":
38 manager = multiprocessing.Manager()
39 results = manager.list()
40 failed_requests = manager.list()
41 Ids = manager.dict()
42
43 print('MULTIPROCESSING')
44 start_time = time.time()
45 df = main(results, Ids, failed_requests)
46 print(f'\nDataframe ({len(failed_requests)} failed requests, {len(results)} successful requests)\n {df.head()}')
47 print("\n--- %s seconds ---" % (time.time() - start_time))
48 print(list(dict(Ids).keys()))
Result
1MULTIPROCESSING
2WORKER 1: API request for cat: materials, id: 0 started ...
3...
4WORKER 1: API request for cat: products, id: 49 started ...
5
6Dataframe (0 failed requests, 100 successful requests)
7
8 id name price quantity
90 0 Soft 153.00 EUR 17
101 1 Granite 732.00 EUR 42
112 2 Granite 893.00 EUR 39
123 3 Plastic 786.00 EUR 47
134 4 Cotton 260.00 EUR 43
14
15--- 10.39824652671814 seconds ---
16['Worker: 1, PID: 7116, TID: 21532', 'Worker: 0, PID: 28516, TID: 2228', 'Worker: 2, PID: 27424, TID: 21780']
Sharing objects between processes is a bit more difficult than in the previous examples. To realize this, I added a multiprocessing Manager to share lists between the worker processes. A set cannot be created via the Manager, so instead I created a dictionary and added the PID/TID/Worker combinations as key elements to get the same effect (only store the unique combinations of Worker ID, process id and thread id).
I used the JoinableQueue class from the multiprocessing library since the default Queue object from this package does not have the join or task_done methods. As expected in the results we see that each worker has its own process- and thread id. The total runtime is also higher due to the extra overhead of multiprocessing.
4. From dataframe to… almost everything
All the sample scripts ended by returning a Pandas dataframe object. You can think of this as a Python equivalent for a spreadsheet. Just like in Excel, it’s very easy to apply transformations or make sub selections.
When the dataframe has the desired format, you can easily push it to its next destination. This can be an excel- or csv-file, a json- or xml-file, a database, the clipboard, markdown or html, …
The best part is that all of this can be done with a single instruction:
1[DATAFRAME OBJECT].to_....(args)
For Power BI users there’s even more! Power BI allows you to use Python scripts as data source in PowerQuery. The dataframe produced by this script is converted to a table. All you need to do for this is
- Open PowerBI and go to options. Open the Python scripting menu and configure the Python home directory.
- Click on Get data in the home tab. Go to other and select Python script.
- Paste the script and press ok.
5. Conclusion and further reading
We had a look at three different ways of concurrency. Each with their advantages and disadvantages. Threading is good for CPU-inexpensive tasks with a lot of I/O or for building GUI’s. The biggest disadvantage is the lack of control for context switching which can make debugging harder, especially for larger applications.
Then we had a look at asyncio. This setup allowed to run tasks asynchronously within the same thread. The advantage here is the ability to specify where context switching can take place. The disadvantage is the added complexity for programming and the need for async libraries.
Lastly we had a look at multiprocessing, although this is actually used for CPU-expensive tasks and not for I/O-heavy tasks like our API-requests. Here we saw that it’s a bit more complex to share objects between processes. The total runtime was also bigger which suggests that there was more overhead than in the previous two methods.
That was the brief introduction by example. For those who wish to explore the subject further. Here are some good references to start with:
Further reading requests and aiohttp packages:
Further reading concurrency
- https://realpython.com/python-concurrency/
- https://realpython.com/python-gil/
- https://realpython.com/intro-to-python-threading/
- https://realpython.com/async-io-python/
- https://docs.python.org/3/library/threading.html
- https://docs.python.org/3/library/asyncio.html
- https://docs.python.org/3/library/multiprocessing.html
Further reading Pandas dataframes
- https://pandas.pydata.org/docs/search.html?q=Dataframe.to_
- https://docs.microsoft.com/en-us/power-bi/connect-data/desktop-python-scripts
6. Python setup used
- Python V3.8.6 64bit
- aiohttp==3.7.4.post0
- async-timeout==3.0.1
- pandas==1.2.2
- requests==2.25.1
- Microsoft Windows 10 64bit