Skip to content

microsoft/PowerPlatform-DataverseClient-Python

PowerPlatform Dataverse Client for Python

PyPI version Python License: MIT

A Python client library for Microsoft Dataverse that provides a unified interface for CRUD operations, SQL queries, table metadata management, and file uploads through the Dataverse Web API.

Source code | Package (PyPI) | API reference documentation | Product documentation | Samples

Table of contents

Key features

  • 🔄 CRUD Operations: Create, read, update, and delete records with support for bulk operations and automatic retry
  • ⚡ True Bulk Operations: Automatically uses Dataverse's native CreateMultiple, UpdateMultiple, UpsertMultiple, and BulkDelete Web API operations for maximum performance and transactional integrity
  • 🔍 Fluent QueryBuilder: Type-safe query construction with method chaining, composable filter expressions, and automatic OData generation
  • 📊 SQL Queries: Execute read-only SQL queries via the Dataverse Web API ?sql= parameter
  • 🏗️ Table Management: Create, inspect, and delete custom tables and columns programmatically
  • 🔗 Relationship Management: Create one-to-many and many-to-many relationships between tables with full metadata control
  • 🐼 DataFrame Support: Pandas wrappers for all CRUD operations, returning DataFrames and Series
  • 📎 File Operations: Upload files to Dataverse file columns with automatic chunking for large files
  • 📦 Batch Operations: Send multiple CRUD, table metadata, and SQL query operations in a single HTTP request with optional transactional changesets
  • 🔐 Azure Identity: Built-in authentication using Azure Identity credential providers with comprehensive support
  • 🛡️ Error Handling: Structured exception hierarchy with detailed error context and retry guidance
  • 📋 HTTP Diagnostics Logging: Opt-in file-based logging of all HTTP requests and responses with automatic redaction of sensitive headers (e.g. Authorization)

Getting started

Prerequisites

  • Python 3.10+ (3.10, 3.11, 3.12, 3.13 supported)
  • Microsoft Dataverse environment with appropriate permissions
  • OAuth authentication configured for your application

Install the package

Install the PowerPlatform Dataverse Client using pip:

# Install the latest stable release
pip install PowerPlatform-Dataverse-Client

(Optional) Install Claude Skill globally with the Client:

pip install PowerPlatform-Dataverse-Client && dataverse-install-claude-skill

This installs two Claude Skills that enable Claude Code to:

  • dataverse-sdk-use: Apply SDK best practices for using the SDK in your applications
  • dataverse-sdk-dev: Provide guidance for developing/contributing to the SDK itself

The skills work with both the Claude Code CLI and VSCode extension. Once installed, Claude will automatically use the appropriate skill when working with Dataverse operations. For more information on Claude Skill see https://platform.claude.com/docs/en/agents-and-tools/agent-skills/overview. See skill definitions here: .claude/skills/dataverse-sdk-use/SKILL.md and .claude/skills/dataverse-sdk-dev/SKILL.md.

For development from source (Claude Skill auto loaded):

git clone https://github.com/microsoft/PowerPlatform-DataverseClient-Python.git
cd PowerPlatform-DataverseClient-Python
pip install -e .

Authenticate the client

The client requires any Azure Identity TokenCredential implementation for OAuth authentication with Dataverse:

from azure.identity import (
    InteractiveBrowserCredential,
    ClientSecretCredential,
    CertificateCredential,
    AzureCliCredential
)
from PowerPlatform.Dataverse.client import DataverseClient

# Development options
credential = InteractiveBrowserCredential()  # Browser authentication
# credential = AzureCliCredential()          # If logged in via 'az login'

# For Production options (service principal / app-only auth)
# credential = ClientSecretCredential(
#     tenant_id="...",      # ID of the service principal's tenant. Also called its "directory" ID.
#     client_id="...",      # The service principal's client ID
#     client_secret="...",  # Client secret value generated for the app (store in Key Vault / env var)
# )
# credential = CertificateCredential(tenant_id, client_id, cert_path)

client = DataverseClient("https://yourorg.crm.dynamics.com", credential)

Ref: https://learn.microsoft.com/en-us/python/api/azure-identity/azure.identity?view=azure-python

Set up service principal authentication: To use ClientSecretCredential or CertificateCredential you must first register an Azure AD app and grant it access to your Dataverse environment as an application user. See Use OAuth with Dataverse (covers app registration, obtaining tenant_id / client_id / client_secret, all credential types, and security configuration).

Key concepts

The SDK provides a simple, pythonic interface for Dataverse operations:

Concept Description
DataverseClient Main entry point; provides records, query, tables, files, and batch namespaces
Context Manager Use with DataverseClient(...) as client: for automatic cleanup and HTTP connection pooling
Namespaces Operations are organized into client.records (CRUD & OData queries), client.query (QueryBuilder & SQL), client.tables (metadata), client.files (file uploads), and client.batch (batch requests)
Records Dataverse records represented as Python dictionaries with column schema names
Schema names Use table schema names ("account", "new_MyTestTable") and column schema names ("name", "new_MyTestColumn"). See: Table definitions in Microsoft Dataverse
Bulk Operations Efficient bulk processing for multiple records with automatic optimization
QueryBuilder Preferred query API: client.query.builder() with composable where(col(...)) filters, formatted values, expand, and streaming; use records.list() only as a shortcut for simple filter+select
Paging Automatic handling of large result sets with iterators
Structured Errors Detailed exception hierarchy with retry guidance and diagnostic information
Customization prefix values Custom tables and columns require a customization prefix value to be included for all operations (e.g., "new_MyTestTable", not "MyTestTable"). See: Table definitions in Microsoft Dataverse

