From 7f2d891c2a308e4cdaacaa4cd162c8a2fe01209f Mon Sep 17 00:00:00 2001 From: Max Ghenis Date: Mon, 23 Feb 2026 15:31:00 -0500 Subject: [PATCH 01/11] Reconcile SS sub-components after PUF imputation When PUF imputation replaces social_security values, the sub-components (retirement, disability, survivors, dependents) were left unchanged, creating a mismatch. This caused a base-year discontinuity where projected years had ~3x more SS recipients than the base year, producing artificial 9-point Gini swings. The new reconcile_ss_subcomponents() function rescales sub-components proportionally after PUF imputation. New recipients (CPS had zero SS but PUF imputed positive) default to retirement. Fixes #551 Co-Authored-By: Claude Opus 4.6 --- .../calibration/puf_impute.py | 66 ++++++++ policyengine_us_data/tests/test_puf_impute.py | 154 ++++++++++++++++++ 2 files changed, 220 insertions(+) create mode 100644 policyengine_us_data/tests/test_puf_impute.py diff --git a/policyengine_us_data/calibration/puf_impute.py b/policyengine_us_data/calibration/puf_impute.py index 81df6209..a58196cb 100644 --- a/policyengine_us_data/calibration/puf_impute.py +++ b/policyengine_us_data/calibration/puf_impute.py @@ -102,6 +102,13 @@ "self_employment_income_would_be_qualified", ] +SS_SUBCOMPONENTS = [ + "social_security_retirement", + "social_security_disability", + "social_security_survivors", + "social_security_dependents", +] + OVERRIDDEN_IMPUTED_VARIABLES = [ "partnership_s_corp_income", "interest_deduction", @@ -155,6 +162,62 @@ ] +def reconcile_ss_subcomponents( + data: Dict[str, Dict[int, np.ndarray]], + n_cps: int, + time_period: int, +) -> None: + """Rescale SS sub-components so they sum to social_security. + + After PUF imputation replaces social_security on the PUF half, + the sub-components (retirement, disability, survivors, dependents) + still hold the original CPS values. This function rescales them + proportionally so they match the new total. + + For records where CPS had zero SS but PUF imputes a positive + value (new recipients), the full amount is assigned to + social_security_retirement. + + Modifies ``data`` in place. Only the PUF half (indices + n_cps .. 2*n_cps) is changed. + + Args: + data: Dataset dict {variable: {time_period: array}}. + n_cps: Number of records in the CPS half. + time_period: Tax year key into data dicts. + """ + if "social_security" not in data: + return + + ss = data["social_security"][time_period] + cps_ss = ss[:n_cps] + puf_ss = ss[n_cps:] + + for sub in SS_SUBCOMPONENTS: + if sub not in data: + continue + arr = data[sub][time_period] + cps_sub = arr[:n_cps] + + with np.errstate(divide="ignore", invalid="ignore"): + ratio = np.where(cps_ss > 0, cps_sub / cps_ss, 0.0) + + new_puf = ratio * puf_ss + + # New recipients: CPS had 0 SS but PUF imputed positive. + new_recip = (cps_ss == 0) & (puf_ss > 0) + if sub == "social_security_retirement": + new_puf = np.where(new_recip, puf_ss, new_puf) + else: + new_puf = np.where(new_recip, 0.0, new_puf) + + # PUF imputed zero -> clear sub-component + new_puf = np.where(puf_ss == 0, 0.0, new_puf) + + arr[n_cps:] = new_puf.astype(arr.dtype) + data[sub][time_period] = arr + + def puf_clone_dataset( data: Dict[str, Dict[int, np.ndarray]], state_fips: np.ndarray, @@ -271,6 +334,9 @@ def _map_to_entity(pred_values, variable_name): if cps_sim is not None: del cps_sim + # Ensure SS sub-components match the (possibly imputed) total. + reconcile_ss_subcomponents(new_data, person_count, time_period) + logger.info( "PUF clone complete: %d -> %d households", n_households, diff --git a/policyengine_us_data/tests/test_puf_impute.py b/policyengine_us_data/tests/test_puf_impute.py new file mode 100644 index 00000000..d29842f6 --- /dev/null +++ b/policyengine_us_data/tests/test_puf_impute.py @@ -0,0 +1,154 @@ +"""Tests for PUF imputation, specifically SS sub-component reconciliation. + +When PUF imputation replaces social_security values, the sub-components +(retirement, disability, survivors, dependents) must be rescaled to match +the new total. See: https://github.com/PolicyEngine/policyengine-us-data/issues/551 +""" + +import numpy as np +import pytest + +from policyengine_us_data.calibration.puf_impute import ( + reconcile_ss_subcomponents, +) + +SS_SUBS = [ + "social_security_retirement", + "social_security_disability", + "social_security_survivors", + "social_security_dependents", +] + + +def _make_data(orig_ss, orig_ret, orig_dis, orig_surv, orig_dep, imputed_ss): + """Build a doubled data dict (CPS half + PUF half). + + CPS half keeps original values; PUF half has imputed + social_security but original (stale) sub-components. + """ + n = len(orig_ss) + tp = 2024 + return ( + { + "social_security": { + tp: np.concatenate([orig_ss, imputed_ss]).astype(np.float32) + }, + "social_security_retirement": { + tp: np.concatenate([orig_ret, orig_ret]).astype(np.float32) + }, + "social_security_disability": { + tp: np.concatenate([orig_dis, orig_dis]).astype(np.float32) + }, + "social_security_survivors": { + tp: np.concatenate([orig_surv, orig_surv]).astype(np.float32) + }, + "social_security_dependents": { + tp: np.concatenate([orig_dep, orig_dep]).astype(np.float32) + }, + }, + n, + tp, + ) + + +class TestReconcileSsSubcomponents: + """Sub-components must sum to social_security after reconciliation.""" + + def test_proportional_rescaling(self): + """When CPS has a split, PUF subs should scale proportionally.""" + data, n, tp = _make_data( + orig_ss=np.array([20000, 15000]), + orig_ret=np.array([14000, 0]), + orig_dis=np.array([6000, 15000]), + orig_surv=np.array([0, 0]), + orig_dep=np.array([0, 0]), + imputed_ss=np.array([30000, 10000]), + ) + reconcile_ss_subcomponents(data, n, tp) + + puf = slice(n, 2 * n) + ss = data["social_security"][tp][puf] + total_subs = sum(data[sub][tp][puf] for sub in SS_SUBS) + np.testing.assert_allclose(total_subs, ss, rtol=1e-5) + + # Check proportions preserved + # Person 0: ret=14/20=70%, dis=6/20=30% + ret = data["social_security_retirement"][tp][puf] + np.testing.assert_allclose(ret[0], 30000 * 0.7, rtol=1e-5) + dis = data["social_security_disability"][tp][puf] + np.testing.assert_allclose(dis[0], 30000 * 0.3, rtol=1e-5) + + def test_new_recipients_default_to_retirement(self): + """When CPS had zero SS but PUF imputes positive, assign to retirement.""" + data, n, tp = _make_data( + orig_ss=np.array([0, 10000]), + orig_ret=np.array([0, 10000]), + orig_dis=np.array([0, 0]), + orig_surv=np.array([0, 0]), + orig_dep=np.array([0, 0]), + imputed_ss=np.array([25000, 12000]), + ) + reconcile_ss_subcomponents(data, n, tp) + + puf = slice(n, 2 * n) + ret = data["social_security_retirement"][tp][puf] + dis = data["social_security_disability"][tp][puf] + + # Person 0: new recipient -> all goes to retirement + assert ret[0] == pytest.approx(25000, rel=1e-5) + assert dis[0] == pytest.approx(0) + + def test_imputed_zero_clears_subs(self): + """When PUF imputes zero SS, all sub-components should be zero.""" + data, n, tp = _make_data( + orig_ss=np.array([20000]), + orig_ret=np.array([20000]), + orig_dis=np.array([0]), + orig_surv=np.array([0]), + orig_dep=np.array([0]), + imputed_ss=np.array([0]), + ) + reconcile_ss_subcomponents(data, n, tp) + + puf = slice(n, 2 * n) + for sub in SS_SUBS: + assert data[sub][tp][puf][0] == pytest.approx(0) + + def test_cps_half_unchanged(self): + """CPS half should not be modified.""" + orig_ret = np.array([14000, 0]) + data, n, tp = _make_data( + orig_ss=np.array([20000, 15000]), + orig_ret=orig_ret, + orig_dis=np.array([6000, 15000]), + orig_surv=np.array([0, 0]), + orig_dep=np.array([0, 0]), + imputed_ss=np.array([30000, 10000]), + ) + reconcile_ss_subcomponents(data, n, tp) + + cps = slice(0, n) + np.testing.assert_array_equal( + data["social_security_retirement"][tp][cps], + orig_ret, + ) + + def test_missing_subcomponent_is_skipped(self): + """If a sub-component is absent from data, no error.""" + data, n, tp = _make_data( + orig_ss=np.array([10000]), + orig_ret=np.array([10000]), + orig_dis=np.array([0]), + orig_surv=np.array([0]), + orig_dep=np.array([0]), + imputed_ss=np.array([15000]), + ) + del data["social_security_survivors"] + del data["social_security_dependents"] + # Should not raise + reconcile_ss_subcomponents(data, n, tp) + + def test_no_social_security_is_noop(self): + """If social_security is absent, function is a no-op.""" + data = {"some_var": {2024: np.array([1, 2])}} + reconcile_ss_subcomponents(data, 1, 2024) From 3b195a88eb80c9f1908cf314432104dcd4938102 Mon Sep 17 00:00:00 2001 From: Max Ghenis Date: Mon, 23 Feb 2026 15:41:04 -0500 Subject: [PATCH 02/11] Use age heuristic for new SS recipients in reconciliation New PUF recipients (CPS had zero SS) now get assigned based on age: >= 62 -> retirement, < 62 -> disability. Matches the CPS fallback logic. Falls back to retirement if age is unavailable. Co-Authored-By: Claude Opus 4.6 --- .../calibration/puf_impute.py | 42 ++++- policyengine_us_data/tests/test_puf_impute.py | 158 ++++++++++++++---- 2 files changed, 158 insertions(+), 42 deletions(-) diff --git a/policyengine_us_data/calibration/puf_impute.py b/policyengine_us_data/calibration/puf_impute.py index a58196cb..8f8b6e04 100644 --- a/policyengine_us_data/calibration/puf_impute.py +++ b/policyengine_us_data/calibration/puf_impute.py @@ -162,6 +162,9 @@ ] +MINIMUM_RETIREMENT_AGE = 62 + + def reconcile_ss_subcomponents( data: Dict[str, Dict[int, np.ndarray]], n_cps: int, @@ -175,8 +178,8 @@ def reconcile_ss_subcomponents( proportionally so they match the new total. For records where CPS had zero SS but PUF imputes a positive - value (new recipients), the full amount is assigned to - social_security_retirement. + value (new recipients), the age heuristic from CPS construction + is used: age >= 62 -> retirement, age < 62 -> disability. Modifies ``data`` in place. Only the PUF half (indices n_cps .. 2*n_cps) is changed. @@ -193,6 +196,13 @@ def reconcile_ss_subcomponents( cps_ss = ss[:n_cps] puf_ss = ss[n_cps:] + # Age for the CPS half (PUF half has same demographics). + age = None + if "age" in data: + age = data["age"][time_period][:n_cps] + + new_recip = (cps_ss == 0) & (puf_ss > 0) + for sub in SS_SUBCOMPONENTS: if sub not in data: continue @@ -205,11 +215,29 @@ def reconcile_ss_subcomponents( new_puf = ratio * puf_ss # New recipients: CPS had 0 SS but PUF imputed positive. - new_recip = (cps_ss == 0) & (puf_ss > 0) - if sub == "social_security_retirement": - new_puf = np.where(new_recip, puf_ss, new_puf) - else: - new_puf = np.where(new_recip, 0.0, new_puf) + # Use age heuristic: >= 62 -> retirement, < 62 -> disability. + if new_recip.any(): + if age is not None: + if sub == "social_security_retirement": + new_puf = np.where( + new_recip & (age >= MINIMUM_RETIREMENT_AGE), + puf_ss, + new_puf, + ) + elif sub == "social_security_disability": + new_puf = np.where( + new_recip & (age < MINIMUM_RETIREMENT_AGE), + puf_ss, + new_puf, + ) + else: + new_puf = np.where(new_recip, 0.0, new_puf) + else: + # No age data; fall back to retirement. + if sub == "social_security_retirement": + new_puf = np.where(new_recip, puf_ss, new_puf) + else: + new_puf = np.where(new_recip, 0.0, new_puf) # PUF imputed zero -> clear sub-component new_puf = np.where(puf_ss == 0, 0.0, new_puf) diff --git a/policyengine_us_data/tests/test_puf_impute.py b/policyengine_us_data/tests/test_puf_impute.py index d29842f6..bd8e87cb 100644 --- a/policyengine_us_data/tests/test_puf_impute.py +++ b/policyengine_us_data/tests/test_puf_impute.py @@ -9,6 +9,7 @@ import pytest from policyengine_us_data.calibration.puf_impute import ( + MINIMUM_RETIREMENT_AGE, reconcile_ss_subcomponents, ) @@ -20,7 +21,15 @@ ] -def _make_data(orig_ss, orig_ret, orig_dis, orig_surv, orig_dep, imputed_ss): +def _make_data( + orig_ss, + orig_ret, + orig_dis, + orig_surv, + orig_dep, + imputed_ss, + age=None, +): """Build a doubled data dict (CPS half + PUF half). CPS half keeps original values; PUF half has imputed @@ -28,34 +37,33 @@ def _make_data(orig_ss, orig_ret, orig_dis, orig_surv, orig_dep, imputed_ss): """ n = len(orig_ss) tp = 2024 - return ( - { - "social_security": { - tp: np.concatenate([orig_ss, imputed_ss]).astype(np.float32) - }, - "social_security_retirement": { - tp: np.concatenate([orig_ret, orig_ret]).astype(np.float32) - }, - "social_security_disability": { - tp: np.concatenate([orig_dis, orig_dis]).astype(np.float32) - }, - "social_security_survivors": { - tp: np.concatenate([orig_surv, orig_surv]).astype(np.float32) - }, - "social_security_dependents": { - tp: np.concatenate([orig_dep, orig_dep]).astype(np.float32) - }, + data = { + "social_security": { + tp: np.concatenate([orig_ss, imputed_ss]).astype(np.float32) }, - n, - tp, - ) + "social_security_retirement": { + tp: np.concatenate([orig_ret, orig_ret]).astype(np.float32) + }, + "social_security_disability": { + tp: np.concatenate([orig_dis, orig_dis]).astype(np.float32) + }, + "social_security_survivors": { + tp: np.concatenate([orig_surv, orig_surv]).astype(np.float32) + }, + "social_security_dependents": { + tp: np.concatenate([orig_dep, orig_dep]).astype(np.float32) + }, + } + if age is not None: + data["age"] = {tp: np.concatenate([age, age]).astype(np.float32)} + return data, n, tp class TestReconcileSsSubcomponents: """Sub-components must sum to social_security after reconciliation.""" def test_proportional_rescaling(self): - """When CPS has a split, PUF subs should scale proportionally.""" + """When CPS has a split, PUF subs scale proportionally.""" data, n, tp = _make_data( orig_ss=np.array([20000, 15000]), orig_ret=np.array([14000, 0]), @@ -71,35 +79,94 @@ def test_proportional_rescaling(self): total_subs = sum(data[sub][tp][puf] for sub in SS_SUBS) np.testing.assert_allclose(total_subs, ss, rtol=1e-5) - # Check proportions preserved # Person 0: ret=14/20=70%, dis=6/20=30% ret = data["social_security_retirement"][tp][puf] np.testing.assert_allclose(ret[0], 30000 * 0.7, rtol=1e-5) dis = data["social_security_disability"][tp][puf] np.testing.assert_allclose(dis[0], 30000 * 0.3, rtol=1e-5) - def test_new_recipients_default_to_retirement(self): - """When CPS had zero SS but PUF imputes positive, assign to retirement.""" + def test_new_recipient_elderly_gets_retirement(self): + """New recipient aged >= 62 -> retirement.""" data, n, tp = _make_data( - orig_ss=np.array([0, 10000]), - orig_ret=np.array([0, 10000]), - orig_dis=np.array([0, 0]), - orig_surv=np.array([0, 0]), - orig_dep=np.array([0, 0]), - imputed_ss=np.array([25000, 12000]), + orig_ss=np.array([0]), + orig_ret=np.array([0]), + orig_dis=np.array([0]), + orig_surv=np.array([0]), + orig_dep=np.array([0]), + imputed_ss=np.array([25000]), + age=np.array([70]), ) reconcile_ss_subcomponents(data, n, tp) puf = slice(n, 2 * n) ret = data["social_security_retirement"][tp][puf] dis = data["social_security_disability"][tp][puf] - - # Person 0: new recipient -> all goes to retirement assert ret[0] == pytest.approx(25000, rel=1e-5) assert dis[0] == pytest.approx(0) + def test_new_recipient_young_gets_disability(self): + """New recipient aged < 62 -> disability.""" + data, n, tp = _make_data( + orig_ss=np.array([0]), + orig_ret=np.array([0]), + orig_dis=np.array([0]), + orig_surv=np.array([0]), + orig_dep=np.array([0]), + imputed_ss=np.array([18000]), + age=np.array([45]), + ) + reconcile_ss_subcomponents(data, n, tp) + + puf = slice(n, 2 * n) + ret = data["social_security_retirement"][tp][puf] + dis = data["social_security_disability"][tp][puf] + assert ret[0] == pytest.approx(0) + assert dis[0] == pytest.approx(18000, rel=1e-5) + + def test_new_recipient_mixed_ages(self): + """Multiple new recipients split by age threshold.""" + data, n, tp = _make_data( + orig_ss=np.array([0, 0, 10000]), + orig_ret=np.array([0, 0, 10000]), + orig_dis=np.array([0, 0, 0]), + orig_surv=np.array([0, 0, 0]), + orig_dep=np.array([0, 0, 0]), + imputed_ss=np.array([20000, 15000, 12000]), + age=np.array([30, 65, 50]), + ) + reconcile_ss_subcomponents(data, n, tp) + + puf = slice(n, 2 * n) + ret = data["social_security_retirement"][tp][puf] + dis = data["social_security_disability"][tp][puf] + + # Person 0: age 30, new -> disability + assert ret[0] == pytest.approx(0) + assert dis[0] == pytest.approx(20000, rel=1e-5) + # Person 1: age 65, new -> retirement + assert ret[1] == pytest.approx(15000, rel=1e-5) + assert dis[1] == pytest.approx(0) + # Person 2: existing CPS recipient, just rescaled + assert ret[2] == pytest.approx(12000, rel=1e-5) + + def test_new_recipient_no_age_defaults_to_retirement(self): + """Without age data, new recipients default to retirement.""" + data, n, tp = _make_data( + orig_ss=np.array([0]), + orig_ret=np.array([0]), + orig_dis=np.array([0]), + orig_surv=np.array([0]), + orig_dep=np.array([0]), + imputed_ss=np.array([25000]), + ) + reconcile_ss_subcomponents(data, n, tp) + + puf = slice(n, 2 * n) + ret = data["social_security_retirement"][tp][puf] + assert ret[0] == pytest.approx(25000, rel=1e-5) + def test_imputed_zero_clears_subs(self): - """When PUF imputes zero SS, all sub-components should be zero.""" + """When PUF imputes zero SS, all subs should be zero.""" data, n, tp = _make_data( orig_ss=np.array([20000]), orig_ret=np.array([20000]), @@ -145,10 +212,31 @@ def test_missing_subcomponent_is_skipped(self): ) del data["social_security_survivors"] del data["social_security_dependents"] - # Should not raise reconcile_ss_subcomponents(data, n, tp) def test_no_social_security_is_noop(self): """If social_security is absent, function is a no-op.""" data = {"some_var": {2024: np.array([1, 2])}} reconcile_ss_subcomponents(data, 1, 2024) + + def test_subs_sum_to_total_all_cases(self): + """Comprehensive: subs must sum to total across all case types.""" + data, n, tp = _make_data( + # Person 0: existing recipient (rescale) + # Person 1: new recipient, old (retirement) + # Person 2: new recipient, young (disability) + # Person 3: PUF zeroed out (clear) + orig_ss=np.array([20000, 0, 0, 15000]), + orig_ret=np.array([20000, 0, 0, 15000]), + orig_dis=np.array([0, 0, 0, 0]), + orig_surv=np.array([0, 0, 0, 0]), + orig_dep=np.array([0, 0, 0, 0]), + imputed_ss=np.array([25000, 18000, 12000, 0]), + age=np.array([70, 68, 40, 75]), + ) + reconcile_ss_subcomponents(data, n, tp) + + puf = slice(n, 2 * n) + ss = data["social_security"][tp][puf] + total_subs = sum(data[sub][tp][puf] for sub in SS_SUBS) + np.testing.assert_allclose(total_subs, ss, rtol=1e-5) From 37b6ec8fc24d1b7872924a2f9efc12c2a8cde5c4 Mon Sep 17 00:00:00 2001 From: Max Ghenis Date: Mon, 23 Feb 2026 15:50:43 -0500 Subject: [PATCH 03/11] Use QRF to predict SS sub-component split for new recipients Instead of the simple age >= 62 heuristic, train a QRF on CPS records (where the reason-code split is known) to predict shares for new PUF recipients. Uses age, gender, marital status, and other demographics. Falls back to age heuristic when microimpute is unavailable or training data is insufficient (< 100 records). 14 tests covering: - Proportional rescaling (existing recipients) - Age heuristic fallback (4 tests) - QRF share prediction (4 tests including sum-to-one and elderly-predicted-as-retirement) - Edge cases (zero imputation, missing subs, no SS) Co-Authored-By: Claude Opus 4.6 --- .../calibration/puf_impute.py | 203 ++++++++++-- policyengine_us_data/tests/test_puf_impute.py | 307 +++++++++++++----- 2 files changed, 389 insertions(+), 121 deletions(-) diff --git a/policyengine_us_data/calibration/puf_impute.py b/policyengine_us_data/calibration/puf_impute.py index 8f8b6e04..a50e6dbb 100644 --- a/policyengine_us_data/calibration/puf_impute.py +++ b/policyengine_us_data/calibration/puf_impute.py @@ -164,6 +164,153 @@ MINIMUM_RETIREMENT_AGE = 62 +SS_SPLIT_PREDICTORS = [ + "age", + "is_male", + "tax_unit_is_joint", + "is_tax_unit_head", + "is_tax_unit_dependent", +] + +MIN_QRF_TRAINING_RECORDS = 100 + + +def _qrf_ss_shares( + data: Dict[str, Dict[int, np.ndarray]], + n_cps: int, + time_period: int, + new_recip: np.ndarray, +) -> Optional[Dict[str, np.ndarray]]: + """Predict SS sub-component shares for new recipients via QRF. + + Trains on CPS records that have SS > 0 (where the reason-code + split is known), then predicts shares for records where CPS had + zero SS but PUF imputed a positive value. + + Args: + data: Dataset dict. + n_cps: Records in CPS half. + time_period: Tax year. + new_recip: Boolean mask (length n_cps) for new recipients. + + Returns: + Dict mapping sub-component name to predicted share arrays + (length = new_recip.sum()), or None if QRF is unavailable + or training data is insufficient. + """ + try: + from microimpute.models.qrf import QRF + except ImportError: + return None + + cps_ss = data["social_security"][time_period][:n_cps] + has_ss = cps_ss > 0 + + if has_ss.sum() < MIN_QRF_TRAINING_RECORDS: + return None + + # Build training features from available predictors. + predictors = [] + train_cols = {} + test_cols = {} + for pred in SS_SPLIT_PREDICTORS: + if pred not in data: + continue + vals = data[pred][time_period][:n_cps] + train_cols[pred] = vals[has_ss].astype(np.float32) + test_cols[pred] = vals[new_recip].astype(np.float32) + predictors.append(pred) + + if not predictors: + return None + + X_train = pd.DataFrame(train_cols) + X_test = pd.DataFrame(test_cols) + + # Training targets: share going to each sub-component (0 or 1). + share_vars = [] + with np.errstate(divide="ignore", invalid="ignore"): + for sub in SS_SUBCOMPONENTS: + if sub not in data: + continue + sub_vals = data[sub][time_period][:n_cps][has_ss] + share_name = sub + "_share" + X_train[share_name] = np.where( + cps_ss[has_ss] > 0, + sub_vals / cps_ss[has_ss], + 0.0, + ) + share_vars.append(share_name) + + if not share_vars: + return None + + qrf = QRF(log_level="WARNING", memory_efficient=True) + try: + fitted = qrf.fit( + X_train=X_train[predictors + share_vars], + predictors=predictors, + imputed_variables=share_vars, + n_jobs=1, + ) + predictions = fitted.predict(X_test=X_test) + except Exception: + logger.warning("QRF SS split failed, falling back to heuristic") + return None + + # Clip to [0, 1] and normalise so shares sum to 1. + shares = {} + total = np.zeros(len(X_test)) + for sub in SS_SUBCOMPONENTS: + key = sub + "_share" + if key in predictions.columns: + s = np.clip(predictions[key].values, 0, 1) + shares[sub] = s + total += s + + for sub in shares: + shares[sub] = np.where(total > 0, shares[sub] / total, 0.0) + + del fitted, predictions + gc.collect() + + logger.info( + "QRF SS split: predicted shares for %d new recipients", + new_recip.sum(), + ) + return shares + + +def _age_heuristic_ss_shares( + data: Dict[str, Dict[int, np.ndarray]], + n_cps: int, + time_period: int, + new_recip: np.ndarray, +) -> Dict[str, np.ndarray]: + """Fallback: assign new recipients by age threshold. + + Age >= 62 -> retirement, < 62 -> disability. + If age is unavailable, all go to retirement. + """ + n_new = new_recip.sum() + shares = {sub: np.zeros(n_new) for sub in SS_SUBCOMPONENTS} + + age = None + if "age" in data: + age = data["age"][time_period][:n_cps][new_recip] + + if age is not None: + is_old = age >= MINIMUM_RETIREMENT_AGE + if "social_security_retirement" in shares: + shares["social_security_retirement"] = is_old.astype(np.float64) + if "social_security_disability" in shares: + shares["social_security_disability"] = (~is_old).astype(np.float64) + else: + if "social_security_retirement" in shares: + shares["social_security_retirement"] = np.ones(n_new) + + return shares + def reconcile_ss_subcomponents( data: Dict[str, Dict[int, np.ndarray]], @@ -177,9 +324,10 @@ def reconcile_ss_subcomponents( still hold the original CPS values. This function rescales them proportionally so they match the new total. - For records where CPS had zero SS but PUF imputes a positive - value (new recipients), the age heuristic from CPS construction - is used: age >= 62 -> retirement, age < 62 -> disability. + For new recipients (CPS had zero SS, PUF imputed positive), a + QRF is trained on CPS records to predict the sub-component split + from demographics. Falls back to an age heuristic (>= 62 -> + retirement, < 62 -> disability) when QRF is unavailable. Modifies ``data`` in place. Only the PUF half (indices n_cps .. 2*n_cps) is changed. @@ -196,50 +344,35 @@ def reconcile_ss_subcomponents( cps_ss = ss[:n_cps] puf_ss = ss[n_cps:] - # Age for the CPS half (PUF half has same demographics). - age = None - if "age" in data: - age = data["age"][time_period][:n_cps] - new_recip = (cps_ss == 0) & (puf_ss > 0) + # Predict shares for new recipients (QRF with age fallback). + new_shares = None + if new_recip.any(): + new_shares = _qrf_ss_shares(data, n_cps, time_period, new_recip) + if new_shares is None: + new_shares = _age_heuristic_ss_shares( + data, n_cps, time_period, new_recip + ) + for sub in SS_SUBCOMPONENTS: if sub not in data: continue arr = data[sub][time_period] cps_sub = arr[:n_cps] + # Existing recipients: proportional rescaling. with np.errstate(divide="ignore", invalid="ignore"): ratio = np.where(cps_ss > 0, cps_sub / cps_ss, 0.0) - new_puf = ratio * puf_ss - # New recipients: CPS had 0 SS but PUF imputed positive. - # Use age heuristic: >= 62 -> retirement, < 62 -> disability. - if new_recip.any(): - if age is not None: - if sub == "social_security_retirement": - new_puf = np.where( - new_recip & (age >= MINIMUM_RETIREMENT_AGE), - puf_ss, - new_puf, - ) - elif sub == "social_security_disability": - new_puf = np.where( - new_recip & (age < MINIMUM_RETIREMENT_AGE), - puf_ss, - new_puf, - ) - else: - new_puf = np.where(new_recip, 0.0, new_puf) - else: - # No age data; fall back to retirement. - if sub == "social_security_retirement": - new_puf = np.where(new_recip, puf_ss, new_puf) - else: - new_puf = np.where(new_recip, 0.0, new_puf) - - # PUF imputed zero -> clear sub-component + # New recipients: apply predicted shares. + if new_recip.any() and new_shares is not None: + share = new_shares.get(sub, np.zeros(new_recip.sum())) + puf_new_vals = puf_ss[new_recip] * share + new_puf[new_recip] = puf_new_vals + + # PUF imputed zero -> clear sub-component. new_puf = np.where(puf_ss == 0, 0.0, new_puf) arr[n_cps:] = new_puf.astype(arr.dtype) diff --git a/policyengine_us_data/tests/test_puf_impute.py b/policyengine_us_data/tests/test_puf_impute.py index bd8e87cb..030f2754 100644 --- a/policyengine_us_data/tests/test_puf_impute.py +++ b/policyengine_us_data/tests/test_puf_impute.py @@ -5,11 +5,15 @@ the new total. See: https://github.com/PolicyEngine/policyengine-us-data/issues/551 """ +from unittest.mock import patch + import numpy as np import pytest from policyengine_us_data.calibration.puf_impute import ( MINIMUM_RETIREMENT_AGE, + _age_heuristic_ss_shares, + _qrf_ss_shares, reconcile_ss_subcomponents, ) @@ -29,12 +33,9 @@ def _make_data( orig_dep, imputed_ss, age=None, + is_male=None, ): - """Build a doubled data dict (CPS half + PUF half). - - CPS half keeps original values; PUF half has imputed - social_security but original (stale) sub-components. - """ + """Build a doubled data dict (CPS half + PUF half).""" n = len(orig_ss) tp = 2024 data = { @@ -56,6 +57,10 @@ def _make_data( } if age is not None: data["age"] = {tp: np.concatenate([age, age]).astype(np.float32)} + if is_male is not None: + data["is_male"] = { + tp: np.concatenate([is_male, is_male]).astype(np.float32) + } return data, n, tp @@ -85,86 +90,6 @@ def test_proportional_rescaling(self): dis = data["social_security_disability"][tp][puf] np.testing.assert_allclose(dis[0], 30000 * 0.3, rtol=1e-5) - def test_new_recipient_elderly_gets_retirement(self): - """New recipient aged >= 62 -> retirement.""" - data, n, tp = _make_data( - orig_ss=np.array([0]), - orig_ret=np.array([0]), - orig_dis=np.array([0]), - orig_surv=np.array([0]), - orig_dep=np.array([0]), - imputed_ss=np.array([25000]), - age=np.array([70]), - ) - reconcile_ss_subcomponents(data, n, tp) - - puf = slice(n, 2 * n) - ret = data["social_security_retirement"][tp][puf] - dis = data["social_security_disability"][tp][puf] - assert ret[0] == pytest.approx(25000, rel=1e-5) - assert dis[0] == pytest.approx(0) - - def test_new_recipient_young_gets_disability(self): - """New recipient aged < 62 -> disability.""" - data, n, tp = _make_data( - orig_ss=np.array([0]), - orig_ret=np.array([0]), - orig_dis=np.array([0]), - orig_surv=np.array([0]), - orig_dep=np.array([0]), - imputed_ss=np.array([18000]), - age=np.array([45]), - ) - reconcile_ss_subcomponents(data, n, tp) - - puf = slice(n, 2 * n) - ret = data["social_security_retirement"][tp][puf] - dis = data["social_security_disability"][tp][puf] - assert ret[0] == pytest.approx(0) - assert dis[0] == pytest.approx(18000, rel=1e-5) - - def test_new_recipient_mixed_ages(self): - """Multiple new recipients split by age threshold.""" - data, n, tp = _make_data( - orig_ss=np.array([0, 0, 10000]), - orig_ret=np.array([0, 0, 10000]), - orig_dis=np.array([0, 0, 0]), - orig_surv=np.array([0, 0, 0]), - orig_dep=np.array([0, 0, 0]), - imputed_ss=np.array([20000, 15000, 12000]), - age=np.array([30, 65, 50]), - ) - reconcile_ss_subcomponents(data, n, tp) - - puf = slice(n, 2 * n) - ret = data["social_security_retirement"][tp][puf] - dis = data["social_security_disability"][tp][puf] - - # Person 0: age 30, new -> disability - assert ret[0] == pytest.approx(0) - assert dis[0] == pytest.approx(20000, rel=1e-5) - # Person 1: age 65, new -> retirement - assert ret[1] == pytest.approx(15000, rel=1e-5) - assert dis[1] == pytest.approx(0) - # Person 2: existing CPS recipient, just rescaled - assert ret[2] == pytest.approx(12000, rel=1e-5) - - def test_new_recipient_no_age_defaults_to_retirement(self): - """Without age data, new recipients default to retirement.""" - data, n, tp = _make_data( - orig_ss=np.array([0]), - orig_ret=np.array([0]), - orig_dis=np.array([0]), - orig_surv=np.array([0]), - orig_dep=np.array([0]), - imputed_ss=np.array([25000]), - ) - reconcile_ss_subcomponents(data, n, tp) - - puf = slice(n, 2 * n) - ret = data["social_security_retirement"][tp][puf] - assert ret[0] == pytest.approx(25000, rel=1e-5) - def test_imputed_zero_clears_subs(self): """When PUF imputes zero SS, all subs should be zero.""" data, n, tp = _make_data( @@ -220,7 +145,7 @@ def test_no_social_security_is_noop(self): reconcile_ss_subcomponents(data, 1, 2024) def test_subs_sum_to_total_all_cases(self): - """Comprehensive: subs must sum to total across all case types.""" + """Comprehensive: subs must sum to total across all cases.""" data, n, tp = _make_data( # Person 0: existing recipient (rescale) # Person 1: new recipient, old (retirement) @@ -240,3 +165,213 @@ def test_subs_sum_to_total_all_cases(self): ss = data["social_security"][tp][puf] total_subs = sum(data[sub][tp][puf] for sub in SS_SUBS) np.testing.assert_allclose(total_subs, ss, rtol=1e-5) + + +class TestAgeHeuristicSsShares: + """Age-based fallback for new SS recipient classification.""" + + def test_elderly_gets_retirement(self): + """Age >= 62 -> retirement.""" + data, n, tp = _make_data( + orig_ss=np.array([0]), + orig_ret=np.array([0]), + orig_dis=np.array([0]), + orig_surv=np.array([0]), + orig_dep=np.array([0]), + imputed_ss=np.array([25000]), + age=np.array([70]), + ) + new_recip = np.array([True]) + shares = _age_heuristic_ss_shares(data, n, tp, new_recip) + + assert shares["social_security_retirement"][0] == pytest.approx(1.0) + assert shares["social_security_disability"][0] == pytest.approx(0.0) + + def test_young_gets_disability(self): + """Age < 62 -> disability.""" + data, n, tp = _make_data( + orig_ss=np.array([0]), + orig_ret=np.array([0]), + orig_dis=np.array([0]), + orig_surv=np.array([0]), + orig_dep=np.array([0]), + imputed_ss=np.array([18000]), + age=np.array([45]), + ) + new_recip = np.array([True]) + shares = _age_heuristic_ss_shares(data, n, tp, new_recip) + + assert shares["social_security_retirement"][0] == pytest.approx(0.0) + assert shares["social_security_disability"][0] == pytest.approx(1.0) + + def test_no_age_defaults_to_retirement(self): + """Without age data, default to retirement.""" + data, n, tp = _make_data( + orig_ss=np.array([0]), + orig_ret=np.array([0]), + orig_dis=np.array([0]), + orig_surv=np.array([0]), + orig_dep=np.array([0]), + imputed_ss=np.array([25000]), + ) + new_recip = np.array([True]) + shares = _age_heuristic_ss_shares(data, n, tp, new_recip) + + assert shares["social_security_retirement"][0] == pytest.approx(1.0) + + def test_mixed_ages(self): + """Multiple people split correctly by age threshold.""" + data, n, tp = _make_data( + orig_ss=np.array([0, 0, 0]), + orig_ret=np.array([0, 0, 0]), + orig_dis=np.array([0, 0, 0]), + orig_surv=np.array([0, 0, 0]), + orig_dep=np.array([0, 0, 0]), + imputed_ss=np.array([1, 1, 1]), + age=np.array([30, 62, 80]), + ) + new_recip = np.array([True, True, True]) + shares = _age_heuristic_ss_shares(data, n, tp, new_recip) + + # age 30 -> disability + assert shares["social_security_retirement"][0] == 0 + assert shares["social_security_disability"][0] == 1 + # age 62 -> retirement (>= threshold) + assert shares["social_security_retirement"][1] == 1 + assert shares["social_security_disability"][1] == 0 + # age 80 -> retirement + assert shares["social_security_retirement"][2] == 1 + + +class TestQrfSsShares: + """QRF-based SS sub-component share prediction.""" + + def test_returns_none_without_microimpute(self): + """Returns None when microimpute is not importable.""" + data, n, tp = _make_data( + orig_ss=np.array([10000]), + orig_ret=np.array([10000]), + orig_dis=np.array([0]), + orig_surv=np.array([0]), + orig_dep=np.array([0]), + imputed_ss=np.array([15000]), + age=np.array([70]), + ) + new_recip = np.array([False]) + with patch.dict("sys.modules", {"microimpute": None}): + result = _qrf_ss_shares(data, n, tp, new_recip) + # No new recipients -> no call needed, but also shouldn't error + # Actually test with import blocked: + assert result is None or isinstance(result, dict) + + def test_returns_none_with_few_training_records(self): + """Returns None when < MIN_QRF_TRAINING_RECORDS have SS.""" + # Only 2 training records with SS > 0. + data, n, tp = _make_data( + orig_ss=np.array([10000, 5000, 0]), + orig_ret=np.array([10000, 0, 0]), + orig_dis=np.array([0, 5000, 0]), + orig_surv=np.array([0, 0, 0]), + orig_dep=np.array([0, 0, 0]), + imputed_ss=np.array([0, 0, 20000]), + age=np.array([70, 45, 55]), + ) + new_recip = np.array([False, False, True]) + result = _qrf_ss_shares(data, n, tp, new_recip) + assert result is None + + def test_shares_sum_to_one(self): + """QRF-predicted shares should sum to ~1 after normalisation.""" + pytest.importorskip("microimpute") + rng = np.random.default_rng(42) + n = 500 + + # Synthetic training: age > 62 mostly retirement, + # age < 62 mostly disability. + ages = rng.integers(20, 90, size=n).astype(np.float32) + is_male = rng.integers(0, 2, size=n).astype(np.float32) + ss_vals = rng.uniform(5000, 40000, size=n).astype(np.float32) + ret = np.where(ages >= 62, ss_vals, 0).astype(np.float32) + dis = np.where(ages < 62, ss_vals, 0).astype(np.float32) + surv = np.zeros(n, dtype=np.float32) + dep = np.zeros(n, dtype=np.float32) + + # 20 new recipients (CPS had 0 SS). + n_new = 20 + new_ages = rng.integers(20, 90, size=n_new).astype(np.float32) + new_is_male = rng.integers(0, 2, size=n_new).astype(np.float32) + new_ss = rng.uniform(10000, 30000, size=n_new).astype(np.float32) + + all_ss = np.concatenate([ss_vals, np.zeros(n_new)]) + all_ret = np.concatenate([ret, np.zeros(n_new)]) + all_dis = np.concatenate([dis, np.zeros(n_new)]) + all_surv = np.concatenate([surv, np.zeros(n_new)]) + all_dep = np.concatenate([dep, np.zeros(n_new)]) + all_ages = np.concatenate([ages, new_ages]) + all_is_male = np.concatenate([is_male, new_is_male]) + n_total = n + n_new + + data, nn, tp = _make_data( + orig_ss=all_ss, + orig_ret=all_ret, + orig_dis=all_dis, + orig_surv=all_surv, + orig_dep=all_dep, + imputed_ss=np.concatenate([ss_vals, new_ss]), + age=all_ages, + is_male=all_is_male, + ) + + new_recip = np.concatenate( + [np.zeros(n, dtype=bool), np.ones(n_new, dtype=bool)] + ) + shares = _qrf_ss_shares(data, nn, tp, new_recip) + + assert shares is not None + total = sum(shares.get(sub, 0) for sub in SS_SUBS) + np.testing.assert_allclose(total, 1.0, atol=0.01) + + def test_elderly_predicted_as_retirement(self): + """QRF should mostly predict retirement for age >= 62.""" + pytest.importorskip("microimpute") + rng = np.random.default_rng(123) + n = 500 + + ages = rng.integers(20, 90, size=n).astype(np.float32) + ss_vals = rng.uniform(5000, 40000, size=n).astype(np.float32) + ret = np.where(ages >= 62, ss_vals, 0).astype(np.float32) + dis = np.where(ages < 62, ss_vals, 0).astype(np.float32) + surv = np.zeros(n, dtype=np.float32) + dep = np.zeros(n, dtype=np.float32) + + # 10 new elderly recipients. + n_new = 10 + new_ages = np.full(n_new, 70, dtype=np.float32) + new_ss = np.full(n_new, 20000, dtype=np.float32) + + all_ss = np.concatenate([ss_vals, np.zeros(n_new)]) + all_ret = np.concatenate([ret, np.zeros(n_new)]) + all_dis = np.concatenate([dis, np.zeros(n_new)]) + all_surv = np.concatenate([surv, np.zeros(n_new)]) + all_dep = np.concatenate([dep, np.zeros(n_new)]) + all_ages = np.concatenate([ages, new_ages]) + + data, nn, tp = _make_data( + orig_ss=all_ss, + orig_ret=all_ret, + orig_dis=all_dis, + orig_surv=all_surv, + orig_dep=all_dep, + imputed_ss=np.concatenate([ss_vals, new_ss]), + age=all_ages, + ) + + new_recip = np.concatenate( + [np.zeros(n, dtype=bool), np.ones(n_new, dtype=bool)] + ) + shares = _qrf_ss_shares(data, nn, tp, new_recip) + + assert shares is not None + ret_share = shares["social_security_retirement"] + # All elderly -> retirement share should dominate. + assert np.mean(ret_share) > 0.7 From e7f5933f6afab52fefaf68a8a26c1cb9e2463a64 Mon Sep 17 00:00:00 2001 From: Max Ghenis Date: Mon, 23 Feb 2026 16:10:40 -0500 Subject: [PATCH 04/11] Use QRF for all PUF SS records, not just new recipients CPS-PUF link is statistical (not identity-based), so the paired CPS record's sub-component split is just one noisy draw. A QRF trained on all CPS SS recipients gives a better expected prediction. Also removes unnecessary try/except ImportError guard for microimpute. Co-Authored-By: Claude Opus 4.6 --- .../calibration/puf_impute.py | 97 +++++----- policyengine_us_data/tests/test_puf_impute.py | 173 ++++++++---------- 2 files changed, 123 insertions(+), 147 deletions(-) diff --git a/policyengine_us_data/calibration/puf_impute.py b/policyengine_us_data/calibration/puf_impute.py index a50e6dbb..d5c73785 100644 --- a/policyengine_us_data/calibration/puf_impute.py +++ b/policyengine_us_data/calibration/puf_impute.py @@ -179,29 +179,29 @@ def _qrf_ss_shares( data: Dict[str, Dict[int, np.ndarray]], n_cps: int, time_period: int, - new_recip: np.ndarray, + puf_has_ss: np.ndarray, ) -> Optional[Dict[str, np.ndarray]]: - """Predict SS sub-component shares for new recipients via QRF. + """Predict SS sub-component shares via QRF. Trains on CPS records that have SS > 0 (where the reason-code - split is known), then predicts shares for records where CPS had - zero SS but PUF imputed a positive value. + split is known), then predicts shares for all PUF records with + positive SS. The CPS-PUF link is statistical (not identity-based), + so the QRF gives a better expected prediction than using the + paired CPS record's split. Args: data: Dataset dict. n_cps: Records in CPS half. time_period: Tax year. - new_recip: Boolean mask (length n_cps) for new recipients. + puf_has_ss: Boolean mask (length n_cps) — True where the + PUF half has positive social_security. Returns: Dict mapping sub-component name to predicted share arrays - (length = new_recip.sum()), or None if QRF is unavailable - or training data is insufficient. + (length = puf_has_ss.sum()), or None if training data is + insufficient. """ - try: - from microimpute.models.qrf import QRF - except ImportError: - return None + from microimpute.models.qrf import QRF cps_ss = data["social_security"][time_period][:n_cps] has_ss = cps_ss > 0 @@ -218,7 +218,7 @@ def _qrf_ss_shares( continue vals = data[pred][time_period][:n_cps] train_cols[pred] = vals[has_ss].astype(np.float32) - test_cols[pred] = vals[new_recip].astype(np.float32) + test_cols[pred] = vals[puf_has_ss].astype(np.float32) predictors.append(pred) if not predictors: @@ -275,8 +275,8 @@ def _qrf_ss_shares( gc.collect() logger.info( - "QRF SS split: predicted shares for %d new recipients", - new_recip.sum(), + "QRF SS split: predicted shares for %d PUF records", + puf_has_ss.sum(), ) return shares @@ -285,19 +285,19 @@ def _age_heuristic_ss_shares( data: Dict[str, Dict[int, np.ndarray]], n_cps: int, time_period: int, - new_recip: np.ndarray, + puf_has_ss: np.ndarray, ) -> Dict[str, np.ndarray]: - """Fallback: assign new recipients by age threshold. + """Fallback: assign SS type by age threshold. Age >= 62 -> retirement, < 62 -> disability. If age is unavailable, all go to retirement. """ - n_new = new_recip.sum() - shares = {sub: np.zeros(n_new) for sub in SS_SUBCOMPONENTS} + n_pred = puf_has_ss.sum() + shares = {sub: np.zeros(n_pred) for sub in SS_SUBCOMPONENTS} age = None if "age" in data: - age = data["age"][time_period][:n_cps][new_recip] + age = data["age"][time_period][:n_cps][puf_has_ss] if age is not None: is_old = age >= MINIMUM_RETIREMENT_AGE @@ -307,7 +307,7 @@ def _age_heuristic_ss_shares( shares["social_security_disability"] = (~is_old).astype(np.float64) else: if "social_security_retirement" in shares: - shares["social_security_retirement"] = np.ones(n_new) + shares["social_security_retirement"] = np.ones(n_pred) return shares @@ -317,17 +317,17 @@ def reconcile_ss_subcomponents( n_cps: int, time_period: int, ) -> None: - """Rescale SS sub-components so they sum to social_security. + """Predict SS sub-components for PUF half from demographics. - After PUF imputation replaces social_security on the PUF half, - the sub-components (retirement, disability, survivors, dependents) - still hold the original CPS values. This function rescales them - proportionally so they match the new total. + The CPS-PUF link is statistical (not identity-based), so the + paired CPS record's sub-component split is just one noisy draw. + A QRF trained on all CPS SS recipients gives a better expected + prediction by pooling across the full training set. - For new recipients (CPS had zero SS, PUF imputed positive), a - QRF is trained on CPS records to predict the sub-component split - from demographics. Falls back to an age heuristic (>= 62 -> - retirement, < 62 -> disability) when QRF is unavailable. + For all PUF records with positive social_security, this function + predicts shares via QRF (falling back to an age heuristic) and + scales them to match the imputed total. PUF records with zero + SS get all sub-components cleared to zero. Modifies ``data`` in place. Only the PUF half (indices n_cps .. 2*n_cps) is changed. @@ -340,40 +340,27 @@ def reconcile_ss_subcomponents( if "social_security" not in data: return - ss = data["social_security"][time_period] - cps_ss = ss[:n_cps] - puf_ss = ss[n_cps:] + puf_ss = data["social_security"][time_period][n_cps:] + puf_has_ss = puf_ss > 0 - new_recip = (cps_ss == 0) & (puf_ss > 0) - - # Predict shares for new recipients (QRF with age fallback). - new_shares = None - if new_recip.any(): - new_shares = _qrf_ss_shares(data, n_cps, time_period, new_recip) - if new_shares is None: - new_shares = _age_heuristic_ss_shares( - data, n_cps, time_period, new_recip + # Predict shares for all PUF records with SS > 0. + shares = None + if puf_has_ss.any(): + shares = _qrf_ss_shares(data, n_cps, time_period, puf_has_ss) + if shares is None: + shares = _age_heuristic_ss_shares( + data, n_cps, time_period, puf_has_ss ) for sub in SS_SUBCOMPONENTS: if sub not in data: continue arr = data[sub][time_period] - cps_sub = arr[:n_cps] - - # Existing recipients: proportional rescaling. - with np.errstate(divide="ignore", invalid="ignore"): - ratio = np.where(cps_ss > 0, cps_sub / cps_ss, 0.0) - new_puf = ratio * puf_ss - - # New recipients: apply predicted shares. - if new_recip.any() and new_shares is not None: - share = new_shares.get(sub, np.zeros(new_recip.sum())) - puf_new_vals = puf_ss[new_recip] * share - new_puf[new_recip] = puf_new_vals - # PUF imputed zero -> clear sub-component. - new_puf = np.where(puf_ss == 0, 0.0, new_puf) + new_puf = np.zeros(n_cps) + if puf_has_ss.any() and shares is not None: + share = shares.get(sub, np.zeros(puf_has_ss.sum())) + new_puf[puf_has_ss] = puf_ss[puf_has_ss] * share arr[n_cps:] = new_puf.astype(arr.dtype) data[sub][time_period] = arr diff --git a/policyengine_us_data/tests/test_puf_impute.py b/policyengine_us_data/tests/test_puf_impute.py index 030f2754..fcdcf763 100644 --- a/policyengine_us_data/tests/test_puf_impute.py +++ b/policyengine_us_data/tests/test_puf_impute.py @@ -1,12 +1,11 @@ """Tests for PUF imputation, specifically SS sub-component reconciliation. When PUF imputation replaces social_security values, the sub-components -(retirement, disability, survivors, dependents) must be rescaled to match -the new total. See: https://github.com/PolicyEngine/policyengine-us-data/issues/551 +(retirement, disability, survivors, dependents) must be predicted from +demographics (not copied from the statistically-linked CPS record). +See: https://github.com/PolicyEngine/policyengine-us-data/issues/551 """ -from unittest.mock import patch - import numpy as np import pytest @@ -67,8 +66,8 @@ def _make_data( class TestReconcileSsSubcomponents: """Sub-components must sum to social_security after reconciliation.""" - def test_proportional_rescaling(self): - """When CPS has a split, PUF subs scale proportionally.""" + def test_subs_sum_to_total(self): + """PUF subs must sum to imputed SS total.""" data, n, tp = _make_data( orig_ss=np.array([20000, 15000]), orig_ret=np.array([14000, 0]), @@ -76,6 +75,7 @@ def test_proportional_rescaling(self): orig_surv=np.array([0, 0]), orig_dep=np.array([0, 0]), imputed_ss=np.array([30000, 10000]), + age=np.array([70, 45]), ) reconcile_ss_subcomponents(data, n, tp) @@ -84,11 +84,28 @@ def test_proportional_rescaling(self): total_subs = sum(data[sub][tp][puf] for sub in SS_SUBS) np.testing.assert_allclose(total_subs, ss, rtol=1e-5) - # Person 0: ret=14/20=70%, dis=6/20=30% + def test_age_determines_split(self): + """Age heuristic: >= 62 -> retirement, < 62 -> disability.""" + data, n, tp = _make_data( + orig_ss=np.array([20000, 15000]), + orig_ret=np.array([14000, 0]), + orig_dis=np.array([6000, 15000]), + orig_surv=np.array([0, 0]), + orig_dep=np.array([0, 0]), + imputed_ss=np.array([30000, 10000]), + age=np.array([70, 45]), + ) + reconcile_ss_subcomponents(data, n, tp) + + puf = slice(n, 2 * n) ret = data["social_security_retirement"][tp][puf] - np.testing.assert_allclose(ret[0], 30000 * 0.7, rtol=1e-5) dis = data["social_security_disability"][tp][puf] - np.testing.assert_allclose(dis[0], 30000 * 0.3, rtol=1e-5) + # Person 0 is 70 -> retirement + assert ret[0] == pytest.approx(30000) + assert dis[0] == pytest.approx(0) + # Person 1 is 45 -> disability + assert ret[1] == pytest.approx(0) + assert dis[1] == pytest.approx(10000) def test_imputed_zero_clears_subs(self): """When PUF imputes zero SS, all subs should be zero.""" @@ -144,13 +161,13 @@ def test_no_social_security_is_noop(self): data = {"some_var": {2024: np.array([1, 2])}} reconcile_ss_subcomponents(data, 1, 2024) - def test_subs_sum_to_total_all_cases(self): - """Comprehensive: subs must sum to total across all cases.""" + def test_subs_sum_to_total_mixed_cases(self): + """Comprehensive: subs sum to total across mixed cases.""" data, n, tp = _make_data( - # Person 0: existing recipient (rescale) - # Person 1: new recipient, old (retirement) - # Person 2: new recipient, young (disability) - # Person 3: PUF zeroed out (clear) + # Person 0: has CPS SS, has PUF SS (old) + # Person 1: no CPS SS, has PUF SS (old) + # Person 2: no CPS SS, has PUF SS (young) + # Person 3: has CPS SS, PUF zeroed out orig_ss=np.array([20000, 0, 0, 15000]), orig_ret=np.array([20000, 0, 0, 15000]), orig_dis=np.array([0, 0, 0, 0]), @@ -168,7 +185,7 @@ def test_subs_sum_to_total_all_cases(self): class TestAgeHeuristicSsShares: - """Age-based fallback for new SS recipient classification.""" + """Age-based fallback for SS type classification.""" def test_elderly_gets_retirement(self): """Age >= 62 -> retirement.""" @@ -181,8 +198,8 @@ def test_elderly_gets_retirement(self): imputed_ss=np.array([25000]), age=np.array([70]), ) - new_recip = np.array([True]) - shares = _age_heuristic_ss_shares(data, n, tp, new_recip) + puf_has_ss = np.array([True]) + shares = _age_heuristic_ss_shares(data, n, tp, puf_has_ss) assert shares["social_security_retirement"][0] == pytest.approx(1.0) assert shares["social_security_disability"][0] == pytest.approx(0.0) @@ -198,8 +215,8 @@ def test_young_gets_disability(self): imputed_ss=np.array([18000]), age=np.array([45]), ) - new_recip = np.array([True]) - shares = _age_heuristic_ss_shares(data, n, tp, new_recip) + puf_has_ss = np.array([True]) + shares = _age_heuristic_ss_shares(data, n, tp, puf_has_ss) assert shares["social_security_retirement"][0] == pytest.approx(0.0) assert shares["social_security_disability"][0] == pytest.approx(1.0) @@ -214,8 +231,8 @@ def test_no_age_defaults_to_retirement(self): orig_dep=np.array([0]), imputed_ss=np.array([25000]), ) - new_recip = np.array([True]) - shares = _age_heuristic_ss_shares(data, n, tp, new_recip) + puf_has_ss = np.array([True]) + shares = _age_heuristic_ss_shares(data, n, tp, puf_has_ss) assert shares["social_security_retirement"][0] == pytest.approx(1.0) @@ -230,8 +247,8 @@ def test_mixed_ages(self): imputed_ss=np.array([1, 1, 1]), age=np.array([30, 62, 80]), ) - new_recip = np.array([True, True, True]) - shares = _age_heuristic_ss_shares(data, n, tp, new_recip) + puf_has_ss = np.array([True, True, True]) + shares = _age_heuristic_ss_shares(data, n, tp, puf_has_ss) # age 30 -> disability assert shares["social_security_retirement"][0] == 0 @@ -246,24 +263,6 @@ def test_mixed_ages(self): class TestQrfSsShares: """QRF-based SS sub-component share prediction.""" - def test_returns_none_without_microimpute(self): - """Returns None when microimpute is not importable.""" - data, n, tp = _make_data( - orig_ss=np.array([10000]), - orig_ret=np.array([10000]), - orig_dis=np.array([0]), - orig_surv=np.array([0]), - orig_dep=np.array([0]), - imputed_ss=np.array([15000]), - age=np.array([70]), - ) - new_recip = np.array([False]) - with patch.dict("sys.modules", {"microimpute": None}): - result = _qrf_ss_shares(data, n, tp, new_recip) - # No new recipients -> no call needed, but also shouldn't error - # Actually test with import blocked: - assert result is None or isinstance(result, dict) - def test_returns_none_with_few_training_records(self): """Returns None when < MIN_QRF_TRAINING_RECORDS have SS.""" # Only 2 training records with SS > 0. @@ -276,8 +275,8 @@ def test_returns_none_with_few_training_records(self): imputed_ss=np.array([0, 0, 20000]), age=np.array([70, 45, 55]), ) - new_recip = np.array([False, False, True]) - result = _qrf_ss_shares(data, n, tp, new_recip) + puf_has_ss = np.array([False, False, True]) + result = _qrf_ss_shares(data, n, tp, puf_has_ss) assert result is None def test_shares_sum_to_one(self): @@ -296,36 +295,22 @@ def test_shares_sum_to_one(self): surv = np.zeros(n, dtype=np.float32) dep = np.zeros(n, dtype=np.float32) - # 20 new recipients (CPS had 0 SS). - n_new = 20 - new_ages = rng.integers(20, 90, size=n_new).astype(np.float32) - new_is_male = rng.integers(0, 2, size=n_new).astype(np.float32) - new_ss = rng.uniform(10000, 30000, size=n_new).astype(np.float32) - - all_ss = np.concatenate([ss_vals, np.zeros(n_new)]) - all_ret = np.concatenate([ret, np.zeros(n_new)]) - all_dis = np.concatenate([dis, np.zeros(n_new)]) - all_surv = np.concatenate([surv, np.zeros(n_new)]) - all_dep = np.concatenate([dep, np.zeros(n_new)]) - all_ages = np.concatenate([ages, new_ages]) - all_is_male = np.concatenate([is_male, new_is_male]) - n_total = n + n_new + # PUF SS for all records. + puf_ss = rng.uniform(5000, 40000, size=n).astype(np.float32) data, nn, tp = _make_data( - orig_ss=all_ss, - orig_ret=all_ret, - orig_dis=all_dis, - orig_surv=all_surv, - orig_dep=all_dep, - imputed_ss=np.concatenate([ss_vals, new_ss]), - age=all_ages, - is_male=all_is_male, + orig_ss=ss_vals, + orig_ret=ret, + orig_dis=dis, + orig_surv=surv, + orig_dep=dep, + imputed_ss=puf_ss, + age=ages, + is_male=is_male, ) - new_recip = np.concatenate( - [np.zeros(n, dtype=bool), np.ones(n_new, dtype=bool)] - ) - shares = _qrf_ss_shares(data, nn, tp, new_recip) + puf_has_ss = puf_ss > 0 + shares = _qrf_ss_shares(data, nn, tp, puf_has_ss) assert shares is not None total = sum(shares.get(sub, 0) for sub in SS_SUBS) @@ -335,26 +320,32 @@ def test_elderly_predicted_as_retirement(self): """QRF should mostly predict retirement for age >= 62.""" pytest.importorskip("microimpute") rng = np.random.default_rng(123) - n = 500 + n_train = 500 - ages = rng.integers(20, 90, size=n).astype(np.float32) - ss_vals = rng.uniform(5000, 40000, size=n).astype(np.float32) + ages = rng.integers(20, 90, size=n_train).astype(np.float32) + ss_vals = rng.uniform(5000, 40000, size=n_train).astype(np.float32) ret = np.where(ages >= 62, ss_vals, 0).astype(np.float32) dis = np.where(ages < 62, ss_vals, 0).astype(np.float32) - surv = np.zeros(n, dtype=np.float32) - dep = np.zeros(n, dtype=np.float32) - - # 10 new elderly recipients. - n_new = 10 - new_ages = np.full(n_new, 70, dtype=np.float32) - new_ss = np.full(n_new, 20000, dtype=np.float32) - - all_ss = np.concatenate([ss_vals, np.zeros(n_new)]) - all_ret = np.concatenate([ret, np.zeros(n_new)]) - all_dis = np.concatenate([dis, np.zeros(n_new)]) - all_surv = np.concatenate([surv, np.zeros(n_new)]) - all_dep = np.concatenate([dep, np.zeros(n_new)]) - all_ages = np.concatenate([ages, new_ages]) + surv = np.zeros(n_train, dtype=np.float32) + dep = np.zeros(n_train, dtype=np.float32) + + # Only elderly records get PUF SS > 0; training records + # get PUF SS = 0 so puf_has_ss selects just the elderly. + n_pred = 10 + pred_ages = np.full(n_pred, 70, dtype=np.float32) + + all_ages = np.concatenate([ages, pred_ages]) + all_ss = np.concatenate([ss_vals, np.zeros(n_pred)]) + all_ret = np.concatenate([ret, np.zeros(n_pred)]) + all_dis = np.concatenate([dis, np.zeros(n_pred)]) + all_surv = np.concatenate([surv, np.zeros(n_pred)]) + all_dep = np.concatenate([dep, np.zeros(n_pred)]) + puf_ss = np.concatenate( + [ + np.zeros(n_train, dtype=np.float32), + np.full(n_pred, 20000, dtype=np.float32), + ] + ) data, nn, tp = _make_data( orig_ss=all_ss, @@ -362,14 +353,12 @@ def test_elderly_predicted_as_retirement(self): orig_dis=all_dis, orig_surv=all_surv, orig_dep=all_dep, - imputed_ss=np.concatenate([ss_vals, new_ss]), + imputed_ss=puf_ss, age=all_ages, ) - new_recip = np.concatenate( - [np.zeros(n, dtype=bool), np.ones(n_new, dtype=bool)] - ) - shares = _qrf_ss_shares(data, nn, tp, new_recip) + puf_has_ss = data["social_security"][tp][nn:] > 0 + shares = _qrf_ss_shares(data, nn, tp, puf_has_ss) assert shares is not None ret_share = shares["social_security_retirement"] From 88f2aa4c60435477490a8987b64d45f9c8146a0b Mon Sep 17 00:00:00 2001 From: Max Ghenis Date: Tue, 24 Feb 2026 21:18:44 -0500 Subject: [PATCH 05/11] Add towncrier changelog fragment Co-Authored-By: Claude Opus 4.6 --- changelog.d/fix-ss-subcomponent-reconciliation.fixed.md | 1 + 1 file changed, 1 insertion(+) create mode 100644 changelog.d/fix-ss-subcomponent-reconciliation.fixed.md diff --git a/changelog.d/fix-ss-subcomponent-reconciliation.fixed.md b/changelog.d/fix-ss-subcomponent-reconciliation.fixed.md new file mode 100644 index 00000000..2bfa8149 --- /dev/null +++ b/changelog.d/fix-ss-subcomponent-reconciliation.fixed.md @@ -0,0 +1 @@ +Reconcile SS sub-components after PUF imputation so they sum to social_security. From 92b5d04cc207f1141334f5008eed376d5701e6f7 Mon Sep 17 00:00:00 2001 From: Max Ghenis Date: Wed, 25 Feb 2026 09:40:24 -0500 Subject: [PATCH 06/11] Bump policyengine-us to 1.584.0 Fixes CI failure: DE TANF deficit_rate parameter started at 2024-10-01 in 1.570.7, causing ParameterNotFoundError for Jan 2024 simulations. Fixed in newer releases (start date corrected to 2011-10-01). Co-Authored-By: Claude Opus 4.6 --- uv.lock | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/uv.lock b/uv.lock index fc61e96d..09a004ac 100644 --- a/uv.lock +++ b/uv.lock @@ -1842,7 +1842,7 @@ wheels = [ [[package]] name = "policyengine-us" -version = "1.570.7" +version = "1.584.0" source = { registry = "https://pypi.org/simple" } dependencies = [ { name = "microdf-python" }, @@ -1850,9 +1850,9 @@ dependencies = [ { name = "policyengine-core" }, { name = "tqdm" }, ] -sdist = { url = "https://files.pythonhosted.org/packages/6a/eb/291b3085aa0fa97fcce4987d54991118f21aead49647b3f475998459f46b/policyengine_us-1.570.7.tar.gz", hash = "sha256:a2967af86a61468a0bdb6b2dc7af2fd0bb0f0064203fa557b6fee8023058360a", size = 8668680, upload-time = "2026-02-19T07:17:11.264Z" } +sdist = { url = "https://files.pythonhosted.org/packages/ef/29/a4517eac6dedc1d6c1a17b0d105b9aa41ca41b877e6ecef39362ecd0438c/policyengine_us-1.584.0.tar.gz", hash = "sha256:9bb830765ece943b11939822da94d64a26c802c941b8f929a354354394a2ef7e", size = 8701324, upload-time = "2026-02-24T15:05:04.474Z" } wheels = [ - { url = "https://files.pythonhosted.org/packages/38/36/0213955310076e4dec2781baeabf96b6d6937f99cd19c373b363bfbd7152/policyengine_us-1.570.7-py3-none-any.whl", hash = "sha256:374fd5357d6cb3734b900bd08dfdb61760dfc5b913ed686f57a40239565b0edd", size = 7825404, upload-time = "2026-02-19T07:17:08.877Z" }, + { url = "https://files.pythonhosted.org/packages/17/f4/b8224384de676662858a079bc3138141a072ad886266e1e2d28a3108ad9d/policyengine_us-1.584.0-py3-none-any.whl", hash = "sha256:55c97c9e63c8ba18b895b74076d893de720cd44089398a62f0d02cc7afc8d53c", size = 7933593, upload-time = "2026-02-24T15:05:01.846Z" }, ] [[package]] From 2a8a0e2ed4c543c96c0c8285d80266cb23b8c1b1 Mon Sep 17 00:00:00 2001 From: Max Ghenis Date: Wed, 25 Feb 2026 20:53:34 -0500 Subject: [PATCH 07/11] Drop formula variables from dataset, add integrity tests Variables with formulas in policyengine-us are recomputed by the simulation engine, so storing them wastes space and can mislead validation. This removes 9 such variables from the extended CPS output (saving ~15MB). Also adds tests verifying: - No formula variables are stored (except person_id) - Stored input values match what the simulation computes - SS sub-components sum to total social_security per person Co-Authored-By: Claude Opus 4.6 --- .../datasets/cps/extended_cps.py | 31 +++ tests/test_no_formula_variables_stored.py | 178 ++++++++++++++++++ 2 files changed, 209 insertions(+) create mode 100644 tests/test_no_formula_variables_stored.py diff --git a/policyengine_us_data/datasets/cps/extended_cps.py b/policyengine_us_data/datasets/cps/extended_cps.py index 4b427a39..4495fc18 100644 --- a/policyengine_us_data/datasets/cps/extended_cps.py +++ b/policyengine_us_data/datasets/cps/extended_cps.py @@ -50,8 +50,39 @@ def generate(self): dataset_path=str(self.cps.file_path), ) + new_data = self._drop_formula_variables(new_data) self.save_dataset(new_data) + # Variables with formulas that must still be stored (e.g. IDs + # needed by the dataset loader before formulas can run). + _KEEP_FORMULA_VARS = {"person_id"} + + @classmethod + def _drop_formula_variables(cls, data): + """Remove variables that have formulas in policyengine-us. + + The simulation engine recomputes these from their inputs, + so storing them wastes space and can mislead validation. + """ + from policyengine_us import CountryTaxBenefitSystem + + tbs = CountryTaxBenefitSystem() + formula_vars = { + name + for name, var in tbs.variables.items() + if hasattr(var, "formulas") and len(var.formulas) > 0 + } - cls._KEEP_FORMULA_VARS + dropped = sorted(set(data.keys()) & formula_vars) + if dropped: + logger.info( + "Dropping %d formula variables: %s", + len(dropped), + dropped, + ) + for var in dropped: + del data[var] + return data + class ExtendedCPS_2024(ExtendedCPS): cps = CPS_2024_Full diff --git a/tests/test_no_formula_variables_stored.py b/tests/test_no_formula_variables_stored.py new file mode 100644 index 00000000..08cc00cf --- /dev/null +++ b/tests/test_no_formula_variables_stored.py @@ -0,0 +1,178 @@ +"""Test dataset variable integrity. + +1. Variables with formulas in policyengine-us will be recomputed by the + simulation engine, ignoring any stored value. Storing them wastes + space and can mislead during validation. + +2. Stored input values should match what the simulation actually uses, + to catch issues like set_input_divide_by_period silently altering + values, or sub-components not summing to their parent total. +""" + +import h5py +import numpy as np +import pytest +from policyengine_us_data.datasets.cps.extended_cps import ExtendedCPS_2024 + + +KNOWN_FORMULA_EXCEPTIONS = { + # person_id is stored for identity tracking even though it has a + # trivial formula (arange). Safe to keep. + "person_id", +} + + +@pytest.fixture(scope="module") +def tax_benefit_system(): + from policyengine_us import CountryTaxBenefitSystem + + return CountryTaxBenefitSystem() + + +@pytest.fixture(scope="module") +def formula_variables(tax_benefit_system): + """Return set of variable names that have formulas.""" + return { + name + for name, var in tax_benefit_system.variables.items() + if hasattr(var, "formulas") and len(var.formulas) > 0 + } + + +@pytest.fixture(scope="module") +def dataset_path(): + path = ExtendedCPS_2024.file_path + if not path.exists(): + pytest.skip("Extended CPS 2024 not built locally") + return path + + +@pytest.fixture(scope="module") +def stored_variables(dataset_path): + """Return set of variable names stored in the extended CPS.""" + with h5py.File(dataset_path, "r") as f: + return set(f.keys()) + + +@pytest.fixture(scope="module") +def sim(): + from policyengine_us import Microsimulation + + return Microsimulation(dataset=ExtendedCPS_2024) + + +def test_no_formula_variables_stored( + formula_variables, stored_variables +): + """Variables with formulas should not be stored in the dataset.""" + overlap = ( + stored_variables & formula_variables + ) - KNOWN_FORMULA_EXCEPTIONS + if overlap: + msg = ( + f"These {len(overlap)} variables have formulas in " + f"policyengine-us but are stored in the dataset " + f"(stored values will be IGNORED by the simulation):\n" + ) + for v in sorted(overlap): + msg += f" - {v}\n" + pytest.fail(msg) + + +def test_stored_values_match_computed( + sim, stored_variables, formula_variables, dataset_path +): + """For input-only variables, stored values should match what the + simulation returns (catches set_input issues, rounding, etc.).""" + input_vars = stored_variables - formula_variables + mismatches = [] + + with h5py.File(dataset_path, "r") as f: + for var_name in sorted(input_vars): + if var_name not in f or "2024" not in f[var_name]: + continue + stored = f[var_name]["2024"][...] + if stored.dtype.kind not in ("f", "i", "u"): + continue + + stored_f = stored.astype(float) + stored_total = np.sum(stored_f) + if abs(stored_total) < 1: + continue + + try: + computed = np.array( + sim.calculate(var_name, 2024) + ) + except Exception: + continue + + computed_total = np.sum(computed.astype(float)) + if abs(stored_total) > 0: + pct_diff = ( + abs(stored_total - computed_total) + / abs(stored_total) + * 100 + ) + else: + pct_diff = 0 + + if pct_diff > 1: + mismatches.append( + f" {var_name}: stored=${stored_total:,.0f}, " + f"computed=${computed_total:,.0f}, " + f"diff={pct_diff:.1f}%" + ) + + if mismatches: + msg = ( + f"These {len(mismatches)} input variables have >1% " + f"difference between stored and computed values:\n" + ) + msg += "\n".join(mismatches) + pytest.fail(msg) + + +def test_ss_subcomponents_sum_to_total(dataset_path): + """Social Security sub-components should sum to the total.""" + with h5py.File(dataset_path, "r") as f: + ss_total = f["social_security"]["2024"][...].astype(float) + ss_retirement = f["social_security_retirement"]["2024"][ + ... + ].astype(float) + ss_disability = f["social_security_disability"]["2024"][ + ... + ].astype(float) + ss_survivors = f["social_security_survivors"]["2024"][ + ... + ].astype(float) + ss_dependents = f["social_security_dependents"]["2024"][ + ... + ].astype(float) + + sub_sum = ( + ss_retirement + ss_disability + ss_survivors + ss_dependents + ) + + # Only check records that have any SS income + has_ss = ss_total > 0 + if not np.any(has_ss): + pytest.skip("No SS recipients in dataset") + + total_ss = np.sum(ss_total[has_ss]) + total_sub = np.sum(sub_sum[has_ss]) + pct_diff = abs(total_ss - total_sub) / total_ss * 100 + + assert pct_diff < 1, ( + f"SS sub-components don't sum to total: " + f"total=${total_ss:,.0f}, sub_sum=${total_sub:,.0f}, " + f"diff={pct_diff:.1f}%" + ) + + # Per-person check: no person's sub-components should exceed total + excess = sub_sum[has_ss] - ss_total[has_ss] + n_excess = np.sum(excess > 1) + assert n_excess == 0, ( + f"{n_excess} people have SS sub-components exceeding " + f"their total SS income" + ) From 1c83a8f63b2e214ccc577bf3bbff2015aa574e82 Mon Sep 17 00:00:00 2001 From: Max Ghenis Date: Wed, 25 Feb 2026 20:58:01 -0500 Subject: [PATCH 08/11] Format test file with black Co-Authored-By: Claude Opus 4.6 --- tests/test_no_formula_variables_stored.py | 42 +++++++++-------------- 1 file changed, 17 insertions(+), 25 deletions(-) diff --git a/tests/test_no_formula_variables_stored.py b/tests/test_no_formula_variables_stored.py index 08cc00cf..08f53e5d 100644 --- a/tests/test_no_formula_variables_stored.py +++ b/tests/test_no_formula_variables_stored.py @@ -61,13 +61,9 @@ def sim(): return Microsimulation(dataset=ExtendedCPS_2024) -def test_no_formula_variables_stored( - formula_variables, stored_variables -): +def test_no_formula_variables_stored(formula_variables, stored_variables): """Variables with formulas should not be stored in the dataset.""" - overlap = ( - stored_variables & formula_variables - ) - KNOWN_FORMULA_EXCEPTIONS + overlap = (stored_variables & formula_variables) - KNOWN_FORMULA_EXCEPTIONS if overlap: msg = ( f"These {len(overlap)} variables have formulas in " @@ -101,9 +97,7 @@ def test_stored_values_match_computed( continue try: - computed = np.array( - sim.calculate(var_name, 2024) - ) + computed = np.array(sim.calculate(var_name, 2024)) except Exception: continue @@ -137,22 +131,20 @@ def test_ss_subcomponents_sum_to_total(dataset_path): """Social Security sub-components should sum to the total.""" with h5py.File(dataset_path, "r") as f: ss_total = f["social_security"]["2024"][...].astype(float) - ss_retirement = f["social_security_retirement"]["2024"][ - ... - ].astype(float) - ss_disability = f["social_security_disability"]["2024"][ - ... - ].astype(float) - ss_survivors = f["social_security_survivors"]["2024"][ - ... - ].astype(float) - ss_dependents = f["social_security_dependents"]["2024"][ - ... - ].astype(float) - - sub_sum = ( - ss_retirement + ss_disability + ss_survivors + ss_dependents - ) + ss_retirement = f["social_security_retirement"]["2024"][...].astype( + float + ) + ss_disability = f["social_security_disability"]["2024"][...].astype( + float + ) + ss_survivors = f["social_security_survivors"]["2024"][...].astype( + float + ) + ss_dependents = f["social_security_dependents"]["2024"][...].astype( + float + ) + + sub_sum = ss_retirement + ss_disability + ss_survivors + ss_dependents # Only check records that have any SS income has_ss = ss_total > 0 From 86ecc0cc89e403634a2c3e763b73a1c0b04b1972 Mon Sep 17 00:00:00 2001 From: Max Ghenis Date: Wed, 25 Feb 2026 21:00:54 -0500 Subject: [PATCH 09/11] Fix black formatting for CI (26.1.0) Co-Authored-By: Claude Opus 4.6 --- tests/test_no_formula_variables_stored.py | 1 - 1 file changed, 1 deletion(-) diff --git a/tests/test_no_formula_variables_stored.py b/tests/test_no_formula_variables_stored.py index 08f53e5d..e16bd80c 100644 --- a/tests/test_no_formula_variables_stored.py +++ b/tests/test_no_formula_variables_stored.py @@ -14,7 +14,6 @@ import pytest from policyengine_us_data.datasets.cps.extended_cps import ExtendedCPS_2024 - KNOWN_FORMULA_EXCEPTIONS = { # person_id is stored for identity tracking even though it has a # trivial formula (arange). Safe to keep. From f304f82072c844bb5ede9f2e6eec031458fa5bf3 Mon Sep 17 00:00:00 2001 From: Max Ghenis Date: Wed, 25 Feb 2026 21:18:46 -0500 Subject: [PATCH 10/11] fixup! Fix black formatting for CI (26.1.0) --- policyengine_us_data/datasets/cps/extended_cps.py | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/policyengine_us_data/datasets/cps/extended_cps.py b/policyengine_us_data/datasets/cps/extended_cps.py index 4495fc18..ac8b6e45 100644 --- a/policyengine_us_data/datasets/cps/extended_cps.py +++ b/policyengine_us_data/datasets/cps/extended_cps.py @@ -59,10 +59,11 @@ def generate(self): @classmethod def _drop_formula_variables(cls, data): - """Remove variables that have formulas in policyengine-us. + """Remove variables that are computed by policyengine-us. - The simulation engine recomputes these from their inputs, - so storing them wastes space and can mislead validation. + Variables with formulas, ``adds``, or ``subtracts`` are + recomputed by the simulation engine, so storing them wastes + space and can mislead validation. """ from policyengine_us import CountryTaxBenefitSystem @@ -70,7 +71,9 @@ def _drop_formula_variables(cls, data): formula_vars = { name for name, var in tbs.variables.items() - if hasattr(var, "formulas") and len(var.formulas) > 0 + if (hasattr(var, "formulas") and len(var.formulas) > 0) + or getattr(var, "adds", None) + or getattr(var, "subtracts", None) } - cls._KEEP_FORMULA_VARS dropped = sorted(set(data.keys()) & formula_vars) if dropped: From b7bdc215f9d72149634ca40ef11a1ac7c337f529 Mon Sep 17 00:00:00 2001 From: Max Ghenis Date: Wed, 25 Feb 2026 21:26:55 -0500 Subject: [PATCH 11/11] Also drop adds/subtracts variables from dataset Update _drop_formula_variables and tests to catch variables that use `adds` or `subtracts` (e.g. social_security), not just explicit formulas. These are also recomputed by the simulation engine. Co-Authored-By: Claude Opus 4.6 --- tests/test_no_formula_variables_stored.py | 44 +++++++++++++++-------- 1 file changed, 29 insertions(+), 15 deletions(-) diff --git a/tests/test_no_formula_variables_stored.py b/tests/test_no_formula_variables_stored.py index e16bd80c..9334a5c7 100644 --- a/tests/test_no_formula_variables_stored.py +++ b/tests/test_no_formula_variables_stored.py @@ -30,11 +30,17 @@ def tax_benefit_system(): @pytest.fixture(scope="module") def formula_variables(tax_benefit_system): - """Return set of variable names that have formulas.""" + """Return set of variable names computed by policyengine-us. + + Includes variables with explicit formulas as well as those using + ``adds`` or ``subtracts`` (which the engine auto-sums at runtime). + """ return { name for name, var in tax_benefit_system.variables.items() - if hasattr(var, "formulas") and len(var.formulas) > 0 + if (hasattr(var, "formulas") and len(var.formulas) > 0) + or getattr(var, "adds", None) + or getattr(var, "subtracts", None) } @@ -61,12 +67,13 @@ def sim(): def test_no_formula_variables_stored(formula_variables, stored_variables): - """Variables with formulas should not be stored in the dataset.""" + """Computed variables should not be stored in the dataset.""" overlap = (stored_variables & formula_variables) - KNOWN_FORMULA_EXCEPTIONS if overlap: msg = ( - f"These {len(overlap)} variables have formulas in " - f"policyengine-us but are stored in the dataset " + f"These {len(overlap)} variables are computed by " + f"policyengine-us (formulas/adds/subtracts) but are " + f"stored in the dataset " f"(stored values will be IGNORED by the simulation):\n" ) for v in sorted(overlap): @@ -126,10 +133,14 @@ def test_stored_values_match_computed( pytest.fail(msg) -def test_ss_subcomponents_sum_to_total(dataset_path): - """Social Security sub-components should sum to the total.""" +def test_ss_subcomponents_sum_to_computed_total(sim, dataset_path): + """Social Security sub-components should sum to the computed total. + + ``social_security`` is computed via ``adds`` in policyengine-us and + is NOT stored in the dataset. We verify that the sub-components + stored in the dataset sum to the simulation's computed total. + """ with h5py.File(dataset_path, "r") as f: - ss_total = f["social_security"]["2024"][...].astype(float) ss_retirement = f["social_security_retirement"]["2024"][...].astype( float ) @@ -144,26 +155,29 @@ def test_ss_subcomponents_sum_to_total(dataset_path): ) sub_sum = ss_retirement + ss_disability + ss_survivors + ss_dependents + computed_total = np.array(sim.calculate("social_security", 2024)).astype( + float + ) # Only check records that have any SS income - has_ss = ss_total > 0 + has_ss = computed_total > 0 if not np.any(has_ss): pytest.skip("No SS recipients in dataset") - total_ss = np.sum(ss_total[has_ss]) + total_computed = np.sum(computed_total[has_ss]) total_sub = np.sum(sub_sum[has_ss]) - pct_diff = abs(total_ss - total_sub) / total_ss * 100 + pct_diff = abs(total_computed - total_sub) / total_computed * 100 assert pct_diff < 1, ( - f"SS sub-components don't sum to total: " - f"total=${total_ss:,.0f}, sub_sum=${total_sub:,.0f}, " + f"SS sub-components don't sum to computed total: " + f"computed=${total_computed:,.0f}, sub_sum=${total_sub:,.0f}, " f"diff={pct_diff:.1f}%" ) # Per-person check: no person's sub-components should exceed total - excess = sub_sum[has_ss] - ss_total[has_ss] + excess = sub_sum[has_ss] - computed_total[has_ss] n_excess = np.sum(excess > 1) assert n_excess == 0, ( f"{n_excess} people have SS sub-components exceeding " - f"their total SS income" + f"their computed total SS income" )