Card

a class based approach to interacting with DomoCards
auth = dmda.DomoTokenAuth(
    domo_instance=os.environ['DOMO_INSTANCE'],
    domo_access_token=os.environ["DOMO_ACCESS_TOKEN"],
)

CARD_ID = 1766265020
CARD_ID_APP = 577316875

source

DomoCard

 DomoCard (id:str, auth:domolibrary.client.DomoAuth.DomoAuth,
           title:str=None, description:str=None, type:str=None,
           urn:str=None, chart_type:str=None, dataset_id:str=None,
           datastore_id:str=None, domo_collections:List[Any]=None,
           domo_source_code:Any=None, certification:dict=None,
           owners:List[<built-infunctionany>]=None,
           datasources:List[Any]=None,
           Lineage:domolibrary.classes.DomoLineage.DomoLineage=None)
domo_card = await DomoCard.get_by_id(card_id=CARD_ID, auth=auth, return_raw = False)
pprint(domo_card)
DomoCard(id=1766265020,
         title='Vendor Amounts Dates',
         description='',
         type='kpi',
         urn='1766265020',
         chart_type=None,
         dataset_id='fd70d83b-38dc-40a2-9349-357ec4321d3e',
         datastore_id=None,
         domo_collections=None,
         domo_source_code=None,
         certification=DictDot(state='NOT_CERTIFIED', adminCertified=False),
         owners=[DomoUser(id='1893952720',
                          display_name='Jae Wilson1',
                          email_address='jae@datacrew.space',
                          role_id=810756122,
                          department='Business Improvement',
                          title=None,
                          avatar_key='/api/content/v1/avatar/USER/1893952720',
                          phone_number=None,
                          web_landing_page=None,
                          web_mobile_landing_page=None,
                          employee_id=None,
                          employee_number=None,
                          hire_date=None,
                          reports_to=None,
                          publisher_domain=None,
                          subscriber_domain=None,
                          virtual_user_id=None,
                          created_dt=datetime.datetime(2020, 5, 8, 17, 55, 18),
                          last_activity_dt=datetime.datetime(2025, 1, 30, 19, 15, 22, 295000),
                          custom_attributes={},
                          role=DomoRole(id=810756122,
                                        name='super_admin',
                                        description='all grants - updated on '
                                                    '2024-11-07',
                                        is_system_role=0,
                                        is_default_role=None,
                                        grants=[],
                                        membership_ls=[]),
                          domo_api_clients=None,
                          domo_access_tokens=None)])

source

DomoCard.get_datasources

 DomoCard.get_datasources (debug_api:bool=False,
                           session:httpx.AsyncClient=None,
                           return_raw:bool=False)
await domo_card.get_datasources(return_raw = False)
[DomoDataset(id='fd70d83b-38dc-40a2-9349-357ec4321d3e', display_type='webform', data_provider_type='webform', name='Vendor Amounts Dates', description=None, row_count=20, column_count=3, stream_id=392, owner={'id': '55874022', 'name': 'Grant Smith', 'type': 'USER', 'group': False}, formula={}, Schema=DomoDataset_Schema(dataset_id='fd70d83b-38dc-40a2-9349-357ec4321d3e', columns=[]), Stream=DomoStream(id=392, dataset_id='fd70d83b-38dc-40a2-9349-357ec4321d3e', transport_description=None, transport_version=None, update_method=None, data_provider_name=None, data_provider_key=None, account_id=None, account_display_name=None, account_userid=None, has_mapping=False, configuration=[], configuration_tables=[], configuration_query=None), Tags=DomoTags(tag_ls=['um_REPLACE', 'webform', 's_webform']), PDP=<domolibrary.classes.DomoPDP.Dataset_PDP_Policies object at 0x72389ec76540>, Certification=None)]

sample share_card

# import domolibrary.classes.DomoGroup as dmg

domo_group = await dmg.DomoGroup.get_by_id(group_id=1324037627, auth=auth)
domo_group

domo_card = DomoCard(id=1766265020, auth=auth)