Examples

Quick start

from azure.identity import InteractiveBrowserCredential
from PowerPlatform.Dataverse.client import DataverseClient

# Connect to Dataverse
credential = InteractiveBrowserCredential()

with DataverseClient("https://yourorg.crm.dynamics.com", credential) as client:
    # Create a contact
    contact_id = client.records.create("contact", {"firstname": "John", "lastname": "Doe"})

    # Read the contact back
    contact = client.records.retrieve("contact", contact_id, select=["firstname", "lastname"])
    print(f"Created: {contact['firstname']} {contact['lastname']}")

    # Clean up
    client.records.delete("contact", contact_id)
# Session closed, caches cleared automatically

Basic CRUD operations

# Create a record
account_id = client.records.create("account", {"name": "Contoso Ltd"})

# Read a record
account = client.records.retrieve("account", account_id)
print(account["name"])

# Read with expand — fetch a related record in the same HTTP request
account = client.records.retrieve(
    "account", account_id,
    select=["name"],
    expand=["primarycontactid"],
)
contact = (account.get("primarycontactid") or {})
print(contact.get("fullname"))

# Update a record
client.records.update("account", account_id, {"telephone1": "555-0199"})

# Delete a record
client.records.delete("account", account_id)

Deprecation note (migration from beta): client.records.get() is deprecated and emits a DeprecationWarning. Use client.records.retrieve(table, record_id) for single-record reads (returns None on 404 instead of raising) and client.records.list(table, filter=...) / client.records.list_pages(...) for multi-record queries. Return types differ from the beta get(), so the codemod flags these for manual review rather than rewriting them — run dataverse-migrate (see Query data) to locate every call site.

Bulk operations

# Bulk create
payloads = [
    {"name": "Company A"},
    {"name": "Company B"},
    {"name": "Company C"}
]
ids = client.records.create("account", payloads)

# Bulk update (broadcast same change to all)
client.records.update("account", ids, {"exchangerate": 1})

# Bulk delete
client.records.delete("account", ids, use_bulk_delete=True)

Upsert operations

Use client.records.upsert() to create or update records identified by alternate keys. When the key matches an existing record it is updated; otherwise the record is created. A single item uses a PATCH request; multiple items use the UpsertMultiple bulk action.

Prerequisite: The table must have an alternate key configured in Dataverse for the columns used in alternate_key. Alternate keys are defined in the table's metadata (Power Apps maker portal → Table → Keys, or via the Dataverse API). Without a configured alternate key, upsert requests will be rejected by Dataverse with a 400 error.

Set up the key once before running the upsert examples:

# One-time setup for the examples below: make accountnumber an alternate key on account
key = client.tables.create_alternate_key(
    "account",
    "account_accountnumber_ak",
    ["accountnumber"],
    display_name="Account Number",
)
print(f"Created key {key.schema_name} ({key.metadata_id}), status={key.status}")

# Optional: check key status (useful right after creation; status transitions Pending -> Active)
for k in client.tables.get_alternate_keys("account"):
    if k.schema_name == "account_accountnumber_ak":
        print(f"{k.schema_name}: {k.status}")

Upsert usage

from PowerPlatform.Dataverse.models import UpsertItem

# Upsert a single record
client.records.upsert("account", [
    UpsertItem(
        alternate_key={"accountnumber": "ACC-001"},
        record={"name": "Contoso Ltd", "telephone1": "555-0100"},
    )
])

# Upsert multiple records (uses UpsertMultiple bulk action)
client.records.upsert("account", [
    UpsertItem(
        alternate_key={"accountnumber": "ACC-001"},
        record={"name": "Contoso Ltd"},
    ),
    UpsertItem(
        alternate_key={"accountnumber": "ACC-002"},
        record={"name": "Fabrikam Inc"},
    ),
])

# Composite alternate key (multiple columns identify the record)
client.records.upsert("account", [
    UpsertItem(
        alternate_key={"accountnumber": "ACC-001", "address1_postalcode": "98052"},
        record={"name": "Contoso Ltd"},
    )
])

# Plain dict syntax (no import needed)
client.records.upsert("account", [
    {
        "alternate_key": {"accountnumber": "ACC-001"},
        "record": {"name": "Contoso Ltd"},
    }
])

DataFrame operations

The SDK provides pandas wrappers for all CRUD operations via the client.dataframe namespace, using DataFrames and Series for input and output.

Note: client.dataframe.get() is deprecated. Use the GA patterns shown below instead.

import pandas as pd
from PowerPlatform.Dataverse.models.filters import col

# Query records as a single DataFrame (GA builder pattern)
df = (client.query.builder("account")
      .select("name", "telephone1")
      .where(col("statecode") == 0)
      .execute()
      .to_dataframe())
print(f"Found {len(df)} accounts")

# Limit results with top for large tables
df = client.query.builder("account").select("name").top(100).execute().to_dataframe()

