Upload Data to Domo


source

upload_data

 upload_data (data_fn, instance_auth:domolibrary.client.DomoAuth.DomoAuth,
              consol_ds:domolibrary.classes.DomoDataset.DomoDataset,
              partition_key:str=None, upload_method:str='REPLACE',
              is_index:bool=False, debug_prn:bool=False,
              debug_fn:bool=True, debug_api:bool=False,
              logger:domolibrary.client.Logger.Logger=None,
              max_retry:int=2)
Type Default Details
data_fn data function to execute
instance_auth DomoAuth instance to run the data function against
consol_ds DomoDataset dataset where data should be accumulated
partition_key str None if partition key supplied, will replace existing partition
upload_method str REPLACE
is_index bool False index dataset
debug_prn bool False
debug_fn bool True
debug_api bool False
logger Logger None
max_retry int 2 number of times to attempt upload

sample implementation of upload_data with loop

# import domolibrary.classes.DomoBootstrap as dmbsr
# import httpx
# import pandas as pd


async def data_fn(
    instance_auth: dmda.DomoFullAuth,  # this API requires full auth
    session: httpx.AsyncClient = None,
    debug_api: bool = False,
) -> pd.DataFrame:
    """function to call.  must return a dataframe."""
    try:
        bsr = dmbsr.DomoBootstrap(auth=instance_auth)
        instance_features = await bsr.get_features(debug_api=debug_api, session=session)

        upload_df = pd.DataFrame(instance_features)
        upload_df["instance"] = instance_auth.domo_instance

        return upload_df

    except Exception as e:
        print(f"getting data : unexpected error: {e}")
        return None


### get_auth
auth = dmda.DomoTokenAuth(
    domo_instance=os.environ['DOMO_INSTANCE'],
    domo_access_token=os.environ["DOMO_ACCESS_TOKEN"],
)

# confirm retrieves token
assert isinstance(await auth.get_auth_token(), str)

ds_id = "44c5af30-ea04-49e4-9d7a-529afd223590"
ds = await dmds.DomoDataset.get_by_id(dataset_id=ds_id, auth=auth)

await upload_data(
    instance_auth=auth,  # instance where the data_fn function will execute against
    consol_ds=ds,
    partition_key=auth.domo_instance,
    data_fn=data_fn,
    is_index=True,
    debug_fn=True,
    debug_api=False,
    max_retry=2,
)
warning this token has not been validated by who_am_i, run get_auth_token first
adjusting num_stacks_to_drop, consider revising `get_traceback` call
{'stack_length': 18, 'module_index': 12, 'num_stacks_to_drop_passed': 5}
๐Ÿ starting domo-community - data_fn
getting data : unexpected error: ๐Ÿ›‘  InvalidAuthTypeError ๐Ÿ›‘ - function: get_bootstrap || This API rquires DomoFullAuth at domo-community
adjusting num_stacks_to_drop, consider revising `get_traceback` call
{'stack_length': 18, 'module_index': 12, 'num_stacks_to_drop_passed': 5}
no data to upload for domo-community: 44c5af30-ea04-49e4-9d7a-529afd223590 in domo-community
adjusting num_stacks_to_drop, consider revising `get_traceback` call
{'stack_length': 18, 'module_index': 12, 'num_stacks_to_drop_passed': 5}
๐Ÿฅซ successfully indexed demo_instance_features in domo-community
# # | export
# async def upload_data_with_date(
#     instance_auth,
#     data_fn,
#     consol_ds,
#     partition_date_col,
#     partition_delimiter,
#     start_date,
#     end_date,
#     debug_api: bool = False,
#     debug_prn: bool = False,
# ):

#     instance_session = httpx.AsyncClient()

#     print(
#         f"'๐ŸŽฌ upload_with_data: starting retrieval {start_date}, {end_date}, {instance_auth.domo_instance}"
#     )

#     upload_df = await data_fn(
#         instance_auth=instance_auth,
#         session=instance_session,
#         start_date=start_date,
#         end_date=end_date,
#         debug=debug,
#     )

#     await instance_session.aclose()

#     if not isinstance(upload_df, pd.DataFrame):
#         print(f"๐Ÿ›‘ error no data returned {instance_auth.domo_instance}")
#         print(upload_df)
#         return None

#     if debug_prn:
#         print(
#             f"๐Ÿงป upload_with_data: starting upload {len(upload_df)} rows for {instance_auth.domo_instance}"
#         )

#     task = []

#     for index, partition_set in upload_df.drop_duplicates(
#         subset=[partition_date_col]
#     ).iterrows():
#         partition_date = partition_set[partition_date_col]

#         partition_key = (
#             f"{instance_auth.domo_instance}{partition_delimiter}{str(partition_date)}"
#         )

#         task.append(
#             consol_ds.upload_data(
#                 upload_df=upload_df[(upload_df[partition_date_col] == partition_date)],
#                 upload_method="REPLACE",
#                 partition_key=partition_key,
#                 is_index=False,
#                 debug_api=debug_api,
#                 debug_prn=debug_prn,
#             )
#         )

#     res = await asyncio.gather(*task)

#     if debug_prn:
#         print(
#             f"๐ŸŽ‰ upload_with_data : finished uploading {len(upload_df)} rows for {instance_auth.domo_instance}"
#         )
#     return res