= dmda.DomoTokenAuth(
auth =os.environ['DOMO_INSTANCE'],
domo_instance=os.environ["DOMO_ACCESS_TOKEN"],
domo_access_token
)
= 1766265020
CARD_ID = 577316875 CARD_ID_APP
Card
a class based approach to interacting with DomoCards
DomoCard
DomoCard (id:str, auth:domolibrary.client.DomoAuth.DomoAuth, title:str=None, description:str=None, type:str=None, urn:str=None, chart_type:str=None, dataset_id:str=None, datastore_id:str=None, domo_collections:List[Any]=None, domo_source_code:Any=None, certification:dict=None, owners:List[<built-infunctionany>]=None, datasources:List[Any]=None, Lineage:domolibrary.classes.DomoLineage.DomoLineage=None)
= await DomoCard.get_by_id(card_id=CARD_ID, auth=auth, return_raw = False)
domo_card pprint(domo_card)
DomoCard(id=1766265020,
title='Vendor Amounts Dates',
description='',
type='kpi',
urn='1766265020',
chart_type=None,
dataset_id='fd70d83b-38dc-40a2-9349-357ec4321d3e',
datastore_id=None,
domo_collections=None,
domo_source_code=None,
certification=DictDot(state='NOT_CERTIFIED', adminCertified=False),
owners=[DomoUser(id='1893952720',
display_name='Jae Wilson1',
email_address='jae@datacrew.space',
role_id=810756122,
department='Business Improvement',
title=None,
avatar_key='/api/content/v1/avatar/USER/1893952720',
phone_number=None,
web_landing_page=None,
web_mobile_landing_page=None,
employee_id=None,
employee_number=None,
hire_date=None,
reports_to=None,
publisher_domain=None,
subscriber_domain=None,
virtual_user_id=None,
created_dt=datetime.datetime(2020, 5, 8, 17, 55, 18),
last_activity_dt=datetime.datetime(2025, 1, 30, 19, 15, 22, 295000),
custom_attributes={},
role=DomoRole(id=810756122,
name='super_admin',
description='all grants - updated on '
'2024-11-07',
is_system_role=0,
is_default_role=None,
grants=[],
membership_ls=[]),
domo_api_clients=None,
domo_access_tokens=None)])
DomoCard.get_datasources
DomoCard.get_datasources (debug_api:bool=False, session:httpx.AsyncClient=None, return_raw:bool=False)
await domo_card.get_datasources(return_raw = False)
[DomoDataset(id='fd70d83b-38dc-40a2-9349-357ec4321d3e', display_type='webform', data_provider_type='webform', name='Vendor Amounts Dates', description=None, row_count=20, column_count=3, stream_id=392, owner={'id': '55874022', 'name': 'Grant Smith', 'type': 'USER', 'group': False}, formula={}, Schema=DomoDataset_Schema(dataset_id='fd70d83b-38dc-40a2-9349-357ec4321d3e', columns=[]), Stream=DomoStream(id=392, dataset_id='fd70d83b-38dc-40a2-9349-357ec4321d3e', transport_description=None, transport_version=None, update_method=None, data_provider_name=None, data_provider_key=None, account_id=None, account_display_name=None, account_userid=None, has_mapping=False, configuration=[], configuration_tables=[], configuration_query=None), Tags=DomoTags(tag_ls=['um_REPLACE', 'webform', 's_webform']), PDP=<domolibrary.classes.DomoPDP.Dataset_PDP_Policies object at 0x72389ec76540>, Certification=None)]
DomoCard.get_source_code
DomoCard.get_source_code (debug_api:bool=False, try_auto_share:bool=False)
Exported source
class Card_DownloadSourceCode(de.DomoError):
def __init__(self, cls, auth, message):
super().__init__(cls=cls, auth=auth, message=message)
@patch_to(DomoCard)
async def get_collections(
self, debug_api: bool = False, return_raw: bool = False, debug_num_stacks_to_drop=2
):import domolibrary.classes.DomoAppDb as dmdb
= await dmdb.AppDbCollections.get_collections(
domo_collections =self.datastore_id,
datastore_id=self.auth,
auth=debug_api,
debug_api=debug_num_stacks_to_drop,
debug_num_stacks_to_drop=return_raw,
return_raw
)
if return_raw:
return domo_collections
self.domo_collections = await dmce.gather_with_concurrency(
*[
dmdb.AppDbCollection.get_by_id(=domo_collection.id, auth=self.auth, debug_api=debug_api
collection_id
)for domo_collection in domo_collections
],=60
n
)
return self.domo_collections
@patch_to(DomoCard)
async def get_source_code(self, debug_api: bool = False, try_auto_share: bool = False):
await self.get_collections(debug_api=debug_api)
= "ddx_app_client_code"
collection_name = next(
code_collection
(
domo_collectionfor domo_collection in self.domo_collections
if domo_collection.name == collection_name
),None,
)
if not code_collection:
raise Card_DownloadSourceCode(
=deepcopy(self),
cls=self.auth,
auth=f"collection - {collection_name} not found for {self.title} - {self.id}",
message
)
= await code_collection.query_documents(
documents =debug_api, try_auto_share=try_auto_share
debug_api
)
if not documents:
raise Card_DownloadSourceCode(
=deepcopy(self),
cls=self.auth,
auth=f"collection - {collection_name} - {code_collection.id} - unable to retrieve documents for {self.title} - {self.id}",
message
)
self.domo_source_code = documents[0]
return self.domo_source_code
DomoCard.get_collections
DomoCard.get_collections (debug_api:bool=False, return_raw:bool=False, debug_num_stacks_to_drop=2)
Card_DownloadSourceCode
Card_DownloadSourceCode (cls, auth, message)
base exception
= await DomoCard.get_by_id(card_id=CARD_ID_APP, auth=auth, return_raw=False)
domo_card
await domo_card.get_source_code()) pprint(
--------------------------------------------------------------------------- CancelledError Traceback (most recent call last) Cell In[14], line 3 1 domo_card = await DomoCard.get_by_id(card_id=CARD_ID_APP, auth=auth, return_raw=False) ----> 3 pprint(await domo_card.get_source_code()) Cell In[13], line 41, in get_source_code(self, debug_api, try_auto_share) 38 @patch_to(DomoCard) 39 async def get_source_code(self, debug_api: bool = False, try_auto_share: bool = False): ---> 41 await self.get_collections(debug_api=debug_api) 43 collection_name = "ddx_app_client_code" 44 code_collection = next( 45 ( 46 domo_collection (...) 50 None, 51 ) Cell In[13], line 26, in get_collections(self, debug_api, return_raw, debug_num_stacks_to_drop) 23 if return_raw: 24 return domo_collections ---> 26 self.domo_collections = await dmce.gather_with_concurrency( 27 *[ 28 dmdb.AppDbCollection.get_by_id( 29 collection_id=domo_collection.id, auth=self.auth, debug_api=debug_api 30 ) 31 for domo_collection in domo_collections 32 ], 33 n=60 34 ) 36 return self.domo_collections File /workspaces/domolibrary/domolibrary/utils/chunk_execution.py:70, in gather_with_concurrency(n, *coros) 67 async with semaphore: 68 return await coro ---> 70 return await asyncio.gather(*(sem_coro(c) for c in coros)) File /workspaces/domolibrary/domolibrary/utils/chunk_execution.py:68, in gather_with_concurrency.<locals>.sem_coro(coro) 66 async def sem_coro(coro): 67 async with semaphore: ---> 68 return await coro File /workspaces/domolibrary/domolibrary/classes/DomoAppDb.py:283, in AppDbCollection.get_by_id(cls, auth, collection_id, debug_api, session, debug_num_stacks_to_drop, return_raw) 272 @classmethod 273 async def get_by_id( 274 cls, (...) 280 return_raw: bool = False, 281 ): --> 283 res = await appdb_routes.get_collection_by_id( 284 auth=auth, 285 collection_id=collection_id, 286 parent_class=cls.__name__, 287 debug_api=debug_api, 288 session=session, 289 debug_num_stacks_to_drop=debug_num_stacks_to_drop, 290 ) 292 if return_raw: 293 return res File /workspaces/domolibrary/domolibrary/client/get_data.py:447, in route_function.<locals>.wrapper(parent_class, debug_num_stacks_to_drop, debug_api, session, *args, **kwargs) 438 @wraps(func) 439 async def wrapper( 440 *args: Any, (...) 445 **kwargs: Any, 446 ) -> Any: --> 447 result = await func( 448 *args, 449 parent_class=parent_class, 450 debug_num_stacks_to_drop=debug_num_stacks_to_drop, 451 debug_api=debug_api, 452 session=session, 453 **kwargs, 454 ) 456 if not isinstance(result, rgd.ResponseGetData): 457 raise RouteFunction_ResponseTypeError(result) File /workspaces/domolibrary/domolibrary/routes/appdb.py:276, in get_collection_by_id(auth, collection_id, parent_class, debug_api, session, debug_num_stacks_to_drop) 264 @gd.route_function 265 async def get_collection_by_id( 266 auth: dmda.DomoAuth, (...) 271 debug_num_stacks_to_drop=1, 272 ): 274 url = f"https://{auth.domo_instance}.domo.com/api/datastores/v1/collections/{collection_id}" --> 276 res = await gd.get_data( 277 auth=auth, 278 method="GET", 279 url=url, 280 parent_class=parent_class, 281 debug_api=debug_api, 282 session=session, 283 num_stacks_to_drop=debug_num_stacks_to_drop, 284 ) 286 if not res.is_success: 287 raise AppDb_GET_Exception(res) File /workspaces/domolibrary/domolibrary/utils/chunk_execution.py:31, in run_with_retry.<locals>.actual_decorator.<locals>.wrapper(*args, **kwargs) 29 try: 30 retry +=1 ---> 31 return await run_fn( *args, **kwargs) 33 except httpx.ConnectTimeout as e: 34 print(f"connect timeout - retry attempt {retry} - {e}") File /workspaces/domolibrary/domolibrary/client/get_data.py:167, in get_data(url, method, auth, content_type, headers, body, params, debug_api, session, return_raw, is_follow_redirects, timeout, parent_class, num_stacks_to_drop, debug_traceback, is_verify) 164 if debug_api: 165 print("get_data: no body") --> 167 res = await getattr(session, method.lower())( 168 url=url, 169 headers=headers, 170 params=params, 171 follow_redirects=is_follow_redirects, 172 timeout=timeout, 173 ) 175 if debug_api: 176 print("get_data_response", res) File ~/.local/lib/python3.12/site-packages/httpx/_client.py:1768, in AsyncClient.get(self, url, params, headers, cookies, auth, follow_redirects, timeout, extensions) 1751 async def get( 1752 self, 1753 url: URL | str, (...) 1761 extensions: RequestExtensions | None = None, 1762 ) -> Response: 1763 """ 1764 Send a `GET` request. 1765 1766 **Parameters**: See `httpx.request`. 1767 """ -> 1768 return await self.request( 1769 "GET", 1770 url, 1771 params=params, 1772 headers=headers, 1773 cookies=cookies, 1774 auth=auth, 1775 follow_redirects=follow_redirects, 1776 timeout=timeout, 1777 extensions=extensions, 1778 ) File ~/.local/lib/python3.12/site-packages/httpx/_client.py:1540, in AsyncClient.request(self, method, url, content, data, files, json, params, headers, cookies, auth, follow_redirects, timeout, extensions) 1525 warnings.warn(message, DeprecationWarning, stacklevel=2) 1527 request = self.build_request( 1528 method=method, 1529 url=url, (...) 1538 extensions=extensions, 1539 ) -> 1540 return await self.send(request, auth=auth, follow_redirects=follow_redirects) File ~/.local/lib/python3.12/site-packages/httpx/_client.py:1629, in AsyncClient.send(self, request, stream, auth, follow_redirects) 1625 self._set_timeout(request) 1627 auth = self._build_request_auth(request, auth) -> 1629 response = await self._send_handling_auth( 1630 request, 1631 auth=auth, 1632 follow_redirects=follow_redirects, 1633 history=[], 1634 ) 1635 try: 1636 if not stream: File ~/.local/lib/python3.12/site-packages/httpx/_client.py:1657, in AsyncClient._send_handling_auth(self, request, auth, follow_redirects, history) 1654 request = await auth_flow.__anext__() 1656 while True: -> 1657 response = await self._send_handling_redirects( 1658 request, 1659 follow_redirects=follow_redirects, 1660 history=history, 1661 ) 1662 try: 1663 try: File ~/.local/lib/python3.12/site-packages/httpx/_client.py:1694, in AsyncClient._send_handling_redirects(self, request, follow_redirects, history) 1691 for hook in self._event_hooks["request"]: 1692 await hook(request) -> 1694 response = await self._send_single_request(request) 1695 try: 1696 for hook in self._event_hooks["response"]: File ~/.local/lib/python3.12/site-packages/httpx/_client.py:1730, in AsyncClient._send_single_request(self, request) 1725 raise RuntimeError( 1726 "Attempted to send an sync request with an AsyncClient instance." 1727 ) 1729 with request_context(request=request): -> 1730 response = await transport.handle_async_request(request) 1732 assert isinstance(response.stream, AsyncByteStream) 1733 response.request = request File ~/.local/lib/python3.12/site-packages/httpx/_transports/default.py:394, in AsyncHTTPTransport.handle_async_request(self, request) 381 req = httpcore.Request( 382 method=request.method, 383 url=httpcore.URL( (...) 391 extensions=request.extensions, 392 ) 393 with map_httpcore_exceptions(): --> 394 resp = await self._pool.handle_async_request(req) 396 assert isinstance(resp.stream, typing.AsyncIterable) 398 return Response( 399 status_code=resp.status, 400 headers=resp.headers, 401 stream=AsyncResponseStream(resp.stream), 402 extensions=resp.extensions, 403 ) File ~/.local/lib/python3.12/site-packages/httpcore/_async/connection_pool.py:256, in AsyncConnectionPool.handle_async_request(self, request) 253 closing = self._assign_requests_to_connections() 255 await self._close_connections(closing) --> 256 raise exc from None 258 # Return the response. Note that in this case we still have to manage 259 # the point at which the response is closed. 260 assert isinstance(response.stream, typing.AsyncIterable) File ~/.local/lib/python3.12/site-packages/httpcore/_async/connection_pool.py:236, in AsyncConnectionPool.handle_async_request(self, request) 232 connection = await pool_request.wait_for_connection(timeout=timeout) 234 try: 235 # Send the request on the assigned connection. --> 236 response = await connection.handle_async_request( 237 pool_request.request 238 ) 239 except ConnectionNotAvailable: 240 # In some cases a connection may initially be available to 241 # handle a request, but then become unavailable. 242 # 243 # In this case we clear the connection and try again. 244 pool_request.clear_connection() File ~/.local/lib/python3.12/site-packages/httpcore/_async/connection.py:101, in AsyncHTTPConnection.handle_async_request(self, request) 99 except BaseException as exc: 100 self._connect_failed = True --> 101 raise exc 103 return await self._connection.handle_async_request(request) File ~/.local/lib/python3.12/site-packages/httpcore/_async/connection.py:78, in AsyncHTTPConnection.handle_async_request(self, request) 76 async with self._request_lock: 77 if self._connection is None: ---> 78 stream = await self._connect(request) 80 ssl_object = stream.get_extra_info("ssl_object") 81 http2_negotiated = ( 82 ssl_object is not None 83 and ssl_object.selected_alpn_protocol() == "h2" 84 ) File ~/.local/lib/python3.12/site-packages/httpcore/_async/connection.py:124, in AsyncHTTPConnection._connect(self, request) 116 kwargs = { 117 "host": self._origin.host.decode("ascii"), 118 "port": self._origin.port, (...) 121 "socket_options": self._socket_options, 122 } 123 async with Trace("connect_tcp", logger, request, kwargs) as trace: --> 124 stream = await self._network_backend.connect_tcp(**kwargs) 125 trace.return_value = stream 126 else: File ~/.local/lib/python3.12/site-packages/httpcore/_backends/auto.py:31, in AutoBackend.connect_tcp(self, host, port, timeout, local_address, socket_options) 22 async def connect_tcp( 23 self, 24 host: str, (...) 28 socket_options: typing.Iterable[SOCKET_OPTION] | None = None, 29 ) -> AsyncNetworkStream: 30 await self._init_backend() ---> 31 return await self._backend.connect_tcp( 32 host, 33 port, 34 timeout=timeout, 35 local_address=local_address, 36 socket_options=socket_options, 37 ) File ~/.local/lib/python3.12/site-packages/httpcore/_backends/anyio.py:115, in AnyIOBackend.connect_tcp(self, host, port, timeout, local_address, socket_options) 113 with map_exceptions(exc_map): 114 with anyio.fail_after(timeout): --> 115 stream: anyio.abc.ByteStream = await anyio.connect_tcp( 116 remote_host=host, 117 remote_port=port, 118 local_host=local_address, 119 ) 120 # By default TCP sockets opened in `asyncio` include TCP_NODELAY. 121 for option in socket_options: File ~/.local/lib/python3.12/site-packages/anyio/_core/_sockets.py:227, in connect_tcp(remote_host, remote_port, local_host, tls, ssl_context, tls_standard_compatible, tls_hostname, happy_eyeballs_delay) 224 target_addrs.append((af, sa[0])) 226 oserrors: list[OSError] = [] --> 227 async with create_task_group() as tg: 228 for i, (af, addr) in enumerate(target_addrs): 229 event = Event() File ~/.local/lib/python3.12/site-packages/anyio/_backends/_asyncio.py:819, in TaskGroup.__aexit__(***failed resolving arguments***) 815 raise BaseExceptionGroup( 816 "unhandled errors in a TaskGroup", self._exceptions 817 ) 818 elif exc_val: --> 819 raise exc_val 820 except BaseException as exc: 821 if self.cancel_scope.__exit__(type(exc), exc, exc.__traceback__): File ~/.local/lib/python3.12/site-packages/anyio/_core/_sockets.py:232, in connect_tcp(remote_host, remote_port, local_host, tls, ssl_context, tls_standard_compatible, tls_hostname, happy_eyeballs_delay) 230 tg.start_soon(try_connect, addr, event) 231 with move_on_after(happy_eyeballs_delay): --> 232 await event.wait() 234 if connected_stream is None: 235 cause = ( 236 oserrors[0] 237 if len(oserrors) == 1 238 else ExceptionGroup("multiple connection attempts failed", oserrors) 239 ) File ~/.local/lib/python3.12/site-packages/anyio/_backends/_asyncio.py:1809, in Event.wait(self) 1807 await AsyncIOBackend.checkpoint() 1808 else: -> 1809 await self._event.wait() File /usr/local/python/3.12.1/lib/python3.12/asyncio/locks.py:212, in Event.wait(self) 210 self._waiters.append(fut) 211 try: --> 212 await fut 213 return True 214 finally: CancelledError:
DomoCard.download_source_code
DomoCard.download_source_code (download_folder='./EXPORT/', file_name=None, debug_api:bool=False, try_auto_share:bool=False)
Exported source
@patch_to(DomoCard)
async def download_source_code(
self,
="./EXPORT/",
download_folder=None,
file_namebool = False,
debug_api: bool = False,
try_auto_share:
):= await self.get_source_code(debug_api=debug_api, try_auto_share=try_auto_share)
doc
if file_name:
= os.path.join(
download_path =".json")
download_folder, dmut.change_suffix(file_name, new_extension
)
dmfi.upsert_folder(download_path)
with open(download_path, "w+", encoding="utf-8") as f:
f.write(json.dumps(doc.content))return doc
= next(iter(doc.content))
ddx_type
for key, value in doc.content[ddx_type].items():
if key == "js":
= "app.js"
file_name elif key == "html":
= "index.html"
file_name elif key == "css":
= "styles.css"
file_name else:
= f"{key}.txt"
file_name
= os.path.join(
download_path f"{ddx_type}/{self.id}/{file_name}"
download_folder,
)
dmfi.upsert_folder(download_path)
with open(download_path, "w+", encoding="utf-8") as f:
f.write(value)
return doc
= await DomoCard.get_by_id(card_id=CARD_ID_APP, auth=auth, return_raw=False)
domo_card
pprint((await domo_card.download_source_code(download_folder="../../test/ddx/")
))
AppDbDocument(_collection_id='475cc3b8-4318-406a-8070-c023bf0b9152',
_identity_columns=[],
_id='3ed44deb-ce70-4e88-9fc7-985cb7f21435',
_created_on_dt=datetime.datetime(2023, 5, 5, 21, 54, 30, 178000, tzinfo=tzlocal()),
_updated_on_dt=datetime.datetime(2023, 11, 2, 15, 18, 2, 629000, tzinfo=tzlocal()),
content={'htmlBlank': {'css': '',
'html': '',
'js': '// DDX Bricks Wiki - See '
'https://developer.domo.com/docs/ddx-bricks/getting-started-using-ddx-bricks\n'
'// for tips on getting started, '
'linking to Domo data and debugging '
'your app\n'
' \n'
'//Available globals\n'
'var domo = window.domo; // For '
'more on domo.js: '
'https://developer.domo.com/docs/dev-studio-guides/domo-js#domo.get\n'
'var datasets = window.datasets;\n'
'\n'
'//Step 1. Select your dataset(s) '
'from the button in the bottom left '
'corner\n'
'\n'
'\n'
'\n'
'//Step 2. Query your dataset(s): '
'https://developer.domo.com/docs/dev-studio-references/data-api\n'
"var fields = ['state', "
"'revenue'];\n"
"var groupby = ['state'];\n"
'var query = '
'`/data/v1/${datasets[0]}?fields=${fields.join()}&groupby=${groupby.join()}`;\n'
'domo.get(query).then(handleResult);\n'
'\n'
'\n'
'\n'
'//Step 3. Do something with the '
'data from the query result\n'
'function handleResult(data){\n'
' console && console.log(data);\n'
'}'}})