From 288714ce05d57a801b3b609e8b58d34ed31a282d Mon Sep 17 00:00:00 2001 From: igerber Date: Fri, 22 May 2026 08:30:18 -0400 Subject: [PATCH 1/2] TwoStageDiD Wave E.3 parity: always-treated drop preserves full-domain survey design MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Mechanical transfer of the Wave E.3 SpilloverDiD invariant (PR #482, merge 24de9062) to TwoStageDiD. When the always-treated handler drops units from the OLS sample, the resolved survey design retains its FULL-DOMAIN n_psu / n_strata / df_survey / strata / fpc / psu arrays instead of being subsetted via replace(...). Per-cluster scores aggregate at fit-length then zero-pad onto the full-domain unique-PSU list via two new optional kwargs on _compute_gmm_variance: score_pad_mask and cluster_ids_full. PSUs containing only always-treated rows get zero score rows but still count toward G_full for n_psu / df_survey accounting. Documented synthesis (library-convention adoption): adopts the canonical R survey::svyrecvar(subset()) convention (Lumley 2010 §2.5), already established at imputation.py:2175-2183 (PreTrendsImputation), prep.py:1401-1432 (DCDH cell variance), and spillover.py (PR #482). Implementation: - diff_diff/two_stage.py: delete L1485-1525 design-subset block; promote keep_mask to fit()-level scope (always defined; defaults all-True); cluster injection sources cluster_ids_raw from FULL-DOMAIN data[cluster] (not post-drop df[cluster]) so _inject_cluster_as_psu's zip against resolved_survey.strata stays length-aligned; df["_survey_cluster"] aligned to post-drop length via resolved_survey.psu[keep_mask.values]; _compute_gmm_variance + 3 inner _stage2_* methods gain score_pad_mask / cluster_ids_full kwargs; zero-pad expansion after per-cluster aggregation; strata/fpc obs_idx lookups use cluster_ids_full under padding; G < 2 unidentified gate fires on G_full when padding active. - diff_diff/two_stage.py: _refit_ts callback subsets each replicate weight w_r via keep_mask.values before threading into _fit_untreated_model and _stage2_*, matching the keep_mask-subsetting applied to survey_weights in the main fit (otherwise solve_ols rejects the length mismatch and compute_replicate_refit_variance swallows the ValueError so replicate inference NaNs out). - Always-treated warning text updated to reflect the new contract: weights are subsetted for OLS, design retained for variance. - Replicate variance + always-treated: existing path now works correctly (score_pad_mask_arg stays None on _uses_replicate_ts paths; per-replicate refit handles resampling separately). Tests (tests/test_two_stage.py): - New TestTwoStageDiDWaveE3ParityAlwaysTreated class with 8 tests: (a) no-always-treated baseline, (b) full-domain df_survey preservation under drop, (c) full-domain n_psu reporting, (d) per-cluster zero-pad mock-spy on _compute_stratified_meat_from_psu_scores, (e) subpopulation + always-treated composition, (f) cluster-as-PSU + always-treated, (g) no-survey path unchanged, (h) PSU entirely-always-treated. Tests (tests/test_replicate_weight_expansion.py): - Strengthened test_two_stage_always_treated to assert finite overall_se / overall_p_value / overall_conf_int (was only asserting finite ATT, missing the replicate-SE regression class). - New test_two_stage_always_treated_event_study_and_group_replicate exercising the event-study + group replicate refit branches end-to-end under always-treated drop with aggregate="all"; asserts finite se + p_value + conf_int on non-reference horizons and cohorts. Docs: - docs/methodology/REGISTRY.md: TwoStageDiD section gains "documented synthesis — Wave E.3 parity" note; SpilloverDiD Wave E.3 follow-up note updated to mark TwoStageDiD parity as shipped. - CHANGELOG.md: Unreleased Added entry leading with documented-synthesis framing. - TODO.md: drop parity follow-up row. Co-Authored-By: Claude Opus 4.7 --- CHANGELOG.md | 1 + TODO.md | 1 - diff_diff/two_stage.py | 290 +++++++++++++---- docs/methodology/REGISTRY.md | 3 +- tests/test_replicate_weight_expansion.py | 93 +++++- tests/test_two_stage.py | 384 ++++++++++++++++++++++- 6 files changed, 694 insertions(+), 78 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 8feaf634..ff5e4c7e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,6 +8,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] ### Added +- **TwoStageDiD: parity with SpilloverDiD Wave E.3 — always-treated unit drop preserves full-domain survey design via zero-padded scores.** Closes the parity follow-up tracked at `TODO.md` after PR #482 (SpilloverDiD Wave E.3, merge `24de9062`). When TwoStageDiD detects always-treated units (`first_treat <= min_time`) and removes them from the OLS sample, the resolved survey design retains its FULL-DOMAIN `n_psu` / `n_strata` / `df_survey` / `strata` / `fpc` / `psu` arrays instead of being subsetted via `replace(resolved_survey, ...)`. Per-cluster stage-1 / stage-2 score aggregates are computed at the post-drop fit-sample length and then zero-padded onto the full-domain unique-PSU list before stratified-meat dispatch via two new optional kwargs on `_compute_gmm_variance`: `score_pad_mask` (full-domain boolean keep mask) and `cluster_ids_full` (full-domain post-injection PSU labels). PSUs containing only always-treated rows get zero score rows but still count toward `G_full` for `n_psu` / `df_survey` accounting. **Documented synthesis (library-convention adoption, NOT new methodology):** adopts the canonical "zero-pad scores + retain full-design resolved survey" convention from R `survey::svyrecvar(subset())` (Lumley 2010 §2.5) already established in `diff_diff/imputation.py:2175-2183` (PreTrendsImputation), `diff_diff/prep.py:1401-1432` (DCDH cell variance), and `diff_diff/spillover.py` (PR #482 Wave E.3). **Mechanical realization:** `two_stage.py:1485-1525` design-subset block deleted (the `replace(resolved_survey, ...)` subset + `n_psu` / `n_strata` recompute + post-drop `compute_survey_metadata` call); `keep_mask` promoted to `fit()`-level scope (always defined, all-True when no always-treated drop); `survey_weights = survey_weights[keep_mask.values]` retained for stage-1 / stage-2 OLS arithmetic; cluster injection block updated to source `cluster_ids_raw` from FULL-DOMAIN `data[cluster_var].values` (not post-drop `df[cluster_var].values`) so `_inject_cluster_as_psu`'s zip against `resolved_survey.strata` (full-domain) stays length-aligned; `df["_survey_cluster"]` aligned to post-drop length via `resolved_survey.psu[keep_mask.values]`; post-injection `compute_survey_metadata` uses full-domain `raw_w` from `data[survey_design.weights]`. `_compute_gmm_variance` adds the zero-pad expansion after the per-cluster aggregation (mapping fit-sample `unique_clusters` into `unique_clusters_full` positions via `np.searchsorted`) and updates the strata/fpc `obs_idx` lookups to use `cluster_ids_for_lookup = cluster_ids_full` when padding is active. The three inner stage-2 methods (`_stage2_static`, `_stage2_event_study`, `_stage2_group`) thread the new kwargs through; bootstrap-resample call sites keep default `None` (no behavior change on bootstrap path). **Always-treated warning text updated:** "Associated survey weights subsetted for stage-1 / stage-2 OLS; full-domain survey design retained for variance estimation (Wave E.3 parity)." replaces the prior "and design arrays adjusted" claim. **No-survey path unchanged:** when `resolved_survey is None`, both `score_pad_mask` and `cluster_ids_full` default to `None` and the existing post-drop scoring path runs bit-identically. **Replicate variance + always-treated drop:** existing path unchanged (replicate refit handles resampling at the survey-design level; `score_pad_mask_arg` is `None` on `_uses_replicate_ts` paths). **Tests:** new `TestTwoStageDiDWaveE3ParityAlwaysTreated` class in `tests/test_two_stage.py` (8 tests: no-always-treated baseline, full-domain `df_survey` preservation under drop, full-domain `n_psu` reporting, per-cluster zero-pad mock-spy on `_compute_stratified_meat_from_psu_scores`, subpopulation + always-treated composition, cluster-as-PSU + always-treated, no-survey path unchanged, PSU entirely-always-treated). REGISTRY.md TwoStageDiD section gains a "documented synthesis — Wave E.3 parity" note; SpilloverDiD Wave E.3 section updated to mark the TwoStageDiD parity follow-up as shipped. - **WooldridgeDiD `vcov_type` parameter, OLS path (Phase 1b PR 3/8).** `WooldridgeDiD(vcov_type=...)` now accepts `{"classical","hc1","hc2","hc2_bm"}` on `method="ols"` (defaults to `"hc1"`, preserves prior behavior at machine precision — the WLS-CR1 sandwich is algebraically invariant between the prior within-transform path and the new branched path, differing only by float64 multiplication ordering at sub-ULP scale; the full 106-test `tests/test_wooldridge.py` baseline still passes unchanged). `hc2_bm` auto-routes to a full-dummy saturated design (`[intercept, X_design, unit_dummies, time_dummies]`) + clubSandwich WLS-CR2 algebra (PR #475) — matches `clubSandwich::vcovCR(lm(...), type="CR2") + coef_test()$df_Satt` at `atol=1e-10` on the new `benchmarks/data/wooldridge_golden.json` fixture. `classical`/`hc2` supported via full-dummy + auto-drop of the unit auto-cluster (one-way families); explicit `cluster="X"` + one-way family raises at the linalg validator. Per-cell + aggregate p-values/CIs on `classical`/`hc2` paths use the residual DOF `n - rank(X)` (matches R `lm()` / `coef_test()` t-distribution), not normal-theory. **Bell-McCaffrey Satterthwaite DOF is threaded across ALL hc2_bm user-facing inference surfaces**: (1) per-cell `group_time_effects[(g, t)]` use `coef_test()$df_Satt` (matches R at atol=1e-6 from CI inversion); (2) overall ATT uses the post-period-aggregation contrast DOF from `_compute_cr2_bm_contrast_dof` (matches R `Wald_test(test="HTZ")$df_denom` at atol=1e-10); (3) `.aggregate("group" | "calendar" | "event")` recomputes contrast-specific BM DOFs lazily from BM artifacts stored on the Results object — the REDUCED kept-column design (`X_red`), cluster_ids, reduced bread matrix, and reduced-space coef-index map (using the reduced kept-column design after rank-deficient drops keeps the bread non-singular and matches the subspace `solve_ols` actually estimated in). Fail-closed (all-NaN inference) when BM DOF unavailable, mirrors PR #475 R7 and PR #479 R3. `method ∈ {"logit","poisson"}` + `vcov_type != "hc1"` raises `NotImplementedError` at `__init__` (GLM CR2-BM-on-pseudo-residuals composition needs derivation; deferred to follow-up TODO row). `SurveyDesign` + `vcov_type != "hc1"` raises `NotImplementedError` at `fit()` (survey TSL overrides analytical sandwich). `n_bootstrap > 0` + one-way (`hc2`/`classical`) raises at `fit()` regardless of `cluster=` setting (multiplier bootstrap is intrinsically clustered, but one-way vcov_type does not compose with cluster_ids — either the auto-cluster is dropped when `cluster=None` leaving the bootstrap with no cluster to draw at, or the linalg validator rejects one-way + cluster_ids when `cluster=X`). `conley` rejected at `__init__` with a deferral pointer. `vcov_type`, `cluster_name`, `n_clusters` added to `WooldridgeDiDResults` for downstream introspection (per `feedback_results_vcov_label_cluster_metadata`). Third PR of the Phase 1b standalone-estimator threading initiative (5 PRs to follow: CallawaySantAnna, ImputationDiD, TripleDifference, TwoStageDiD, EfficientDiD). - **`SpilloverDiD(survey_design=SurveyDesign.subpopulation(...))` full-design retention via zero-pad scores (Wave E.3).** Closes the Wave E.1/E.2/follow-up documented limitation at `REGISTRY.md:3249`: `SurveyDesign.subpopulation()`-derived designs AND warn-and-drop fits now preserve the full-domain resolved survey design — `n_psu` / `n_strata` / `df_survey` / Binder TSL per-stratum centering reflect the FULL domain rather than the post-`finite_mask` fit sample. **Documented synthesis (library-convention adoption, NOT new methodology):** Wave E.3 adopts the canonical "zero-pad scores to full panel + retain full-design resolved survey" pattern from R `survey::svyrecvar(subset())` (Lumley 2010 §2.5) already established in `diff_diff/imputation.py:2175-2183` (PreTrendsImputation lead regression — Omega_0 scores zero-padded back to full panel length) and `diff_diff/prep.py:1401-1432` (DCDH cell variance — IF zero-padded outside the cell). Wave E.3 propagates the same convention to SpilloverDiD's Wave E.1 Binder TSL × Wave D Gardner GMM × Wave E.2/follow-up stratified-Conley + serial Bartlett meat. **Mechanical realization (one new `_compute_gmm_corrected_meat` kwarg):** the gamma_hat / Psi build stays on SURVEY-FINITE-MASK inputs (`X_1_sparse_fit`, `X_10_sparse_fit`, `eps_10_fit` built on `survey_finite_mask = finite_mask & survey_weights > 0`; `X_2_kept_gamma`, `eps_2_fit_gamma`, `survey_weights_fit_gamma` projected from the fit-sample frame down to survey_finite_mask) so the drop-first stage-1 FE column space is bit-identical to the pre-E.3 path. `_compute_gmm_corrected_meat` gains a new optional kwarg `score_pad_mask: Optional[np.ndarray] = None`: when supplied, the helper zero-pads the fit-sample `Psi` to full panel length AFTER construction but BEFORE kernel dispatch via `Psi_padded[score_pad_mask] = Psi`. Kernel-dispatch arrays (`cluster_ids`, `conley_coords`, `conley_time`, `conley_unit`, `resolved_survey`) are passed at FULL length so the meat helpers (Binder TSL / stratified-Conley / serial Bartlett) see the full-domain PSU / strata / centroid / time geometry. The `_validate_conley_kwargs` call inside the helper reads `n_for_conley = len(score_pad_mask)` when the kwarg is set so the Conley shape checks see the full-length geometry. **`gamma_hat` invariance:** the gamma_hat solve operates on fit-sample inputs throughout — bit-identical to the pre-E.3 path (critical for the case where `_build_butts_fe_design_csr`'s `pd.factorize` re-compaction would drop a different unit's column under a full-length FE build than under a fit-length one). **Bread invariance:** `A_22 = X_2_kept' W X_2_kept` at `spillover.py:3187-3214` still uses fit-length `X_2_kept` because `A_22_full = X_2_full' W_full X_2_full` equals `A_22_kept` when zero-weight rows contribute zero. **A2 invariant:** warn-and-drop and `SurveyDesign.subpopulation()` drops are treated identically — both apply the zero-pad mechanism. The "both mechanisms compose cleanly" case (subpop-excluded row that is ALSO warn-and-dropped) produces `Psi = 0` from either cause; the PSU still counts toward `n_psu_full`. Hand-computation methodology anchor at `_scratch/wave_e3_smoke.py` codifies the A2 invariant on 4 PSU × 4 period × 3 obs synthetic. **Subpopulation parity vs upstream-subset:** `df_survey` matches the full domain regardless of how many rows the subpopulation mask excludes (mirrors R `svyglm(design=subset(d, mask))` vs `svyglm(design=svydesign(data=data[mask], ...))`). SE may differ by design — subpopulation retains zero-padded PSU geometry; upstream-subset drops PSUs entirely. **Pre-E.3 baseline parity:** when `finite_mask.all() == True` AND all weights `> 0`, the Wave E.3 zero-pad is a no-op — ATT + SE + n_psu + df_survey match pre-E.3 baseline values via FIXED GOLDEN values at `test_c` (`rtol=1e-12, atol=1e-12`). **Cross-surface n_psu consistency:** top-level `res.n_psu` reads from `len(resolved_survey_fit.weights)` on the implicit-PSU branch (was `int(finite_mask.sum())` pre-codex-R1-P2-fix); this keeps `res.n_psu == res.survey_metadata.n_psu` on weights-only / strata-only survey designs under warn-and-drop. Regression at `test_c2`. **Restrictions inherited:** replicate-weight variance + subpopulation continues to raise `NotImplementedError` at the Wave E.1 gate. TwoStageDiD's analogous `finite_mask + design-subset` pattern at `two_stage.py:567-601` is NOT yet adopted to Wave E.3 — separate parity follow-up tracked in `TODO.md` (an expected-divergence test was attempted but TwoStageDiD's always-treated handling at `two_stage.py:294-336` differs from SpilloverDiD's per-unit Omega_0 check, so the divergence didn't materialize on the standard fixture; the parity follow-up should add its own targeted regression). **Implementation:** `spillover.py:2845-2896` design-subset block deleted; `survey_weights_fit = survey_weights[finite_mask]` retained for the stage-2 OLS solve which still operates on the fit sample; `cluster_ids_full[finite_mask]` subset dropped on the survey path. `_compute_gmm_corrected_meat` call at `spillover.py:3163` now receives FIT-LENGTH gamma_hat-construction inputs (unchanged) plus FULL-LENGTH kernel-dispatch arrays (`cluster_ids_for_meat`, `conley_*_for_meat`, `resolved_survey_fit`) plus the new `score_pad_mask=survey_finite_mask` kwarg; no-survey path passes `score_pad_mask=None` and uses fit-length variables throughout (bit-identical to pre-E.3). `_compute_gmm_corrected_meat` at `two_stage.py:62-80` adds one new optional kwarg `score_pad_mask: Optional[np.ndarray] = None` and one post-Psi-construction zero-pad block; the `_validate_conley_kwargs` call uses `n_for_conley = len(score_pad_mask)` when the kwarg is set. Within-unit-constancy validator at `spillover.py:2913` updated to operate on full-length unit array. Second `compute_survey_metadata` recompute at `spillover.py:2954-2959` uses full-length `raw_w`. No `_compute_stratified_meat_from_psu_scores` / `_compute_stratified_conley_meat` / `_compute_stratified_serial_bartlett_meat` signature changes. **Tests:** new `TestSpilloverDiDWaveE3SubpopulationFullDesign` and `TestSpilloverDiDWaveE3SubpopulationFullDesignEventStudy` classes in `tests/test_spillover.py` (19 tests: pre-E.3 baseline parity via pinned goldens, n_psu cross-surface consistency on implicit-PSU branch, A2 invariant (zero-pad mechanics via mock-spy), subpopulation × explicit-PSU parity, conley + lag>0 + subpopulation × explicit-PSU / cluster-injection / weights-only branches, cluster-as-PSU + subpopulation parity, unit with BOTH zero weight AND no Omega_0 support, gamma_hat-build sample excludes zero-weight rows, n_obs / n_treated / n_control / n_far_away_obs reflect count_mask, warn-drop SE drift golden, ATT bit-equality under PSU-last-sort exclusion, exact event-study n_obs propagation, event-study on both is_staggered branches with analytical + conley+lag variants). Pre-existing Wave E.1 `test_p2_finite_mask_forces_drop_under_survey` assertion flipped from `n_psu=8` (subset) to `n_psu=10` (full domain) to reflect the new contract. - **ChaisemartinDHaultfoeuille (DCDH) methodology-review-tracker promotion.** Tracker row flipped **In Progress** → **Complete** with full Verified Components / Test Coverage / Corrections Made / Deviations / Outstanding Concerns structure mirroring the HAD precedent (PR #473) and ContinuousDiD precedent (PR #476). REGISTRY `## ChaisemartinDHaultfoeuille` gains a formal `### Deviations from the paper / from R / library extensions` block consolidating 7 documented deviations into a single AI-review-recognized labeled surface (per CLAUDE.md "Documenting Deviations (AI Review Compatibility)"): (D1) equal-cell weighting (deviation from BOTH AER 2020 Equation 3 AND R `DIDmultiplegtDYN`); (D2) period-based vs cohort-based stable controls; (D3) balanced-baseline panel + interior-gap drops + terminal-missingness retention + cell-period-allocator targeted `ValueError`; (D4) SE normalization `N_l` vs R `G` (~4% smaller analytical SE); (D5) singleton-cohort degeneracy → NaN with `UserWarning`; (D6) `<50%` switcher warning at far horizons (library extension citing Favara-Imbs application, footnote 14 of NBER WP 29873); (D7) Phase 3 `DID^X` covariate first-stage equal-cell weights. R cross-language coverage holds at documented tolerance bands in `tests/test_chaisemartin_dhaultfoeuille_parity.py` (`POINT_RTOL = 1e-4` on pure-direction point estimates, `MIXED_POINT_RTOL = 0.025` on mixed-direction, `PURE_DIRECTION_SE_RTOL = 0.05` on pure-direction SE, `SE_RTOL = 0.10` on multi-horizon SE, `se_rtol=0.15` on the long-panel `L_max=5` joiners-only scenario where cell-count-weighting compounds). No source code changes, no new tests, no new docstrings — consolidation only against the existing 12 methodology tests (`tests/test_methodology_chaisemartin_dhaultfoeuille.py`), 26 R-parity tests (`tests/test_chaisemartin_dhaultfoeuille_parity.py`), 352 unit tests (`tests/test_chaisemartin_dhaultfoeuille.py`), survey suites (`tests/test_survey_dcdh.py`, `tests/test_survey_dcdh_replicate_psu.py`, three cell-period coverage suites), and two primary-source DCDH paper reviews on disk (2020 AER + 2022/2023 NBER WP 29873 via PR #478; the `dechaisemartin-2026-review.md` on disk is HAD's primary source, not DCDH's, and is referenced as adjacent context only). The REGISTRY Deviations block uses semantic section-name anchors (rather than fragile line numbers) for back-references to other parts of the DCDH section — an intentional divergence from the PR #476 ContinuousDiD precedent reflecting PR-A wording-drift CI feedback that flagged line-number cross-references as drift-prone in long sections. `METHODOLOGY_REVIEW.md` DCDH row promoted **In Progress** → **Complete**; L27 In Progress example paragraph re-pointed to WooldridgeDiD; L1289 priority-order queue item #6 (DCDH) removed and items #7-#11 renumbered to #6-#10. diff --git a/TODO.md b/TODO.md index a52f1587..8f221f59 100644 --- a/TODO.md +++ b/TODO.md @@ -147,7 +147,6 @@ Deferred items from PR reviews that were not addressed before merge. | Conley + survey weights / `survey_design`. Score-reweighted meat `s_i = w_i · X_i · ε_i` is mechanical, but PSU clustering interaction with the spatial kernel and replicate-weights variance under spatial correlation are non-trivial (Bertanha-Imbens 2014 covers cluster-sample but not the explicit Conley case). Phase 5 of the spillover-conley initiative; paper review prerequisite. Currently raises `NotImplementedError` at the linalg validator. | `linalg.py::_validate_vcov_args` | Phase 5 (spillover-conley) | Medium | | `SyntheticDiD(vcov_type="conley")` support. Currently raises `TypeError` at `__init__` because SyntheticDiD uses `variance_method ∈ {bootstrap, jackknife, placebo}` rather than the analytical sandwich that Conley plugs into. Wiring would require either reimplementing an analytical sandwich path for SyntheticDiD or designing a spatial-block bootstrap (new methodology, Politis-Romano 1994 territory). | `synthetic_did.py::SyntheticDiD` | follow-up (spillover-conley) | Low | | `SpilloverDiD(survey_design=...)` replicate-weight variance (BRR / Fay / JK1 / JKn / SDR). Wave E.1 ships Taylor-linearization only. Per Gerber (2026) Appendix A, the IF-reweighting shortcut does NOT apply to TwoStageDiD-class estimators because `gamma_hat` is weight-sensitive; correct support requires per-replicate full re-fit of stage 1 and stage 2 (200+ LoC of test surface beyond E.1). | `spillover.py::SpilloverDiD.fit`, `survey.py::compute_replicate_refit_variance` | follow-up | Low | -| `TwoStageDiD(survey_design=SurveyDesign.subpopulation(...))` full-design retention (Wave E.3 parity follow-up). SpilloverDiD shipped Wave E.3's zero-pad convention (R `survey::svyrecvar(subset())` form) but TwoStageDiD's analogous `finite_mask + design-subset + recompute n_psu/n_strata` pattern at `two_stage.py:567-601` was NOT updated in the same PR (scope discipline). Fix template: apply the same Wave E.3 surgical edits to TwoStageDiD (delete design-subset block, zero-pad Psi inputs at the meat call site, gate fit-length sparse FE behind `if resolved_survey is None:`). Parity follow-up should ALSO add a targeted regression on a fixture that actually exercises the TwoStageDiD finite-mask subset path under subpopulation (e.g. a DGP where TwoStageDiD's always-treated handling at `two_stage.py:294-336` triggers warn-and-drop AND `SurveyDesign.subpopulation()` is used) — Wave E.3 attempted such a test on a SpilloverDiD warn-and-drop fixture but found the two estimators have different warn-and-drop semantics so the divergence didn't materialize, so the parity-anchor test was deferred to this follow-up. | `two_stage.py::TwoStageDiD.fit` | follow-up (Wave E.3 parity) | Low | | `compute_survey_metadata(resolved_survey, raw_w_for_meta)` helper extraction. Wave E.1/E.3 contain two near-duplicate `raw_w_for_meta` constructions (upstream + post-cluster-injection) that differ only in which point of the resolution pipeline they fire at. Factor out a shared helper that takes `(survey_design, data, [finite_mask])` and returns `(resolved_survey, raw_w_for_meta)` to reduce drift risk between the two paths. Cosmetic; behaviour unchanged. | `spillover.py::SpilloverDiD.fit` | follow-up | Low | | Serial Bartlett kernel logic duplicated between `diff_diff/two_stage.py::_compute_stratified_serial_bartlett_meat` (survey path) and `diff_diff/conley.py::_compute_conley_meat` panel-block branch (no-survey path). Both compute `K[t,s] = (1 - |t-s|/(L+1)) * 1{|t-s| <= L, t != s}` over dense panel-period codes. Factor out a shared `_serial_bartlett_kernel_matrix(t_codes, L)` helper and a shared post-meat finite + PSD-warning guard so the survey and no-survey paths can't drift on diagnostics or kernel weights. Cosmetic; refactor doesn't change behavior. | `two_stage.py::_compute_stratified_serial_bartlett_meat`, `conley.py::_compute_conley_meat` | follow-up | Low | | `SpilloverDiD(vcov_type="conley", conley_lag_cutoff > 0, survey_design=...)` no-effective-PSU serial Bartlett HAC. Wave E.2 follow-up ships the panel-block composition when an effective PSU exists (explicit `survey_design.psu` OR injected via `cluster=` per `_inject_cluster_as_psu`). Weights-only / strata-only survey designs WITHOUT a cluster fallback raise `NotImplementedError` at `SpilloverDiD.fit` post-resolution because under the pseudo-PSU = obs-index fallback each pseudo-PSU appears in exactly one period — the per-PSU serial cross-period loop would silently contribute zero. Fix would either derive a unit-level serial fallback for no-PSU designs (mixes IF allocators with the pseudo-PSU spatial term — needs methodology work) or route the serial loop through `conley_unit` with explicit documentation of the IF-allocator asymmetry. Regression goldens vs the effective-PSU shipped path. | `spillover.py::SpilloverDiD.fit`, `two_stage.py::_compute_stratified_serial_bartlett_meat` | follow-up (Wave E.2 follow-up tail) | Low | diff --git a/diff_diff/two_stage.py b/diff_diff/two_stage.py index 82f67eba..21893734 100644 --- a/diff_diff/two_stage.py +++ b/diff_diff/two_stage.py @@ -22,7 +22,6 @@ """ import warnings -from dataclasses import replace from typing import TYPE_CHECKING, Any, Dict, List, Literal, Optional, Tuple import numpy as np @@ -1462,12 +1461,30 @@ def fit( always_treated_mask = (~df["_never_treated"]) & (df[first_treat] <= min_time) always_treated_units = df.loc[always_treated_mask, unit].unique() n_always_treated = len(always_treated_units) + # `keep_mask` is always defined (defaults to all-True over `data`) so + # downstream survey-path code can subset full-domain arrays uniformly + # whether or not the always-treated branch fires (Wave E.3 parity). + keep_mask = pd.Series(np.ones(len(data), dtype=bool), index=data.index) if n_always_treated > 0: unit_list = ", ".join(str(u) for u in always_treated_units[:10]) suffix = f" (and {n_always_treated - 10} more)" if n_always_treated > 10 else "" survey_note = "" if survey_weights is not None or resolved_survey is not None: - survey_note = " Associated survey weights and design arrays " "adjusted to match." + # Wave E.3 parity (PR #482 SpilloverDiD precedent): under the + # always-treated drop we subset `survey_weights` for stage-1 / + # stage-2 OLS arithmetic but retain the full-domain + # `resolved_survey` for variance estimation. The Binder TSL + # meat consumes zero-padded per-cluster scores keyed against + # the full-domain PSU/strata layout, matching R + # `survey::svyrecvar(subset())` (Lumley 2010 §2.5) and the + # in-library convention used at `imputation.py:2175-2183` + # (PreTrendsImputation) and `prep.py:1401-1432` (DCDH cell + # variance). + survey_note = ( + " Associated survey weights subsetted for stage-1 / " + "stage-2 OLS; full-domain survey design retained for " + "variance estimation (Wave E.3 parity)." + ) warnings.warn( f"{n_always_treated} unit(s) are treated in all observed periods " f"(first_treat <= {min_time}): [{unit_list}{suffix}]. " @@ -1478,55 +1495,21 @@ def fit( ) df = df[~df[unit].isin(always_treated_units)].copy() - # Subset survey arrays to match filtered df + # Stage-1 / stage-2 OLS sample = post-drop `df`; survey_weights + # must be aligned for OLS arithmetic. The active-sample mask + # `keep_mask` is FULL-DOMAIN length and identifies kept rows. + keep_mask = ~data[unit].isin(always_treated_units) if survey_weights is not None: - keep_mask = ~data[unit].isin(always_treated_units) survey_weights = survey_weights[keep_mask.values] - if resolved_survey is not None: - keep_mask = ~data[unit].isin(always_treated_units) - resolved_survey = replace( - resolved_survey, - weights=resolved_survey.weights[keep_mask.values], - strata=( - resolved_survey.strata[keep_mask.values] - if resolved_survey.strata is not None - else None - ), - psu=( - resolved_survey.psu[keep_mask.values] - if resolved_survey.psu is not None - else None - ), - fpc=( - resolved_survey.fpc[keep_mask.values] - if resolved_survey.fpc is not None - else None - ), - replicate_weights=( - resolved_survey.replicate_weights[keep_mask.values] - if resolved_survey.replicate_weights is not None - else None - ), - ) - # Recompute n_psu/n_strata after subsetting - new_n_psu = ( - len(np.unique(resolved_survey.psu)) if resolved_survey.psu is not None else 0 - ) - new_n_strata = ( - len(np.unique(resolved_survey.strata)) - if resolved_survey.strata is not None - else 0 - ) - resolved_survey = replace(resolved_survey, n_psu=new_n_psu, n_strata=new_n_strata) - # Recompute survey_metadata since it depends on these counts - from diff_diff.survey import compute_survey_metadata - - raw_w = ( - df[survey_design.weights].values.astype(np.float64) - if survey_design.weights - else np.ones(len(df), dtype=np.float64) - ) - survey_metadata = compute_survey_metadata(resolved_survey, raw_w) + # NOTE (Wave E.3 parity): `resolved_survey` is intentionally NOT + # subsetted here. Full-domain `n_psu` / `n_strata` / `df_survey` / + # `strata` / `fpc` / `psu` arrays propagate downstream; the + # always-treated rows contribute zero score to the Binder TSL + # meat (see `_compute_gmm_variance` for the zero-pad mechanics). + # `survey_metadata` is computed once on the full-domain design by + # `_resolve_survey_for_fit` upstream and (when cluster injection + # fires) recomputed once post-injection in the block below; no + # post-always-treated recompute is needed. # Treatment indicator with anticipation effective_treat = df[first_treat] - self.anticipation @@ -1569,9 +1552,20 @@ def fit( f"Available columns: {list(df.columns)}" ) - # Resolve effective cluster and inject cluster-as-PSU for survey variance + # Resolve effective cluster and inject cluster-as-PSU for survey variance. + # Wave E.3 parity: under always-treated drop, `resolved_survey` retains + # full-domain arrays — so the cluster_ids fed into + # `_resolve_effective_cluster` / `_inject_cluster_as_psu` MUST be the + # FULL-DOMAIN cluster column (sourced from `data`, not `df` which is + # post-drop). Otherwise the zip in `_inject_cluster_as_psu` between + # `resolved.strata` (full-domain) and `cluster_ids` (post-drop) would + # truncate silently. Mirrors `spillover.py:2879` (PR #482 Wave E.3 + # `cluster_ids_full` invariant). + cluster_ids_full: Optional[np.ndarray] = None if resolved_survey is not None: - cluster_ids_raw = df[cluster_var].values if cluster_var in df.columns else None + cluster_ids_raw = ( + np.asarray(data[cluster_var].values) if cluster_var in data.columns else None + ) effective_cluster_ids = _resolve_effective_cluster( resolved_survey, cluster_ids_raw, @@ -1579,18 +1573,31 @@ def fit( ) resolved_survey = _inject_cluster_as_psu(resolved_survey, effective_cluster_ids) # When survey PSU is present, use it as the effective cluster for - # GMM variance (PSU overrides unit-level clustering) + # GMM variance (PSU overrides unit-level clustering). Both + # `df["_survey_cluster"]` (post-drop) and `cluster_ids_full` + # (full-domain) reference the post-injection PSU labels: the OLS + # aggregation downstream uses `df["_survey_cluster"]` (df-aligned); + # `_compute_gmm_variance` receives `cluster_ids_full` (full-domain) + # so the zero-padded per-PSU meat sees the full-domain PSU list. if resolved_survey.psu is not None: - df["_survey_cluster"] = resolved_survey.psu + df["_survey_cluster"] = resolved_survey.psu[keep_mask.values] cluster_var = "_survey_cluster" - # Recompute metadata after PSU injection + cluster_ids_full = np.asarray(resolved_survey.psu) + elif effective_cluster_ids is not None: + # No PSU injected (e.g., user-only cluster=, no survey.psu); + # full-domain cluster_ids match effective_cluster_ids. + cluster_ids_full = np.asarray(effective_cluster_ids) + # Recompute metadata after PSU injection. `raw_w` is taken from the + # FULL-DOMAIN `data` so the design-effect / Kish-effective-n + # diagnostics reflect the full domain (matches the retained + # full-domain `resolved_survey` arrays per Wave E.3 R6 lesson). if resolved_survey.psu is not None and survey_metadata is not None: from diff_diff.survey import compute_survey_metadata raw_w = ( - df[survey_design.weights].values.astype(np.float64) + np.asarray(data[survey_design.weights].values, dtype=np.float64) if survey_design.weights - else np.ones(len(df), dtype=np.float64) + else np.ones(len(data), dtype=np.float64) ) survey_metadata = compute_survey_metadata(resolved_survey, raw_w) @@ -1657,6 +1664,23 @@ def fit( if _uses_replicate_ts and _survey_df is None: _survey_df = 0 + # Wave E.3 parity (PR #482 SpilloverDiD precedent): under the survey + # path, `score_pad_mask_arg` is the FULL-DOMAIN keep_mask identifying + # rows present in the stage-1 / stage-2 OLS sample after the + # always-treated drop. `cluster_ids_full` is the FULL-DOMAIN + # post-injection PSU labels. Both are passed through to + # `_compute_gmm_variance`, which zero-pads per-cluster scores onto + # the full-domain PSU list before stratified-meat dispatch so + # `n_psu` / `n_strata` / `df_survey` reflect the full design. + # Replicate-variance fits do NOT pad — Replicate refits per-replicate + # already handle the resampling at the survey-design level. + score_pad_mask_arg = ( + keep_mask.values if resolved_survey is not None and not _uses_replicate_ts else None + ) + cluster_ids_full_arg = ( + cluster_ids_full if resolved_survey is not None and not _uses_replicate_ts else None + ) + # Always compute overall ATT (static specification) overall_att, overall_se = self._stage2_static( df=df, @@ -1675,6 +1699,8 @@ def fit( survey_weights=survey_weights, survey_weight_type=survey_weight_type, resolved_survey=(resolved_survey if not _uses_replicate_ts else None), + score_pad_mask=score_pad_mask_arg, + cluster_ids_full=cluster_ids_full_arg, ) # Compute overall ATT inference (may be overridden by replicate below) @@ -1708,6 +1734,8 @@ def fit( survey_weight_type=survey_weight_type, survey_df=_survey_df, resolved_survey=(resolved_survey if not _uses_replicate_ts else None), + score_pad_mask=score_pad_mask_arg, + cluster_ids_full=cluster_ids_full_arg, ) if aggregate in ("group", "all"): @@ -1730,6 +1758,8 @@ def fit( survey_weight_type=survey_weight_type, survey_df=_survey_df, resolved_survey=(resolved_survey if not _uses_replicate_ts else None), + score_pad_mask=score_pad_mask_arg, + cluster_ids_full=cluster_ids_full_arg, ) # Replicate variance override: derive keys from actual outputs, then refit @@ -1756,6 +1786,19 @@ def fit( _full_est_ts.extend([group_effects[g]["effect"] for g in _sorted_groups_ts]) def _refit_ts(w_r): + # Wave E.3 parity (PR #482 SpilloverDiD precedent): the main fit + # path keeps `resolved_survey` at full-domain length but subsets + # `survey_weights` for stage-1 / stage-2 OLS arithmetic via + # `keep_mask` (always-treated drop). The replicate refit + # callback receives a FULL-DOMAIN replicate weight `w_r` + # (sourced from `resolved_survey.replicate_weights` which is + # also full-domain) and must apply the SAME `keep_mask` + # subsetting before threading through stage-1 / stage-2, + # otherwise `solve_ols` rejects the length mismatch + # (full-domain w_r vs post-drop df) and the ValueError is + # swallowed by `compute_replicate_refit_variance` → + # NaN replicate inference. + w_r_fit = np.asarray(w_r)[keep_mask.values] ufe_r, tfe_r, gm_r, delta_r, kcm_r = self._fit_untreated_model( df, outcome, @@ -1763,7 +1806,7 @@ def _refit_ts(w_r): time, covariates, omega_0_mask, - weights=w_r, + weights=w_r_fit, ) y_tilde_r = self._residualize( df, @@ -1794,7 +1837,7 @@ def _refit_ts(w_r): delta_hat=delta_r, cluster_var=cluster_var, kept_cov_mask=kcm_r, - survey_weights=w_r, + survey_weights=w_r_fit, survey_weight_type="pweight", ) results.append(att_r) @@ -1817,7 +1860,7 @@ def _refit_ts(w_r): ref_period=ref_period, balance_e=balance_e, kept_cov_mask=kcm_r, - survey_weights=w_r, + survey_weights=w_r_fit, survey_weight_type="pweight", survey_df=None, ) @@ -1840,7 +1883,7 @@ def _refit_ts(w_r): cluster_var=cluster_var, treatment_groups=treatment_groups, kept_cov_mask=kcm_r, - survey_weights=w_r, + survey_weights=w_r_fit, survey_weight_type="pweight", survey_df=None, ) @@ -2296,6 +2339,8 @@ def _stage2_static( survey_weights: Optional[np.ndarray] = None, survey_weight_type: str = "pweight", resolved_survey=None, + score_pad_mask: Optional[np.ndarray] = None, + cluster_ids_full: Optional[np.ndarray] = None, ) -> Tuple[float, float]: """ Static (simple ATT) Stage 2: OLS of y_tilde on D_it. @@ -2344,6 +2389,8 @@ def _stage2_static( cluster_ids=df[cluster_var].values, survey_weights=survey_weights, resolved_survey=resolved_survey, + score_pad_mask=score_pad_mask, + cluster_ids_full=cluster_ids_full, ) se = float(np.sqrt(max(V[0, 0], 0.0))) @@ -2371,6 +2418,8 @@ def _stage2_event_study( survey_weight_type: str = "pweight", survey_df: Optional[int] = None, resolved_survey=None, + score_pad_mask: Optional[np.ndarray] = None, + cluster_ids_full: Optional[np.ndarray] = None, ) -> Dict[int, Dict[str, Any]]: """Event study Stage 2: OLS of y_tilde on relative-time dummies.""" y_tilde = df["_y_tilde"].values.copy() @@ -2512,6 +2561,8 @@ def _stage2_event_study( cluster_ids=df[cluster_var].values, survey_weights=survey_weights, resolved_survey=resolved_survey, + score_pad_mask=score_pad_mask, + cluster_ids_full=cluster_ids_full, ) # Build results dict @@ -2589,6 +2640,8 @@ def _stage2_group( survey_weight_type: str = "pweight", survey_df: Optional[int] = None, resolved_survey=None, + score_pad_mask: Optional[np.ndarray] = None, + cluster_ids_full: Optional[np.ndarray] = None, ) -> Dict[Any, Dict[str, Any]]: """Group (cohort) Stage 2: OLS of y_tilde on cohort dummies.""" y_tilde = df["_y_tilde"].values.copy() @@ -2634,6 +2687,8 @@ def _stage2_group( cluster_ids=df[cluster_var].values, survey_weights=survey_weights, resolved_survey=resolved_survey, + score_pad_mask=score_pad_mask, + cluster_ids_full=cluster_ids_full, ) group_effects: Dict[Any, Dict[str, Any]] = {} @@ -2723,6 +2778,8 @@ def _compute_gmm_variance( cluster_ids: np.ndarray, survey_weights: Optional[np.ndarray] = None, resolved_survey=None, + score_pad_mask: Optional[np.ndarray] = None, + cluster_ids_full: Optional[np.ndarray] = None, ) -> np.ndarray: """ Compute GMM sandwich variance (Butts & Gardner 2022). @@ -2754,10 +2811,37 @@ def _compute_gmm_variance( eps_2 : np.ndarray, shape (n,) Stage 2 residuals. cluster_ids : np.ndarray, shape (n,) - Cluster identifiers. + Cluster identifiers, fit-sample length. Used for the per-cluster + stage-1 / stage-2 score aggregation (OLS path). survey_weights : np.ndarray, optional Survey weights of shape (n,). When None, unweighted (identical to current code). + resolved_survey : ResolvedSurveyDesign, optional + Resolved survey design. Under Wave E.3 parity (PR #482 SpilloverDiD + precedent) the design retains full-domain `n_psu` / `n_strata` / + `df_survey` / `strata` / `fpc` / `psu` arrays even when the + always-treated drop removes rows from the OLS sample. The + zero-padded per-cluster scores expand onto the full-domain PSU + list before stratified-meat dispatch. R `survey::svyrecvar(subset())` + convention (Lumley 2010 §2.5); mirrors `imputation.py:2175-2183` + (PreTrendsImputation) and `prep.py:1401-1432` (DCDH cell variance). + score_pad_mask : np.ndarray of shape (n_full,), bool, optional + Wave E.3 parity zero-pad mask. When supplied, indicates which + FULL-DOMAIN rows are present in the fit sample (True = kept + for OLS). Requires `n == int(np.sum(score_pad_mask))`. Co-supplied + with `cluster_ids_full`. Per-cluster stage-1 / stage-2 score + aggregates computed at fit-length are expanded onto the + full-domain unique-PSU list; PSUs absent from the fit sample + (e.g. PSUs containing only always-treated rows) get zero score + rows but still count toward `G_full` for `n_psu` / `df_survey`. + None (default) → no padding, exact pre-PR behavior. + cluster_ids_full : np.ndarray of shape (n_full,), optional + Full-domain PSU labels. Co-supplied with `score_pad_mask`. Must + share the same length. Provides the full-domain unique-PSU list + used both for score zero-pad expansion and for downstream + strata/FPC `obs_idx` lookups against the full-domain + `resolved_survey.strata` / `.fpc` arrays. None (default) → no + padding, exact pre-PR behavior. Returns ------- @@ -2871,6 +2955,78 @@ def _compute_gmm_variance( for j_col in range(k): np.add.at(s2_by_cluster[:, j_col], cluster_indices, weighted_X2[:, j_col]) + # Wave E.3 parity (PR #482 SpilloverDiD precedent): when the caller + # supplies `score_pad_mask` + `cluster_ids_full`, expand per-cluster + # stage-1 / stage-2 score aggregates onto the FULL-DOMAIN unique-PSU + # list. PSUs absent from the fit sample (those containing only + # always-treated rows) get zero score rows but still count toward + # `G_full` for `n_psu` / `df_survey` accounting. Mirrors R + # `survey::svyrecvar(subset())` (Lumley 2010 §2.5) and the in-library + # convention at `imputation.py:2175-2183` (PreTrendsImputation) and + # `prep.py:1401-1432` (DCDH cell variance). Downstream strata / FPC + # lookups use `cluster_ids_for_lookup` so the obs_idx applies to the + # full-domain `resolved_survey.strata` / `.fpc` arrays. + if score_pad_mask is not None: + if cluster_ids_full is None: + raise ValueError( + "_compute_gmm_variance: score_pad_mask requires " + "cluster_ids_full to be co-supplied (Wave E.3 parity " + "contract — score zero-pad expansion needs the " + "full-domain PSU labels to align with resolved_survey)." + ) + if resolved_survey is None: + raise ValueError( + "_compute_gmm_variance: score_pad_mask requires " + "resolved_survey to be co-supplied (Wave E.3 parity " + "contract — zero-pad only meaningful under a survey " + "design that retains full-domain dimensions)." + ) + n_full = int(len(score_pad_mask)) + if int(len(cluster_ids_full)) != n_full: + raise ValueError( + "_compute_gmm_variance: score_pad_mask and " + "cluster_ids_full must share the FULL-DOMAIN length; " + f"got len(score_pad_mask)={n_full}, " + f"len(cluster_ids_full)={int(len(cluster_ids_full))}." + ) + if int(np.sum(score_pad_mask)) != n: + raise ValueError( + "_compute_gmm_variance: int(np.sum(score_pad_mask)) " + f"({int(np.sum(score_pad_mask))}) must equal the " + f"fit-sample length n ({n}) so the score expansion " + "is well-defined." + ) + unique_clusters_full = np.unique(cluster_ids_full) + G_full = int(len(unique_clusters_full)) + # Map fit-sample unique_clusters into positions in + # unique_clusters_full via searchsorted (both arrays sorted by + # np.unique). Verify the mapping is exact — otherwise the fit + # sample contains PSU labels absent from the full domain (a + # contract violation that should never occur under the upstream + # `_inject_cluster_as_psu` invariant). + fit_to_full_idx = np.searchsorted(unique_clusters_full, unique_clusters) + if not np.array_equal( + unique_clusters_full[fit_to_full_idx], np.asarray(unique_clusters) + ): + raise ValueError( + "_compute_gmm_variance: fit-sample unique cluster " + "labels are not a subset of full-domain cluster labels " + "(Wave E.3 parity invariant violated). This should be " + "impossible under `_inject_cluster_as_psu` — please " + "file an issue with a minimal reproducer." + ) + c_by_cluster_full = np.zeros((G_full, p)) + s2_by_cluster_full = np.zeros((G_full, k)) + c_by_cluster_full[fit_to_full_idx] = c_by_cluster + s2_by_cluster_full[fit_to_full_idx] = s2_by_cluster + c_by_cluster = c_by_cluster_full + s2_by_cluster = s2_by_cluster_full + unique_clusters = unique_clusters_full + G = G_full + cluster_ids_for_lookup = np.asarray(cluster_ids_full) + else: + cluster_ids_for_lookup = cluster_ids + # 4. S_g = gamma_hat' c_g - X'_{2g} eps_{2g} S = self._compute_gmm_scores(c_by_cluster, gamma_hat, s2_by_cluster) @@ -2882,15 +3038,17 @@ def _compute_gmm_variance( from diff_diff.survey import _compute_stratified_meat_from_psu_scores # Build PSU→stratum and PSU→FPC mappings from observation-level arrays. - # cluster_ids used here match resolved_survey.psu (via _inject_cluster_as_psu). - # unique_clusters is already computed at line above (np.unique(cluster_ids)). + # cluster_ids_for_lookup is full-domain length under Wave E.3 parity + # (score_pad_mask path) and fit-sample length otherwise; either way it + # aligns with `resolved_survey.strata` / `resolved_survey.fpc` so the + # obs_idx lookup resolves to the correct stratum / FPC value. G_meat = len(unique_clusters) # Strata: synthesize single stratum when strata is None (unstratified FPC) if resolved_survey.strata is not None: psu_strata = np.empty(G_meat, dtype=resolved_survey.strata.dtype) for idx, c in enumerate(unique_clusters): - obs_idx = np.where(cluster_ids == c)[0][0] + obs_idx = np.where(cluster_ids_for_lookup == c)[0][0] psu_strata[idx] = resolved_survey.strata[obs_idx] else: psu_strata = np.zeros(G_meat, dtype=int) @@ -2900,12 +3058,14 @@ def _compute_gmm_variance( if resolved_survey.fpc is not None: psu_fpc = np.empty(G_meat, dtype=np.float64) for idx, c in enumerate(unique_clusters): - obs_idx = np.where(cluster_ids == c)[0][0] + obs_idx = np.where(cluster_ids_for_lookup == c)[0][0] psu_fpc[idx] = resolved_survey.fpc[obs_idx] # Unstratified single-PSU: variance is unidentified (matches # _compute_stratified_psu_meat at survey.py:1225 which returns # zero meat with no variance_computed flag for n_psu < 2). + # Under Wave E.3 parity, G_meat = G_full (post zero-pad), so the + # gate fires on the full-domain PSU count, not the fit-sample. if resolved_survey.strata is None and G_meat < 2: return np.full((k, k), np.nan) diff --git a/docs/methodology/REGISTRY.md b/docs/methodology/REGISTRY.md index e67dc135..ec7e57ff 100644 --- a/docs/methodology/REGISTRY.md +++ b/docs/methodology/REGISTRY.md @@ -1314,6 +1314,7 @@ Our implementation uses multiplier bootstrap on the GMM influence function: clus - **Zero-observation horizons after filtering:** When `balance_e` or NaN `y_tilde` filtering results in zero observations for some non-Prop-5 event study horizons, those horizons produce NaN for all inference fields (effect, SE, t-stat, p-value, CI) with n_obs=0. - **Zero-observation cohorts in group effects:** If all treated observations for a cohort have NaN `y_tilde` (excluded from estimation), that cohort's group effect is NaN with n_obs=0. - **Note:** Survey weights in TwoStageDiD GMM sandwich via weighted cross-products: bread uses (X'_2 W X_2)^{-1}, gamma_hat uses (X'_{10} W X_{10})^{-1}(X'_1 W X_2), per-cluster scores multiply by survey weights. PSU clustering, stratification, and FPC are fully supported in the meat matrix via `_compute_stratified_meat_from_psu_scores()`. When strata or FPC are present, the meat computation replaces `S' S` with the stratified formula `sum_h (1 - f_h) * (n_h/(n_h-1)) * centered_h' centered_h`. Strata also enters survey df (n_PSU - n_strata) for t-distribution inference. Bootstrap + survey supported (Phase 6) via PSU-level multiplier weights. +- **Note (documented synthesis — Wave E.3 parity, full-domain survey design under always-treated drop):** when the always-treated handler drops units that lack untreated observations, TwoStageDiD preserves the FULL-DOMAIN resolved survey design (`n_psu`, `n_strata`, `df_survey`, `strata`, `fpc`, `psu`) for variance estimation. Per-cluster stage-1 / stage-2 score aggregates are computed at the post-drop fit-sample length and then zero-padded onto the full-domain unique-PSU list via `score_pad_mask` + `cluster_ids_full` kwargs on `_compute_gmm_variance`; PSUs that contain only always-treated rows get zero score rows but still count toward `G_full` for `n_psu` / `df_survey` accounting. Stage-1 / stage-2 OLS solve continues to operate on the post-drop sample (`survey_weights` subsetted for OLS arithmetic; bread `(X'_2 W X_2)^{-1}` unchanged because dropped rows contribute zero score under zero-padded weights). Mirrors SpilloverDiD Wave E.3 (PR #482, merge 24de9062) and adopts the canonical "zero-pad scores to full panel + retain full-design resolved survey" convention from R `survey::svyrecvar(subset())` (Lumley 2010 §2.5 "Domains and subpopulations") and the in-library precedents at `imputation.py:2175-2183` (PreTrendsImputation) and `prep.py:1401-1432` (DCDH cell variance). Cluster-injection (`_inject_cluster_as_psu`) operates on the FULL-DOMAIN cluster column (sourced from `data` pre-drop, not the post-drop `df`) so `resolved_survey.strata` and the injected `psu` array stay length-aligned. Pre-PR, the always-treated drop physically subsetted `resolved_survey.weights / strata / psu / fpc / replicate_weights` via `replace(resolved_survey, ...)` and recomputed `n_psu` / `n_strata` on the post-drop sample, producing artificially-deflated `df_survey` when a PSU contained only always-treated rows; tests at `tests/test_two_stage.py::TestTwoStageDiDWaveE3ParityAlwaysTreated` lock the parity contract. - **Note:** Both the iterative FE solver (`_iterative_fe`, Stage 1) and the iterative alternating-projection demeaning helper (`_iterative_demean`, used in covariate residualization) emit `UserWarning` when `max_iter` exhausts without reaching `tol`, via `diff_diff.utils.warn_if_not_converged`. Silent return of the current iterate was classified as a silent failure under the Phase 2 audit and replaced with an explicit signal to match the logistic/Poisson IRLS pattern in `linalg.py`. - **Note:** When the Stage-2 bread `X'_2 W X_2` is singular, both the analytical TSL variance (`two_stage.py`) and the multiplier-bootstrap bread (`two_stage_bootstrap.py`) now emit a `UserWarning` before falling back to `np.linalg.lstsq`. Previously this fallback was silent. Sibling of axis-A finding #17 in the Phase 2 silent-failures audit; surfaced by the repo-wide lstsq-fallback pattern grep that accompanied the StaggeredTripleDifference fix. - **Note:** The GMM sandwich and bootstrap paths both use `scipy.sparse.linalg.factorized` for the Stage 1 normal-equations solve `(X'_{10} W X_{10}) gamma = X'_1 W X_2` and fall back to dense `lstsq` when the sparse factorization raises `RuntimeError` on a near-singular matrix. Both fallback sites emit a `UserWarning` (silent-failure audit axis C) so callers know SE estimates came from the degraded path rather than the fast sparse path. @@ -3357,7 +3358,7 @@ Degrees of freedom for the t-distribution lookup use `ResolvedSurveyDesign.df_su - Subpopulation parity vs upstream-subset: `df_survey` matches the full domain (`n_psu_full - n_strata_full`) regardless of how many rows the subpopulation mask excludes. SE may differ from `fit(data[mask], ..., survey_design=plain_design)` by design (subpopulation retains zero-padded PSU geometry; subset drops PSUs entirely). - **Restrictions / unchanged from Wave E.1/E.2/follow-up:** - Replicate-weight variance (BRR / Fay / JK1 / JKn / SDR) raises `NotImplementedError` (Wave E.1 gate at `spillover.py:2400` inherited; per-replicate refit + subpopulation is a separate follow-up). - - TwoStageDiD's analogous `finite_mask + design-subset + recompute n_psu/n_strata` pattern at `two_stage.py:567-601` is NOT yet adopted to Wave E.3's full-design retention — separate parity follow-up tracked in TODO.md. (No expected-divergence test ships in this PR: TwoStageDiD's always-treated handling at `two_stage.py:294-336` differs from SpilloverDiD's per-unit Omega_0 rank-deficiency check, so the divergence does not materialize on the standard SpilloverDiD warn-and-drop fixtures. The parity follow-up should add a targeted regression on a fixture that actually exercises the TwoStageDiD finite-mask path.) + - TwoStageDiD's analogous always-treated design-subset pattern was shipped in a follow-up PR (TwoStageDiD Wave E.3 parity) — see "Note (documented synthesis — Wave E.3 parity, full-domain survey design under always-treated drop)" in the TwoStageDiD section above. The two estimators now share the full-design retention contract; the trigger differs (TwoStageDiD: always-treated unit detection; SpilloverDiD: `finite_mask` warn-and-drop / subpopulation zero-weight rows) but the invariant is identical (zero-pad scores at the meat-helper boundary; retain full-domain `resolved_survey.psu / strata / fpc`). - `df_for_inference` at `spillover.py:3229-3234` reads `resolved_survey_fit.df_survey` which is the full-domain value under Wave E.3 (since `resolved_survey_fit = resolved_survey`). All four `safe_inference` call sites (aggregate `tau_total`, per-ring `delta_j`, event-study per-event-time `tau_k` / `delta_jk`, scalar `att` lincom) inherit the full-domain df. **Implementation:** `spillover.py:2845-2896` design-subset block deleted; `survey_weights_fit = survey_weights[finite_mask]` retained for the stage-2 OLS solve (which still operates on the fit sample); `cluster_ids_full[finite_mask]` subset dropped on the survey path (`cluster_ids_fit = cluster_ids_full` under survey). The `_compute_gmm_corrected_meat` call at `spillover.py:3163` is rewired to receive SURVEY-FINITE-MASK gamma_hat-construction inputs (`X_1_sparse_fit`, `X_10_sparse_fit`, `eps_10_fit` built on `survey_finite_mask`; `X_2_kept_gamma`, `eps_2_fit_gamma`, `survey_weights_fit_gamma` projected from the fit-sample frame down to survey_finite_mask) plus FULL-LENGTH kernel-dispatch arrays (`cluster_ids_for_meat`, `conley_coords_for_meat`, `conley_time_for_meat`, `conley_unit_for_meat`, `resolved_survey_fit`) plus the new `score_pad_mask=survey_finite_mask` kwarg; no-survey path passes `score_pad_mask=None` and uses fit-length variables throughout (bit-identical to pre-E.3). `_compute_gmm_corrected_meat` at `two_stage.py` adds one new optional kwarg `score_pad_mask: Optional[np.ndarray] = None` and one post-Psi-construction zero-pad block; the `_validate_conley_kwargs` call uses `n_for_conley = len(score_pad_mask)` when the kwarg is set. Within-unit-constancy validator at `spillover.py:2913` updated to operate on full-length unit array (matches full-length cluster array under Wave E.3). Second `compute_survey_metadata` recompute at `spillover.py:2954-2959` now uses full-length `raw_w` (no `[finite_mask]` subset). Tests: `TestSpilloverDiDWaveE3SubpopulationFullDesign` + `TestSpilloverDiDWaveE3SubpopulationFullDesignEventStudy` at `tests/test_spillover.py`. diff --git a/tests/test_replicate_weight_expansion.py b/tests/test_replicate_weight_expansion.py index 3f6fa410..11f1fcaf 100644 --- a/tests/test_replicate_weight_expansion.py +++ b/tests/test_replicate_weight_expansion.py @@ -378,7 +378,18 @@ def test_two_stage_event_study_replicate(self): assert np.isfinite(eff["se"]) and eff["se"] > 0, f"period {e}: SE not finite" def test_two_stage_always_treated(self): - """Replicate weights should be subsetted when always-treated units are excluded.""" + """Replicate weights subsetted to post-always-treated-drop sample. + + Wave E.3 parity: the main fit retains full-domain `resolved_survey` + but subsets `survey_weights` to the post-drop OLS sample. The + replicate refit callback receives FULL-DOMAIN replicate weights and + must apply the SAME `keep_mask` subsetting before threading into + stage-1 / stage-2. Without the subset, `solve_ols` rejects the + length mismatch and `compute_replicate_refit_variance` swallows the + ValueError so replicate inference NaNs out. This test exercises the + full replicate variance pipeline (not just the point estimate) under + the always-treated drop to lock the parity contract end-to-end. + """ data = _make_staggered_panel() # Add always-treated units (first_treat <= min time) for i in range(50, 55): @@ -394,7 +405,87 @@ def test_two_stage_always_treated(self): result = TwoStageDiD(n_bootstrap=0).fit( data, "outcome", "unit", "time", "first_treat", survey_design=sd, ) + # ATT comes from the main fit (always finite once always-treated drop runs) assert np.isfinite(result.overall_att) + # SE comes from the replicate refit variance: requires the refit + # callback to align replicate weights with the post-drop sample. + # Pre-Wave-E.3-parity-fix, replicate refits raised ValueError on + # length mismatch, `compute_replicate_refit_variance` swallowed + # them, and `overall_se` came out NaN with all replicate-based + # inference fields NaN. + assert np.isfinite(result.overall_se), ( + "Replicate SE must be finite under always-treated drop. " + "If NaN, the replicate refit callback is failing to align " + "weights with the post-drop sample — Wave E.3 parity bug." + ) + assert np.isfinite(result.overall_p_value) + assert result.overall_conf_int is not None + assert np.all(np.isfinite(result.overall_conf_int)) + + def test_two_stage_always_treated_event_study_and_group_replicate(self): + """Replicate refit covers event-study + group surfaces under + always-treated drop. Companion to ``test_two_stage_always_treated`` + which asserts the overall ATT surface only; the Wave E.3 parity + fix to ``_refit_ts`` aligns ``w_r`` with ``keep_mask`` for ALL + three stage-2 surfaces (``_stage2_static`` / ``_stage2_event_study`` + / ``_stage2_group``), so this test exercises the event-study + + group replicate refit branches end-to-end with the same + always-treated fixture.""" + data = _make_staggered_panel() + for i in range(50, 55): + for t in range(1, 9): + data = pd.concat([data, pd.DataFrame([{ + "unit": i, "time": t, "first_treat": 1, + "outcome": 12.0 + np.random.normal(0, 0.3), + "weight": 1.5, "treated": 1, "post": 1, + }])], ignore_index=True) + rep_cols = _add_jk1_replicates(data, n_rep=10, unit_col="unit") + sd = SurveyDesign(weights="weight", replicate_weights=rep_cols, replicate_method="JK1") + result = TwoStageDiD(n_bootstrap=0).fit( + data, "outcome", "unit", "time", "first_treat", + aggregate="all", survey_design=sd, + ) + # Event-study surface: at least one non-reference horizon must have + # replicate-derived finite SE / p_value / conf_int (the replicate + # override path at `two_stage.py` updates SE + t_stat + p_value + + # conf_int separately for each non-reference horizon via the same + # _refit_ts callback, so all four fields must be locked). + assert result.event_study_effects is not None + non_ref_es = { + e: eff for e, eff in result.event_study_effects.items() + if eff["effect"] != 0.0 and np.isfinite(eff["effect"]) + } + assert len(non_ref_es) > 0, "no non-reference event-study effects" + for e, eff in non_ref_es.items(): + assert np.isfinite(eff["se"]) and eff["se"] > 0, ( + f"event-study horizon {e}: replicate SE must be finite " + f"under always-treated drop" + ) + assert np.isfinite(eff["p_value"]), ( + f"event-study horizon {e}: replicate p_value must be finite" + ) + assert eff["conf_int"] is not None and np.all(np.isfinite(eff["conf_int"])), ( + f"event-study horizon {e}: replicate conf_int bounds must be finite" + ) + # Group surface: at least one cohort with finite replicate SE / + # p_value / conf_int (same override path applies to group effects). + assert result.group_effects is not None + finite_groups = { + g: eff for g, eff in result.group_effects.items() + if np.isfinite(eff["effect"]) + } + assert len(finite_groups) > 0, "no finite group effects" + for g, eff in finite_groups.items(): + assert np.isfinite(eff["se"]) and eff["se"] > 0, ( + f"cohort {g}: replicate SE must be finite under " + f"always-treated drop" + ) + assert np.isfinite(eff["p_value"]), ( + f"cohort {g}: replicate p_value must be finite" + ) + assert eff["conf_int"] is not None and np.all(np.isfinite(eff["conf_int"])), ( + f"cohort {g}: replicate conf_int bounds must be finite" + ) def test_two_stage_bootstrap_rejected(self): data = _make_staggered_panel() diff --git a/tests/test_two_stage.py b/tests/test_two_stage.py index 56f0d44b..ae714a3b 100644 --- a/tests/test_two_stage.py +++ b/tests/test_two_stage.py @@ -507,7 +507,9 @@ def test_sparse_factorized_dense_fallback_emits_warning(self): "diff_diff.two_stage.sparse_factorized", side_effect=RuntimeError("test failure"), ): - with pytest.warns(UserWarning, match="sparse factorization.*falling back to dense lstsq"): + with pytest.warns( + UserWarning, match="sparse factorization.*falling back to dense lstsq" + ): results = TwoStageDiD().fit( data, outcome="outcome", @@ -534,7 +536,9 @@ def test_sparse_factorized_bootstrap_dense_fallback_emits_warning(self): "diff_diff.two_stage_bootstrap.sparse_factorized", side_effect=RuntimeError("test failure"), ): - with pytest.warns(UserWarning, match="sparse factorization.*falling back to dense lstsq"): + with pytest.warns( + UserWarning, match="sparse factorization.*falling back to dense lstsq" + ): results = TwoStageDiD(n_bootstrap=4, seed=42).fit( data, outcome="outcome", @@ -1488,10 +1492,7 @@ def raise_for_square_eye(a, b): time="time", first_treat="first_treat", ) - fallback = [ - w for w in caught - if "TwoStageDiD TSL variance" in str(w.message) - ] + fallback = [w for w in caught if "TwoStageDiD TSL variance" in str(w.message)] assert len(fallback) >= 1, ( "Expected TSL-variance bread fallback warning when np.linalg.solve " f"was forced to raise; got warnings: " @@ -1534,10 +1535,7 @@ def raise_for_square_eye(a, b): time="time", first_treat="first_treat", ) - fallback = [ - w for w in caught - if "TwoStageDiD multiplier bootstrap bread" in str(w.message) - ] + fallback = [w for w in caught if "TwoStageDiD multiplier bootstrap bread" in str(w.message)] assert len(fallback) >= 1, ( "Expected bootstrap-bread fallback warning when np.linalg.solve " f"was forced to raise; got warnings: " @@ -1546,3 +1544,369 @@ def raise_for_square_eye(a, b): msg = str(fallback[0].message) assert "np.linalg.lstsq" in msg assert "X_2'WX_2" in msg + + +# ============================================================================= +# TestTwoStageDiDWaveE3ParityAlwaysTreated +# ============================================================================= + + +def _build_parity_panel( + sharp_psu0: bool = False, + include_always_treated: bool = True, + seed: int = 17, +) -> pd.DataFrame: + """Build a 6-PSU x 4-period staggered panel for Wave E.3 parity tests. + + PSU layout: + - PSUs 0, 1, 2 in stratum 0; PSUs 3, 4, 5 in stratum 1. + - Each PSU contains 2 units by default; each unit has 4 observations + (one per period). + - Unit 0 (PSU 0): always-treated (first_treat=1) when + ``include_always_treated=True``; never-treated otherwise. + - Other units: alternating never-treated / staggered-onset. + + When ``sharp_psu0=True``, drop unit 1 (PSU 0) so the always-treated + unit is the sole occupant of PSU 0. Under pre-PR TwoStageDiD this + triggers the design-subset bug: dropping unit 0 from PSU 0 also drops + PSU 0 from `resolved_survey.psu` so the reported `n_psu` falls from 6 + to 5. Wave E.3 parity contract: `n_psu` remains 6. + """ + rng = np.random.default_rng(seed) + rows = [] + unit_id = 0 + for psu in range(6): + stratum = 0 if psu < 3 else 1 + N_h = 100 # FPC: hypothetical stratum size + for _ in range(2): + if unit_id == 0: + first_treat = 1 if include_always_treated else 0 + elif unit_id % 2 == 0: + first_treat = 0 # never-treated + else: + # Staggered onsets at period 2 or 3 (within observed periods 1-4) + first_treat = 2 + (unit_id % 2) + for t in range(1, 5): + y = ( + 1.5 * (unit_id % 4) + + 0.3 * t + + (1.0 if first_treat > 0 and t >= first_treat else 0.0) + + rng.normal(0, 0.5) + ) + rows.append( + dict( + id=unit_id, + t=t, + y=y, + g=first_treat if first_treat > 0 else 0, + psu=psu, + stratum=stratum, + N_h=N_h, + w=1.0, + ) + ) + unit_id += 1 + df = pd.DataFrame(rows) + if sharp_psu0: + # Drop unit 1 (also in PSU 0) so PSU 0's only resident is the + # always-treated unit 0. Post-drop fit sample loses PSU 0 entirely. + df = df[~((df["psu"] == 0) & (df["id"] == 1))].copy() + return df + + +class TestTwoStageDiDWaveE3ParityAlwaysTreated: + """Wave E.3 parity contract: TwoStageDiD's always-treated drop retains + the FULL-DOMAIN survey design (n_psu, n_strata, df_survey, strata, fpc). + + Mirrors PR #482 SpilloverDiD Wave E.3 (merge 24de9062) which established + the same invariant for SpilloverDiD's finite_mask / subpopulation drops. + Adopts the R `survey::svyrecvar(subset())` convention (Lumley 2010 §2.5) + and the in-library precedents at `imputation.py:2175-2183` + (PreTrendsImputation) and `prep.py:1401-1432` (DCDH cell variance). + + Scope: this PR tests only `vcov_type` paths reachable from TwoStageDiD's + public API — stratified-PSU meat via `_compute_stratified_meat_from_psu_scores` + (with or without FPC) and unstratified `S.T @ S`. TwoStageDiD does NOT + currently expose `vcov_type="conley"`; that follow-up is tracked + separately at TODO.md. + """ + + def test_a_no_always_treated_baseline_survey_path(self): + """Sanity: no-always-treated fit reports n_psu reflecting the data's + full PSU set (no artificial reduction). Locks the zero-pad-of-all-True + mask = no-op invariant; the parity code path runs but is a no-op.""" + from diff_diff.survey import SurveyDesign + + data = _build_parity_panel(sharp_psu0=False, include_always_treated=False) + design = SurveyDesign(weights="w", strata="stratum", psu="psu", fpc="N_h") + est = TwoStageDiD() + with warnings.catch_warnings(): + warnings.simplefilter("ignore") + result = est.fit( + data, + outcome="y", + unit="id", + time="t", + first_treat="g", + survey_design=design, + ) + assert result.survey_metadata is not None + # Full-domain PSU count from data + n_psu_data = int(data["psu"].nunique()) + n_strata_data = int(data["stratum"].nunique()) + assert result.survey_metadata.n_psu == n_psu_data + assert result.survey_metadata.df_survey == n_psu_data - n_strata_data + # ATT + SE finite + assert np.isfinite(result.overall_att) + assert np.isfinite(result.overall_se) + + def test_b_full_domain_df_survey_under_always_treated_drop(self): + """Wave E.3 parity contract: when the always-treated drop removes a + PSU entirely from the fit sample, reported df_survey reflects the + FULL-DOMAIN n_psu - n_strata, NOT the post-drop count.""" + from diff_diff.survey import SurveyDesign + + data = _build_parity_panel(sharp_psu0=True, include_always_treated=True) + n_psu_full = int(data["psu"].nunique()) # = 6 (PSU 0 still in data via unit 0) + n_strata_full = int(data["stratum"].nunique()) # = 2 + + design = SurveyDesign(weights="w", strata="stratum", psu="psu", fpc="N_h") + est = TwoStageDiD() + with warnings.catch_warnings(): + warnings.simplefilter("ignore") + result = est.fit( + data, + outcome="y", + unit="id", + time="t", + first_treat="g", + survey_design=design, + ) + assert result.survey_metadata is not None + # Wave E.3 parity contract: post-drop fit sample is missing PSU 0 + # (always-treated unit was its sole occupant), but full-domain count + # is retained. + assert result.survey_metadata.df_survey == n_psu_full - n_strata_full, ( + f"Wave E.3 parity: df_survey should reflect full domain " + f"({n_psu_full - n_strata_full}); got {result.survey_metadata.df_survey}" + ) + # Defensive: gate that the always-treated drop did not remove all + # treated units (front-door check per + # `feedback_front_door_gate_active_sample_mirror`). + assert result.n_treated_obs > 0 + + def test_c_full_domain_n_psu_reporting(self): + """Companion to (b): reported n_psu reflects the FULL-DOMAIN count + even when the always-treated drop empties a PSU from the fit sample.""" + from diff_diff.survey import SurveyDesign + + data = _build_parity_panel(sharp_psu0=True, include_always_treated=True) + n_psu_full = int(data["psu"].nunique()) # = 6 + + design = SurveyDesign(weights="w", strata="stratum", psu="psu", fpc="N_h") + est = TwoStageDiD() + with warnings.catch_warnings(): + warnings.simplefilter("ignore") + result = est.fit( + data, + outcome="y", + unit="id", + time="t", + first_treat="g", + survey_design=design, + ) + assert result.survey_metadata is not None + assert result.survey_metadata.n_psu == n_psu_full, ( + f"Wave E.3 parity: n_psu should reflect full domain ({n_psu_full}); " + f"got {result.survey_metadata.n_psu}" + ) + + def test_d_zero_pad_psu_score_spy(self): + """Mock-spy on `_compute_stratified_meat_from_psu_scores`: capture the + per-PSU score matrix and assert the row corresponding to the + drop-only PSU (PSU 0) is exactly zero (zero-padded by the parity path). + Locks the score-zero-pad invariant directly at the meat boundary.""" + from unittest.mock import patch + + import diff_diff.survey as survey_mod + from diff_diff.survey import SurveyDesign + + data = _build_parity_panel(sharp_psu0=True, include_always_treated=True) + design = SurveyDesign(weights="w", strata="stratum", psu="psu", fpc="N_h") + + captured = {} + real_helper = survey_mod._compute_stratified_meat_from_psu_scores + + def spy(*, psu_scores, psu_strata, fpc_per_psu, lonely_psu): + captured["psu_scores"] = np.asarray(psu_scores).copy() + captured["psu_strata"] = np.asarray(psu_strata).copy() + return real_helper( + psu_scores=psu_scores, + psu_strata=psu_strata, + fpc_per_psu=fpc_per_psu, + lonely_psu=lonely_psu, + ) + + with patch.object(survey_mod, "_compute_stratified_meat_from_psu_scores", side_effect=spy): + est = TwoStageDiD() + with warnings.catch_warnings(): + warnings.simplefilter("ignore") + est.fit( + data, + outcome="y", + unit="id", + time="t", + first_treat="g", + survey_design=design, + ) + + psu_scores = captured["psu_scores"] + # Per-PSU score matrix has shape (G_full, k); G_full = 6 (full domain), + # not 5 (post-drop fit sample). + assert psu_scores.shape[0] == 6, ( + f"Wave E.3 parity: stratified meat should receive full-domain " + f"G_full=6 per-PSU scores; got {psu_scores.shape[0]}" + ) + # The score row for PSU 0 (the drop-only PSU) is exactly zero — its + # only resident was the always-treated unit, dropped from stage-1/2. + # PSU labels are sorted by np.unique → PSU 0 is row 0. + assert np.allclose(psu_scores[0], 0.0), ( + "Wave E.3 parity: drop-only PSU row should be zero-padded; " + f"got psu_scores[0]={psu_scores[0]}" + ) + + def test_e_subpopulation_plus_always_treated_composition(self): + """Two zero-pad mechanisms compose cleanly: (i) SurveyDesign.subpopulation() + excludes some rows via zero weights, (ii) always-treated drop removes + the unit physically. Both should preserve full-domain n_psu / df_survey.""" + from diff_diff.survey import SurveyDesign + + data = _build_parity_panel(sharp_psu0=True, include_always_treated=True) + n_psu_full = int(data["psu"].nunique()) + n_strata_full = int(data["stratum"].nunique()) + + base_design = SurveyDesign(weights="w", strata="stratum", psu="psu", fpc="N_h") + # Subpopulation: exclude PSU 5's two units (mask=False there); + # always-treated unit 0 is INSIDE the subpopulation domain. + subpop_mask = data["psu"] != 5 + subpop_design, data_subpop = base_design.subpopulation(data, subpop_mask) + + est = TwoStageDiD() + with warnings.catch_warnings(record=True) as caught: + warnings.simplefilter("always") + result = est.fit( + data_subpop, + outcome="y", + unit="id", + time="t", + first_treat="g", + survey_design=subpop_design, + ) + + # Always-treated warning must still fire (composition does not silence it). + at_warnings = [w for w in caught if "treated in all observed periods" in str(w.message)] + assert len(at_warnings) >= 1, ( + f"Expected always-treated warning under subpop+always-treated; " + f"got: {[str(w.message) for w in caught]}" + ) + + assert result.survey_metadata is not None + # Full-domain n_psu / df_survey retained (subpopulation = zero-weight + # padding, doesn't reduce design dimension). + assert result.survey_metadata.n_psu == n_psu_full + assert result.survey_metadata.df_survey == n_psu_full - n_strata_full + + def test_f_cluster_as_psu_plus_always_treated(self): + """Cluster-injection path (user-specified `cluster=` without explicit + survey_design.psu): cluster column is injected as effective PSU. + Wave E.3 parity must preserve full-domain n_psu count even when + always-treated drop removes a cluster from the fit sample.""" + from diff_diff.survey import SurveyDesign + + data = _build_parity_panel(sharp_psu0=True, include_always_treated=True) + n_psu_full = int(data["psu"].nunique()) # PSU column used as cluster + n_strata_full = int(data["stratum"].nunique()) + + # Survey design has strata + fpc but NO explicit psu — cluster= + # injects "psu" column as effective PSU. + design = SurveyDesign(weights="w", strata="stratum", fpc="N_h") + est = TwoStageDiD(cluster="psu") + with warnings.catch_warnings(): + warnings.simplefilter("ignore") + result = est.fit( + data, + outcome="y", + unit="id", + time="t", + first_treat="g", + survey_design=design, + ) + assert result.survey_metadata is not None + # Cluster-as-PSU injection produces full-domain n_psu (post-injection + # count includes drop-only PSUs). + assert result.survey_metadata.n_psu == n_psu_full, ( + f"Wave E.3 parity (cluster-injection): n_psu should reflect " + f"full domain ({n_psu_full}); got {result.survey_metadata.n_psu}" + ) + assert result.survey_metadata.df_survey == n_psu_full - n_strata_full + + def test_g_no_survey_path_unchanged_under_always_treated(self): + """Pure unweighted path: always-treated drop with `survey_design=None` + must produce IDENTICAL results to a fit on data that excludes the + always-treated unit upstream (the parity zero-pad path is gated on + `resolved_survey is not None` so the unweighted path is unaffected).""" + data_with_at = _build_parity_panel(sharp_psu0=True, include_always_treated=True) + data_without_at = data_with_at[data_with_at["id"] != 0].copy() + + est = TwoStageDiD() + with warnings.catch_warnings(): + warnings.simplefilter("ignore") + result_with_at = est.fit( + data_with_at, + outcome="y", + unit="id", + time="t", + first_treat="g", + ) + result_without_at = est.fit( + data_without_at, + outcome="y", + unit="id", + time="t", + first_treat="g", + ) + # Always-treated drop equivalence on the unweighted path: fitting + # with the always-treated unit (which gets dropped internally) is + # equivalent to fitting on the pre-filtered dataset. + np.testing.assert_allclose( + result_with_at.overall_att, result_without_at.overall_att, rtol=1e-10 + ) + np.testing.assert_allclose( + result_with_at.overall_se, result_without_at.overall_se, rtol=1e-10 + ) + + def test_h_psu_entirely_always_treated_unidentified_gate(self): + """Optional sharper case: a PSU containing ONLY always-treated units + is dropped entirely from the fit sample. Verify (i) the variance + computation proceeds (zero-padded PSU 0 + 5 active PSUs gives G=6, + sufficient for stratified-PSU variance identification; the meat row + for PSU 0 is zero but its existence preserves the n_psu count), + (ii) reported n_psu = full domain.""" + from diff_diff.survey import SurveyDesign + + data = _build_parity_panel(sharp_psu0=True, include_always_treated=True) + design = SurveyDesign(weights="w", strata="stratum", psu="psu", fpc="N_h") + est = TwoStageDiD() + with warnings.catch_warnings(): + warnings.simplefilter("ignore") + result = est.fit( + data, + outcome="y", + unit="id", + time="t", + first_treat="g", + survey_design=design, + ) + assert np.isfinite(result.overall_se) + assert result.survey_metadata is not None + assert result.survey_metadata.n_psu == 6 From db3f7fea1bec4cbeed0046ca8b6c73c1c1e9cc9b Mon Sep 17 00:00:00 2001 From: igerber Date: Fri, 22 May 2026 08:56:48 -0400 Subject: [PATCH 2/2] Address CI codex P3s: gate zero-pad on actual drop + seed test RNGs CI codex review P3 #1 (performance): _compute_gmm_variance was taking the zero-pad branch (np.unique + np.searchsorted + full-size c_by_cluster / s2_by_cluster copies) on EVERY analytical survey fit because keep_mask defaults to all-True. Gate on `n_always_treated > 0` so baseline survey fits with no drop pass `None / None` and take the bit-identical pre-PR path. CI codex review P3 #2 (test determinism): seed local np.random.default_rng in the two new always-treated replicate-weight tests so always-treated outcomes are deterministic across runs (was using global np.random.normal which is non-deterministic at import time). Co-Authored-By: Claude Opus 4.7 --- diff_diff/two_stage.py | 15 ++++++++++----- tests/test_replicate_weight_expansion.py | 9 +++++++-- 2 files changed, 17 insertions(+), 7 deletions(-) diff --git a/diff_diff/two_stage.py b/diff_diff/two_stage.py index 21893734..ceadb3ff 100644 --- a/diff_diff/two_stage.py +++ b/diff_diff/two_stage.py @@ -1674,12 +1674,17 @@ def fit( # `n_psu` / `n_strata` / `df_survey` reflect the full design. # Replicate-variance fits do NOT pad — Replicate refits per-replicate # already handle the resampling at the survey-design level. - score_pad_mask_arg = ( - keep_mask.values if resolved_survey is not None and not _uses_replicate_ts else None - ) - cluster_ids_full_arg = ( - cluster_ids_full if resolved_survey is not None and not _uses_replicate_ts else None + # We gate the padding kwargs on `n_always_treated > 0` so the + # zero-pad branch in `_compute_gmm_variance` (np.unique + + # np.searchsorted + full-size c_by_cluster / s2_by_cluster copies) + # only fires when always-treated rows were actually dropped — + # baseline survey fits with no drop pass `None / None` and take + # the bit-identical pre-PR path. + _wave_e3_pad_active = ( + resolved_survey is not None and not _uses_replicate_ts and n_always_treated > 0 ) + score_pad_mask_arg = keep_mask.values if _wave_e3_pad_active else None + cluster_ids_full_arg = cluster_ids_full if _wave_e3_pad_active else None # Always compute overall ATT (static specification) overall_att, overall_se = self._stage2_static( diff --git a/tests/test_replicate_weight_expansion.py b/tests/test_replicate_weight_expansion.py index 11f1fcaf..4f42e3d2 100644 --- a/tests/test_replicate_weight_expansion.py +++ b/tests/test_replicate_weight_expansion.py @@ -390,13 +390,16 @@ def test_two_stage_always_treated(self): full replicate variance pipeline (not just the point estimate) under the always-treated drop to lock the parity contract end-to-end. """ + # Seeded RNG for deterministic always-treated outcomes (regression + # tests should not depend on numpy global RNG state at import time). + rng = np.random.default_rng(101) data = _make_staggered_panel() # Add always-treated units (first_treat <= min time) for i in range(50, 55): for t in range(1, 9): data = pd.concat([data, pd.DataFrame([{ "unit": i, "time": t, "first_treat": 1, - "outcome": 12.0 + np.random.normal(0, 0.3), + "outcome": 12.0 + rng.normal(0, 0.3), "weight": 1.5, "treated": 1, "post": 1, }])], ignore_index=True) rep_cols = _add_jk1_replicates(data, n_rep=10, unit_col="unit") @@ -431,12 +434,14 @@ def test_two_stage_always_treated_event_study_and_group_replicate(self): / ``_stage2_group``), so this test exercises the event-study + group replicate refit branches end-to-end with the same always-treated fixture.""" + # Seeded RNG for deterministic always-treated outcomes. + rng = np.random.default_rng(202) data = _make_staggered_panel() for i in range(50, 55): for t in range(1, 9): data = pd.concat([data, pd.DataFrame([{ "unit": i, "time": t, "first_treat": 1, - "outcome": 12.0 + np.random.normal(0, 0.3), + "outcome": 12.0 + rng.normal(0, 0.3), "weight": 1.5, "treated": 1, "post": 1, }])], ignore_index=True) rep_cols = _add_jk1_replicates(data, n_rep=10, unit_col="unit")