# Create records from a DataFrame (returns a Series of GUIDs)
new_accounts = pd.DataFrame([
    {"name": "Contoso", "telephone1": "555-0100"},
    {"name": "Fabrikam", "telephone1": "555-0200"},
])
new_accounts["accountid"] = client.dataframe.create("account", new_accounts)

# Update records from a DataFrame (id_column identifies the GUID column)
new_accounts["telephone1"] = ["555-0199", "555-0299"]
client.dataframe.update("account", new_accounts, id_column="accountid")

# Clear a field by setting clear_nulls=True (by default, NaN/None fields are skipped)
df = pd.DataFrame([{"accountid": new_accounts["accountid"].iloc[0], "websiteurl": None}])
client.dataframe.update("account", df, id_column="accountid", clear_nulls=True)

# Delete records by passing a Series of GUIDs
client.dataframe.delete("account", new_accounts["accountid"])

# SQL query directly to DataFrame (supports JOINs, aggregates, GROUP BY)
df = client.dataframe.sql(
    "SELECT a.name, COUNT(c.contactid) as contacts "
    "FROM account a "
    "JOIN contact c ON a.accountid = c.parentcustomerid "
    "GROUP BY a.name"
)

Query data

The QueryBuilder is the recommended way to query records. It provides a fluent, type-safe interface that generates correct OData queries automatically — no need to remember OData filter syntax.

# Fluent query builder (recommended)
from PowerPlatform.Dataverse.models.filters import col

for record in (client.query.builder("account")
               .select("name", "revenue")
               .where(col("statecode") == 0)
               .where(col("revenue") > 1000000)
               .order_by("revenue", descending=True)
               .top(100)
               .page_size(50)
               .execute()):
    print(f"{record['name']}: {record['revenue']}")

The QueryBuilder handles value formatting, column name casing, and OData syntax automatically. Filter expressions are built with col() and standard Python operators:

# Get results as a pandas DataFrame (consolidates all pages)
df = (client.query.builder("account")
      .select("name", "telephone1")
      .where(col("statecode") == 0)
      .top(100)
      .execute()
      .to_dataframe())
print(f"Got {len(df)} accounts")
# Comparison filters using col() expressions
query = (client.query.builder("contact")
         .where(col("email").contains("outlook.com"))       # contains(email from domain, 'outlook.com')
         .where(col("creditlimit").between(10000, 50000))   # credit limit ge 10000 and revenue le 50000
         .where(col("telephone1").is_null())                # telephone1 eq null
         )

For complex logic (OR, NOT, grouping), compose expressions with &, |, ~:

from PowerPlatform.Dataverse.models import col

# OR conditions: (statecode = 0 OR statecode = 1) AND revenue > 100k
for record in (client.query.builder("account")
               .select("name", "revenue")
               .where(((col("statecode") == 0) | (col("statecode") == 1))
                      & (col("revenue") > 100000))
               .execute()):
    print(record["name"])

# NOT, between, and in operators
for record in (client.query.builder("account")
               .where(col("statecode") != 2)                       # NOT inactive
               .where(col("revenue").between(100000, 500000))      # revenue in range
               .execute()):
    print(record["name"])

Formatted values and annotations -- request localized labels, currency symbols, and display names:

# Get formatted values (choice labels, currency, lookup names) — via query builder
for record in (client.query.builder("account")
               .select("name", "statecode", "revenue")
               .include_formatted_values()
               .execute()):
    status = record["statecode@OData.Community.Display.V1.FormattedValue"]
    print(f"{record['name']}: {status}")

# Get formatted values — via records.list() / records.retrieve() include_annotations param
result = client.records.list(
    "account",
    select=["name", "statecode"],
    include_annotations="OData.Community.Display.V1.FormattedValue",
)
for record in result:
    label = record.get("statecode@OData.Community.Display.V1.FormattedValue")
    print(f"{record['name']}: {label}")

record = client.records.retrieve(
    "account", account_id,
    select=["name", "statuscode"],
    include_annotations="OData.Community.Display.V1.FormattedValue",
)
if record:
    print(record.get("statuscode@OData.Community.Display.V1.FormattedValue"))

Nested expand with options -- expand navigation properties with $select, $filter, $orderby, and $top:

from PowerPlatform.Dataverse.models import ExpandOption

# Expand related tasks with filtering and sorting
for record in (client.query.builder("account")
               .select("name")
               .expand(ExpandOption("Account_Tasks")
                       .select("subject", "createdon")
                       .filter("contains(subject,'Task')")
                       .order_by("createdon", descending=True)
                       .top(5))
               .execute()):
    print(record["name"], record.get("Account_Tasks"))

Paging -- use execute_pages() for streaming large result sets with full builder options (filtering, sorting, formatted values). records.list() and records.list_pages() are simpler shortcuts for string-based OData filter queries:

# Preferred: query.builder().execute_pages() — stream one page at a time, memory stays flat
# Supports composable filters, sorting, formatted values, and expand with nested selects
for page_num, page in enumerate(
    client.query.builder("account")
    .select("accountid", "name", "revenue")
    .where(col("statecode") == 0)
    .order_by("name")
    .page_size(500)        # optional: override Dataverse default (~5000/page)
    .execute_pages()
):
    print(f"Page {page_num + 1}: {len(page)} records")
    for record in page:
        print(f"  {record['name']}")

