from dataclasses import dataclass, field
from typing import List
import mbison.client.core as dmda
Dataset
Dataset
Routes
get_datasets
get_datasets (auth:mbison.client.core.DomoAuth, debug_api:bool=False, return_raw:bool=False)
Exported source
def get_datasets(
bool = False, return_raw: bool = False
auth: dmda.DomoAuth, debug_api:
):= "/api/data/ui/v3/datasources/search"
endpoint
= {
payload "entities": ["DATASET"],
"filters": [],
"combineResults": "true",
"query": "*",
"count": 1000,
"offset": 0,
"sort": {
"isRelevance": "false",
"fieldSorts": [{"field": "create_date", "sortOrder": "DESC"}],
},
}
= dmda.domo_api_request(
res =auth,
auth=endpoint,
endpoint="POST",
request_type=debug_api,
debug_api=payload,
body
)
if return_raw:
return res
= res.response["dataSources"]
res.response
return res
= dmda.DomoAuth(
auth =os.environ["DOMO_INSTANCE"],
domo_instance=os.environ["DOMO_ACCESS_TOKEN"],
access_token
)
= get_datasets(auth=auth, debug_api=False)
res
= res.response
datasets 0:1] datasets[
*** Using Access Token Auth
[{'id': 'fedef9a2-eddb-44d1-b0c9-a0023224e8d1',
'displayType': 'webform',
'dataProviderType': 'webform',
'type': 'webform',
'name': 'Example Sales Data',
'owner': {'id': '1728973208',
'name': 'Peter Shull',
'type': 'USER',
'group': False},
'status': 'VALID',
'created': 1721655313000,
'lastUpdated': 1721655316205,
'dataLastUpdated': 0,
'rowCount': 19378,
'columnCount': 5,
'cardInfo': {'cardCount': 2, 'cardViewCount': 0},
'state': 'VALID',
'validConfiguration': True,
'validAccount': True,
'transportType': 'WEBFORM',
'adc': False,
'adcExternal': False,
'cloudId': 'domo',
'cloudName': 'Domo',
'hidden': False,
'tagsList': [],
'schema': {'name': 'fedef9a2-eddb-44d1-b0c9-a0023224e8d1',
'tables': [{'columns': [{'name': 'date_ymd',
'id': 'date_ymd',
'type': 'DATE',
'visible': True,
'order': 0},
{'name': 'revenue',
'id': 'revenue',
'type': 'DOUBLE',
'visible': True,
'order': 0},
{'name': 'sales_rep',
'id': 'sales_rep',
'type': 'STRING',
'visible': True,
'order': 0},
{'name': 'department',
'id': 'department',
'type': 'STRING',
'visible': True,
'order': 0},
{'name': 'state',
'id': 'state',
'type': 'STRING',
'visible': True,
'order': 0}]}]},
'alertCount': 0,
'dataFlowCount': 0,
'dataSourceCount': 0,
'impactCardCount': 2,
'impactAlertCount': 0,
'impactDataFlowCount': 0,
'impactDataSourceCount': 0}]
get_pdp_policy
get_pdp_policy (auth:mbison.client.core.DomoAuth, datasetId, policy_id)
Exported source
def get_pdp_policies(auth: dmda.DomoAuth, datasetId):
= f"/api/query/v1/data-control/{datasetId}/filter-groups"
endpoint
= {"options": "load_associations,load_filters,include_open_policy"}
params
= dmda.domo_api_request(
response =endpoint, auth=auth, request_type="GET", params=params
endpoint
)
return response
def get_pdp_policy(auth: dmda.DomoAuth, datasetId, policy_id):
= f"/api/query/v1/data-control/{datasetId}/filter-groups/{policy_id}"
endpoint
= dmda.domo_api_request(
res =auth,
auth="GET",
method=endpoint,
endpoint={"options": "load_associations,load_filters,include_open_policy"},
params
)
return res
get_pdp_policies
get_pdp_policies (auth:mbison.client.core.DomoAuth, datasetId)
=auth, datasetId=datasets[0]["id"]) get_pdp_policies(auth
ResponseGetData(response=[{'name': 'All Rows', 'filterGroupId': 2380, 'dataSourceId': 'fedef9a2-eddb-44d1-b0c9-a0023224e8d1', 'type': 'open', 'dataSourcePermissions': False, 'order': 0}], is_success=True, status=200)
delete_pdp_policy
delete_pdp_policy (auth:mbison.client.core.DomoAuth, datasetId, policyId)
Exported source
def create_pdp_policy(auth: dmda.DomoAuth, datasetId: str, pdp_object: dict):
= f"/api/query/v1/data-control/{datasetId}/filter-groups"
endpoint
= dmda.domo_api_request(
res =auth, request_type="POST", endpoint=endpoint, body=pdp_object
auth
)
return res
def update_pdp_policy(auth: dmda.DomoAuth, datasetId, policyId, pdp_object: dict):
= f"/api/query/v1/data-control/{datasetId}/filter-groups/{policyId}"
endpoint
= dmda.domo_api_request(
res =auth,
auth="PUT",
request_type=endpoint,
endpoint=pdp_object,
body
)
return res
def delete_pdp_policy(auth: dmda.DomoAuth, datasetId, policyId):
= f"/api/query/v1/data-control/{datasetId}/filter-groups/{policyId}"
endpoint
= dmda.domo_api_request(auth=auth, endpoint=endpoint, request_type="DELETE")
res
return res
update_pdp_policy
update_pdp_policy (auth:mbison.client.core.DomoAuth, datasetId, policyId, pdp_object:dict)
create_pdp_policy
create_pdp_policy (auth:mbison.client.core.DomoAuth, datasetId:str, pdp_object:dict)
Classes
DomoDataset
DomoDataset (id:str, name:str, auth:mbison.client.core.DomoAuth, pdp_policies:List[dict]=None, domo_policies:List[__main__.DomoDatasetPolicy]=None)
Exported source
@dataclass
class DomoDatasetPolicy:
str
name:
@dataclass
class DomoDataset:
id: str
str
name:
auth: dmda.DomoAuth
dict] = None
pdp_policies: List[= None
domo_policies: List[DomoDatasetPolicy]
@classmethod
def _from_json(cls, auth: dmda.DomoAuth, obj: dict):
return cls(auth=auth, id=obj["id"], name=obj["name"])
def get_pdp_policies(self, return_raw: bool = False):
= get_pdp_policies(auth=self.auth, datasetId=self.id)
res
if return_raw:
return res
self.pdp_policies = res.response
self.domo_policies = [
=obj["name"]) for obj in self.pdp_policies
DomoDatasetPolicy(name
]
return self.domo_policies
DomoDatasetPolicy
DomoDatasetPolicy (name:str)
DomoDatasets
DomoDatasets (auth:mbison.client.core.DomoAuth, raw_datasets:List[dict]=<factory>, domo_datasets:List[__main__.DomoDataset]=<factory>)
Exported source
@dataclass
class DomoDatasets:
auth: dmda.DomoAuthdict] = field(default_factory=lambda: [])
raw_datasets: List[
= field(default_factory=lambda: [])
domo_datasets: List[DomoDataset]
def get_datasets(self, debug_api: bool = False, return_raw: bool = False):
= get_datasets(self.auth, debug_api=debug_api)
res
if return_raw:
return res
self.raw_datasets = res.response
self.domo_datasets = [
=obj, auth=self.auth) for obj in self.raw_datasets
DomoDataset._from_json(obj
]
return self.domo_datasets
def print_datasets(self):
print(self.domo_datasets)
= DomoDatasets(auth=auth)
domo_datasets 0:5] domo_datasets.get_datasets()[
[DomoDataset(id='fedef9a2-eddb-44d1-b0c9-a0023224e8d1', name='Example Sales Data', auth=DomoAuth(domo_instance='domo-community', username=None), pdp_policies=None, domo_policies=None),
DomoDataset(id='37fdfa92-ba60-4ab0-8f95-a03c6af543f0', name='Output2', auth=DomoAuth(domo_instance='domo-community', username=None), pdp_policies=None, domo_policies=None),
DomoDataset(id='4823950f-d754-474b-bddc-a6e7fec6df34', name='Output1', auth=DomoAuth(domo_instance='domo-community', username=None), pdp_policies=None, domo_policies=None),
DomoDataset(id='f17bae29-3caa-4408-afbd-e2130b7281a5', name='HarryPotter', auth=DomoAuth(domo_instance='domo-community', username=None), pdp_policies=None, domo_policies=None),
DomoDataset(id='7e8c4e24-6169-4e3b-ae51-65e4305b2e80', name='form_data_a2693a83-a265-4a27-8a4f-a0d4778273ba_APP_DB', auth=DomoAuth(domo_instance='domo-community', username=None), pdp_policies=None, domo_policies=None)]
0].get_pdp_policies() domo_datasets.domo_datasets[
[DomoDatasetPolicy(name='All Rows')]