-
Notifications
You must be signed in to change notification settings - Fork 18
Add async support for Dataverse SDK #171
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Warning ADO PR pipeline YAML change detected This PR modifies Action required (post-merge): Re-enable / approve the updated YAML for:
Please resolve this comment after completing the post-merge steps. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Warning ADO PR pipeline YAML change detected This PR modifies Action required (post-merge): Re-enable / approve the updated YAML for:
Please resolve this comment after completing the post-merge steps. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Warning ADO PR pipeline YAML change detected This PR modifies Action required (post-merge): Re-enable / approve the updated YAML for:
Please resolve this comment after completing the post-merge steps. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Warning ADO PR pipeline YAML change detected This PR modifies Action required (post-merge): Re-enable / approve the updated YAML for:
Please resolve this comment after completing the post-merge steps. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Warning ADO PR pipeline YAML change detected This PR modifies Action required (post-merge): Re-enable / approve the updated YAML for:
Please resolve this comment after completing the post-merge steps. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Warning ADO PR pipeline YAML change detected This PR modifies Action required (post-merge): Re-enable / approve the updated YAML for:
Please resolve this comment after completing the post-merge steps. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Warning ADO PR pipeline YAML change detected This PR modifies Action required (post-merge): Re-enable / approve the updated YAML for:
Please resolve this comment after completing the post-merge steps. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Warning ADO PR pipeline YAML change detected This PR modifies Action required (post-merge): Re-enable / approve the updated YAML for:
Please resolve this comment after completing the post-merge steps. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Warning ADO PR pipeline YAML change detected This PR modifies Action required (post-merge): Re-enable / approve the updated YAML for:
Please resolve this comment after completing the post-merge steps. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Warning ADO PR pipeline YAML change detected This PR modifies Action required (post-merge): Re-enable / approve the updated YAML for:
Please resolve this comment after completing the post-merge steps. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Warning ADO PR pipeline YAML change detected This PR modifies Action required (post-merge): Re-enable / approve the updated YAML for:
Please resolve this comment after completing the post-merge steps. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Warning ADO PR pipeline YAML change detected This PR modifies Action required (post-merge): Re-enable / approve the updated YAML for:
Please resolve this comment after completing the post-merge steps. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Warning ADO PR pipeline YAML change detected This PR modifies Action required (post-merge): Re-enable / approve the updated YAML for:
Please resolve this comment after completing the post-merge steps. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Warning ADO PR pipeline YAML change detected This PR modifies Action required (post-merge): Re-enable / approve the updated YAML for:
Please resolve this comment after completing the post-merge steps. |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,2 @@ | ||
| # Copyright (c) Microsoft Corporation. | ||
| # Licensed under the MIT license. |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,57 @@ | ||
| # Copyright (c) Microsoft Corporation. | ||
| # Licensed under the MIT license. | ||
|
|
||
| """ | ||
| Async credential helper for the async example scripts. | ||
|
|
||
| azure-identity's InteractiveBrowserCredential is only available in the sync | ||
| namespace (azure.identity), not the async one (azure.identity.aio). This | ||
| module wraps the sync credential so it satisfies the AsyncTokenCredential | ||
| protocol required by AsyncDataverseClient. | ||
|
|
||
| Usage:: | ||
|
|
||
| from _auth import AsyncInteractiveBrowserCredential | ||
|
|
||
| credential = AsyncInteractiveBrowserCredential() | ||
| try: | ||
| async with AsyncDataverseClient(org_url, credential) as client: | ||
| ... | ||
| finally: | ||
| await credential.close() | ||
| """ | ||
|
|
||
| import asyncio | ||
| from concurrent.futures import ThreadPoolExecutor | ||
|
|
||
| from azure.identity import InteractiveBrowserCredential | ||
|
|
||
|
|
||
| class AsyncInteractiveBrowserCredential: | ||
| """ | ||
| Async wrapper around the sync InteractiveBrowserCredential. | ||
|
|
||
| get_token() is dispatched to a dedicated thread so the event loop stays | ||
| free during the browser popup / token exchange. Subsequent calls hit the | ||
| in-process token cache and return almost immediately. | ||
| """ | ||
|
|
||
| def __init__(self, **kwargs): | ||
| self._credential = InteractiveBrowserCredential(**kwargs) | ||
| self._executor = ThreadPoolExecutor(max_workers=1) | ||
|
|
||
| async def get_token(self, *scopes, **kwargs): | ||
| loop = asyncio.get_running_loop() | ||
| return await loop.run_in_executor( | ||
| self._executor, | ||
| lambda: self._credential.get_token(*scopes, **kwargs), | ||
| ) | ||
|
|
||
| async def close(self): | ||
| self._executor.shutdown(wait=False) | ||
|
|
||
| async def __aenter__(self): | ||
| return self | ||
|
|
||
| async def __aexit__(self, *_): | ||
| await self.close() |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,2 @@ | ||
| # Copyright (c) Microsoft Corporation. | ||
| # Licensed under the MIT license. |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,275 @@ | ||
| #!/usr/bin/env python3 | ||
| # Copyright (c) Microsoft Corporation. | ||
| # Licensed under the MIT license. | ||
|
|
||
| """ | ||
| PowerPlatform Dataverse Client - Async Alternate Keys & Upsert Example | ||
|
|
||
| Async equivalent of examples/advanced/alternate_keys_upsert.py. | ||
|
|
||
| Demonstrates the full workflow of creating alternate keys and using | ||
| them for upsert operations: | ||
| 1. Create a custom table with columns | ||
| 2. Define an alternate key on a column | ||
| 3. Wait for the key index to become Active | ||
| 4. Upsert records using the alternate key | ||
| 5. Verify records were created/updated correctly | ||
| 6. Clean up | ||
|
|
||
| Prerequisites: | ||
| pip install PowerPlatform-Dataverse-Client | ||
| pip install azure-identity | ||
| """ | ||
|
|
||
| import asyncio | ||
| import sys | ||
|
|
||
| from PowerPlatform.Dataverse.aio.async_client import AsyncDataverseClient | ||
| from PowerPlatform.Dataverse.models.upsert import UpsertItem | ||
| from pathlib import Path | ||
|
|
||
| sys.path.insert(0, str(Path(__file__).resolve().parents[1])) | ||
| from _auth import AsyncInteractiveBrowserCredential | ||
|
|
||
| # --- Config --- | ||
| TABLE_NAME = "new_AltKeyDemo" | ||
| KEY_COLUMN = "new_externalid" | ||
| KEY_NAME = "new_ExternalIdKey" | ||
| BACKOFF_DELAYS = (0, 3, 10, 20, 35) | ||
|
|
||
|
|
||
| # --- Helpers --- | ||
| async def backoff(coro_fn, *, delays=BACKOFF_DELAYS): | ||
| """Retry *coro_fn* with exponential-ish backoff on any exception.""" | ||
| last = None | ||
| total_delay = 0 | ||
| attempts = 0 | ||
| for d in delays: | ||
| if d: | ||
| await asyncio.sleep(d) | ||
| total_delay += d | ||
| attempts += 1 | ||
| try: | ||
| result = await coro_fn() | ||
| if attempts > 1: | ||
| retry_count = attempts - 1 | ||
| print(f" [INFO] Backoff succeeded after {retry_count} retry(s); " f"waited {total_delay}s total.") | ||
| return result | ||
| except Exception as ex: # noqa: BLE001 | ||
| last = ex | ||
| continue | ||
| if last: | ||
| if attempts: | ||
| retry_count = max(attempts - 1, 0) | ||
| print(f" [WARN] Backoff exhausted after {retry_count} retry(s); " f"waited {total_delay}s total.") | ||
| raise last | ||
|
|
||
|
|
||
| async def wait_for_key_active(client, table, key_name, max_wait=120): | ||
| """Poll get_alternate_keys until the key status is Active.""" | ||
| import time | ||
|
|
||
| start = time.time() | ||
| while time.time() - start < max_wait: | ||
| keys = await client.tables.get_alternate_keys(table) | ||
| for k in keys: | ||
| if k.schema_name == key_name: | ||
| print(f" Key status: {k.status}") | ||
| if k.status == "Active": | ||
| return k | ||
| if k.status == "Failed": | ||
| raise RuntimeError(f"Alternate key index failed: {k.schema_name}") | ||
| await asyncio.sleep(5) | ||
| raise TimeoutError(f"Key {key_name} did not become Active within {max_wait}s") | ||
|
|
||
|
|
||
| # --- Main --- | ||
| async def main(): | ||
| """Run the async alternate-keys & upsert E2E walkthrough.""" | ||
| print("PowerPlatform Dataverse Client - Async Alternate Keys & Upsert Example") | ||
| print("=" * 70) | ||
| print("This script demonstrates:") | ||
| print(" - Creating a custom table with columns") | ||
| print(" - Defining an alternate key on a column") | ||
| print(" - Waiting for the key index to become Active") | ||
| print(" - Upserting records via alternate key (create + update)") | ||
| print(" - Verifying records and listing keys") | ||
| print(" - Cleaning up (delete key, delete table)") | ||
| print("=" * 70) | ||
|
|
||
| entered = input("Enter Dataverse org URL (e.g. https://yourorg.crm.dynamics.com): ").strip() | ||
| if not entered: | ||
| print("No URL entered; exiting.") | ||
| sys.exit(1) | ||
|
|
||
| base_url = entered.rstrip("/") | ||
| credential = AsyncInteractiveBrowserCredential() | ||
| try: | ||
| async with AsyncDataverseClient(base_url, credential) as client: | ||
|
|
||
| # ------------------------------------------------------------------ | ||
| # Step 1: Create table (skip if already exists) | ||
| # ------------------------------------------------------------------ | ||
| print("\n1. Creating table...") | ||
| table_info = await client.tables.get(TABLE_NAME) | ||
| if table_info: | ||
| print(f" Table already exists: {TABLE_NAME} (skipped)") | ||
| else: | ||
| table_info = await backoff( | ||
| lambda: client.tables.create( | ||
| TABLE_NAME, | ||
| columns={ | ||
| KEY_COLUMN: "string", | ||
| "new_ProductName": "string", | ||
| "new_Price": "decimal", | ||
| }, | ||
| ) | ||
| ) | ||
| print(f" Created: {table_info.get('table_schema_name', TABLE_NAME)}") | ||
| await asyncio.sleep(10) # Wait for metadata propagation | ||
|
|
||
| # ------------------------------------------------------------------ | ||
| # Step 2: Create alternate key (skip if already exists) | ||
| # ------------------------------------------------------------------ | ||
| print("\n2. Creating alternate key...") | ||
| existing_keys = await client.tables.get_alternate_keys(TABLE_NAME) | ||
| existing_key = next((k for k in existing_keys if k.schema_name == KEY_NAME), None) | ||
| if existing_key: | ||
| print(f" Alternate key already exists: {KEY_NAME} (skipped)") | ||
| else: | ||
| key_info = await backoff( | ||
| lambda: client.tables.create_alternate_key(TABLE_NAME, KEY_NAME, [KEY_COLUMN.lower()]) | ||
| ) | ||
| print(f" Key created: {key_info.schema_name} (id={key_info.metadata_id})") | ||
|
|
||
| # ------------------------------------------------------------------ | ||
| # Step 3: Wait for key to become Active | ||
| # ------------------------------------------------------------------ | ||
| print("\n3. Waiting for key index to become Active...") | ||
| active_key = await wait_for_key_active(client, TABLE_NAME, KEY_NAME) | ||
| print(f" Key is Active: {active_key.schema_name}") | ||
|
|
||
| # ------------------------------------------------------------------ | ||
| # Step 4: Upsert records (creates new) | ||
| # ------------------------------------------------------------------ | ||
| print("\n4a. Upsert single record (PATCH, creates new)...") | ||
| await client.records.upsert( | ||
| TABLE_NAME, | ||
| [ | ||
| UpsertItem( | ||
| alternate_key={KEY_COLUMN.lower(): "EXT-001"}, | ||
| record={"new_productname": "Widget A", "new_price": 9.99}, | ||
| ), | ||
| ], | ||
| ) | ||
| print(" Upserted EXT-001 (single)") | ||
|
|
||
| print("\n4b. Upsert second record (single PATCH)...") | ||
| await client.records.upsert( | ||
| TABLE_NAME, | ||
| [ | ||
| UpsertItem( | ||
| alternate_key={KEY_COLUMN.lower(): "EXT-002"}, | ||
| record={"new_productname": "Widget B", "new_price": 19.99}, | ||
| ), | ||
| ], | ||
| ) | ||
| print(" Upserted EXT-002 (single)") | ||
|
|
||
| print("\n4c. Upsert multiple records (UpsertMultiple bulk)...") | ||
| await client.records.upsert( | ||
| TABLE_NAME, | ||
| [ | ||
| UpsertItem( | ||
| alternate_key={KEY_COLUMN.lower(): "EXT-003"}, | ||
| record={"new_productname": "Widget C", "new_price": 29.99}, | ||
| ), | ||
| UpsertItem( | ||
| alternate_key={KEY_COLUMN.lower(): "EXT-004"}, | ||
| record={"new_productname": "Widget D", "new_price": 39.99}, | ||
| ), | ||
| ], | ||
| ) | ||
| print(" Upserted EXT-003, EXT-004 (bulk)") | ||
|
|
||
| # ------------------------------------------------------------------ | ||
| # Step 5a: Upsert single update (PATCH, record exists) | ||
| # ------------------------------------------------------------------ | ||
| print("\n5a. Upsert single record (update existing via PATCH)...") | ||
| await client.records.upsert( | ||
| TABLE_NAME, | ||
| [ | ||
| UpsertItem( | ||
| alternate_key={KEY_COLUMN.lower(): "EXT-001"}, | ||
| record={"new_productname": "Widget A v2", "new_price": 12.99}, | ||
| ), | ||
| ], | ||
| ) | ||
| print(" Updated EXT-001 (single)") | ||
|
|
||
| # ------------------------------------------------------------------ | ||
| # Step 5b: Upsert multiple update (UpsertMultiple, records exist) | ||
| # ------------------------------------------------------------------ | ||
| print("\n5b. Upsert multiple records (update existing via UpsertMultiple)...") | ||
| await client.records.upsert( | ||
| TABLE_NAME, | ||
| [ | ||
| UpsertItem( | ||
| alternate_key={KEY_COLUMN.lower(): "EXT-003"}, | ||
| record={"new_productname": "Widget C v2", "new_price": 31.99}, | ||
| ), | ||
| UpsertItem( | ||
| alternate_key={KEY_COLUMN.lower(): "EXT-004"}, | ||
| record={"new_productname": "Widget D v2", "new_price": 41.99}, | ||
| ), | ||
| ], | ||
| ) | ||
| print(" Updated EXT-003, EXT-004 (bulk)") | ||
|
|
||
| # ------------------------------------------------------------------ | ||
| # Step 6: Verify | ||
| # ------------------------------------------------------------------ | ||
| print("\n6. Verifying records...") | ||
| async for record in client.records.list_pages( | ||
| TABLE_NAME, | ||
| select=["new_productname", "new_price", KEY_COLUMN.lower()], | ||
| ): | ||
| for item in record: | ||
| ext_id = item.get(KEY_COLUMN.lower(), "?") | ||
| name = item.get("new_productname", "?") | ||
| price = item.get("new_price", "?") | ||
| print(f" {ext_id}: {name} @ ${price}") | ||
|
|
||
| # ------------------------------------------------------------------ | ||
| # Step 7: List alternate keys | ||
| # ------------------------------------------------------------------ | ||
| print("\n7. Listing alternate keys...") | ||
| keys = await client.tables.get_alternate_keys(TABLE_NAME) | ||
| for k in keys: | ||
| print(f" {k.schema_name}: columns={k.key_attributes}, status={k.status}") | ||
|
|
||
| # ------------------------------------------------------------------ | ||
| # Step 8: Cleanup | ||
| # ------------------------------------------------------------------ | ||
| cleanup = input("\n8. Delete table and cleanup? (Y/n): ").strip() or "y" | ||
| if cleanup.lower() in ("y", "yes"): | ||
| try: | ||
| # Delete alternate key first | ||
| for k in keys: | ||
| await client.tables.delete_alternate_key(TABLE_NAME, k.metadata_id) | ||
| print(f" Deleted key: {k.schema_name}") | ||
| await asyncio.sleep(5) | ||
| await backoff(lambda: client.tables.delete(TABLE_NAME)) | ||
| print(f" Deleted table: {TABLE_NAME}") | ||
| except Exception as e: # noqa: BLE001 | ||
| print(f" Cleanup error: {e}") | ||
| else: | ||
| print(" Table kept for inspection.") | ||
| finally: | ||
| await credential.close() | ||
|
|
||
| print("\nDone.") | ||
|
|
||
|
|
||
| if __name__ == "__main__": | ||
| asyncio.run(main()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Warning
ADO PR pipeline YAML change detected
This PR modifies
.azdo/ci-pr.yaml. After merge, Azure DevOps may disable or require approval for the PR validation pipeline.Action required (post-merge): Re-enable / approve the updated YAML for:
Please resolve this comment after completing the post-merge steps.