# Simple shortcut: records.list() — automatic paging, all records in memory
# Use for basic filter+select queries; string OData filter only (no composable expressions)
result = client.records.list(
    "account",
    filter="statecode eq 0",
    select=["name", "revenue"],
    orderby=["name asc"],          # optional sort
    top=500,                       # bounds total records returned and number of HTTP round-trips
    page_size=200,                 # optional: hint Dataverse default page size
)
for record in result:
    print(record["name"])

# Simple streaming shortcut: records.list_pages() — same params as records.list(), yields one page at a time
for page_num, page in enumerate(
    client.records.list_pages("account", filter="statecode eq 0", select=["name"], orderby=["name asc"])
):
    print(f"Page {page_num + 1}: {len(page)} records")
    for record in page:
        print(record["name"])

Deprecation note: execute(by_page=True) and execute(by_page=False) are deprecated and emit a UserWarning. Replace with execute_pages() (streaming) or plain execute() (eager). QueryBuilder.to_dataframe() is also deprecated; use .execute().to_dataframe() instead. The migration tool rewrites all of these automatically — install with pip install PowerPlatform-Dataverse-Client[migration] and run dataverse-migrate path/to/your/scripts/ (or python -m PowerPlatform.Dataverse.migration.migrate_v0_to_v1 for development checkouts).

Record count -- include $count=true in the request:

# Via query builder
results = (client.query.builder("account")
           .where(col("statecode") == 0)
           .count()
           .execute())
print(len(results))   # QueryResult is sized — use len() to get the count

# Via records.list() — count=True adds $count=true to the OData request
results = client.records.list("account", filter="statecode eq 0", count=True)
print(len(results))

Accessing the count: QueryResult is iterable and sized — call len(results) to get the number of records. There is no separate .count or .total_count attribute. Because the client auto-paginates, len(results) reflects every matching row fetched; the server's raw @odata.count annotation is not surfaced as a standalone field.

FetchXML queries -- client.query.fetchxml() returns an inert FetchXmlQuery object; no HTTP request is made until you call .execute() or .execute_pages():

xml = """
<fetch>
  <entity name="account">
    <attribute name="name"/>
    <attribute name="revenue"/>
    <filter><condition attribute="statecode" operator="eq" value="0"/></filter>
  </entity>
</fetch>
"""

# .execute() — blocking, fetches all pages and returns a single QueryResult
result = client.query.fetchxml(xml).execute()
df = result.to_dataframe()

# .execute_pages() — streaming, yields one QueryResult per HTTP page
# Use count="N" in the FetchXML <fetch> element to set page size
for page_num, page in enumerate(client.query.fetchxml(xml).execute_pages()):
    print(f"Page {page_num + 1}: {len(page)} records")
    for record in page:
        print(record["name"])

SQL queries provide an alternative read-only query syntax with support for JOINs, aggregates, GROUP BY, DISTINCT, and OFFSET FETCH pagination:

# Basic query
results = client.query.sql(
    "SELECT TOP 10 accountid, name FROM account WHERE statecode = 0"
)

# JOINs and aggregates work
results = client.query.sql(
    "SELECT a.name, COUNT(c.contactid) as cnt "
    "FROM account a "
    "JOIN contact c ON a.accountid = c.parentcustomerid "
    "GROUP BY a.name"
)

# SQL results directly as a DataFrame
df = client.dataframe.sql(
    "SELECT name, revenue FROM account ORDER BY revenue DESC"
)

# Discover columns from metadata (schema-discovery helper, kept at GA)
cols_meta = client.query.sql_columns("account")
col_names = [c["LogicalName"] for c in cols_meta]

# Build queries using the discovered column names
sql = f"SELECT TOP 10 {', '.join(col_names[:5])} FROM account"
df = client.dataframe.sql(sql)

Simple list shortcut -- records.list() accepts a raw OData filter string for basic queries. For anything beyond simple filter+select, prefer client.query.builder() (composable filters, formatted values, nested expand):

# records.list() shortcut — raw OData filter string, all records loaded into memory
# Column names in filter must be lowercase logical names
for record in client.records.list(
    "account",
    select=["name"],
    filter="statecode eq 0",
    top=100,
):
    print(record["name"])

# Discover navigation property names for $expand (metadata-discovery helper, kept at GA)
nav_props = client.query.odata_expands("account")  # → list of navigation property metadata

# Expand navigation properties using the query builder
from PowerPlatform.Dataverse.models.query_builder import ExpandOption
for record in (client.query.builder("contact")
               .select("fullname")
               .expand(ExpandOption("parentcustomerid_account").select("name"))
               .execute()):
    acct = record.get("parentcustomerid_account") or {}
    print(f"{record['fullname']} -> {acct.get('name')}")

# Build @odata.bind for lookup fields (deprecated helper, still functional with DeprecationWarning)
bind = client.query.odata_bind("contact", "account", account_id)
# Returns: {"parentcustomerid_account@odata.bind": "/accounts(guid)"}
client.records.create("contact", {"firstname": "Jane", **bind})

Table management

# Create a custom table, including the customization prefix value in the schema names for the table and columns.
table_info = client.tables.create("new_Product", {
    "new_Code": "string",
    "new_Description": "memo",
    "new_Price": "decimal",
    "new_Active": "bool"
})

