Cross-pipeline integration-test framework for GFW data pipelines (pipe-gaps, anchorages_pipeline/port-visits, pipe-events/fishing).
The dit CLI drives Python workflow files that orchestrate phases (pipeline invocations) across modes (trigger patterns: bf / bfd / bftruncate / mutate-recover) and assert equivalence on the resulting BQ tables via table-check summary.
The framework is intentionally thin — a small library plus per-pipeline workflow files. Capabilities, in scope today:
- Pipeline-agnostic runners (
dit.runners.docker,dit.runners.dataflow). The docker runner invokes a published or locally-built image; the dataflow runner submits an in-process Beam pipeline with lock-split submit/wait. Both used by Phase 1; the docker runner doubles for "Beam-in-container submits to Dataflow" workflows too (port-visits' shape). - Comparison shim (
dit.compare.compare_tables) — thin wrapper overtable-check summaryfrom thetable_identical_checksrepo. Per-column tolerances,view_suffixfor SCD-2 last-versions vs. truncate-shape,keysfor the comparison join. - BQ + date utilities (
dit.bq,dit.dates) — drop tables by prefix, query for restricted ssvids, snapshot a table or whole dataset for source-data pinning across cross-version runs, half-open date iteration. Used by workflows that need pre/post-run setup or computed inputs. - Library-first: anything you can do via
dit run …you can also do by importingdit.*from a pytest target or another Python script. The CLI is one consumer of the library, not the only one. - Workflow file conventions: per-pipeline workflows live in
workflows/<pipeline>/<name>.pyand exposemain(argv) -> int. Output tables tagged with<experiment_id>_<commit>_<uuid>for provenance;--experiment-id/DIT_EXPERIMENT_IDclusters N runs (e.g.pipe-gaps@mainvs@pr-NNN) under one BQ-prefix-scannable slug, defaulting tosolo_<6-hex>when unset. (--allow-dirty-treeexists today as a transition aid but is scheduled for removal — seedocs/no-dirty-tree-pivot.md. Going forward, dit auto-snapshots dirty trees to a pushed ref and runs reproducibly.) - Three install modes per pipeline: editable (fast inner loop), specific-ref (
REF=<sha-or-branch>), and snapshot (git stash create→ anchored on adit-snapshot-<epoch>branch). See Usage § Install modes below. - Per-user infra knobs via
DIT_*env vars:DIT_DEST_DATASET,DIT_DATAFLOW_SA,DIT_DATAFLOW_REGION,DIT_DATAFLOW_TEMP_BUCKET,DIT_DATAFLOW_SUBNETWORK,DIT_BQ_TEMP_DATASET. Plays cleanly with direnv via.envrc.example. - Pipeline integration contract (
docs/pipeline-contract.md) — what a pipeline must expose to be cleanly testable bydit, with an adoption matrix tracking where each current pipeline stands. - Cross-version experiments (
workflows/port_visits/cross_version_ais.py) — pin source data via BQ snapshots at a fixed timestamp, run a workflow at N pipeline-version bindings (git refs in the pipeline checkout), then diff corresponding output tables pairwise. Foundation for PR-validation comparisons (pipe-anchorages@mainvs@pr-NNN). - Content-addressable run cache (
dit.cache+world-fishing-827.tech_great_expectations.dit_runs) — everyexecute_*mode hashes(pipeline_commit, worker_image_digest, workflow_file_sha1, params)and looks up the run-cache table. On hit (output tables still present), the workflow skips Dataflow entirely and uses the cached FQN for downstream comparisons; on miss, it runs and inserts a row with output tables + provenance. The same table also serves as registry + cleanup source formake dit-cancel(M5, in flight). Pipe-gaps wired in 2026-05-22; port-visits next. Design:docs/run-cache.md; milestone tracker:docs/run-cache-impl.md. - Centralised Dataflow job-name builder (
dit.job_names.make_job_name) — single source of truth for thedit-<repo>-<step>-<exp>-<binding?>-<mode?>-<N?>-<M?>shape, with 63-char truncation that preserves the load-bearing tail. Both workflows use it; the prefix lets a single label-filter find every dit-launched Dataflow job in the UI. - Submitter-vs-worker guardrail (
dit.git_info.warn_if_worker_image_misses_dirty_tree) — Dataflow runs install pipeline code from two different sources (submitter from/workspace, workers from the registry). When the submitter's tree is dirty but--worker-imageis unchanged from the workflow's default, the workers won't see the changes; the helper logs a prominent banner pointing at the build+push fix. (Scheduled for removal in the no-dirty-tree pivot — the warn-helper exists only because dirty submitters exist. The underlying submitter-vs-worker split itself stays relevant — workers always install from a published image, the submitter from local source — but with all submitters running committed code post-pivot, the warning is no longer the right vehicle.) - Cloud Build ad-hoc runtime (
cloudbuild-dit.yaml,make dit-cloud,make publish-ditbox) — submit a workflow run to Cloud Build with one command. The pipeline checkout flows through as the build source; dit is cloned fresh per run at a configurable ref. ditbox builds via kaniko with a registry-backed layer cache; per-run pipeline installs useuv(~10× faster than pip). The_BEAM_VERSIONsubstitution pins apache-beam to match the worker image's SDK (per-pipeline defaults in the Makefile:pipe-gaps=2.71.0,anchorages_pipeline=2.69.0). Validated end-to-end 2026-05-22 — caught a realpipe-gaps:v0.9.6mode-equivalence bug and verified the fix against a custom-published worker image. PR-trigger integration in each pipeline repo comes as a follow-up.
Pipeline-shape primitives (Phase/Mode/Mutation/Oracle dataclasses, mutation library, phase-sharing via BQ COPY, golden-table regression mode) are deliberately not extracted yet — see Roadmap below for why and when.
docs/architecture.md— visual reference: repo ownership, run modes, workflow flows, Cloud Build runtime, image namespace. Mermaid diagrams that render on GitHub.docs/plan.md— implementation plan, three-repo split, public API contracts, Phase 1 task breakdown.docs/conventions.md— prod-infra boundary, dit image namespace (gcr.io/world-fishing-827/dit/*), standard build-and-push workflow.docs/pipeline-contract.md— what a GFW pipeline must expose to be cleanly integration-testable; adoption matrix for the three current pipelines. Audience: pipeline maintainers.docs/context.md— background, source bugs the framework caught, branch state at handoff.docs/framework-vision.md— long-term shape (don't optimise for it; Phase 1 stays imperative).CLAUDE.md— working agreements and Plan changelog.
python3 -m venv venv && source venv/bin/activate
make install-pipe-gaps # or install-port-visits / install-pipe-events / install-alldit is pipeline-agnostic; workflow dependencies (pipe-gaps, anchorages_pipeline, pipe-events) are not in dit's base requirements.txt. By default the Makefile assumes sibling checkouts ($(realpath ..)). If yours live elsewhere, prepend PROJECTS=/path to any target or copy .envrc.example → .envrc (gitignored; loaded by direnv).
For the framework only (no workflow deps), make install works — but the dataflow runner won't load without a workflow install bringing apache-beam[gcp] transitively.
| When | Target | What happens |
|---|---|---|
| Active dev on a pipeline (fast inner loop) | make install-<pipeline> |
pip install -e <pipeline-dir> — working-tree edits picked up immediately. |
| Reproducible run against a specific committed ref | make install-<pipeline>-ref REF=<sha-or-branch> |
pip install --force-reinstall --no-deps git+file://...@<ref> — non-editable, exactly that commit, ~5-10s per ref. |
| Test what's currently in the working tree, reproducibly | make snapshot-<pipeline> |
git stash create captures tracked changes (working tree untouched), anchors on a dit-snapshot-<epoch> branch, installs from that ref. |
| Pipeline's transitive deps changed in target ref | add FULLDEPS=1 |
Drops --no-deps, lets pip reinstall the full dep tree (slower; only needed when the target ref bumped or added a dep). |
| GC the temp snapshot branches | make clean-snapshots |
Removes dit-snapshot-* branches from all three pipeline checkouts. |
Notes on snapshot mode: git stash create captures tracked modifications only. Run git add -A in the pipeline repo first if untracked source files need to be in the snapshot. The snapshot branch persists for traceability until make clean-snapshots.
dit run workflows/<pipeline>/<workflow>.py [workflow-args...]The CLI loads the Python module at the given path and invokes its main(argv) -> int entry point. Workflows can also be imported directly from a pytest target — dit is library-first, CLI-second.
Example (Phase 2 AIS-staging, the verified mode-equivalence test for port-visits):
dit run workflows/port_visits/ais.py --runner dataflow --parallel --build-from-sourcedit covers a small set of orthogonal axes, summarised first, then walked through as concrete scenarios. Best-practice paths are flagged ⭐.
| Axis | Choices |
|---|---|
| Where | local (dit run …) / Cloud Build (make dit-cloud …) |
| Pipeline ref | clean HEAD of pipeline checkout / auto-created snapshot under refs/dit-snapshots/<pipeline>/<commit-short-sha> (content-addressable; one ref per distinct working-tree state) / a specific committed ref (make install-<pipeline>-ref REF=… or make dit-cloud REF=…) / a PR head SHA (automated, future) |
| Worker image | published default (e.g. pipe-gaps:v0.9.6) / custom-built from a ref (for changes that touch worker code) |
| Single vs cross-version | one ref / N refs compared (workflows/port_visits/cross_version_ais.py) |
| Trigger | manual CLI / PR event (automated, future) / scheduled (future) |
Future-state note (see
docs/no-dirty-tree-pivot.md): under the in-flight pivot, every dit run executes a committed git ref. If your working tree is dirty when you invoke dit, the workflow auto-snapshots torefs/dit-snapshots/<pipeline>/<commit-short-sha>(content-addressable — identical tree state always produces the same SHA, so repeat runs of unchanged uncommitted code hit the cache), auto-pushes, and uses that ref — no--allow-dirty-treeflag, no special-case logic, every cache row reproducible. The scenarios below describe the end state.
You're working on a pipeline change; you commit it; push to a branch; open a PR. dit fires automatically and compares the PR against main's cached output.
$ cd $PROJECTS/pipe-gaps
$ git checkout -b fix/PIPELINE-1465-tiebreaker
$ git commit -am "Add deterministic tiebreaker to message sort"
$ git push -u origin fix/PIPELINE-1465-tiebreaker
$ gh pr create --title "Fix PIPELINE-1465 tiebreaker" --body "..."
# ← dit triggers here, automatically
# ← Check Run appears on the PR with the verdict
When to use: default for any pipeline change you intend to land. No commands beyond your normal git workflow. dit hits the cache for main's side (it's already been computed), runs the PR side fresh, posts the diff as a Check Run.
You're mid-development; you want feedback before opening a PR. dit detects the dirty tree, auto-snapshots, auto-pushes, runs against the snapshot. Your working tree is untouched.
$ cd $PROJECTS/pipe-gaps && vim src/... # edits, not committed
$ cd $PROJECTS/data_integration_tests
$ make dit-cloud PIPELINE=pipe-gaps WORKFLOW=workflows/pipe_gaps/mode_equivalence.py
# auto-snapshots → refs/dit-snapshots/pipe-gaps/<commit-short-sha>
# auto-pushes
# runs against the snapshot
When to use: iterating on a fix that isn't PR-ready (e.g. making a pipeline idempotent across modes). Each edit produces a fresh snapshot (new cache key, new MISS, full Dataflow workload). Once you're happy, commit + push the same code to a real branch + open a PR (Scenario A).
Caveats printed at snapshot time:
- First run against each new tree-state is a cache MISS; repeat runs of an unchanged tree are a HIT (snapshot SHA is content-addressable — derived from
git write-treewith frozen author/committer dates, so identical tree → identical SHA → identical cache key). - Only tracked modifications are captured —
git add -Afirst if you have new files. - The cache row is tagged
unreviewed_code=TRUEso PR-validation queries skip it. - If your changes touch worker code, build+push a custom worker image too (
--worker-image=…). - Requires
git pushpermission on the pipeline repo. If you're a read-only viewer, dit will fail at push time with a clear error pointing atmake install-<pipeline>-ref REF=<committed-ref>or asking you to commit + push the changes via a normal branch first.
You want to test a specific committed ref without changing what's installed.
$ make dit-cloud PIPELINE=pipe-gaps REF=main WORKFLOW=workflows/pipe_gaps/mode_equivalence.py
# or:
$ make dit-cloud PIPELINE=pipe-gaps REF=fix/PIPELINE-1465-tiebreaker WORKFLOW=...
# or to install locally and run from your venv (e.g. for ad-hoc inspection, or to drive Dataflow without Cloud Build):
$ make install-pipe-gaps-ref REF=fix/PIPELINE-1465-tiebreaker
$ dit run workflows/pipe_gaps/mode_equivalence.py --runner dataflow ...
When to use: testing main as a regression baseline; reviewing a teammate's PR locally; reproducing a historical run from a dit_runs row's pipeline_commit.
You have a fix and want to verify it doesn't regress anywhere it shouldn't.
$ dit run workflows/port_visits/cross_version_ais.py \
--experiment-id pipeline-1465 \
--pin-source-at 2026-05-15T10:00:00Z \
--binding before=main \
--binding after=fix/PIPELINE-1465-tiebreaker \
--binding-worker-image after=gcr.io/world-fishing-827/dit/pipe-anchorages:fix-tiebreaker \
--runner dataflow --parallel --build-from-source
When to use: fix-verification before PR (or as a separate verification run alongside Scenario A). Each binding runs against a frozen source snapshot at --pin-source-at; outputs are diffed pairwise.
Per-binding --worker-image is important when the change touches worker code — see memory [[submitter-vs-worker-split]].
A previous cache row says pipeline_commit=<sha>. You want to rerun against that exact code.
$ bq query --use_legacy_sql=false --project_id=world-fishing-827 \
'SELECT pipeline_commit, params_json FROM `world-fishing-827.tech_great_expectations.dit_runs`
WHERE run_id = "<rid>"'
$ make dit-cloud PIPELINE=pipe-gaps REF=<sha> WORKFLOW=workflows/pipe_gaps/mode_equivalence.py ARGS="..."
When to use: investigating a past diff; auditing a result; sanity-checking that a snapshot ref is still fetchable. Because every dit run executes a pushed ref, this works for anyone with read access to the pipeline repo — the row is genuinely portable.
$ make dit-cancel RUN_ID=<rid>
Looks up the run's Dataflow job IDs + output table FQNs in dit_runs, cancels the jobs, drops the tables, marks the row cancelled. Reads the same dit_run_id label dit stamps on every Dataflow job + BQ table from a run.
You can manually invoke the snapshot step before running dit. Equivalent to letting make dit-cloud auto-snapshot, but useful when you want to inspect the snapshot ref before the run, or when running multiple dit invocations against the same uncommitted code base (the second run hits cache against the first).
$ make snapshot-pipe-gaps
# Prints: Created refs/dit-snapshots/pipe-gaps/<commit-short-sha>; pushed to origin
# (or: ref already exists on origin, skipping push — content-addressable).
$ make dit-cloud PIPELINE=pipe-gaps REF=refs/dit-snapshots/pipe-gaps/<that> ...
Snapshots live forever by design — bytes-scale storage on origin in a hidden ref namespace, no measurable cost. There's no periodic-cleanup target.
The one exception:
$ make clean-snapshot REF=refs/dit-snapshots/pipe-gaps/<sha>
Deletes the specified snapshot ref locally and on origin in one step. Intended for secret-leak remediation — e.g. you noticed a .env file or a credential was tracked in the working tree when you ran make dit-cloud. Surgical, user-invoked. The cache row in dit_runs retains pipeline_commit_parent, so removing the snapshot ref doesn't lose reproduce context for the rest of the row's lifetime.
| Question | Answer | Scenario |
|---|---|---|
| Are you opening a PR? | Yes | A (best) |
| Not yet — iterating | B (auto-snapshot) | |
| Testing an existing ref | C | |
| Are you comparing two refs against pinned source? | Yes | D |
| Are you reproducing or auditing a past run? | Yes | E |
| Need to clean up a stuck run? | Yes | F (M5) |
Each phase below is a short summary of what's planned and where we are. The canonical detail lives in docs/plan.md; this section is the operational dashboard.
| Phase / capability | Scope | Status (2026-05-22) |
|---|---|---|
| 1 — pipe-gaps port | Stand up the repo. Lift the four-mode mode-equivalence test from pipe-gaps/tests/integration/mode_equivalence.py onto dit.* helpers. Drop --runner=local. Replace the source file with a shim. |
Code complete + verified on real BQ 2026-05-22 (caught a real pipe-gaps:v0.9.6 non-determinism bug; custom worker image with fix collapsed all 3 pairwise diffs to zero). Pending only: Track 5 shim swap in the pipe-gaps repo. |
| 2 — port-visits | Ship AIS-staging, VMS, and AIS-full workflows for anchorages_pipeline's two-step port-visits (thin_port_messages → port_visits). First real test of the dit.compare abstraction on the truncate-shape (view_suffix="", keys=["visit_id"]). |
AIS-staging verified 2026-05-15 (3/3 pairwise green). VMS not started; AIS-full not started. |
| Cross-version experiments | BQ snapshot helpers (dit.bq.snapshot_table/snapshot_dataset), experiment-id linkage (--experiment-id/DIT_EXPERIMENT_ID), structured Dataflow job names + dynamic labels, and an end-to-end orchestrator (workflows/port_visits/cross_version_ais.py) for diffing pipeline outputs across versions against pinned input. |
Landed 2026-05-15. Validated end-to-end via the PIPELINE-1465 cross-version test + the pipe-gaps --worker-image flow on 2026-05-22. |
| Runtime & CI (Cloud Build) | ditbox image + cloudbuild-dit.yaml + make dit-cloud / make publish-ditbox targets. Moves the orchestrator off the laptop; serves both gcloud builds submit ad-hoc and GitHub-webhook PR triggers. Tiered triggers (cheap AIS-staging on every PR, heavy AIS-full on label) come on top. |
Ad-hoc path battle-tested 2026-05-22 (kaniko cache, uv per-run installs, _BEAM_VERSION pin, dit.job_names centralisation, dit.git_info warn). Per-pipeline PR triggers pending. 30-concurrent-build slot cap is the next architectural ceiling — Cloud Run jobs flagged as the migration target if dit graduates to per-PR matrix testing. |
| 3 — pipe-events port | Port pipe-events/integration_tests/staging-bf_bfd_bftruncate.sh (bash, no comparisons) to workflows/pipe_events/fishing.py. Add automated comparisons. Then decide whether to extract Phase/Mode/Oracle dataclasses based on three-consumer evidence. |
Not started. |
| 4 — composer-dags param sync | dit sync-params --from <composer-dags-checkout> reads production DAGs and regenerates params.yaml. Triggered when a real prod-vs-test param drift bug shows up. |
Not started. |
| 5 — Mutation library | Promote pipe-gaps' compute_restricted_ssvids into dit.mutations along with drop_messages, shift_timestamps, set_segment_flag. Cap at ~5 mutations. |
Not started; waits for a second consumer. |
| 6 — Phase sharing | Hash (image-tag, phase-config, mutation-set); second invocation of an identical phase becomes a BQ COPY instead of a re-run. Cuts wall-clock for CI. |
Not started; build only when duplicate-run cost matters operationally. |
| 7 — Golden-table mode | Per-workflow reference _1_bf table keyed by (image-tag, params-hash, date-range); future runs assert byte-equivalence vs. the golden table. Cheap PR-validation regression check. Implementable on top of the cross-version snapshot machinery. |
Not started. |
Operational next steps (rolling, in priority order):
- No-dirty-tree pivot (
docs/no-dirty-tree-pivot.md). Deprecate--allow-dirty-tree; auto-snapshot+push dirty trees; renamepipeline_dirty→unreviewed_code; collapse the accumulated special-case logic before M5 inherits any of it into port-visits. - Run cache M5 — port-visits integration +
make dit-cancel. Wiredit.cacheintoworkflows/port_visits/ais.pyvia a_run_with_cache-style wrapper (structurally identical to the pipe-gaps M4 work that just landed inworkflows/pipe_gaps/mode_equivalence.py). Addmake dit-cancel RUN_ID=<id>reading the cache table to cancel Dataflow + drop output tables. Implementcancel_runindit.cache. - Run cache M6 — SIGTERM trap inside
dit runso Cloud Build cancellations cleanly tear down their own Dataflow + BQ artefacts viacancel_run. - Per-pipeline PR triggers. Wire
pipe-gaps(most-verified) →anchorages_pipeline→pipe-eventsto fire dit on PRs. Trigger config lives in each pipeline repo; referencescloudbuild-dit.yamlfrom the dit checkout. Path-filter on pipeline-relevant files;dit:runlabel as the escape-hatch override. - PR comment integration via Check Runs.
dit.reportmodule + GitHub Check Run API call from insidedit run. Typed contract end-to-end (no log parsing). Design doc TBD; separately,docs/llm-pr-gating.mdcovers the negative-signal-only LLM pre-filter that gates whether a PR triggers dit (orthogonal concern from how the verdict is posted). - Track 5 — pipe-gaps repo shim, opportunistically (replace
pipe-gaps/tests/integration/mode_equivalence.pywith a thin re-export fromdit/workflows/pipe_gaps/...). - VMS port-visits workflow, then AIS-full (the latter is what motivates Cloud Run jobs hardest once concurrent runs scale up).
- PIPELINE-1465 cross-version full validation against newly-built pipe-anchorages images (parallel to the pipe-gaps validation just completed).
- The first dit release tag (
v0.1.0) once per-pipeline PR triggers + caching land — that's the natural "framework is usable by outsiders" milestone.
The canonical detailed version of this list lives in docs/plan.md § Next steps.