Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/fix-ss-subcomponent-reconciliation.fixed.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Reconcile SS sub-components after PUF imputation so they sum to social_security.
214 changes: 214 additions & 0 deletions policyengine_us_data/calibration/puf_impute.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,13 @@
"self_employment_income_would_be_qualified",
]

SS_SUBCOMPONENTS = [
"social_security_retirement",
"social_security_disability",
"social_security_survivors",
"social_security_dependents",
]

OVERRIDDEN_IMPUTED_VARIABLES = [
"partnership_s_corp_income",
"interest_deduction",
Expand Down Expand Up @@ -155,6 +162,210 @@
]


MINIMUM_RETIREMENT_AGE = 62

SS_SPLIT_PREDICTORS = [
"age",
"is_male",
"tax_unit_is_joint",
"is_tax_unit_head",
"is_tax_unit_dependent",
]

MIN_QRF_TRAINING_RECORDS = 100


def _qrf_ss_shares(
data: Dict[str, Dict[int, np.ndarray]],
n_cps: int,
time_period: int,
puf_has_ss: np.ndarray,
) -> Optional[Dict[str, np.ndarray]]:
"""Predict SS sub-component shares via QRF.

Trains on CPS records that have SS > 0 (where the reason-code
split is known), then predicts shares for all PUF records with
positive SS. The CPS-PUF link is statistical (not identity-based),
so the QRF gives a better expected prediction than using the
paired CPS record's split.

Args:
data: Dataset dict.
n_cps: Records in CPS half.
time_period: Tax year.
puf_has_ss: Boolean mask (length n_cps) — True where the
PUF half has positive social_security.

Returns:
Dict mapping sub-component name to predicted share arrays
(length = puf_has_ss.sum()), or None if training data is
insufficient.
"""
from microimpute.models.qrf import QRF

cps_ss = data["social_security"][time_period][:n_cps]
has_ss = cps_ss > 0

if has_ss.sum() < MIN_QRF_TRAINING_RECORDS:
return None

# Build training features from available predictors.
predictors = []
train_cols = {}
test_cols = {}
for pred in SS_SPLIT_PREDICTORS:
if pred not in data:
continue
vals = data[pred][time_period][:n_cps]
train_cols[pred] = vals[has_ss].astype(np.float32)
test_cols[pred] = vals[puf_has_ss].astype(np.float32)
predictors.append(pred)

if not predictors:
return None

X_train = pd.DataFrame(train_cols)
X_test = pd.DataFrame(test_cols)

# Training targets: share going to each sub-component (0 or 1).
share_vars = []
with np.errstate(divide="ignore", invalid="ignore"):
for sub in SS_SUBCOMPONENTS:
if sub not in data:
continue
sub_vals = data[sub][time_period][:n_cps][has_ss]
share_name = sub + "_share"
X_train[share_name] = np.where(
cps_ss[has_ss] > 0,
sub_vals / cps_ss[has_ss],
0.0,
)
share_vars.append(share_name)

if not share_vars:
return None

qrf = QRF(log_level="WARNING", memory_efficient=True)
try:
fitted = qrf.fit(
X_train=X_train[predictors + share_vars],
predictors=predictors,
imputed_variables=share_vars,
n_jobs=1,
)
predictions = fitted.predict(X_test=X_test)
except Exception:
logger.warning("QRF SS split failed, falling back to heuristic")
return None

# Clip to [0, 1] and normalise so shares sum to 1.
shares = {}
total = np.zeros(len(X_test))
for sub in SS_SUBCOMPONENTS:
key = sub + "_share"
if key in predictions.columns:
s = np.clip(predictions[key].values, 0, 1)
shares[sub] = s
total += s

for sub in shares:
shares[sub] = np.where(total > 0, shares[sub] / total, 0.0)

del fitted, predictions
gc.collect()

logger.info(
"QRF SS split: predicted shares for %d PUF records",
puf_has_ss.sum(),
)
return shares


def _age_heuristic_ss_shares(
data: Dict[str, Dict[int, np.ndarray]],
n_cps: int,
time_period: int,
puf_has_ss: np.ndarray,
) -> Dict[str, np.ndarray]:
"""Fallback: assign SS type by age threshold.

Age >= 62 -> retirement, < 62 -> disability.
If age is unavailable, all go to retirement.
"""
n_pred = puf_has_ss.sum()
shares = {sub: np.zeros(n_pred) for sub in SS_SUBCOMPONENTS}