# Create with custom primary column name and solution assignment
table_info = client.tables.create(
    "new_Product",
    columns={
        "new_Code": "string",
        "new_Price": "decimal"
    },
    solution="MyPublisher",  # Optional: add to specific solution
    primary_column="new_ProductName",  # Optional: custom primary column (default is "{customization prefix value}_Name")
)

# Get table information
info = client.tables.get("new_Product")
print(f"Logical name: {info['table_logical_name']}")
print(f"Entity set: {info['entity_set_name']}")

# List all tables
tables = client.tables.list()
for table in tables:
    print(table)

# Add columns to existing table (columns must include customization prefix value)
client.tables.add_columns("new_Product", {"new_Category": "string"})

# Remove columns
client.tables.remove_columns("new_Product", ["new_Category"])

# List all columns (attributes) for a table to discover schema
columns = client.tables.list_columns("account")
for col in columns:
    print(f"{col['LogicalName']} ({col.get('AttributeType')})")

# List only specific properties
columns = client.tables.list_columns(
    "account",
    select=["LogicalName", "SchemaName", "AttributeType"],
    filter="AttributeType eq 'String'",
)

# Clean up
client.tables.delete("new_Product")

Important: All custom column names must include the customization prefix value (e.g., "new_"). This ensures explicit, predictable naming and aligns with Dataverse metadata requirements.

Relationship management

Create relationships between tables using the relationship API. For a complete working example, see examples/advanced/relationships.py.

from PowerPlatform.Dataverse.models import (
    CascadeConfiguration,
    Label,
    LocalizedLabel,
    LookupAttributeMetadata,
    ManyToManyRelationshipMetadata,
    OneToManyRelationshipMetadata,
)

# Create a one-to-many relationship: Department (1) -> Employee (N)
# This adds a "Department" lookup field to the Employee table
lookup = LookupAttributeMetadata(
    schema_name="new_DepartmentId",
    display_name=Label(localized_labels=[LocalizedLabel(label="Department", language_code=1033)]),
)

relationship = OneToManyRelationshipMetadata(
    schema_name="new_Department_Employee",
    referenced_entity="new_department",   # Parent table (the "one" side)
    referencing_entity="new_employee",    # Child table (the "many" side)
    referenced_attribute="new_departmentid",
)

result = client.tables.create_one_to_many_relationship(lookup, relationship)
print(f"Created lookup field: {result.lookup_schema_name}")

# Create a many-to-many relationship: Employee (N) <-> Project (N)
# Employees work on multiple projects; projects have multiple team members
m2m_relationship = ManyToManyRelationshipMetadata(
    schema_name="new_employee_project",
    entity1_logical_name="new_employee",
    entity2_logical_name="new_project",
)

result = client.tables.create_many_to_many_relationship(m2m_relationship)
print(f"Created M:N relationship: {result.relationship_schema_name}")

# Query relationship metadata
rel = client.tables.get_relationship("new_Department_Employee")
if rel:
    print(f"Found: {rel.relationship_schema_name}")

# List all relationships
rels = client.tables.list_relationships()
for rel in rels:
    print(f"{rel['SchemaName']} ({rel.get('RelationshipType')})")

# List relationships for a specific table (one-to-many + many-to-one + many-to-many)
account_rels = client.tables.list_table_relationships("account")
for rel in account_rels:
    print(f"{rel['SchemaName']} -> {rel.get('RelationshipType')}")

# Delete a relationship
client.tables.delete_relationship(result.relationship_id)

For simpler scenarios, use the convenience method:

# Quick way to create a lookup field with sensible defaults
result = client.tables.create_lookup_field(
    referencing_table="contact",       # Child table gets the lookup field
    lookup_field_name="new_AccountId",
    referenced_table="account",        # Parent table being referenced
    display_name="Account",
)

File operations

# Upload a file to a record
client.files.upload(
    "account",
    account_id,
    "new_Document",  # If the file column doesn't exist, it will be created automatically
    "/path/to/document.pdf",
)

Batch operations

Use client.batch to send multiple operations in one HTTP request. The batch namespace mirrors client.records, client.tables, and client.query.

# Build a batch request and add operations
batch = client.batch.new()
batch.records.create("account", {"name": "Contoso"})
batch.records.create("account", [{"name": "Fabrikam"}, {"name": "Woodgrove"}])
batch.records.update("account", account_id, {"telephone1": "555-0100"})
batch.records.delete("account", old_id)
batch.records.retrieve("account", account_id, select=["name"], expand=["primarycontactid"])  # single record with expand
batch.records.list(                                                # multi-record, single page
    "account",
    filter="statecode eq 0",
    select=["name"],
    orderby=["name asc"],
    top=50,
)

result = batch.execute()
for item in result.responses:
    if item.is_success:
        print(f"[OK] {item.status_code} entity_id={item.entity_id}")
    else:
        print(f"[ERR] {item.status_code}: {item.error_message}")

Transactional changeset — all operations in a changeset succeed or roll back together:

batch = client.batch.new()
with batch.changeset() as cs:
    lead_ref = cs.records.create("lead", {"firstname": "Ada"})
    contact_ref = cs.records.create("contact", {"firstname": "Ada"})
    cs.records.create("account", {
        "name": "Babbage & Co.",
        "originatingleadid@odata.bind": lead_ref,
        "primarycontactid@odata.bind": contact_ref,
    })