await domo_card.share(
    auth=auth,
    domo_groups=[domo_group],
    message=None,
    debug_api=False,
)
ResponseGetData(status=200, response='badge 1766265020 successfully shared with 1324037627', is_success=True, parent_class=None)

source

DomoCard.get_source_code

 DomoCard.get_source_code (debug_api:bool=False,
                           try_auto_share:bool=False)
Exported source
class Card_DownloadSourceCode(de.DomoError):
    def __init__(self, cls, auth, message):
        super().__init__(cls=cls, auth=auth, message=message)


@patch_to(DomoCard)
async def get_collections(
    self, debug_api: bool = False, return_raw: bool = False, debug_num_stacks_to_drop=2
):
    import domolibrary.classes.DomoAppDb as dmdb

    domo_collections = await dmdb.AppDbCollections.get_collections(
        datastore_id=self.datastore_id,
        auth=self.auth,
        debug_api=debug_api,
        debug_num_stacks_to_drop=debug_num_stacks_to_drop,
        return_raw=return_raw,
    )

    if return_raw:
        return domo_collections

    self.domo_collections = await dmce.gather_with_concurrency(
        *[
            dmdb.AppDbCollection.get_by_id(
                collection_id=domo_collection.id, auth=self.auth, debug_api=debug_api
            )
            for domo_collection in domo_collections
        ],
        n=60
    )

    return self.domo_collections

@patch_to(DomoCard)
async def get_source_code(self, debug_api: bool = False, try_auto_share: bool = False):

    await self.get_collections(debug_api=debug_api)

    collection_name = "ddx_app_client_code"
    code_collection = next(
        (
            domo_collection
            for domo_collection in self.domo_collections
            if domo_collection.name == collection_name
        ),
        None,
    )

    if not code_collection:
        raise Card_DownloadSourceCode(
            cls=deepcopy(self),
            auth=self.auth,
            message=f"collection - {collection_name} not found for {self.title} - {self.id}",
        )

    documents = await code_collection.query_documents(
        debug_api=debug_api, try_auto_share=try_auto_share
    )

    if not documents:
        raise Card_DownloadSourceCode(
            cls=deepcopy(self),
            auth=self.auth,
            message=f"collection - {collection_name} - {code_collection.id} - unable to retrieve documents for {self.title} - {self.id}",
        )

    self.domo_source_code = documents[0]

    return self.domo_source_code

source

DomoCard.get_collections

 DomoCard.get_collections (debug_api:bool=False, return_raw:bool=False,
                           debug_num_stacks_to_drop=2)

source

Card_DownloadSourceCode

 Card_DownloadSourceCode (cls, auth, message)

base exception

domo_card = await DomoCard.get_by_id(card_id=CARD_ID_APP, auth=auth, return_raw=False)

pprint(await domo_card.get_source_code())
---------------------------------------------------------------------------
CancelledError                            Traceback (most recent call last)
Cell In[14], line 3
      1 domo_card = await DomoCard.get_by_id(card_id=CARD_ID_APP, auth=auth, return_raw=False)
----> 3 pprint(await domo_card.get_source_code())

Cell In[13], line 41, in get_source_code(self, debug_api, try_auto_share)
     38 @patch_to(DomoCard)
     39 async def get_source_code(self, debug_api: bool = False, try_auto_share: bool = False):
