A Python client library for Microsoft Dataverse that provides a unified interface for CRUD operations, SQL queries, table metadata management, and file uploads through the Dataverse Web API.
Source code | Package (PyPI) | API reference documentation | Product documentation | Samples
- Key features
- Getting started
- Key concepts
- Examples
- Async client
- Next steps
- Troubleshooting
- Contributing
- 🔄 CRUD Operations: Create, read, update, and delete records with support for bulk operations and automatic retry
- ⚡ True Bulk Operations: Automatically uses Dataverse's native
CreateMultiple,UpdateMultiple,UpsertMultiple, andBulkDeleteWeb API operations for maximum performance and transactional integrity - 🔍 Fluent QueryBuilder: Type-safe query construction with method chaining, composable filter expressions, and automatic OData generation
- 📊 SQL Queries: Execute read-only SQL queries via the Dataverse Web API
?sql=parameter - 🏗️ Table Management: Create, inspect, and delete custom tables and columns programmatically
- 🔗 Relationship Management: Create one-to-many and many-to-many relationships between tables with full metadata control
- 🐼 DataFrame Support: Pandas wrappers for all CRUD operations, returning DataFrames and Series
- 📎 File Operations: Upload files to Dataverse file columns with automatic chunking for large files
- 📦 Batch Operations: Send multiple CRUD, table metadata, and SQL query operations in a single HTTP request with optional transactional changesets
- 🔐 Azure Identity: Built-in authentication using Azure Identity credential providers with comprehensive support
- 🛡️ Error Handling: Structured exception hierarchy with detailed error context and retry guidance
- 📋 HTTP Diagnostics Logging: Opt-in file-based logging of all HTTP requests and responses with automatic redaction of sensitive headers (e.g.
Authorization)
- Python 3.10+ (3.10, 3.11, 3.12, 3.13 supported)
- Microsoft Dataverse environment with appropriate permissions
- OAuth authentication configured for your application
Install the PowerPlatform Dataverse Client using pip:
# Install the latest stable release
pip install PowerPlatform-Dataverse-Client(Optional) Install Claude Skill globally with the Client:
pip install PowerPlatform-Dataverse-Client && dataverse-install-claude-skillThis installs two Claude Skills that enable Claude Code to:
- dataverse-sdk-use: Apply SDK best practices for using the SDK in your applications
- dataverse-sdk-dev: Provide guidance for developing/contributing to the SDK itself
The skills work with both the Claude Code CLI and VSCode extension. Once installed, Claude will automatically use the appropriate skill when working with Dataverse operations. For more information on Claude Skill see https://platform.claude.com/docs/en/agents-and-tools/agent-skills/overview. See skill definitions here: .claude/skills/dataverse-sdk-use/SKILL.md and .claude/skills/dataverse-sdk-dev/SKILL.md.
For development from source (Claude Skill auto loaded):
git clone https://github.com/microsoft/PowerPlatform-DataverseClient-Python.git
cd PowerPlatform-DataverseClient-Python
pip install -e .The client requires any Azure Identity TokenCredential implementation for OAuth authentication with Dataverse:
from azure.identity import (
InteractiveBrowserCredential,
ClientSecretCredential,
CertificateCredential,
AzureCliCredential
)
from PowerPlatform.Dataverse.client import DataverseClient
# Development options
credential = InteractiveBrowserCredential() # Browser authentication
# credential = AzureCliCredential() # If logged in via 'az login'
# For Production options (service principal / app-only auth)
# credential = ClientSecretCredential(
# tenant_id="...", # ID of the service principal's tenant. Also called its "directory" ID.
# client_id="...", # The service principal's client ID
# client_secret="...", # Client secret value generated for the app (store in Key Vault / env var)
# )
# credential = CertificateCredential(tenant_id, client_id, cert_path)
client = DataverseClient("https://yourorg.crm.dynamics.com", credential)Ref: https://learn.microsoft.com/en-us/python/api/azure-identity/azure.identity?view=azure-python
Set up service principal authentication: To use
ClientSecretCredentialorCertificateCredentialyou must first register an Azure AD app and grant it access to your Dataverse environment as an application user. See Use OAuth with Dataverse (covers app registration, obtainingtenant_id/client_id/client_secret, all credential types, and security configuration).
The SDK provides a simple, pythonic interface for Dataverse operations:
| Concept | Description |
|---|---|
| DataverseClient | Main entry point; provides records, query, tables, files, and batch namespaces |
| Context Manager | Use with DataverseClient(...) as client: for automatic cleanup and HTTP connection pooling |
| Namespaces | Operations are organized into client.records (CRUD & OData queries), client.query (QueryBuilder & SQL), client.tables (metadata), client.files (file uploads), and client.batch (batch requests) |
| Records | Dataverse records represented as Python dictionaries with column schema names |
| Schema names | Use table schema names ("account", "new_MyTestTable") and column schema names ("name", "new_MyTestColumn"). See: Table definitions in Microsoft Dataverse |
| Bulk Operations | Efficient bulk processing for multiple records with automatic optimization |
| QueryBuilder | Preferred query API: client.query.builder() with composable where(col(...)) filters, formatted values, expand, and streaming; use records.list() only as a shortcut for simple filter+select |
| Paging | Automatic handling of large result sets with iterators |
| Structured Errors | Detailed exception hierarchy with retry guidance and diagnostic information |
| Customization prefix values | Custom tables and columns require a customization prefix value to be included for all operations (e.g., "new_MyTestTable", not "MyTestTable"). See: Table definitions in Microsoft Dataverse |
from azure.identity import InteractiveBrowserCredential
from PowerPlatform.Dataverse.client import DataverseClient
# Connect to Dataverse
credential = InteractiveBrowserCredential()
with DataverseClient("https://yourorg.crm.dynamics.com", credential) as client:
# Create a contact
contact_id = client.records.create("contact", {"firstname": "John", "lastname": "Doe"})
# Read the contact back
contact = client.records.retrieve("contact", contact_id, select=["firstname", "lastname"])
print(f"Created: {contact['firstname']} {contact['lastname']}")
# Clean up
client.records.delete("contact", contact_id)
# Session closed, caches cleared automatically# Create a record
account_id = client.records.create("account", {"name": "Contoso Ltd"})
# Read a record
account = client.records.retrieve("account", account_id)
print(account["name"])
# Read with expand — fetch a related record in the same HTTP request
account = client.records.retrieve(
"account", account_id,
select=["name"],
expand=["primarycontactid"],
)
contact = (account.get("primarycontactid") or {})
print(contact.get("fullname"))
# Update a record
client.records.update("account", account_id, {"telephone1": "555-0199"})
# Delete a record
client.records.delete("account", account_id)Deprecation note (migration from beta):
client.records.get()is deprecated and emits aDeprecationWarning. Useclient.records.retrieve(table, record_id)for single-record reads (returnsNoneon 404 instead of raising) andclient.records.list(table, filter=...)/client.records.list_pages(...)for multi-record queries. Return types differ from the betaget(), so the codemod flags these for manual review rather than rewriting them — rundataverse-migrate(see Query data) to locate every call site.
# Bulk create
payloads = [
{"name": "Company A"},
{"name": "Company B"},
{"name": "Company C"}
]
ids = client.records.create("account", payloads)
# Bulk update (broadcast same change to all)
client.records.update("account", ids, {"exchangerate": 1})
# Bulk delete
client.records.delete("account", ids, use_bulk_delete=True)Use client.records.upsert() to create or update records identified by alternate keys. When the
key matches an existing record it is updated; otherwise the record is created. A single item uses
a PATCH request; multiple items use the UpsertMultiple bulk action.
Prerequisite: The table must have an alternate key configured in Dataverse for the columns used in
alternate_key. Alternate keys are defined in the table's metadata (Power Apps maker portal → Table → Keys, or via the Dataverse API). Without a configured alternate key, upsert requests will be rejected by Dataverse with a 400 error.
Set up the key once before running the upsert examples:
# One-time setup for the examples below: make accountnumber an alternate key on account
key = client.tables.create_alternate_key(
"account",
"account_accountnumber_ak",
["accountnumber"],
display_name="Account Number",
)
print(f"Created key {key.schema_name} ({key.metadata_id}), status={key.status}")
# Optional: check key status (useful right after creation; status transitions Pending -> Active)
for k in client.tables.get_alternate_keys("account"):
if k.schema_name == "account_accountnumber_ak":
print(f"{k.schema_name}: {k.status}")Upsert usage
from PowerPlatform.Dataverse.models import UpsertItem
# Upsert a single record
client.records.upsert("account", [
UpsertItem(
alternate_key={"accountnumber": "ACC-001"},
record={"name": "Contoso Ltd", "telephone1": "555-0100"},
)
])
# Upsert multiple records (uses UpsertMultiple bulk action)
client.records.upsert("account", [
UpsertItem(
alternate_key={"accountnumber": "ACC-001"},
record={"name": "Contoso Ltd"},
),
UpsertItem(
alternate_key={"accountnumber": "ACC-002"},
record={"name": "Fabrikam Inc"},
),
])
# Composite alternate key (multiple columns identify the record)
client.records.upsert("account", [
UpsertItem(
alternate_key={"accountnumber": "ACC-001", "address1_postalcode": "98052"},
record={"name": "Contoso Ltd"},
)
])
# Plain dict syntax (no import needed)
client.records.upsert("account", [
{
"alternate_key": {"accountnumber": "ACC-001"},
"record": {"name": "Contoso Ltd"},
}
])The SDK provides pandas wrappers for all CRUD operations via the client.dataframe namespace, using DataFrames and Series for input and output.
Note:
client.dataframe.get()is deprecated. Use the GA patterns shown below instead.
import pandas as pd
from PowerPlatform.Dataverse.models.filters import col
# Query records as a single DataFrame (GA builder pattern)
df = (client.query.builder("account")
.select("name", "telephone1")
.where(col("statecode") == 0)
.execute()
.to_dataframe())
print(f"Found {len(df)} accounts")
# Limit results with top for large tables
df = client.query.builder("account").select("name").top(100).execute().to_dataframe()
# Create records from a DataFrame (returns a Series of GUIDs)
new_accounts = pd.DataFrame([
{"name": "Contoso", "telephone1": "555-0100"},
{"name": "Fabrikam", "telephone1": "555-0200"},
])
new_accounts["accountid"] = client.dataframe.create("account", new_accounts)
# Update records from a DataFrame (id_column identifies the GUID column)
new_accounts["telephone1"] = ["555-0199", "555-0299"]
client.dataframe.update("account", new_accounts, id_column="accountid")
# Clear a field by setting clear_nulls=True (by default, NaN/None fields are skipped)
df = pd.DataFrame([{"accountid": new_accounts["accountid"].iloc[0], "websiteurl": None}])
client.dataframe.update("account", df, id_column="accountid", clear_nulls=True)
# Delete records by passing a Series of GUIDs
client.dataframe.delete("account", new_accounts["accountid"])
# SQL query directly to DataFrame (supports JOINs, aggregates, GROUP BY)
df = client.dataframe.sql(
"SELECT a.name, COUNT(c.contactid) as contacts "
"FROM account a "
"JOIN contact c ON a.accountid = c.parentcustomerid "
"GROUP BY a.name"
)The QueryBuilder is the recommended way to query records. It provides a fluent, type-safe interface that generates correct OData queries automatically — no need to remember OData filter syntax.
# Fluent query builder (recommended)
from PowerPlatform.Dataverse.models.filters import col
for record in (client.query.builder("account")
.select("name", "revenue")
.where(col("statecode") == 0)
.where(col("revenue") > 1000000)
.order_by("revenue", descending=True)
.top(100)
.page_size(50)
.execute()):
print(f"{record['name']}: {record['revenue']}")The QueryBuilder handles value formatting, column name casing, and OData syntax automatically. Filter expressions are built with col() and standard Python operators:
# Get results as a pandas DataFrame (consolidates all pages)
df = (client.query.builder("account")
.select("name", "telephone1")
.where(col("statecode") == 0)
.top(100)
.execute()
.to_dataframe())
print(f"Got {len(df)} accounts")# Comparison filters using col() expressions
query = (client.query.builder("contact")
.where(col("email").contains("outlook.com")) # contains(email from domain, 'outlook.com')
.where(col("creditlimit").between(10000, 50000)) # credit limit ge 10000 and revenue le 50000
.where(col("telephone1").is_null()) # telephone1 eq null
)For complex logic (OR, NOT, grouping), compose expressions with &, |, ~:
from PowerPlatform.Dataverse.models import col
# OR conditions: (statecode = 0 OR statecode = 1) AND revenue > 100k
for record in (client.query.builder("account")
.select("name", "revenue")
.where(((col("statecode") == 0) | (col("statecode") == 1))
& (col("revenue") > 100000))
.execute()):
print(record["name"])
# NOT, between, and in operators
for record in (client.query.builder("account")
.where(col("statecode") != 2) # NOT inactive
.where(col("revenue").between(100000, 500000)) # revenue in range
.execute()):
print(record["name"])Formatted values and annotations -- request localized labels, currency symbols, and display names:
# Get formatted values (choice labels, currency, lookup names) — via query builder
for record in (client.query.builder("account")
.select("name", "statecode", "revenue")
.include_formatted_values()
.execute()):
status = record["statecode@OData.Community.Display.V1.FormattedValue"]
print(f"{record['name']}: {status}")
# Get formatted values — via records.list() / records.retrieve() include_annotations param
result = client.records.list(
"account",
select=["name", "statecode"],
include_annotations="OData.Community.Display.V1.FormattedValue",
)
for record in result:
label = record.get("statecode@OData.Community.Display.V1.FormattedValue")
print(f"{record['name']}: {label}")
record = client.records.retrieve(
"account", account_id,
select=["name", "statuscode"],
include_annotations="OData.Community.Display.V1.FormattedValue",
)
if record:
print(record.get("statuscode@OData.Community.Display.V1.FormattedValue"))Nested expand with options -- expand navigation properties with $select, $filter, $orderby, and $top:
from PowerPlatform.Dataverse.models import ExpandOption
# Expand related tasks with filtering and sorting
for record in (client.query.builder("account")
.select("name")
.expand(ExpandOption("Account_Tasks")
.select("subject", "createdon")
.filter("contains(subject,'Task')")
.order_by("createdon", descending=True)
.top(5))
.execute()):
print(record["name"], record.get("Account_Tasks"))Paging -- use execute_pages() for streaming large result sets with full builder options (filtering, sorting, formatted values). records.list() and records.list_pages() are simpler shortcuts for string-based OData filter queries:
# Preferred: query.builder().execute_pages() — stream one page at a time, memory stays flat
# Supports composable filters, sorting, formatted values, and expand with nested selects
for page_num, page in enumerate(
client.query.builder("account")
.select("accountid", "name", "revenue")
.where(col("statecode") == 0)
.order_by("name")
.page_size(500) # optional: override Dataverse default (~5000/page)
.execute_pages()
):
print(f"Page {page_num + 1}: {len(page)} records")
for record in page:
print(f" {record['name']}")
# Simple shortcut: records.list() — automatic paging, all records in memory
# Use for basic filter+select queries; string OData filter only (no composable expressions)
result = client.records.list(
"account",
filter="statecode eq 0",
select=["name", "revenue"],
orderby=["name asc"], # optional sort
top=500, # bounds total records returned and number of HTTP round-trips
page_size=200, # optional: hint Dataverse default page size
)
for record in result:
print(record["name"])
# Simple streaming shortcut: records.list_pages() — same params as records.list(), yields one page at a time
for page_num, page in enumerate(
client.records.list_pages("account", filter="statecode eq 0", select=["name"], orderby=["name asc"])
):
print(f"Page {page_num + 1}: {len(page)} records")
for record in page:
print(record["name"])Deprecation note:
execute(by_page=True)andexecute(by_page=False)are deprecated and emit aUserWarning. Replace withexecute_pages()(streaming) or plainexecute()(eager).QueryBuilder.to_dataframe()is also deprecated; use.execute().to_dataframe()instead. The migration tool rewrites all of these automatically — install withpip install PowerPlatform-Dataverse-Client[migration]and rundataverse-migrate path/to/your/scripts/(orpython -m PowerPlatform.Dataverse.migration.migrate_v0_to_v1for development checkouts).
Record count -- include $count=true in the request:
# Via query builder
results = (client.query.builder("account")
.where(col("statecode") == 0)
.count()
.execute())
print(len(results)) # QueryResult is sized — use len() to get the count
# Via records.list() — count=True adds $count=true to the OData request
results = client.records.list("account", filter="statecode eq 0", count=True)
print(len(results))Accessing the count:
QueryResultis iterable and sized — calllen(results)to get the number of records. There is no separate.countor.total_countattribute. Because the client auto-paginates,len(results)reflects every matching row fetched; the server's raw@odata.countannotation is not surfaced as a standalone field.
FetchXML queries -- client.query.fetchxml() returns an inert FetchXmlQuery object; no HTTP request is made until you call .execute() or .execute_pages():
xml = """
<fetch>
<entity name="account">
<attribute name="name"/>
<attribute name="revenue"/>
<filter><condition attribute="statecode" operator="eq" value="0"/></filter>
</entity>
</fetch>
"""
# .execute() — blocking, fetches all pages and returns a single QueryResult
result = client.query.fetchxml(xml).execute()
df = result.to_dataframe()
# .execute_pages() — streaming, yields one QueryResult per HTTP page
# Use count="N" in the FetchXML <fetch> element to set page size
for page_num, page in enumerate(client.query.fetchxml(xml).execute_pages()):
print(f"Page {page_num + 1}: {len(page)} records")
for record in page:
print(record["name"])SQL queries provide an alternative read-only query syntax with support for JOINs, aggregates, GROUP BY, DISTINCT, and OFFSET FETCH pagination:
# Basic query
results = client.query.sql(
"SELECT TOP 10 accountid, name FROM account WHERE statecode = 0"
)
# JOINs and aggregates work
results = client.query.sql(
"SELECT a.name, COUNT(c.contactid) as cnt "
"FROM account a "
"JOIN contact c ON a.accountid = c.parentcustomerid "
"GROUP BY a.name"
)
# SQL results directly as a DataFrame
df = client.dataframe.sql(
"SELECT name, revenue FROM account ORDER BY revenue DESC"
)
# Discover columns from metadata (schema-discovery helper, kept at GA)
cols_meta = client.query.sql_columns("account")
col_names = [c["LogicalName"] for c in cols_meta]
# Build queries using the discovered column names
sql = f"SELECT TOP 10 {', '.join(col_names[:5])} FROM account"
df = client.dataframe.sql(sql)Simple list shortcut -- records.list() accepts a raw OData filter string for basic queries. For anything beyond simple filter+select, prefer client.query.builder() (composable filters, formatted values, nested expand):
# records.list() shortcut — raw OData filter string, all records loaded into memory
# Column names in filter must be lowercase logical names
for record in client.records.list(
"account",
select=["name"],
filter="statecode eq 0",
top=100,
):
print(record["name"])
# Discover navigation property names for $expand (metadata-discovery helper, kept at GA)
nav_props = client.query.odata_expands("account") # → list of navigation property metadata
# Expand navigation properties using the query builder
from PowerPlatform.Dataverse.models.query_builder import ExpandOption
for record in (client.query.builder("contact")
.select("fullname")
.expand(ExpandOption("parentcustomerid_account").select("name"))
.execute()):
acct = record.get("parentcustomerid_account") or {}
print(f"{record['fullname']} -> {acct.get('name')}")
# Build @odata.bind for lookup fields (deprecated helper, still functional with DeprecationWarning)
bind = client.query.odata_bind("contact", "account", account_id)
# Returns: {"parentcustomerid_account@odata.bind": "/accounts(guid)"}
client.records.create("contact", {"firstname": "Jane", **bind})# Create a custom table, including the customization prefix value in the schema names for the table and columns.
table_info = client.tables.create("new_Product", {
"new_Code": "string",
"new_Description": "memo",
"new_Price": "decimal",
"new_Active": "bool"
})
# Create with custom primary column name and solution assignment
table_info = client.tables.create(
"new_Product",
columns={
"new_Code": "string",
"new_Price": "decimal"
},
solution="MyPublisher", # Optional: add to specific solution
primary_column="new_ProductName", # Optional: custom primary column (default is "{customization prefix value}_Name")
)
# Get table information
info = client.tables.get("new_Product")
print(f"Logical name: {info['table_logical_name']}")
print(f"Entity set: {info['entity_set_name']}")
# List all tables
tables = client.tables.list()
for table in tables:
print(table)
# Add columns to existing table (columns must include customization prefix value)
client.tables.add_columns("new_Product", {"new_Category": "string"})
# Remove columns
client.tables.remove_columns("new_Product", ["new_Category"])
# List all columns (attributes) for a table to discover schema
columns = client.tables.list_columns("account")
for col in columns:
print(f"{col['LogicalName']} ({col.get('AttributeType')})")
# List only specific properties
columns = client.tables.list_columns(
"account",
select=["LogicalName", "SchemaName", "AttributeType"],
filter="AttributeType eq 'String'",
)
# Clean up
client.tables.delete("new_Product")Important: All custom column names must include the customization prefix value (e.g.,
"new_"). This ensures explicit, predictable naming and aligns with Dataverse metadata requirements.
Create relationships between tables using the relationship API. For a complete working example, see examples/advanced/relationships.py.
from PowerPlatform.Dataverse.models import (
CascadeConfiguration,
Label,
LocalizedLabel,
LookupAttributeMetadata,
ManyToManyRelationshipMetadata,
OneToManyRelationshipMetadata,
)
# Create a one-to-many relationship: Department (1) -> Employee (N)
# This adds a "Department" lookup field to the Employee table
lookup = LookupAttributeMetadata(
schema_name="new_DepartmentId",
display_name=Label(localized_labels=[LocalizedLabel(label="Department", language_code=1033)]),
)
relationship = OneToManyRelationshipMetadata(
schema_name="new_Department_Employee",
referenced_entity="new_department", # Parent table (the "one" side)
referencing_entity="new_employee", # Child table (the "many" side)
referenced_attribute="new_departmentid",
)
result = client.tables.create_one_to_many_relationship(lookup, relationship)
print(f"Created lookup field: {result.lookup_schema_name}")
# Create a many-to-many relationship: Employee (N) <-> Project (N)
# Employees work on multiple projects; projects have multiple team members
m2m_relationship = ManyToManyRelationshipMetadata(
schema_name="new_employee_project",
entity1_logical_name="new_employee",
entity2_logical_name="new_project",
)
result = client.tables.create_many_to_many_relationship(m2m_relationship)
print(f"Created M:N relationship: {result.relationship_schema_name}")
# Query relationship metadata
rel = client.tables.get_relationship("new_Department_Employee")
if rel:
print(f"Found: {rel.relationship_schema_name}")
# List all relationships
rels = client.tables.list_relationships()
for rel in rels:
print(f"{rel['SchemaName']} ({rel.get('RelationshipType')})")
# List relationships for a specific table (one-to-many + many-to-one + many-to-many)
account_rels = client.tables.list_table_relationships("account")
for rel in account_rels:
print(f"{rel['SchemaName']} -> {rel.get('RelationshipType')}")
# Delete a relationship
client.tables.delete_relationship(result.relationship_id)For simpler scenarios, use the convenience method:
# Quick way to create a lookup field with sensible defaults
result = client.tables.create_lookup_field(
referencing_table="contact", # Child table gets the lookup field
lookup_field_name="new_AccountId",
referenced_table="account", # Parent table being referenced
display_name="Account",
)# Upload a file to a record
client.files.upload(
"account",
account_id,
"new_Document", # If the file column doesn't exist, it will be created automatically
"/path/to/document.pdf",
)Use client.batch to send multiple operations in one HTTP request. The batch namespace mirrors client.records, client.tables, and client.query.
# Build a batch request and add operations
batch = client.batch.new()
batch.records.create("account", {"name": "Contoso"})
batch.records.create("account", [{"name": "Fabrikam"}, {"name": "Woodgrove"}])
batch.records.update("account", account_id, {"telephone1": "555-0100"})
batch.records.delete("account", old_id)
batch.records.retrieve("account", account_id, select=["name"], expand=["primarycontactid"]) # single record with expand
batch.records.list( # multi-record, single page
"account",
filter="statecode eq 0",
select=["name"],
orderby=["name asc"],
top=50,
)
result = batch.execute()
for item in result.responses:
if item.is_success:
print(f"[OK] {item.status_code} entity_id={item.entity_id}")
else:
print(f"[ERR] {item.status_code}: {item.error_message}")Transactional changeset — all operations in a changeset succeed or roll back together:
batch = client.batch.new()
with batch.changeset() as cs:
lead_ref = cs.records.create("lead", {"firstname": "Ada"})
contact_ref = cs.records.create("contact", {"firstname": "Ada"})
cs.records.create("account", {
"name": "Babbage & Co.",
"originatingleadid@odata.bind": lead_ref,
"primarycontactid@odata.bind": contact_ref,
})
result = batch.execute()
print(f"Created {len(result.entity_ids)} records atomically")Table metadata and SQL queries in a batch:
batch = client.batch.new()
batch.tables.create("new_Product", {"new_Price": "decimal", "new_InStock": "bool"})
batch.tables.add_columns("new_Product", {"new_Rating": "int"})
batch.tables.get("new_Product")
batch.query.sql("SELECT TOP 5 name FROM account")
result = batch.execute()Continue on error — attempt all operations even when one fails:
result = batch.execute(continue_on_error=True)
print(f"Succeeded: {len(result.succeeded)}, Failed: {len(result.failed)}")
for item in result.failed:
print(f"[ERR] {item.status_code}: {item.error_message}")
continue_on_error=Trueonly affects how Dataverse handles per-operation failures on the server. Client-side errors raised before the batch is sent — such asValidationError(e.g. exceeding the 1000-operation limit) orMetadataErrorfrom metadata pre-resolution (tables.delete,tables.add_columns,tables.remove_columns) — are still raised as exceptions and must be handled withtry/except.
DataFrame integration -- feed pandas DataFrames directly into a batch:
import pandas as pd
batch = client.batch.new()
# Create records from a DataFrame
df = pd.DataFrame([{"name": "Contoso"}, {"name": "Fabrikam"}])
batch.dataframe.create("account", df)
# Update records from a DataFrame
updates = pd.DataFrame([
{"accountid": id1, "telephone1": "555-0100"},
{"accountid": id2, "telephone1": "555-0200"},
])
batch.dataframe.update("account", updates, id_column="accountid")
# Delete records from a Series
batch.dataframe.delete("account", pd.Series([id1, id2]))
result = batch.execute()For a complete example see examples/advanced/batch.py.
The SDK ships a full async client, AsyncDataverseClient, for use in async applications. It mirrors every operation of the sync client — the same namespaces (records, query, tables, files, batch), the same method signatures, and the same return types.
ⓘ Async snippets below are fragments. Every example after
### Quick startassumes it is nested inside anasync def main(): ...body, withclientandcredentialalready constructed as shown in Quick start. Copying a fragment into a top-level.pyfile will raiseSyntaxError: 'await' outside function. See examples/aio/ for full runnable scripts.
The async client requires aiohttp, which is an optional extra:
pip install "PowerPlatform-Dataverse-Client[async]"import asyncio
from azure.identity import InteractiveBrowserCredential
from PowerPlatform.Dataverse.aio import AsyncDataverseClient
async def main():
# Connect to Dataverse
credential = InteractiveBrowserCredential()
async with DefaultAzureCredential() as credential:
async with AsyncDataverseClient("https://yourorg.crm.dynamics.com", credential) as client:
# Create a contact
contact_id = await client.records.create("contact", {"firstname": "John", "lastname": "Doe"})
# Read it back
contact = await client.records.retrieve("contact", contact_id, select=["firstname", "lastname"])
print(f"Created: {contact['firstname']} {contact['lastname']}")
# Clean up
await client.records.delete("contact", contact_id)
asyncio.run(main())# given: credential constructed as in Quick start (e.g. DefaultAzureCredential())
client = AsyncDataverseClient("https://yourorg.crm.dynamics.com", credential)
try:
account_id = await client.records.create("account", {"name": "Contoso Ltd"})
finally:
await client.aclose()The async query builder API is identical to the sync one:
# given: client is an open AsyncDataverseClient
from PowerPlatform.Dataverse.models.filters import col
# Execute and collect all results
result = await (
client.query.builder("account")
.select("name", "telephone1")
.where(col("statecode") == 0)
.top(10)
.execute()
)
for record in result:
print(record["name"])
# Lazy page-by-page iteration (memory-efficient for large sets)
async for page in (
client.query.builder("account")
.select("name")
.page_size(500)
.execute_pages()
):
for record in page:
print(record["name"])# given: client is open; account_id is the GUID returned by an earlier records.create
batch = client.batch.new()
batch.records.create("account", {"name": "Alpha"})
batch.records.create("account", {"name": "Beta"})
result = await batch.execute()
print(f"Created {len(result.entity_ids)} records")
# Atomic changeset
batch = client.batch.new()
async with batch.changeset() as cs:
ref = cs.records.create("contact", {"firstname": "Alice"})
cs.records.update("account", account_id, {"primarycontactid@odata.bind": ref})
result = await batch.execute()See examples/aio/ for async equivalents of all sync examples.
Explore our comprehensive examples in the examples/ directory:
🌱 Getting Started:
- Installation & Setup - Validate installation and basic usage patterns
- Functional Testing - Test core functionality in your environment
🚀 Advanced Usage:
- Complete Walkthrough - Full feature demonstration with production patterns
- Relationship Management - Create and manage table relationships
- File Upload - Upload files to Dataverse file columns
- Batch Operations - Send multiple operations in a single request with changesets
📖 See the examples README for detailed guidance and learning progression.
For comprehensive information on Microsoft Dataverse and related technologies:
| Resource | Description |
|---|---|
| Dataverse Developer Guide | Complete developer documentation for Microsoft Dataverse |
| Dataverse Web API Reference | Detailed Web API reference and examples |
| Azure Identity for Python | Authentication library documentation and credential types |
| Power Platform Developer Center | Broader Power Platform development resources |
| Dataverse SDK for .NET | Official .NET SDK for Microsoft Dataverse |
The SDK raises structured exceptions that all inherit from a common base, DataverseError. Catching the base class is the safest fallback; catch the specific subclasses when you need to react differently to validation, metadata, SQL, or HTTP failures.
Exception
└── DataverseError # Base class for every SDK-raised error
├── ValidationError # Client-side input validation failed
├── MetadataError # Table/column/relationship metadata problem
├── SQLParseError # SQL query could not be parsed
└── HttpError # Dataverse Web API returned a non-success status
All classes are importable from PowerPlatform.Dataverse.core.errors (or re-exported from PowerPlatform.Dataverse.core).
| Exception | When it is raised | Typical examples |
|---|---|---|
DataverseError |
Base class. Catch it to handle any SDK-originated failure in one block. | Fallback except clause. |
ValidationError |
Client-side argument validation fails before a request is sent. | Empty/None table name, missing primary key, non-string SQL, invalid batch payload, unsupported column type in create_table. |
MetadataError |
A metadata lookup or definition operation fails — usually an unknown or invalid table, column, or relationship. | Unknown logical name passed to batch.create/update/delete, tables.create_column, relationships.create_*, or tables.delete. |
SQLParseError |
A SQL string passed to client.query.sql(...) cannot be parsed into a valid SELECT. |
Unsupported SQL syntax, write statements (INSERT/UPDATE/DELETE), malformed queries. |
HttpError |
The Dataverse Web API responded with a non-2xx status. Exposes status_code, service_error_code, correlation_id, service_request_id, retry_after, and is_transient (set for 408/429/503/504). |
401 (auth), 403 (permissions), 404 (record/table not found), 412 (concurrency/ETag), 429 (throttling), 5xx (server). |
Note on timeouts and network errors. Low-level network failures from the underlying
httpxclient are not wrapped by the SDK and surface as their originalhttpxexceptions — most commonlyhttpx.ReadTimeout,httpx.ConnectTimeout, andhttpx.TimeoutException(their common base) on slow endpoints such asrelationships.list()or large queries, andhttpx.ConnectError/httpx.NetworkErrorfor connectivity issues. Catchhttpx.HTTPErrorto cover all of them, orhttpx.TimeoutExceptionfor timeouts specifically. The async client (PowerPlatform.Dataverse.aio) surfacesaiohttp.ClientErrorandasyncio.TimeoutErroranalogously.
import httpx
from PowerPlatform.Dataverse.client import DataverseClient
from PowerPlatform.Dataverse.core.errors import (
DataverseError,
HttpError,
MetadataError,
SQLParseError,
ValidationError,
)
try:
client.records.retrieve("account", "invalid-id")
except ValidationError as e:
print(f"Validation error: {e.message} (subcode={e.subcode})")
except MetadataError as e:
print(f"Metadata error: {e.message} (subcode={e.subcode})")
except SQLParseError as e:
print(f"SQL parse error: {e.message}")
except HttpError as e:
print(f"HTTP {e.status_code}: {e.message}")
print(f"Code: {e.code} Subcode: {e.subcode}")
print(f"Service request id: {e.details.get('service_request_id')}")
if e.is_transient:
print(f"Transient — retry after {e.details.get('retry_after')}s")
except httpx.TimeoutException as e:
# ReadTimeout / ConnectTimeout / WriteTimeout from the underlying transport
print(f"Request timed out: {e}")
except DataverseError as e:
# Catch-all for any other SDK-raised error
print(f"Dataverse error [{e.code}]: {e.message}")Common fixes:
- Verify environment URL format:
https://yourorg.crm.dynamics.com(no trailing slash) - Ensure Azure Identity credentials have proper Dataverse permissions
- Check app registration permissions are granted and admin-consented
For optimal performance in production environments:
| Best Practice | Description |
|---|---|
| Prefer QueryBuilder for queries | Use client.query.builder() for filtering, sorting, expansion, or formatted values; use records.list() only as a shortcut for simple filter+select |
| Bulk Operations | Pass lists to records.create(), records.update() for automatic bulk processing, for records.delete(), set use_bulk_delete when passing lists to use bulk operation |
| Select Fields | Specify select parameter to limit returned columns and reduce payload size |
| Page Size Control | Use top and page_size parameters to control memory usage; use execute_pages() for large result sets |
| Connection Reuse | Reuse DataverseClient instances across operations |
| Production Credentials | Use ClientSecretCredential or CertificateCredential for unattended operations |
| Error Handling | Implement retry logic for transient errors (e.is_transient) |
Enable file-based HTTP logging to capture all requests and responses for debugging. Sensitive headers (e.g. Authorization) are automatically redacted.
from PowerPlatform.Dataverse.client import DataverseClient
from PowerPlatform.Dataverse.core import DataverseConfig, LogConfig
log_cfg = LogConfig(
log_folder="./my_logs", # Directory for log files (created if missing)
log_file_prefix="crm_debug", # Filename prefix; timestamp appended automatically
max_body_bytes=4096, # Bytes of body to capture per entry — 0 (default) disables body capture
)
config = DataverseConfig(log_config=log_cfg)
client = DataverseClient("https://yourorg.crm.dynamics.com", credential, config=config)Each log file is timestamped and rotated automatically (default 10 MB per file, 5 backups). Sample output:
[2026-04-11T15:27:31-0700] DEBUG >>> REQUEST POST https://yourorg.crm.dynamics.com/api/data/v9.2/accounts
Authorization: [REDACTED]
Accept: application/json
Content-Type: application/json
OData-MaxVersion: 4.0
OData-Version: 4.0
User-Agent: DataverseSvcPythonClient:0.1.0b8
x-ms-client-request-id: 7050c4d0-6bcc-48e3-a310-b4e8fa18ac69
x-ms-correlation-id: 4cace77d-e4ee-4419-8c65-fc62beed6e71
Body: {"name":"Contoso Ltd"}
[2026-04-11T15:27:31-0700] DEBUG <<< RESPONSE 204 POST https://yourorg.crm.dynamics.com/api/data/v9.2/accounts (78.0ms)
Content-Type: application/json; odata.metadata=minimal
OData-Version: 4.0
x-ms-service-request-id: a6d0b6c4-5dd1-47cb-83eb-b6fccf754216
x-ms-ratelimit-burst-remaining-xrm-requests: 7998
Security note: This feature is intended for development and debugging only. Log files are plaintext and may contain PII, sensitive business data, and Dataverse record IDs — even with
max_body_bytes=0(the default), request URLs can include filter values and record identifiers.
- Never enable in production. If required for production diagnostics, keep
max_body_bytes=0and treat log files as regulated data under your organization's data handling policy.- Restrict access. Set file system permissions so only the process user can read log files. Use an encrypted volume or folder in sensitive environments.
- Control retention. Log rotation keeps up to 5 files by default (
backup_count). Delete logs after the debugging session; use secure deletion for regulated data.- Prevent source control leaks. Add the log folder to
.gitignoreimmediately.
The client applies sensible per-method HTTP timeouts and automatically retries
transient network errors. You can tune both via DataverseConfig.
| Setting | Default | Applies to |
|---|---|---|
http_timeout |
per-method (see below) | every request — overrides the per-method defaults when set |
http_retries |
5 |
maximum attempts per request on network errors (requests.exceptions.RequestException) |
http_backoff |
0.5 |
base delay in seconds between retries; doubles each attempt (0.5s, 1s, 2s, 4s, …) |
When http_timeout is not set, the client uses:
- 10 seconds for
GET(and any non-write method) - 120 seconds for
POST,PATCH,DELETE
The 10s read default is comfortable for routine data queries but can be too tight
for large metadata reads (e.g. client.tables.list_relationships(),
client.tables.list_columns()) on orgs with many tables/relationships, or on the
first call after an org wakes from idle. If you see ReadTimeout errors from
those endpoints, raise the ceiling:
from PowerPlatform.Dataverse.client import DataverseClient
from PowerPlatform.Dataverse.core import DataverseConfig
config = DataverseConfig(
http_timeout=120, # seconds — applies to every request
http_retries=3, # cap retries on slow metadata calls
http_backoff=1.0,
)
client = DataverseClient("https://yourorg.crm.dynamics.com", credential, config=config)Note: Setting
http_timeoutoverrides the per-method defaults for all requests, not just metadata calls. Pick a value large enough for the slowest operation you expect (typically metadata listing or bulk writes).
- SQL queries are read-only and support a limited subset of SQL syntax
- Create Table supports the following column types: string, memo, int, decimal, float, bool, datetime, file, and picklist (Enum subclass)
- File uploads are limited by Dataverse file size restrictions (default 128MB per file)
This project welcomes contributions and suggestions. Most contributions require you to agree to a Contributor License Agreement (CLA) declaring that you have the right to, and actually do, grant us the rights to use your contribution. For details, visit Contributor License Agreements.
When you submit a pull request, a CLA bot will automatically determine whether you need to provide a CLA and decorate the PR appropriately (e.g., status check, comment). Simply follow the instructions provided by the bot. You will only need to do this once across all repos using our CLA.
This project has adopted the Microsoft Open Source Code of Conduct. For more information see the Code of Conduct FAQ or contact opencode@microsoft.com with any additional questions or comments.
When contributing new features to this SDK, please follow these guidelines:
- Public methods in operation namespaces - New public methods go in the appropriate namespace module under operations/. Public types and constants live in their own modules (e.g.,
models/metadata.py,common/constants.py) - Add README example for public methods - Add usage examples to this README for public API methods
- Document public APIs - Include Sphinx-style docstrings with parameter descriptions and examples for all public methods
- Update documentation when adding features - Keep README and SKILL files (note that each skill has 2 copies) in sync
- Internal vs public naming - Modules, files, and functions not meant to be part of the public API must use a
_prefix (e.g.,_odata.py,_relationships.py). Files without the prefix (e.g.,constants.py,metadata.py) are public and importable by SDK consumers
This project may contain trademarks or logos for projects, products, or services. Authorized use of Microsoft trademarks or logos is subject to and must follow Microsoft's Trademark & Brand Guidelines. Use of Microsoft trademarks or logos in modified versions of this project must not cause confusion or imply Microsoft sponsorship. Any use of third-party trademarks or logos are subject to those third-party's policies.