result = batch.execute()
print(f"Created {len(result.entity_ids)} records atomically")

Table metadata and SQL queries in a batch:

batch = client.batch.new()
batch.tables.create("new_Product", {"new_Price": "decimal", "new_InStock": "bool"})
batch.tables.add_columns("new_Product", {"new_Rating": "int"})
batch.tables.get("new_Product")
batch.query.sql("SELECT TOP 5 name FROM account")

result = batch.execute()

Continue on error — attempt all operations even when one fails:

result = batch.execute(continue_on_error=True)
print(f"Succeeded: {len(result.succeeded)}, Failed: {len(result.failed)}")
for item in result.failed:
    print(f"[ERR] {item.status_code}: {item.error_message}")

continue_on_error=True only affects how Dataverse handles per-operation failures on the server. Client-side errors raised before the batch is sent — such as ValidationError (e.g. exceeding the 1000-operation limit) or MetadataError from metadata pre-resolution (tables.delete, tables.add_columns, tables.remove_columns) — are still raised as exceptions and must be handled with try/except.

DataFrame integration -- feed pandas DataFrames directly into a batch:

import pandas as pd

batch = client.batch.new()

# Create records from a DataFrame
df = pd.DataFrame([{"name": "Contoso"}, {"name": "Fabrikam"}])
batch.dataframe.create("account", df)

# Update records from a DataFrame
updates = pd.DataFrame([
    {"accountid": id1, "telephone1": "555-0100"},
    {"accountid": id2, "telephone1": "555-0200"},
])
batch.dataframe.update("account", updates, id_column="accountid")

# Delete records from a Series
batch.dataframe.delete("account", pd.Series([id1, id2]))

result = batch.execute()

For a complete example see examples/advanced/batch.py.

Async client

The SDK ships a full async client, AsyncDataverseClient, for use in async applications. It mirrors every operation of the sync client — the same namespaces (records, query, tables, files, batch), the same method signatures, and the same return types.

Async snippets below are fragments. Every example after ### Quick start assumes it is nested inside an async def main(): ... body, with client and credential already constructed as shown in Quick start. Copying a fragment into a top-level .py file will raise SyntaxError: 'await' outside function. See examples/aio/ for full runnable scripts.

Install

The async client requires aiohttp, which is an optional extra:

pip install "PowerPlatform-Dataverse-Client[async]"

Quick start

import asyncio
from azure.identity import InteractiveBrowserCredential
from PowerPlatform.Dataverse.aio import AsyncDataverseClient

async def main():
    # Connect to Dataverse
    credential = InteractiveBrowserCredential()

    async with DefaultAzureCredential() as credential:
        async with AsyncDataverseClient("https://yourorg.crm.dynamics.com", credential) as client:
            # Create a contact
            contact_id = await client.records.create("contact", {"firstname": "John", "lastname": "Doe"})

            # Read it back
            contact = await client.records.retrieve("contact", contact_id, select=["firstname", "lastname"])
            print(f"Created: {contact['firstname']} {contact['lastname']}")

            # Clean up
            await client.records.delete("contact", contact_id)

asyncio.run(main())

Standalone usage (without async with)

# given: credential constructed as in Quick start (e.g. DefaultAzureCredential())
client = AsyncDataverseClient("https://yourorg.crm.dynamics.com", credential)
try:
    account_id = await client.records.create("account", {"name": "Contoso Ltd"})
finally:
    await client.aclose()

Query builder

The async query builder API is identical to the sync one:

# given: client is an open AsyncDataverseClient
from PowerPlatform.Dataverse.models.filters import col

# Execute and collect all results
result = await (
    client.query.builder("account")
    .select("name", "telephone1")
    .where(col("statecode") == 0)
    .top(10)
    .execute()
)
for record in result:
    print(record["name"])

# Lazy page-by-page iteration (memory-efficient for large sets)
async for page in (
    client.query.builder("account")
    .select("name")
    .page_size(500)
    .execute_pages()
):
    for record in page:
        print(record["name"])

Batch and changesets

# given: client is open; account_id is the GUID returned by an earlier records.create
batch = client.batch.new()
batch.records.create("account", {"name": "Alpha"})
batch.records.create("account", {"name": "Beta"})
result = await batch.execute()
print(f"Created {len(result.entity_ids)} records")

# Atomic changeset
batch = client.batch.new()
async with batch.changeset() as cs:
    ref = cs.records.create("contact", {"firstname": "Alice"})
    cs.records.update("account", account_id, {"primarycontactid@odata.bind": ref})
result = await batch.execute()

See examples/aio/ for async equivalents of all sync examples.

Next steps

More sample code

Explore our comprehensive examples in the examples/ directory:

🌱 Getting Started:

🚀 Advanced Usage:

📖 See the examples README for detailed guidance and learning progression.

Additional documentation

For comprehensive information on Microsoft Dataverse and related technologies:

Resource Description
Dataverse Developer Guide Complete developer documentation for Microsoft Dataverse
Dataverse Web API Reference Detailed Web API reference and examples
Azure Identity for Python Authentication library documentation and credential types
Power Platform Developer Center Broader Power Platform development resources
Dataverse SDK for .NET Official .NET SDK for Microsoft Dataverse

Troubleshooting

