Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
194 changes: 109 additions & 85 deletions mkdocs/docs/api.md
Original file line number Diff line number Diff line change
Expand Up @@ -272,10 +272,15 @@ catalog.purge_table("docs_example.bids")

## Write to a table

Reading and writing is being done using [Apache Arrow](https://arrow.apache.org/). Arrow is an in-memory columnar format for fast data interchange and in-memory analytics. Let's consider the following Arrow Table:
PyIceberg supports several write modes: [append](#append), [overwrite](#overwrite), [delete](#delete), [dynamic partition overwrite](#dynamic-partition-overwrite), and [upsert](#upsert). All writes use [Apache Arrow](https://arrow.apache.org/) as the in-memory format. Writes can be issued directly on the `Table` object or grouped together using the [Transaction API](#transaction-api).

To set up a table for the examples below:

```python
import pyarrow as pa
from pyiceberg.catalog import load_catalog

catalog = load_catalog("default")

df = pa.Table.from_pylist(
[
Expand All @@ -285,19 +290,11 @@ df = pa.Table.from_pylist(
{"city": "Paris", "lat": 48.864716, "long": 2.349014},
],
)
```

Next, create a table using the Arrow schema:

```python
from pyiceberg.catalog import load_catalog

catalog = load_catalog("default")

tbl = catalog.create_table("default.cities", schema=df.schema)
```

Next, write the data to the table. Both `append` and `overwrite` produce the same result, since the table is empty on creation:
### Append

<!-- prettier-ignore-start -->

Expand All @@ -306,15 +303,13 @@ Next, write the data to the table. Both `append` and `overwrite` produce the sam

<!-- prettier-ignore-end -->

Use `append` to add new rows to an existing table without modifying any existing data:

```python
tbl.append(df)

# or

tbl.overwrite(df)
```

Now, the data is written to the table, and the table can be read using `tbl.scan().to_arrow()`:
After the first append, `tbl.scan().to_arrow()` returns:

```python
pyarrow.Table
Expand All @@ -327,15 +322,15 @@ lat: [[52.371807,37.773972,53.11254,48.864716]]
long: [[4.896029,-122.431297,6.0989,2.349014]]
```

If we want to add more data, we can use `.append()` again:
Each call to `append` produces a new [Parquet file](https://parquet.apache.org/). Calling `append` a second time adds another batch of rows:

```python
tbl.append(pa.Table.from_pylist(
[{"city": "Groningen", "lat": 53.21917, "long": 6.56667}],
))
```

When reading the table `tbl.scan().to_arrow()` you can see that `Groningen` is now also part of the table:
The nested lists in `tbl.scan().to_arrow()` reflect the separate Arrow buffers from each write:

```python
pyarrow.Table
Expand All @@ -348,98 +343,113 @@ lat: [[52.371807,37.773972,53.11254,48.864716],[53.21917]]
long: [[4.896029,-122.431297,6.0989,2.349014],[6.56667]]
```

The nested lists indicate the different Arrow buffers. Each of the writes produce a [Parquet file](https://parquet.apache.org/) where each [row group](https://parquet.apache.org/docs/concepts/) translates into an Arrow buffer. In the case where the table is large, PyIceberg also allows the option to stream the buffers using the Arrow [RecordBatchReader](https://arrow.apache.org/docs/python/generated/pyarrow.RecordBatchReader.html), avoiding pulling everything into memory right away:
To avoid type inconsistencies, convert the Iceberg table schema to Arrow before writing:

```python
for buf in tbl.scan().to_arrow_batch_reader():
print(f"Buffer contains {len(buf)} rows")
df = pa.Table.from_pylist(
[{"city": "Groningen", "lat": 53.21917, "long": 6.56667}], schema=tbl.schema().as_arrow()
)

tbl.append(df)
```

To avoid any type inconsistencies during writing, you can convert the Iceberg table schema to Arrow:
Optionally, you can attach custom properties to the snapshot created by `append`, or target a specific branch:

```python
df = pa.Table.from_pylist(
[{"city": "Groningen", "lat": 53.21917, "long": 6.56667}], schema=table.schema().as_arrow()
)
tbl.append(df, snapshot_properties={"owner": "etl-job", "run_id": "abc123"})

tbl.append(df)
# Write to a branch instead of main
tbl.append(df, branch="staging")
```

You can delete some of the data from the table by calling `tbl.delete()` with a desired `delete_filter`. This will use the Iceberg metadata to only open up the Parquet files that contain relevant information.
### Overwrite

`overwrite` replaces data in the table with new data. When called without an `overwrite_filter`, it behaves like a full table replacement — existing data is deleted and the new data is written. When the table is empty, `overwrite` and `append` produce the same result.

```python
tbl.delete(delete_filter="city == 'Paris'")
tbl.overwrite(df)
```

In the above example, any records where the city field value equals to `Paris` will be deleted. Running `tbl.scan().to_arrow()` will now yield:
#### Partial overwrite with `overwrite_filter`

Pass an `overwrite_filter` to delete only the rows that match the predicate before appending the new data. This is useful for replacing a specific subset of rows.

For example, to replace the record for `Paris` with a record for `New York`:

```python
from pyiceberg.expressions import EqualTo

df_new = pa.Table.from_pylist(
[{"city": "New York", "lat": 40.7128, "long": 74.0060}]
)
tbl.overwrite(df_new, overwrite_filter=EqualTo("city", "Paris"))
```

After the overwrite, `tbl.scan().to_arrow()` yields:

```python
pyarrow.Table
city: string
city: large_string
lat: double
long: double
----
city: [["Amsterdam","San Francisco","Drachten"],["Groningen"]]
lat: [[52.371807,37.773972,53.11254],[53.21917]]
long: [[4.896029,-122.431297,6.0989],[6.56667]]
city: [["New York"],["Amsterdam","San Francisco","Drachten"]]
lat: [[40.7128],[52.371807,37.773972,53.11254]]
long: [[74.006],[4.896029,-122.431297,6.0989]]
```

In the case of `tbl.delete(delete_filter="city == 'Groningen'")`, the whole Parquet file will be dropped without checking it contents, since from the Iceberg metadata PyIceberg can derive that all the content in the file matches the predicate.
The `overwrite_filter` accepts both expression objects (e.g., `EqualTo`, `GreaterThan`) and SQL-style string predicates (e.g., `"city == 'Paris'"`). Matching is case-sensitive by default; pass `case_sensitive=False` to change this.

### Partial overwrites

When using the `overwrite` API, you can use an `overwrite_filter` to delete data that matches the filter before appending new data into the table. For example, consider the following Iceberg table:
Optionally, you can also set snapshot properties or target a branch:

```python
import pyarrow as pa
df = pa.Table.from_pylist(
[
{"city": "Amsterdam", "lat": 52.371807, "long": 4.896029},
{"city": "San Francisco", "lat": 37.773972, "long": -122.431297},
{"city": "Drachten", "lat": 53.11254, "long": 6.0989},
{"city": "Paris", "lat": 48.864716, "long": 2.349014},
],
)
tbl.overwrite(df_new, overwrite_filter=EqualTo("city", "Paris"), snapshot_properties={"owner": "etl-job"})

from pyiceberg.catalog import load_catalog
catalog = load_catalog("default")
# Write to a branch instead of main
tbl.overwrite(df_new, overwrite_filter=EqualTo("city", "Paris"), branch="staging")
```

tbl = catalog.create_table("default.cities", schema=df.schema)
### Delete

tbl.append(df)
```
Use `delete` to remove rows matching a predicate without writing new data. PyIceberg uses Iceberg metadata to prune which Parquet files need to be opened, so only relevant files are read. The filter is case-sensitive by default; pass `case_sensitive=False` to change this.

<!-- prettier-ignore-start -->

You can overwrite the record of `Paris` with a record of `New York`:
!!! warning "Merge-on-read not yet supported"
If the table property `write.delete.mode` is set to `merge-on-read`, PyIceberg will fall back to copy-on-write and emit a warning. All deletes currently rewrite Parquet files.

<!-- prettier-ignore-end -->

```python
from pyiceberg.expressions import EqualTo
df = pa.Table.from_pylist(
[
{"city": "New York", "lat": 40.7128, "long": 74.0060},
]
)
tbl.overwrite(df, overwrite_filter=EqualTo('city', "Paris"))
tbl.delete(delete_filter="city == 'Paris'")
```

This produces the following result with `tbl.scan().to_arrow()`:
Any rows where `city` equals `Paris` are removed. Running `tbl.scan().to_arrow()` afterwards yields:

```python
pyarrow.Table
city: large_string
city: string
lat: double
long: double
----
city: [["New York"],["Amsterdam","San Francisco","Drachten"]]
lat: [[40.7128],[52.371807,37.773972,53.11254]]
long: [[74.006],[4.896029,-122.431297,6.0989]]
city: [["Amsterdam","San Francisco","Drachten"],["Groningen"]]
lat: [[52.371807,37.773972,53.11254],[53.21917]]
long: [[4.896029,-122.431297,6.0989],[6.56667]]
```

If the PyIceberg table is partitioned, you can use `tbl.dynamic_partition_overwrite(df)` to replace the existing partitions with new ones provided in the dataframe. The partitions to be replaced are detected automatically from the provided arrow table.
For example, with an iceberg table with a partition specified on `"city"` field:
When the predicate matches all rows in a Parquet file (e.g., `tbl.delete(delete_filter="city == 'Groningen'")`), PyIceberg drops the entire file without scanning its contents.

### Dynamic Partition Overwrite

For partitioned tables, `dynamic_partition_overwrite` replaces only the partitions present in the provided Arrow table. The partitions to overwrite are detected automatically — you do not need to specify them explicitly.

First, create a partitioned table:

```python
from pyiceberg.schema import Schema
from pyiceberg.types import DoubleType, NestedField, StringType
from pyiceberg.partitioning import PartitionSpec, PartitionField
from pyiceberg.transforms import IdentityTransform

schema = Schema(
NestedField(1, "city", StringType(), required=False),
Expand All @@ -454,23 +464,21 @@ tbl = catalog.create_table(
)
```

And we want to overwrite the data for the partition of `"Paris"`:
Write some initial data:

```python
import pyarrow as pa

df = pa.Table.from_pylist(
[
{"city": "Amsterdam", "lat": 52.371807, "long": 4.896029},
{"city": "San Francisco", "lat": 37.773972, "long": -122.431297},
{"city": "Drachten", "lat": 53.11254, "long": 6.0989},
{"city": "Paris", "lat": -48.864716, "long": -2.349014},
{"city": "Paris", "lat": -48.864716, "long": -2.349014}, # incorrect coordinates
],
)
tbl.append(df)
```

Then we can call `dynamic_partition_overwrite` with this arrow table:
To correct only the `Paris` partition:

```python
df_corrected = pa.Table.from_pylist([
Expand All @@ -479,7 +487,7 @@ df_corrected = pa.Table.from_pylist([
tbl.dynamic_partition_overwrite(df_corrected)
```

This produces the following result with `tbl.scan().to_arrow()`:
Only the `Paris` partition is replaced. All other partitions remain unchanged. `tbl.scan().to_arrow()` now yields:

```python
pyarrow.Table
Expand All @@ -492,6 +500,35 @@ lat: [[48.864716],[52.371807],[53.11254],[37.773972]]
long: [[2.349014],[4.896029],[6.0989],[-122.431297]]
```

### Transaction API

All write operations can also be issued as part of a transaction, which lets you combine multiple mutations — including schema changes, property updates, and data writes — into a single atomic commit.

```python
with tbl.transaction() as txn:
txn.append(df)
```

You can combine multiple write operations in one transaction:

```python
with tbl.transaction() as txn:
txn.delete("city == 'Paris'")
txn.append(pa.Table.from_pylist([{"city": "New York", "lat": 40.7128, "long": 74.0060}]))
```

You can also mix data writes with metadata changes in the same transaction:

```python
with tbl.transaction() as txn:
txn.append(df)
with txn.update_schema() as update_schema:
update_schema.add_column("population", "long")
txn.set_properties(owner="data-team")
```

If an exception is raised inside the `with` block, no snapshot is committed to the catalog. Note that data files already written to object storage are not automatically cleaned up in that case.

### Upsert

PyIceberg supports upsert operations, meaning that it is able to merge an Arrow table into an Iceberg table. Rows are considered the same based on the [identifier field](https://iceberg.apache.org/spec/?column-projection#identifier-field-ids). If a row is already in the table, it will update that row. If a row cannot be found, it will insert that new row.
Expand Down Expand Up @@ -553,6 +590,7 @@ upd = tbl.upsert(df)

assert upd.rows_updated == 1
assert upd.rows_inserted == 1
# Paris was already up-to-date; PyIceberg skips it silently
```

PyIceberg will automatically detect which rows need to be updated, inserted or can simply be ignored.
Expand Down Expand Up @@ -1370,20 +1408,6 @@ table = table.transaction().remove_properties("abc").commit_transaction()
assert table.properties == {}
```

## Snapshot properties

Optionally, Snapshot properties can be set while writing to a table using `append` or `overwrite` API:

```python
tbl.append(df, snapshot_properties={"abc": "def"})

# or

tbl.overwrite(df, snapshot_properties={"abc": "def"})

assert tbl.metadata.snapshots[-1].summary["abc"] == "def"
```

## Snapshot Management

Manage snapshots with operations through the `Table` API:
Expand Down
Loading