Async Code Execution

utility functions for scaling code execution

source

run_with_retry

 run_with_retry (max_retry:int=1, errors_to_retry_tp:Tuple=None)
class e(Exception):
    pass

class b(Exception):
    pass



if ():
    print('hi')

source

gather_with_concurrency

 gather_with_concurrency (*coros, n=60)

limits the number of open coroutines at a time.

Type Default Details
coros VAR_POSITIONAL list of coroutines to await
n int 60 number of open coroutines

sample implementation of gather_with_concurrency

async def sleeper(duration):
    print(duration)
    await asyncio.sleep(duration)


await gather_with_concurrency(
    *[sleeper(random.randint(0, 3)) for index in list(range(1, 20))], n=5
)
3
3
0
3
0
2
0
2
2
2
3
3
2
0
3
2
3
3
2
[None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None]

source

run_sequence

 run_sequence (*functions:Awaitable[Any])

executes a sequence of functions

Type Details
functions Awaitable comma separated list of functions
Returns None no explicit return

Sample Implementation of run_sequence

async def t1():
    print("running t1")
    await asyncio.sleep(1)
    print("done running t1")


async def t2():
    print("running t2 next")
    await asyncio.sleep(3)
    print("done running t2")


async def t3():
    print("running t3 next")
    await asyncio.sleep(2)
    print("done running t3")


await_ls = [t1(), t2(), t3()]

await run_sequence(*await_ls)

# run_sequence uses the same syntax as asyncio.gather().  the following code sample is the same
# await run_sequence( t1(),t2(),t3())

# run_sequence forces sequential code execution as opposed to asyncio.gather
# await asyncio.gather(*await_ls)
running t1
done running t1
running t2 next
done running t2
running t3 next
done running t3
[None, None, None]

source

chunk_list

 chunk_list (obj_ls:list[any], chunk_size:int)
Type Details
obj_ls list list of entities to split into n chunks
chunk_size int entities per sub list
Returns list returns a list of chunk_size lists of objects

sample implementation of chunk_list

num_ls = list(range(50))

# each list contains six elements
chunk_list(num_ls, 6)
[[0, 1, 2, 3, 4, 5],
 [6, 7, 8, 9, 10, 11],
 [12, 13, 14, 15, 16, 17],
 [18, 19, 20, 21, 22, 23],
 [24, 25, 26, 27, 28, 29],
 [30, 31, 32, 33, 34, 35],
 [36, 37, 38, 39, 40, 41],
 [42, 43, 44, 45, 46, 47],
 [48, 49]]
# #| export
# async def chunk_fn(chunk, api_fun, idx, sleep_time, session, list_id):
#     print(f'sleeping {idx} and {len(chunk)}')

#     res = await asyncio.gather(*[api_fun(row=row, session=session, list_id=list_id) for row in chunk])

#     await asyncio.sleep(sleep_time)
#     print(f'end_sleep {idx}')
#     return res


# #| export
# async def api_request_in_chunks(full_list, api_fn, api_limit_size, list_id, sleep_time=10):
#     chunked_list = chunk_list(tlist=full_list, chunk_size=api_limit_size)
#     session = httpx.AsyncClient(request_class=OAuthRequest)
#     res = await run_sequence(
#         *[chunk_fn(chunk, api_fn, idx, sleep_time=sleep_time, session=session, list_id=list_id) for idx, chunk in
#           enumerate(chunked_list)])
#     await session.aclose()
#     return res