Exception hierarchy

The SDK raises structured exceptions that all inherit from a common base, DataverseError. Catching the base class is the safest fallback; catch the specific subclasses when you need to react differently to validation, metadata, SQL, or HTTP failures.

Exception
└── DataverseError              # Base class for every SDK-raised error
    ├── ValidationError         # Client-side input validation failed
    ├── MetadataError           # Table/column/relationship metadata problem
    ├── SQLParseError           # SQL query could not be parsed
    └── HttpError               # Dataverse Web API returned a non-success status

All classes are importable from PowerPlatform.Dataverse.core.errors (or re-exported from PowerPlatform.Dataverse.core).

Exception When it is raised Typical examples
DataverseError Base class. Catch it to handle any SDK-originated failure in one block. Fallback except clause.
ValidationError Client-side argument validation fails before a request is sent. Empty/None table name, missing primary key, non-string SQL, invalid batch payload, unsupported column type in create_table.
MetadataError A metadata lookup or definition operation fails — usually an unknown or invalid table, column, or relationship. Unknown logical name passed to batch.create/update/delete, tables.create_column, relationships.create_*, or tables.delete.
SQLParseError A SQL string passed to client.query.sql(...) cannot be parsed into a valid SELECT. Unsupported SQL syntax, write statements (INSERT/UPDATE/DELETE), malformed queries.
HttpError The Dataverse Web API responded with a non-2xx status. Exposes status_code, service_error_code, correlation_id, service_request_id, retry_after, and is_transient (set for 408/429/503/504). 401 (auth), 403 (permissions), 404 (record/table not found), 412 (concurrency/ETag), 429 (throttling), 5xx (server).

Note on timeouts and network errors. Low-level network failures from the underlying httpx client are not wrapped by the SDK and surface as their original httpx exceptions — most commonly httpx.ReadTimeout, httpx.ConnectTimeout, and httpx.TimeoutException (their common base) on slow endpoints such as relationships.list() or large queries, and httpx.ConnectError/httpx.NetworkError for connectivity issues. Catch httpx.HTTPError to cover all of them, or httpx.TimeoutException for timeouts specifically. The async client (PowerPlatform.Dataverse.aio) surfaces aiohttp.ClientError and asyncio.TimeoutError analogously.

General

import httpx
from PowerPlatform.Dataverse.client import DataverseClient
from PowerPlatform.Dataverse.core.errors import (
    DataverseError,
    HttpError,
    MetadataError,
    SQLParseError,
    ValidationError,
)

try:
    client.records.retrieve("account", "invalid-id")
except ValidationError as e:
    print(f"Validation error: {e.message} (subcode={e.subcode})")
except MetadataError as e:
    print(f"Metadata error: {e.message} (subcode={e.subcode})")
except SQLParseError as e:
    print(f"SQL parse error: {e.message}")
except HttpError as e:
    print(f"HTTP {e.status_code}: {e.message}")
    print(f"Code: {e.code}  Subcode: {e.subcode}")
    print(f"Service request id: {e.details.get('service_request_id')}")
    if e.is_transient:
        print(f"Transient — retry after {e.details.get('retry_after')}s")
except httpx.TimeoutException as e:
    # ReadTimeout / ConnectTimeout / WriteTimeout from the underlying transport
    print(f"Request timed out: {e}")
except DataverseError as e:
    # Catch-all for any other SDK-raised error
    print(f"Dataverse error [{e.code}]: {e.message}")

Authentication issues

Common fixes:

  • Verify environment URL format: https://yourorg.crm.dynamics.com (no trailing slash)
  • Ensure Azure Identity credentials have proper Dataverse permissions
  • Check app registration permissions are granted and admin-consented

Performance considerations

For optimal performance in production environments:

Best Practice Description
Prefer QueryBuilder for queries Use client.query.builder() for filtering, sorting, expansion, or formatted values; use records.list() only as a shortcut for simple filter+select
Bulk Operations Pass lists to records.create(), records.update() for automatic bulk processing, for records.delete(), set use_bulk_delete when passing lists to use bulk operation
Select Fields Specify select parameter to limit returned columns and reduce payload size
Page Size Control Use top and page_size parameters to control memory usage; use execute_pages() for large result sets
Connection Reuse Reuse DataverseClient instances across operations
Production Credentials Use ClientSecretCredential or CertificateCredential for unattended operations
Error Handling Implement retry logic for transient errors (e.is_transient)

HTTP diagnostics logging

Enable file-based HTTP logging to capture all requests and responses for debugging. Sensitive headers (e.g. Authorization) are automatically redacted.

from PowerPlatform.Dataverse.client import DataverseClient
from PowerPlatform.Dataverse.core import DataverseConfig, LogConfig

log_cfg = LogConfig(
    log_folder="./my_logs",      # Directory for log files (created if missing)
    log_file_prefix="crm_debug", # Filename prefix; timestamp appended automatically
    max_body_bytes=4096,         # Bytes of body to capture per entry — 0 (default) disables body capture
)
config = DataverseConfig(log_config=log_cfg)
client = DataverseClient("https://yourorg.crm.dynamics.com", credential, config=config)

Each log file is timestamped and rotated automatically (default 10 MB per file, 5 backups). Sample output:

[2026-04-11T15:27:31-0700] DEBUG >>> REQUEST  POST https://yourorg.crm.dynamics.com/api/data/v9.2/accounts
    Authorization: [REDACTED]
    Accept: application/json
    Content-Type: application/json
    OData-MaxVersion: 4.0
    OData-Version: 4.0
    User-Agent: DataverseSvcPythonClient:0.1.0b8
    x-ms-client-request-id: 7050c4d0-6bcc-48e3-a310-b4e8fa18ac69
    x-ms-correlation-id: 4cace77d-e4ee-4419-8c65-fc62beed6e71
    Body:    {"name":"Contoso Ltd"}
[2026-04-11T15:27:31-0700] DEBUG <<< RESPONSE 204 POST https://yourorg.crm.dynamics.com/api/data/v9.2/accounts (78.0ms)
    Content-Type: application/json; odata.metadata=minimal
    OData-Version: 4.0
    x-ms-service-request-id: a6d0b6c4-5dd1-47cb-83eb-b6fccf754216
    x-ms-ratelimit-burst-remaining-xrm-requests: 7998

Security note: This feature is intended for development and debugging only. Log files are plaintext and may contain PII, sensitive business data, and Dataverse record IDs — even with max_body_bytes=0 (the default), request URLs can include filter values and record identifiers.

  • Never enable in production. If required for production diagnostics, keep max_body_bytes=0 and treat log files as regulated data under your organization's data handling policy.
  • Restrict access. Set file system permissions so only the process user can read log files. Use an encrypted volume or folder in sensitive environments.
  • Control retention. Log rotation keeps up to 5 files by default (backup_count). Delete logs after the debugging session; use secure deletion for regulated data.
  • Prevent source control leaks. Add the log folder to .gitignore immediately.

HTTP timeouts and retries

The client applies sensible per-method HTTP timeouts and automatically retries transient network errors. You can tune both via DataverseConfig.

Setting Default Applies to
http_timeout per-method (see below) every request — overrides the per-method defaults when set
http_retries 5 maximum attempts per request on network errors (requests.exceptions.RequestException)
http_backoff 0.5 base delay in seconds between retries; doubles each attempt (0.5s, 1s, 2s, 4s, …)

When http_timeout is not set, the client uses:

  • 10 seconds for GET (and any non-write method)
  • 120 seconds for POST, PATCH, DELETE

The 10s read default is comfortable for routine data queries but can be too tight for large metadata reads (e.g. client.tables.list_relationships(), client.tables.list_columns()) on orgs with many tables/relationships, or on the first call after an org wakes from idle. If you see ReadTimeout errors from those endpoints, raise the ceiling:

from PowerPlatform.Dataverse.client import DataverseClient
from PowerPlatform.Dataverse.core import DataverseConfig

config = DataverseConfig(
    http_timeout=120,   # seconds — applies to every request
    http_retries=3,     # cap retries on slow metadata calls
    http_backoff=1.0,
)
client = DataverseClient("https://yourorg.crm.dynamics.com", credential, config=config)

Note: Setting http_timeout overrides the per-method defaults for all requests, not just metadata calls. Pick a value large enough for the slowest operation you expect (typically metadata listing or bulk writes).

Limitations

  • SQL queries are read-only and support a limited subset of SQL syntax
  • Create Table supports the following column types: string, memo, int, decimal, float, bool, datetime, file, and picklist (Enum subclass)
  • File uploads are limited by Dataverse file size restrictions (default 128MB per file)

Contributing

This project welcomes contributions and suggestions. Most contributions require you to agree to a Contributor License Agreement (CLA) declaring that you have the right to, and actually do, grant us the rights to use your contribution. For details, visit Contributor License Agreements.

When you submit a pull request, a CLA bot will automatically determine whether you need to provide a CLA and decorate the PR appropriately (e.g., status check, comment). Simply follow the instructions provided by the bot. You will only need to do this once across all repos using our CLA.

This project has adopted the Microsoft Open Source Code of Conduct. For more information see the Code of Conduct FAQ or contact opencode@microsoft.com with any additional questions or comments.

API Design Guidelines

When contributing new features to this SDK, please follow these guidelines:

  1. Public methods in operation namespaces - New public methods go in the appropriate namespace module under operations/. Public types and constants live in their own modules (e.g., models/metadata.py, common/constants.py)
  2. Add README example for public methods - Add usage examples to this README for public API methods
  3. Document public APIs - Include Sphinx-style docstrings with parameter descriptions and examples for all public methods
  4. Update documentation when adding features - Keep README and SKILL files (note that each skill has 2 copies) in sync
  5. Internal vs public naming - Modules, files, and functions not meant to be part of the public API must use a _ prefix (e.g., _odata.py, _relationships.py). Files without the prefix (e.g., constants.py, metadata.py) are public and importable by SDK consumers

Trademarks

This project may contain trademarks or logos for projects, products, or services. Authorized use of Microsoft trademarks or logos is subject to and must follow Microsoft's Trademark & Brand Guidelines. Use of Microsoft trademarks or logos in modified versions of this project must not cause confusion or imply Microsoft sponsorship. Any use of third-party trademarks or logos are subject to those third-party's policies.

About

No description, website, or topics provided.

Resources

License

Code of conduct

Contributing

Security policy

Stars

Watchers

Forks

Packages

 
 
 

Contributors

Languages