---> 41     await self.get_collections(debug_api=debug_api)
     43     collection_name = "ddx_app_client_code"
     44     code_collection = next(
     45         (
     46             domo_collection
   (...)
     50         None,
     51     )

Cell In[13], line 26, in get_collections(self, debug_api, return_raw, debug_num_stacks_to_drop)
     23 if return_raw:
     24     return domo_collections
---> 26 self.domo_collections = await dmce.gather_with_concurrency(
     27     *[
     28         dmdb.AppDbCollection.get_by_id(
     29             collection_id=domo_collection.id, auth=self.auth, debug_api=debug_api
     30         )
     31         for domo_collection in domo_collections
     32     ],
     33     n=60
     34 )
     36 return self.domo_collections

File /workspaces/domolibrary/domolibrary/utils/chunk_execution.py:70, in gather_with_concurrency(n, *coros)
     67     async with semaphore:
     68         return await coro
---> 70 return await asyncio.gather(*(sem_coro(c) for c in coros))

File /workspaces/domolibrary/domolibrary/utils/chunk_execution.py:68, in gather_with_concurrency.<locals>.sem_coro(coro)
     66 async def sem_coro(coro):
     67     async with semaphore:
---> 68         return await coro

File /workspaces/domolibrary/domolibrary/classes/DomoAppDb.py:283, in AppDbCollection.get_by_id(cls, auth, collection_id, debug_api, session, debug_num_stacks_to_drop, return_raw)
    272 @classmethod
    273 async def get_by_id(
    274     cls,
   (...)
    280     return_raw: bool = False,
    281 ):
--> 283     res = await appdb_routes.get_collection_by_id(
    284         auth=auth,
    285         collection_id=collection_id,
    286         parent_class=cls.__name__,
    287         debug_api=debug_api,
    288         session=session,
    289         debug_num_stacks_to_drop=debug_num_stacks_to_drop,
    290     )
    292     if return_raw:
    293         return res

File /workspaces/domolibrary/domolibrary/client/get_data.py:447, in route_function.<locals>.wrapper(parent_class, debug_num_stacks_to_drop, debug_api, session, *args, **kwargs)
    438 @wraps(func)
    439 async def wrapper(
    440     *args: Any,
   (...)
    445     **kwargs: Any,
    446 ) -> Any:
--> 447     result = await func(
    448         *args,
    449         parent_class=parent_class,
    450         debug_num_stacks_to_drop=debug_num_stacks_to_drop,
    451         debug_api=debug_api,
    452         session=session,
    453         **kwargs,
    454     )
    456     if not isinstance(result, rgd.ResponseGetData):
    457         raise RouteFunction_ResponseTypeError(result)

File /workspaces/domolibrary/domolibrary/routes/appdb.py:276, in get_collection_by_id(auth, collection_id, parent_class, debug_api, session, debug_num_stacks_to_drop)
    264 @gd.route_function
    265 async def get_collection_by_id(
    266     auth: dmda.DomoAuth,
   (...)
    271     debug_num_stacks_to_drop=1,
    272 ):
    274     url = f"https://{auth.domo_instance}.domo.com/api/datastores/v1/collections/{collection_id}"
--> 276     res = await gd.get_data(
    277         auth=auth,
    278         method="GET",
    279         url=url,
    280         parent_class=parent_class,
    281         debug_api=debug_api,
    282         session=session,
    283         num_stacks_to_drop=debug_num_stacks_to_drop,
    284     )
    286     if not res.is_success:
    287         raise AppDb_GET_Exception(res)

File /workspaces/domolibrary/domolibrary/utils/chunk_execution.py:31, in run_with_retry.<locals>.actual_decorator.<locals>.wrapper(*args, **kwargs)
     29 try:
     30     retry +=1
---> 31     return await run_fn( *args, **kwargs)
     33 except httpx.ConnectTimeout as e:
     34     print(f"connect timeout - retry attempt {retry} - {e}")

File /workspaces/domolibrary/domolibrary/client/get_data.py:167, in get_data(url, method, auth, content_type, headers, body, params, debug_api, session, return_raw, is_follow_redirects, timeout, parent_class, num_stacks_to_drop, debug_traceback, is_verify)
    164     if debug_api:
    165         print("get_data: no body")
--> 167     res = await getattr(session, method.lower())(
    168         url=url,
    169         headers=headers,
    170         params=params,
    171         follow_redirects=is_follow_redirects,
    172         timeout=timeout,
    173     )
    175 if debug_api:
    176     print("get_data_response", res)

File ~/.local/lib/python3.12/site-packages/httpx/_client.py:1768, in AsyncClient.get(self, url, params, headers, cookies, auth, follow_redirects, timeout, extensions)
   1751 async def get(
   1752     self,
   1753     url: URL | str,
   (...)
   1761     extensions: RequestExtensions | None = None,
   1762 ) -> Response:
   1763     """
   1764     Send a `GET` request.
   1765 
   1766     **Parameters**: See `httpx.request`.
   1767     """
-> 1768     return await self.request(
   1769         "GET",
   1770         url,
   1771         params=params,
   1772         headers=headers,
   1773         cookies=cookies,
   1774         auth=auth,
   1775         follow_redirects=follow_redirects,
   1776         timeout=timeout,
   1777         extensions=extensions,
   1778     )

File ~/.local/lib/python3.12/site-packages/httpx/_client.py:1540, in AsyncClient.request(self, method, url, content, data, files, json, params, headers, cookies, auth, follow_redirects, timeout, extensions)
   1525     warnings.warn(message, DeprecationWarning, stacklevel=2)
   1527 request = self.build_request(
   1528     method=method,
   1529     url=url,
   (...)
   1538     extensions=extensions,
   1539 )
-> 1540 return await self.send(request, auth=auth, follow_redirects=follow_redirects)

File ~/.local/lib/python3.12/site-packages/httpx/_client.py:1629, in AsyncClient.send(self, request, stream, auth, follow_redirects)
   1625 self._set_timeout(request)
   1627 auth = self._build_request_auth(request, auth)
-> 1629 response = await self._send_handling_auth(
   1630     request,
   1631     auth=auth,
   1632     follow_redirects=follow_redirects,
   1633     history=[],
   1634 )
   1635 try:
   1636     if not stream:

File ~/.local/lib/python3.12/site-packages/httpx/_client.py:1657, in AsyncClient._send_handling_auth(self, request, auth, follow_redirects, history)
   1654 request = await auth_flow.__anext__()
   1656 while True:
-> 1657     response = await self._send_handling_redirects(
   1658         request,
   1659         follow_redirects=follow_redirects,
   1660         history=history,
   1661     )
   1662     try:
   1663         try:

File ~/.local/lib/python3.12/site-packages/httpx/_client.py:1694, in AsyncClient._send_handling_redirects(self, request, follow_redirects, history)
   1691 for hook in self._event_hooks["request"]:
   1692     await hook(request)
-> 1694 response = await self._send_single_request(request)
   1695 try:
   1696     for hook in self._event_hooks["response"]:

File ~/.local/lib/python3.12/site-packages/httpx/_client.py:1730, in AsyncClient._send_single_request(self, request)
   1725     raise RuntimeError(
   1726         "Attempted to send an sync request with an AsyncClient instance."
   1727     )
   1729 with request_context(request=request):
-> 1730     response = await transport.handle_async_request(request)
   1732 assert isinstance(response.stream, AsyncByteStream)
   1733 response.request = request

File ~/.local/lib/python3.12/site-packages/httpx/_transports/default.py:394, in AsyncHTTPTransport.handle_async_request(self, request)
    381 req = httpcore.Request(
    382     method=request.method,
    383     url=httpcore.URL(
   (...)
    391     extensions=request.extensions,
    392 )
    393 with map_httpcore_exceptions():
--> 394     resp = await self._pool.handle_async_request(req)
    396 assert isinstance(resp.stream, typing.AsyncIterable)
    398 return Response(
    399     status_code=resp.status,
    400     headers=resp.headers,
    401     stream=AsyncResponseStream(resp.stream),
    402     extensions=resp.extensions,
    403 )

File ~/.local/lib/python3.12/site-packages/httpcore/_async/connection_pool.py:256, in AsyncConnectionPool.handle_async_request(self, request)
    253         closing = self._assign_requests_to_connections()
    255     await self._close_connections(closing)
--> 256     raise exc from None
    258 # Return the response. Note that in this case we still have to manage
    259 # the point at which the response is closed.
    260 assert isinstance(response.stream, typing.AsyncIterable)

File ~/.local/lib/python3.12/site-packages/httpcore/_async/connection_pool.py:236, in AsyncConnectionPool.handle_async_request(self, request)
    232 connection = await pool_request.wait_for_connection(timeout=timeout)
    234 try:
    235     # Send the request on the assigned connection.
--> 236     response = await connection.handle_async_request(
    237         pool_request.request
    238     )
    239 except ConnectionNotAvailable:
    240     # In some cases a connection may initially be available to
    241     # handle a request, but then become unavailable.
    242     #
    243     # In this case we clear the connection and try again.
    244     pool_request.clear_connection()

File ~/.local/lib/python3.12/site-packages/httpcore/_async/connection.py:101, in AsyncHTTPConnection.handle_async_request(self, request)
     99 except BaseException as exc:
    100     self._connect_failed = True
--> 101     raise exc
    103 return await self._connection.handle_async_request(request)

File ~/.local/lib/python3.12/site-packages/httpcore/_async/connection.py:78, in AsyncHTTPConnection.handle_async_request(self, request)
     76 async with self._request_lock:
     77     if self._connection is None:
---> 78         stream = await self._connect(request)
     80         ssl_object = stream.get_extra_info("ssl_object")
     81         http2_negotiated = (
     82             ssl_object is not None
     83             and ssl_object.selected_alpn_protocol() == "h2"
     84         )

File ~/.local/lib/python3.12/site-packages/httpcore/_async/connection.py:124, in AsyncHTTPConnection._connect(self, request)
    116     kwargs = {
    117         "host": self._origin.host.decode("ascii"),
    118         "port": self._origin.port,
   (...)
    121         "socket_options": self._socket_options,
    122     }
    123     async with Trace("connect_tcp", logger, request, kwargs) as trace:
--> 124         stream = await self._network_backend.connect_tcp(**kwargs)
    125         trace.return_value = stream
    126 else:

File ~/.local/lib/python3.12/site-packages/httpcore/_backends/auto.py:31, in AutoBackend.connect_tcp(self, host, port, timeout, local_address, socket_options)
     22 async def connect_tcp(
     23     self,
     24     host: str,
   (...)
     28     socket_options: typing.Iterable[SOCKET_OPTION] | None = None,
     29 ) -> AsyncNetworkStream:
     30     await self._init_backend()
---> 31     return await self._backend.connect_tcp(
     32         host,
     33         port,
     34         timeout=timeout,
     35         local_address=local_address,
     36         socket_options=socket_options,
     37     )

File ~/.local/lib/python3.12/site-packages/httpcore/_backends/anyio.py:115, in AnyIOBackend.connect_tcp(self, host, port, timeout, local_address, socket_options)
    113 with map_exceptions(exc_map):
    114     with anyio.fail_after(timeout):
--> 115         stream: anyio.abc.ByteStream = await anyio.connect_tcp(
    116             remote_host=host,
    117             remote_port=port,
    118             local_host=local_address,
    119         )
    120         # By default TCP sockets opened in `asyncio` include TCP_NODELAY.
    121         for option in socket_options:

File ~/.local/lib/python3.12/site-packages/anyio/_core/_sockets.py:227, in connect_tcp(remote_host, remote_port, local_host, tls, ssl_context, tls_standard_compatible, tls_hostname, happy_eyeballs_delay)
    224             target_addrs.append((af, sa[0]))
    226 oserrors: list[OSError] = []
--> 227 async with create_task_group() as tg:
    228     for i, (af, addr) in enumerate(target_addrs):
    229         event = Event()

File ~/.local/lib/python3.12/site-packages/anyio/_backends/_asyncio.py:819, in TaskGroup.__aexit__(***failed resolving arguments***)
    815         raise BaseExceptionGroup(
    816             "unhandled errors in a TaskGroup", self._exceptions
    817         )
    818     elif exc_val:
--> 819         raise exc_val
    820 except BaseException as exc:
    821     if self.cancel_scope.__exit__(type(exc), exc, exc.__traceback__):

File ~/.local/lib/python3.12/site-packages/anyio/_core/_sockets.py:232, in connect_tcp(remote_host, remote_port, local_host, tls, ssl_context, tls_standard_compatible, tls_hostname, happy_eyeballs_delay)
    230         tg.start_soon(try_connect, addr, event)
    231         with move_on_after(happy_eyeballs_delay):
--> 232             await event.wait()
    234 if connected_stream is None:
    235     cause = (
    236         oserrors[0]
    237         if len(oserrors) == 1
    238         else ExceptionGroup("multiple connection attempts failed", oserrors)
    239     )

File ~/.local/lib/python3.12/site-packages/anyio/_backends/_asyncio.py:1809, in Event.wait(self)
   1807     await AsyncIOBackend.checkpoint()
   1808 else:
-> 1809     await self._event.wait()

File /usr/local/python/3.12.1/lib/python3.12/asyncio/locks.py:212, in Event.wait(self)
    210 self._waiters.append(fut)
    211 try:
--> 212     await fut
    213     return True
    214 finally:

CancelledError: 

source

DomoCard.download_source_code

 DomoCard.download_source_code (download_folder='./EXPORT/',
                                file_name=None, debug_api:bool=False,
                                try_auto_share:bool=False)
Exported source
@patch_to(DomoCard)
async def download_source_code(
    self,
    download_folder="./EXPORT/",
    file_name=None,
    debug_api: bool = False,
    try_auto_share: bool = False,
):
    doc = await self.get_source_code(debug_api=debug_api, try_auto_share=try_auto_share)

    if file_name:
        download_path = os.path.join(
            download_folder, dmut.change_suffix(file_name, new_extension=".json")
        )
        dmfi.upsert_folder(download_path)

        with open(download_path, "w+", encoding="utf-8") as f:
            f.write(json.dumps(doc.content))
            return doc

    ddx_type = next(iter(doc.content))


    for key, value in doc.content[ddx_type].items():
        if key == "js":
            file_name = "app.js"
        elif key == "html":
            file_name = "index.html"
        elif key == "css":
            file_name = "styles.css"
        else:
            file_name = f"{key}.txt"

        download_path = os.path.join(
            download_folder, f"{ddx_type}/{self.id}/{file_name}"
        )
        dmfi.upsert_folder(download_path)

        with open(download_path, "w+", encoding="utf-8") as f:
            f.write(value)

    return doc
domo_card = await DomoCard.get_by_id(card_id=CARD_ID_APP, auth=auth, return_raw=False)

pprint((
    await domo_card.download_source_code(download_folder="../../test/ddx/")
))
AppDbDocument(_collection_id='475cc3b8-4318-406a-8070-c023bf0b9152',
              _identity_columns=[],
              _id='3ed44deb-ce70-4e88-9fc7-985cb7f21435',
              _created_on_dt=datetime.datetime(2023, 5, 5, 21, 54, 30, 178000, tzinfo=tzlocal()),
              _updated_on_dt=datetime.datetime(2023, 11, 2, 15, 18, 2, 629000, tzinfo=tzlocal()),
              content={'htmlBlank': {'css': '',
                                     'html': '',
                                     'js': '// DDX Bricks Wiki - See '
                                           'https://developer.domo.com/docs/ddx-bricks/getting-started-using-ddx-bricks\n'
                                           '// for tips on getting started, '
                                           'linking to Domo data and debugging '
                                           'your app\n'
                                           ' \n'
                                           '//Available globals\n'
                                           'var domo = window.domo; // For '
                                           'more on domo.js: '
                                           'https://developer.domo.com/docs/dev-studio-guides/domo-js#domo.get\n'
                                           'var datasets = window.datasets;\n'
                                           '\n'
                                           '//Step 1. Select your dataset(s) '
                                           'from the button in the bottom left '
                                           'corner\n'
                                           '\n'
                                           '\n'
                                           '\n'
                                           '//Step 2. Query your dataset(s): '
                                           'https://developer.domo.com/docs/dev-studio-references/data-api\n'
                                           "var fields = ['state', "
                                           "'revenue'];\n"
                                           "var groupby = ['state'];\n"
                                           'var query = '
                                           '`/data/v1/${datasets[0]}?fields=${fields.join()}&groupby=${groupby.join()}`;\n'
                                           'domo.get(query).then(handleResult);\n'
                                           '\n'
                                           '\n'
                                           '\n'
                                           '//Step 3. Do something with the '
                                           'data from the query result\n'
                                           'function handleResult(data){\n'
                                           '  console && console.log(data);\n'
                                           '}'}})