age = None
if "age" in data:
age = data["age"][time_period][:n_cps][puf_has_ss]

if age is not None:
is_old = age >= MINIMUM_RETIREMENT_AGE
if "social_security_retirement" in shares:
shares["social_security_retirement"] = is_old.astype(np.float64)
if "social_security_disability" in shares:
shares["social_security_disability"] = (~is_old).astype(np.float64)
else:
if "social_security_retirement" in shares:
shares["social_security_retirement"] = np.ones(n_pred)

return shares


def reconcile_ss_subcomponents(
data: Dict[str, Dict[int, np.ndarray]],
n_cps: int,
time_period: int,
) -> None:
"""Predict SS sub-components for PUF half from demographics.

The CPS-PUF link is statistical (not identity-based), so the
paired CPS record's sub-component split is just one noisy draw.
A QRF trained on all CPS SS recipients gives a better expected
prediction by pooling across the full training set.

For all PUF records with positive social_security, this function
predicts shares via QRF (falling back to an age heuristic) and
scales them to match the imputed total. PUF records with zero
SS get all sub-components cleared to zero.

Modifies ``data`` in place. Only the PUF half (indices
n_cps .. 2*n_cps) is changed.

Args:
data: Dataset dict {variable: {time_period: array}}.
n_cps: Number of records in the CPS half.
time_period: Tax year key into data dicts.
"""
if "social_security" not in data:
return

puf_ss = data["social_security"][time_period][n_cps:]
puf_has_ss = puf_ss > 0

# Predict shares for all PUF records with SS > 0.
shares = None
if puf_has_ss.any():
shares = _qrf_ss_shares(data, n_cps, time_period, puf_has_ss)
if shares is None:
shares = _age_heuristic_ss_shares(
data, n_cps, time_period, puf_has_ss
)

for sub in SS_SUBCOMPONENTS:
if sub not in data:
continue
arr = data[sub][time_period]

new_puf = np.zeros(n_cps)
if puf_has_ss.any() and shares is not None:
share = shares.get(sub, np.zeros(puf_has_ss.sum()))
new_puf[puf_has_ss] = puf_ss[puf_has_ss] * share

arr[n_cps:] = new_puf.astype(arr.dtype)
data[sub][time_period] = arr


def puf_clone_dataset(
data: Dict[str, Dict[int, np.ndarray]],
state_fips: np.ndarray,
Expand Down Expand Up @@ -271,6 +482,9 @@ def _map_to_entity(pred_values, variable_name):
if cps_sim is not None:
del cps_sim

# Ensure SS sub-components match the (possibly imputed) total.
reconcile_ss_subcomponents(new_data, person_count, time_period)

logger.info(
"PUF clone complete: %d -> %d households",
n_households,
Expand Down
34 changes: 34 additions & 0 deletions policyengine_us_data/datasets/cps/extended_cps.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,42 @@ def generate(self):
dataset_path=str(self.cps.file_path),
)

new_data = self._drop_formula_variables(new_data)
self.save_dataset(new_data)

# Variables with formulas that must still be stored (e.g. IDs
# needed by the dataset loader before formulas can run).
_KEEP_FORMULA_VARS = {"person_id"}

@classmethod
def _drop_formula_variables(cls, data):
"""Remove variables that are computed by policyengine-us.

Variables with formulas, ``adds``, or ``subtracts`` are
recomputed by the simulation engine, so storing them wastes
space and can mislead validation.
"""
from policyengine_us import CountryTaxBenefitSystem

tbs = CountryTaxBenefitSystem()
formula_vars = {
name
for name, var in tbs.variables.items()
if (hasattr(var, "formulas") and len(var.formulas) > 0)
or getattr(var, "adds", None)
or getattr(var, "subtracts", None)
} - cls._KEEP_FORMULA_VARS
dropped = sorted(set(data.keys()) & formula_vars)
if dropped:
logger.info(
"Dropping %d formula variables: %s",
len(dropped),
dropped,
)
for var in dropped:
del data[var]
return data


class ExtendedCPS_2024(ExtendedCPS):
cps = CPS_2024_Full
Expand Down
Loading