class e(Exception):
pass
class b(Exception):
pass
if ():
print('hi')
Async Code Execution
utility functions for scaling code execution
run_with_retry
run_with_retry (max_retry:int=1, errors_to_retry_tp:Tuple=None)
gather_with_concurrency
gather_with_concurrency (*coros, n=60)
limits the number of open coroutines at a time.
Type | Default | Details | |
---|---|---|---|
coros | VAR_POSITIONAL | list of coroutines to await | |
n | int | 60 | number of open coroutines |
sample implementation of gather_with_concurrency
async def sleeper(duration):
print(duration)
await asyncio.sleep(duration)
await gather_with_concurrency(
*[sleeper(random.randint(0, 3)) for index in list(range(1, 20))], n=5
)
3
3
0
3
0
2
0
2
2
2
3
3
2
0
3
2
3
3
2
[None,
None,
None,
None,
None,
None,
None,
None,
None,
None,
None,
None,
None,
None,
None,
None,
None,
None,
None]
run_sequence
run_sequence (*functions:Awaitable[Any])
executes a sequence of functions
Type | Details | |
---|---|---|
functions | Awaitable | comma separated list of functions |
Returns | None | no explicit return |
Sample Implementation of run_sequence
async def t1():
print("running t1")
await asyncio.sleep(1)
print("done running t1")
async def t2():
print("running t2 next")
await asyncio.sleep(3)
print("done running t2")
async def t3():
print("running t3 next")
await asyncio.sleep(2)
print("done running t3")
= [t1(), t2(), t3()]
await_ls
await run_sequence(*await_ls)
# run_sequence uses the same syntax as asyncio.gather(). the following code sample is the same
# await run_sequence( t1(),t2(),t3())
# run_sequence forces sequential code execution as opposed to asyncio.gather
# await asyncio.gather(*await_ls)
running t1
done running t1
running t2 next
done running t2
running t3 next
done running t3
[None, None, None]
chunk_list
chunk_list (obj_ls:list[any], chunk_size:int)
Type | Details | |
---|---|---|
obj_ls | list | list of entities to split into n chunks |
chunk_size | int | entities per sub list |
Returns | list | returns a list of chunk_size lists of objects |
sample implementation of chunk_list
= list(range(50))
num_ls
# each list contains six elements
6) chunk_list(num_ls,
[[0, 1, 2, 3, 4, 5],
[6, 7, 8, 9, 10, 11],
[12, 13, 14, 15, 16, 17],
[18, 19, 20, 21, 22, 23],
[24, 25, 26, 27, 28, 29],
[30, 31, 32, 33, 34, 35],
[36, 37, 38, 39, 40, 41],
[42, 43, 44, 45, 46, 47],
[48, 49]]
# #| export
# async def chunk_fn(chunk, api_fun, idx, sleep_time, session, list_id):
# print(f'sleeping {idx} and {len(chunk)}')
# res = await asyncio.gather(*[api_fun(row=row, session=session, list_id=list_id) for row in chunk])
# await asyncio.sleep(sleep_time)
# print(f'end_sleep {idx}')
# return res
# #| export
# async def api_request_in_chunks(full_list, api_fn, api_limit_size, list_id, sleep_time=10):
# chunked_list = chunk_list(tlist=full_list, chunk_size=api_limit_size)
# session = httpx.AsyncClient(request_class=OAuthRequest)
# res = await run_sequence(
# *[chunk_fn(chunk, api_fn, idx, sleep_time=sleep_time, session=session, list_id=list_id) for idx, chunk in
# enumerate(chunked_list)])
# await session.aclose()
# return res