Python concurrency
Various notes on concurrency in Python. Main information source is FastAPI tutorial 1, realpython concurrency article 2 and official documentation 3.
TODO: check 2 and 3 in details.
The dictionary definition of concurrency is simultaneous occurrence.
In computer science concurrency is the ability of different parts or units of a computer, algorithm, or problem to be executed out-of-order or in partial order, without affecting the outcome. This allows for parallel execution of the concurrent units, which can significantly improve overall speed of the execution in multi-processor and multi-core systems. In more technical terms, concurrency refers to the decomposability of a program, algorithm, or problem into order-independent or partially-ordered components or units of computation.
— Wikipedia
Concurrency types:
- Pre-emptive multitasking (
threading
), single process/processors. Switch decision:
The operating system decides when to switch tasks external to Python. - Cooperative multitasking (
asyncio
), single process/processors. Switch decision:
The tasks decide when to switch tasks. - Multiprocessing (
multiprocessing
), multiple processes/processors. Difference betweenthreading
andasyncio
?
The processes all run at the same time on different processors. Number of processes/processors many.
Learning path
- Асинхронный python без головной боли (часть 1) / Хабр
- parallel_loops_in_python TODO: prepare book
- Asyncio complete tutorial
- Ускорение Python в 2 раза с помощью multiprocessing, async и MapReduce / Хабр
- python - How does asyncio actually work? - Stack Overflow
- An Intro to Threading in Python – Real Python
- asyncio — Asynchronous I/O Python 3.12.5 documentation
- Async IO in Python: A Complete Walkthrough – Real Python
- How Python Asyncio Works: Recreating it from Scratch
When concurrency is useful
Main concurrency use cases are CPU bound and I/O bound tasks.
Your program spends most of its time talking to a slow device, like a computer network connection, a hard drive, or a printer, this is ==O bound== process. Speeding it up involves overlapping the times spent waiting for these devices.
You program spends most of its time doing CPU operations, this is CPU bound process. Speeding it up involves finding ways to do more computations in the same amount of time with concurrency (solving the problem with more physical cores).
Concurrency and async / await
More detailed here: asyncio.
Coroutine (like Goroutines) is just the term for the thing returned by an
async def
function. Python knows that it is something like a function, that it
can start and that it will end at some point, but that it might be
paused internally too, whenever there is an await inside of it.
Coroutines declared with the async/await syntax is the preferred way of writing asyncio applications. For example, the following snippet of code prints “hello”, waits 1 second, and then prints “world”:
import asyncio
async def main():
print('hello')
await asyncio.sleep(1)
print('world')
asyncio.run(main()) # run main() coroutine
You can only use await
inside of functions created with ==async def
== functions.
To actually run a coroutine, asyncio provides the following mechanisms:
- The ==
asyncio.run()
== function to run the top-level entry point “main()” function. - Awaiting on a coroutine. The following snippet of code will print “hello”
after waiting for 1 second, and then print “world” after waiting for another 2
seconds:
import asyncio import time async def say_after(delay, what): # say_after coroutine await asyncio.sleep(delay) print(what) async def main(): print(f"started at {time.strftime('%X')}") await say_after(1, 'hello') # await on say_after coroutine await say_after(2, 'world') # await on say_after coroutine print(f"finished at {time.strftime('%X')}") asyncio.run(main())
- The ==
asyncio.create_task()
== function to runcoroutines
concurrently asasyncio
Tasks.async def main(): task1 = asyncio.create_task(say_after(1, "hello")) task2 = asyncio.create_task(say_after(2, "world")) print(f"started at {time.strftime('%X')}") # Wait until both tasks are completed (should take # around 2 seconds.) await task1 await task2 print(f"finished at {time.strftime('%X')}")
- The
asyncio.TaskGroup
class provides a more modern alternative tocreate_task()
. Using this API, the last example becomes:async def main(): async with asyncio.TaskGroup() as tg: task1 = tg.create_task(say_after(1, 'hello')) task2 = tg.create_task(say_after(2, 'world')) print(f"started at {time.strftime('%X')}") # The await is implicit when the context manager exits. print(f"finished at {time.strftime('%X')}")
Awaitable object is an object if it can be used in an ==await
== expression.
There are three main types of awaitable objects:
- Coroutines
- Tasks
- Futures, special low-level awaitable object that represents an eventual result
of an asynchronous operation (a good example of a low-level function that
returns a Future object is
loop.run_in_executor()
).
How to speed up an I/O bound program
Let’s say you have synchronous version of program that do I/O bound task.
import requests
import logging
import time
from requests.sessions import Session
def download_site(url: str, session: Session):
with session.get(url) as response:
print(f"Read {len(response.content)} from {url}")
def download_all_sites(sites: list[str]):
with requests.Session() as session:
for url in sites:
download_site(url, session)
if __name__ == "__main__":
sites = [
"https://www.jython.org",
"https://www.wikipedia.org/",
"http://olympus.realpython.org/dice",
] * 80
start_time = time.time()
download_all_sites(sites)
duration = time.time() - start_time
print(f"Downloaded {len(sites)} sites in {duration} seconds")
Why we used requests.Session
here?
Creating a Session object allows requests to do some fancy networking tricks and
really speed things up, we can reuse connection between requests.
In my case I/O bound task is downloading a bunch of websites, and it takes less than a minute to download each website with mine internet connection.
The big problem here is that it’s relatively slow compared to the other solutions, and sometimes you can’t afford to wait that long. To solve this problem, we can use threads.
Threading
import concurrent.futures
import requests
import threading
import time
thread_local = threading.local()
def get_session():
"""Since Session is not thread-safe, we need to create a new one for each
thread"""
if not hasattr(thread_local, "session"):
thread_local.session = requests.Session()
return thread_local.session
def download_site(url):
session = get_session()
with session.get(url) as response:
print(f"Read {len(response.content)} from {url}")
def download_all_sites(sites):
# Thread + Pool (threads union) + Executor (threads controller)
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
executor.map(download_site, sites) # threads generator
if __name__ == "__main__":
sites = [
"https://www.jython.org",
"http://olympus.realpython.org/dice",
] * 80
start_time = time.time()
download_all_sites(sites)
duration = time.time() - start_time
print(f"Downloaded {len(sites)} in {duration} seconds")
One major possible issue with threading is race conditions.
Race conditions happen because the programmer has not sufficiently protected data accesses to prevent threads from interfering with each other.
import concurrent.futures
counter = 0
def increment_counter(fake_value: int):
global counter
for _ in range(100):
counter += 1 # each of the threads needs to read the current value, add one to it, and the save that value back to the variable
if __name__ == "__main__":
fake_data = [x for x in range(5000)]
counter = 0
with concurrent.futures.ThreadPoolExecutor(max_workers=5000) as executor:
_ = executor.map(increment_counter, fake_data)
print(counter)
Can you explain problems with this code?
In order to increment counter, each of the threads needs to read the current
value, add one to it, and the save that value back to the variable. That happens
in this line: counter += 1
.
Because the operating system knows nothing about your code and can swap
threads at any point in the execution, it’s possible for this swap to
happen after a thread has read the value, but before it has had the chance to
write it back. If the new code that is running modifies counter
as well,
then the first thread has a stale copy of the data and trouble will
ensue (incorrect results).
Since this is rare situation, this type of problem quite difficult to debug.
asyncio
Detailed explanation available in asyncio note.
General concept of event loop?
The event loop object is aware of each task and knows what state it’s in.
An important point of asyncio
is that the tasks never give up control without
intentionally doing so. They never get interrupted in the middle of an
operation. This allows us to share resources a bit more easily in asyncio
than
in threading. You don’t have to worry about making your code thread-safe.