Job

A job refers to a scheduled task in an application.

For the purposes of this beta class, the only Job type implemented has been Watchdog. This class will be heavily revised in a future development sprint.

An additional Job type might be PDP Automation Toolkit.

token_auth = dmda.DomoTokenAuth(
    domo_instance="domo-alpha",
    domo_access_token=os.environ["ALPHA_ACCESS_TOKEN"],
)

source

DomoTrigger

 DomoTrigger (id:str, job_id:str,
              schedule:List[__main__.DomoTrigger_Schedule]=None)

source

DomoTrigger_Schedule

 DomoTrigger_Schedule (schedule_text:str=None,
                       schedule_type:str='scheduleTriggered',
                       minute:int=None, hour:int=None,
                       minute_str:str=None, hour_str:str=None)

Domo Job

Domno Job is an abstract base class to describe all DomoJob_Base types. A Job captures the parameters of an implementation of one task in the application.

The Job will be extended by implementations of an Application like RemoteDomoStats or DatasetWatchdog


source

DomoJob_Base

 DomoJob_Base (auth:domolibrary.client.DomoAuth.DomoAuth, name:str,
               application_id:str, logs_dataset_id:str=None, id:str=None,
               user_id:str=None, execution_timeout:int=1440,
               is_enabled:bool=False, customer_id:str=None,
               created_dt:datetime.datetime=None,
               updated_dt:datetime.datetime=None, description:str=None,
               execution_payload:dict=<factory>,
               share_state:dict=<factory>, accounts:List[str]=<factory>,
               triggers:List[__main__.DomoTrigger]=<factory>)

the base class only captures attributes applicable to all jobs (i.e. does not destructure execution_payload onto the class) build Application / Job extensions by creating children of the DomoJob_Base class

# test_job = await DomoJob_Base.get_by_id(
#     job_id="743c1c6f-80d5-4b47-b02e-0ea28f6a5683",
#     application_id="a99c3fd8-a0f6-4d06-9a1d-74f3d12293d4",
#     auth=token_auth,
#     return_raw=False,
# )

# test_job.to_json()