diff --git a/Pipfile b/Pipfile index 2e0a87d9..4534dd06 100644 --- a/Pipfile +++ b/Pipfile @@ -33,8 +33,10 @@ cfl-common = "==8.9.19" # TODO: remove codeforlife-portal = "==8.9.19" # TODO: remove rapid-router = "==7.6.18" # TODO: remove phonenumbers = "==8.12.12" # TODO: remove -google-auth = "==2.40.3" +google-auth = "==2.48.0" google-cloud-bigquery = "==3.38.0" +tink = {version = "==1.13.0", extras = ["gcpkms"]} +cachetools = "==6.2.6" [dev-packages] celery-types = "==0.23.0" @@ -57,6 +59,7 @@ django-stubs = {version = "==5.1.3", extras = ["compatible-mypy"]} djangorestframework-stubs = {version = "==3.15.3", extras = ["compatible-mypy"]} types-regex = "==2024.11.6.*" types-psutil = "==7.0.0.20250601" +types-cachetools = "==6.2.*" [requires] python_version = "3.12" diff --git a/Pipfile.lock b/Pipfile.lock index 84e2fb5c..18df9d2b 100644 --- a/Pipfile.lock +++ b/Pipfile.lock @@ -1,7 +1,7 @@ { "_meta": { "hash": { - "sha256": "e7f16b2d14b73ca8af83232206b594726dcda0856f51945cdf2410edc86f4472" + "sha256": "696ecd4e6a607df072aa76da1da2fb578391ee41d7cfa9902a0de680fd1e2cc4" }, "pipfile-spec": 6, "requires": { @@ -16,6 +16,14 @@ ] }, "default": { + "absl-py": { + "hashes": [ + "sha256:88476fd881ca8aab94ffa78b7b6c632a782ab3ba1cd19c9bd423abc4fb4cd28d", + "sha256:8c6af82722b35cf71e0f4d1d47dcaebfff286e27110a99fc359349b247dfb5d4" + ], + "markers": "python_version >= '3.10'", + "version": "==2.4.0" + }, "amqp": { "hashes": [ "sha256:43b3319e1b4e7d1251833a93d672b4af1e40f3d632d479b98661a95f117880a2", @@ -40,6 +48,13 @@ "markers": "python_version >= '3.8'", "version": "==3.0.1" }, + "bazel-runfiles": { + "hashes": [ + "sha256:57a2cc04e0b924606e8dd70fc8b31d157db43680a300f546dc60207b5ce7ca82" + ], + "markers": "python_version >= '3.7'", + "version": "==1.8.3" + }, "billiard": { "hashes": [ "sha256:525b42bdec68d2b983347ac312f892db930858495db601b5836ac24e6477cde5", @@ -67,11 +82,12 @@ }, "cachetools": { "hashes": [ - "sha256:1a661caa9175d26759571b2e19580f9d6393969e5dfca11fdb1f947a23e640d4", - "sha256:d26a22bcc62eb95c3beabd9f1ee5e820d3d2704fe2967cbe350e20c8ffcd3f0a" + "sha256:16c33e1f276b9a9c0b49ab5782d901e3ad3de0dd6da9bf9bcd29ac5672f2f9e6", + "sha256:8c9717235b3c651603fff0076db52d6acbfd1b338b8ed50256092f7ce9c85bda" ], - "markers": "python_version >= '3.7'", - "version": "==5.5.2" + "index": "pypi", + "markers": "python_version >= '3.9'", + "version": "==6.2.6" }, "celery": { "extras": [ @@ -590,12 +606,12 @@ }, "google-auth": { "hashes": [ - "sha256:1370d4593e86213563547f97a92752fc658456fe4514c809544f330fed45a7ca", - "sha256:500c3a29adedeb36ea9cf24b8d10858e152f2412e3ca37829b3fa18e33d63b77" + "sha256:2e2a537873d449434252a9632c28bfc268b0adb1e53f9fb62afc5333a975903f", + "sha256:4f7e706b0cd3208a3d940a19a822c37a476ddba5450156c3e6624a71f7c841ce" ], "index": "pypi", - "markers": "python_version >= '3.7'", - "version": "==2.40.3" + "markers": "python_version >= '3.8'", + "version": "==2.48.0" }, "google-cloud-bigquery": { "hashes": [ @@ -614,6 +630,14 @@ "markers": "python_version >= '3.7'", "version": "==2.5.0" }, + "google-cloud-kms": { + "hashes": [ + "sha256:2060d56cebe856ecdff04b6ea6b477a498b620ed57577804e0aea2950a2edab1", + "sha256:adc9924ec7bafe1b01236f8f5aba706902b39d64abceef0b4f739725dc66d08b" + ], + "markers": "python_version >= '3.7'", + "version": "==3.10.0" + }, "google-crc32c": { "hashes": [ "sha256:014a7e68d623e9a4222d663931febc3033c5c7c9730785727de2a81f87d5bab8", @@ -662,6 +686,9 @@ "version": "==2.8.0" }, "googleapis-common-protos": { + "extras": [ + "grpc" + ], "hashes": [ "sha256:4299c5a82d5ae1a9702ada957347726b167f9f8d1fc352477702a1e851ff4038", "sha256:e55a601c1b32b52d7a3e65f43563e2aa61bcd737998ee672ac9b951cd49319f5" @@ -669,6 +696,14 @@ "markers": "python_version >= '3.7'", "version": "==1.72.0" }, + "grpc-google-iam-v1": { + "hashes": [ + "sha256:7a7f697e017a067206a3dfef44e4c634a34d3dee135fe7d7a4613fe3e59217e6", + "sha256:879ac4ef33136c5491a6300e27575a9ec760f6cdf9a2518798c1b8977a5dc389" + ], + "markers": "python_version >= '3.7'", + "version": "==0.14.3" + }, "grpcio": { "hashes": [ "sha256:035d90bc79eaa4bed83f524331d55e35820725c9fbb00ffa1904d5550ed7ede3", @@ -1239,11 +1274,11 @@ }, "proto-plus": { "hashes": [ - "sha256:1baa7f81cf0f8acb8bc1f6d085008ba4171eaf669629d1b6d1673b21ed1c0a82", - "sha256:873af56dd0d7e91836aee871e5799e1c6f1bda86ac9a983e0bb9f0c266a568c4" + "sha256:912a7460446625b792f6448bade9e55cd4e41e6ac10e27009ef71a7f317fa147", + "sha256:e4643061f3a4d0de092d62aa4ad09fa4756b2cbb89d4627f3985018216f9fefc" ], "markers": "python_version >= '3.7'", - "version": "==1.27.0" + "version": "==1.27.1" }, "protobuf": { "hashes": [ @@ -1769,6 +1804,36 @@ "markers": "python_version >= '3.9'", "version": "==3.7.0" }, + "tink": { + "extras": [ + "gcpkms" + ], + "hashes": [ + "sha256:011897f8d3c20b094bcf4b2cb2cbf09a8319bd24a19570014022f888c53248bb", + "sha256:0af46b6fe37831e99de16b5159842d53a46317775153258d4e591592abc07f99", + "sha256:0e40472ad3eb344ce822c10298a0ff7430e527a8705d956464ebe0fb93c3a0b0", + "sha256:2045c226c81e23193d3c5eb0f264918ceb06ee9f4de7933427e0f701e088a3c1", + "sha256:27281112714fd07310594a4fcda069d6be215a9eecbc31f86e4e4e01e0c8d860", + "sha256:2753a379fcf21f9055b157e0fc090aa0359b9cb70fe146ff37d8c15933941c38", + "sha256:32bbaee98f3f6065ef9d3e9b7f8ee69a254d35ecb37ca1fe4b3b5b501e4dabf0", + "sha256:4ede7d7ac3dcfd767f8082e6ba2c38164e57aa18dd4881050439367906723475", + "sha256:55bdc12d35d7a332ec6e7ffc2f0a887866ecaa5fdccf2194c1bb176282a59b90", + "sha256:5936a4bbcd9f42df50a371d9575124009e47c9666aec71322192e404992d1bc2", + "sha256:5b7ce1c56a208bf191db2386150bdf42e6cfc2bd3dad44054ed045d8ae6ec7f3", + "sha256:6e50a0ad53d80dedba0b1368977058fa6a6cdf6c05d06a1f88971753b7dcd3c7", + "sha256:71e71df02035f0682237f14ea48325533b3a1ffb7f33d3970f56a8cf56cdcf25", + "sha256:75dcf4bbd5047e94481904a3d52e911ac1d807dda02e406bbcc47b703bec56a7", + "sha256:9b9e5a1b5102c69623b65a57a16fe4bed17e6bda535b6fa7af3d09b40633436b", + "sha256:bf785ad6dc5d98343d23e2a38b26093cd4ef1a7a6228b58cc7789f1ca2f84977", + "sha256:c4c61925014e2b59125e8b6afa6c5ce6f12fbcd05a346fb9bf6de83db2cc2b39", + "sha256:dca4950ffe762367771db1bc79241e6589f889360f3d023b32ba29282c2e4579", + "sha256:f2d39c87a982a43f848807c779828012e888e527e6c5f4152df81cca0134abb5", + "sha256:f7e525198e2fe6d45e7507d3c7af44a8d518c003fd3b135ea9f64283dd8ff2b1", + "sha256:fed609dd558df2a37ed8395d485990e582f69083aa9d3c172c31da5c034cd74c" + ], + "markers": "python_version >= '3.9'", + "version": "==1.13.0" + }, "traitlets": { "hashes": [ "sha256:9ed0579d3502c94b4b3732ac120375cda96f923114522847de4b3bb98b96b6b7", @@ -1918,11 +1983,11 @@ }, "botocore-stubs": { "hashes": [ - "sha256:96cf63b6949264f078f5009fa72932c6fd06a88c7bde4e21ed96df6c37e8edac", - "sha256:e21dadac08ee134cc0713884545c01b8eb1552c77e0d0c55c05e9584590e2b28" + "sha256:3038388fb54db85ddccb1d2780cfe8fefed515a2c63bb98d877e6cbf338eb645", + "sha256:3ece9db3bfbf33152cbaff8f3360a791b936f3e55fd4b65f88bba4da2026ec09" ], "markers": "python_version >= '3.9'", - "version": "==1.42.38" + "version": "==1.42.40" }, "celery-types": { "hashes": [ @@ -2073,101 +2138,101 @@ "toml" ], "hashes": [ - "sha256:044c6951ec37146b72a50cc81ef02217d27d4c3640efd2640311393cbbf143d3", - "sha256:060ebf6f2c51aff5ba38e1f43a2095e087389b1c69d559fde6049a4b0001320e", - "sha256:060ee84f6a769d40c492711911a76811b4befb6fba50abb450371abb720f5bd6", - "sha256:10758e0586c134a0bafa28f2d37dd2cdb5e4a90de25c0fc0c77dabbad46eca28", - "sha256:13fe81ead04e34e105bf1b3c9f9cdf32ce31736ee5d90a8d2de02b9d3e1bcb82", - "sha256:14ae4146465f8e6e6253eba0cccd57423e598a4cb925958b240c805300918343", - "sha256:14f500232e521201cf031549fb1ebdfc0a40f401cf519157f76c397e586c3beb", - "sha256:1574983178b35b9af4db4a9f7328a18a14a0a0ce76ffaa1c1bacb4cc82089a7c", - "sha256:196bfeabdccc5a020a57d5a368c681e3a6ceb0447d153aeccc1ab4d70a5032ba", - "sha256:21dd57941804ae2ac7e921771a5e21bbf9aabec317a041d164853ad0a96ce31e", - "sha256:264657171406c114787b441484de620e03d8f7202f113d62fcd3d9688baa3e6f", - "sha256:27ba1ed6f66b0e2d61bfa78874dffd4f8c3a12f8e2b5410e515ab345ba7bc9c3", - "sha256:292250282cf9bcf206b543d7608bda17ca6fc151f4cbae949fc7e115112fbd41", - "sha256:2a47a4223d3361b91176aedd9d4e05844ca67d7188456227b6bf5e436630c9a1", - "sha256:2a5b567f0b635b592c917f96b9a9cb3dbd4c320d03f4bf94e9084e494f2e8894", - "sha256:2ee0e58cca0c17dd9c6c1cdde02bb705c7b3fbfa5f3b0b5afeda20d4ebff8ef4", - "sha256:36393bd2841fa0b59498f75466ee9bdec4f770d3254f031f23e8fd8e140ffdd2", - "sha256:387a825f43d680e7310e6f325b2167dd093bc8ffd933b83e9aa0983cf6e0a2ef", - "sha256:3973f353b2d70bd9796cc12f532a05945232ccae966456c8ed7034cb96bbfd6f", - "sha256:3bca209d001fd03ea2d978f8a4985093240a355c93078aee3f799852c23f561a", - "sha256:406821f37f864f968e29ac14c3fccae0fec9fdeba48327f0341decf4daf92d7c", - "sha256:40ce1ea1e25125556d8e76bd0b61500839a07944cc287ac21d5626f3e620cad5", - "sha256:49d49e9a5e9f4dc3d3dac95278a020afa6d6bdd41f63608a76fa05a719d5b66f", - "sha256:4a3158dc2dcce5200d91ec28cd315c999eebff355437d2765840555d765a6e5f", - "sha256:4f7b71757a3ab19f7ba286e04c181004c1d61be921795ee8ba6970fd0ec91da5", - "sha256:59562de3f797979e1ff07c587e2ac36ba60ca59d16c211eceaa579c266c5022f", - "sha256:5b20211c47a8abf4abc3319d8ce2464864fa9f30c5fcaf958a3eed92f4f1fef8", - "sha256:5bd447332ec4f45838c1ad42268ce21ca87c40deb86eabd59888859b66be22a5", - "sha256:6326e18e9a553e674d948536a04a80d850a5eeefe2aae2e6d7cf05d54046c01b", - "sha256:6873f0271b4a15a33e7590f338d823f6f66f91ed147a03938d7ce26efd04eee6", - "sha256:68c86173562ed4413345410c9480a8d64864ac5e54a5cda236748031e094229f", - "sha256:69269ab58783e090bfbf5b916ab3d188126e22d6070bbfc93098fdd474ef937c", - "sha256:69e526e14f3f854eda573d3cf40cffd29a1a91c684743d904c33dbdcd0e0f3e7", - "sha256:6ae99e4560963ad8e163e819e5d77d413d331fd00566c1e0856aa252303552c1", - "sha256:6b8092aa38d72f091db61ef83cb66076f18f02da3e1a75039a4f218629600e04", - "sha256:6e5bbb5018bf76a56aabdb64246b5288d5ae1b7d0dd4d0534fe86df2c2992d1c", - "sha256:76e06ccacd1fb6ada5d076ed98a8c6f66e2e6acd3df02819e2ee29fd637b76ad", - "sha256:78f45d21dc4d5d6bd29323f0320089ef7eae16e4bef712dff79d184fa7330af3", - "sha256:79f6506a678a59d4ded048dc72f1859ebede8ec2b9a2d509ebe161f01c2879d3", - "sha256:7be4d613638d678b2b3773b8f687537b284d7074695a43fe2fbbfc0e31ceaed1", - "sha256:7c79ad5c28a16a1277e1187cf83ea8dafdcc689a784228a7d390f19776db7c31", - "sha256:7de326f80e3451bd5cc7239ab46c73ddb658fe0b7649476bc7413572d36cd548", - "sha256:7f9405ab4f81d490811b1d91c7a20361135a2df4c170e7f0b747a794da5b7f23", - "sha256:838943bea48be0e2768b0cf7819544cdedc1bbb2f28427eabb6eb8c9eb2285d3", - "sha256:88a800258d83acb803c38175b4495d293656d5fac48659c953c18e5f539a274b", - "sha256:89567798404af067604246e01a49ef907d112edf2b75ef814b1364d5ce267031", - "sha256:8a0b33e9fd838220b007ce8f299114d406c1e8edb21336af4c97a26ecfd185aa", - "sha256:8be48da4d47cc68754ce643ea50b3234557cbefe47c2f120495e7bd0a2756f2b", - "sha256:9074896edd705a05769e3de0eac0a8388484b503b68863dd06d5e473f874fd47", - "sha256:93b57142f9621b0d12349c43fc7741fe578e4bc914c1e5a54142856cfc0bf421", - "sha256:93d1d25ec2b27e90bcfef7012992d1f5121b51161b8bffcda756a816cf13c2c3", - "sha256:9779310cb5a9778a60c899f075a8514c89fa6d10131445c2207fc893e0b14557", - "sha256:97e596de8fa9bada4d88fde64a3f4d37f1b6131e4faa32bad7808abc79887ddc", - "sha256:9b2f4714bb7d99ba3790ee095b3b4ac94767e1347fe424278a0b10acb3ff04fe", - "sha256:9c9bdea644e94fd66d75a6f7e9a97bb822371e1fe7eadae2cacd50fcbc28e4dc", - "sha256:9cc7573518b7e2186bd229b1a0fe24a807273798832c27032c4510f47ffdb896", - "sha256:9f93959ee0c604bccd8e0697be21de0887b1f73efcc3aa73a3ec0fd13feace92", - "sha256:a360a8baeb038928ceb996f5623a4cd508728f8f13e08d4e96ce161702f3dd99", - "sha256:a43d34ce714f4ca674c0d90beb760eb05aad906f2c47580ccee9da8fe8bfb417", - "sha256:a55516c68ef3e08e134e818d5e308ffa6b1337cc8b092b69b24287bf07d38e31", - "sha256:a7fc042ba3c7ce25b8a9f097eb0f32a5ce1ccdb639d9eec114e26def98e1f8a4", - "sha256:abaea04f1e7e34841d4a7b343904a3f59481f62f9df39e2cd399d69a187a9660", - "sha256:ae47d8dcd3ded0155afbb59c62bd8ab07ea0fd4902e1c40567439e6db9dcaf2f", - "sha256:b01899e82a04085b6561eb233fd688474f57455e8ad35cd82286463ba06332b7", - "sha256:b3becbea7f3ce9a2d4d430f223ec15888e4deb31395840a79e916368d6004cce", - "sha256:b780090d15fd58f07cf2011943e25a5f0c1c894384b13a216b6c86c8a8a7c508", - "sha256:b7fc50d2afd2e6b4f6f2f403b70103d280a8e0cb35320cbbe6debcda02a1030b", - "sha256:bff1b04cb9d4900ce5c56c4942f047dc7efe57e2608cb7c3c8936e9970ccdbee", - "sha256:c1ea8ca9db5e7469cd364552985e15911548ea5b69c48a17291f0cac70484b2e", - "sha256:c6cadac7b8ace1ba9144feb1ae3cb787a6065ba6d23ffc59a934b16406c26573", - "sha256:c6f141b468740197d6bd38f2b26ade124363228cc3f9858bd9924ab059e00059", - "sha256:ca9566769b69a5e216a4e176d54b9df88f29d750c5b78dbb899e379b4e14b30c", - "sha256:d0ba505e021557f7f8173ee8cd6b926373d8653e5ff7581ae2efce1b11ef4c27", - "sha256:d6d16b0f71120e365741bca2cb473ca6fe38930bc5431c5e850ba949f708f892", - "sha256:d7f63ce526a96acd0e16c4af8b50b64334239550402fb1607ce6a584a6d62ce9", - "sha256:e080afb413be106c95c4ee96b4fffdc9e2fa56a8bbf90b5c0918e5c4449412f5", - "sha256:e4121a90823a063d717a96e0a0529c727fb31ea889369a0ee3ec00ed99bf6859", - "sha256:e64fa5a1e41ce5df6b547cbc3d3699381c9e2c2c369c67837e716ed0f549d48e", - "sha256:e79a8c7d461820257d9aa43716c4efc55366d7b292e46b5b37165be1d377405d", - "sha256:ed2bce0e7bfa53f7b0b01c722da289ef6ad4c18ebd52b1f93704c21f116360c8", - "sha256:ed75de7d1217cf3b99365d110975f83af0528c849ef5180a12fd91b5064df9d6", - "sha256:ee68e5a4e3e5443623406b905db447dceddffee0dceb39f4e0cd9ec2a35004b5", - "sha256:eeea10169fac01549a7921d27a3e517194ae254b542102267bef7a93ed38c40e", - "sha256:f06799ae1bdfff7ccb8665d75f8291c69110ba9585253de254688aa8a1ccc6c5", - "sha256:f0d7fea9d8e5d778cd5a9e8fc38308ad688f02040e883cdc13311ef2748cb40f", - "sha256:f106b2af193f965d0d3234f3f83fc35278c7fb935dfbde56ae2da3dd2c03b84d", - "sha256:f4af3b01763909f477ea17c962e2cca8f39b350a4e46e3a30838b2c12e31b81b", - "sha256:f61d349f5b7cd95c34017f1927ee379bfbe9884300d74e07cf630ccf7a610c1b", - "sha256:f674f59712d67e841525b99e5e2b595250e39b529c3bda14764e4f625a3fa01f", - "sha256:f819c727a6e6eeb8711e4ce63d78c620f69630a2e9d53bc95ca5379f57b6ba94", - "sha256:f9ab1d5b86f8fbc97a5b3cd6280a3fd85fef3b028689d8a2c00918f0d82c728c", - "sha256:fae91dfecd816444c74531a9c3d6ded17a504767e97aa674d44f638107265b99" + "sha256:00d34b29a59d2076e6f318b30a00a69bf63687e30cd882984ed444e753990cc1", + "sha256:00dd3f02de6d5f5c9c3d95e3e036c3c2e2a669f8bf2d3ceb92505c4ce7838f67", + "sha256:01119735c690786b6966a1e9f098da4cd7ca9174c4cfe076d04e653105488395", + "sha256:03a6e5e1e50819d6d7436f5bc40c92ded7e484e400716886ac921e35c133149d", + "sha256:05dd25b21afffe545e808265897c35f32d3e4437663923e0d256d9ab5031fb14", + "sha256:06d316dbb3d9fd44cca05b2dbcfbef22948493d63a1f28e828d43e6cc505fed8", + "sha256:06e49c5897cb12e3f7ecdc111d44e97c4f6d0557b81a7a0204ed70a8b038f86f", + "sha256:0b4f345f7265cdbdb5ec2521ffff15fa49de6d6c39abf89fc7ad68aa9e3a55f0", + "sha256:0c2be202a83dde768937a61cdc5d06bf9fb204048ca199d93479488e6247656c", + "sha256:0f45e32ef383ce56e0ca099b2e02fcdf7950be4b1b56afaab27b4ad790befe5b", + "sha256:123ceaf2b9d8c614f01110f908a341e05b1b305d6b2ada98763b9a5a59756051", + "sha256:16d23d6579cf80a474ad160ca14d8b319abaa6db62759d6eef53b2fc979b58c8", + "sha256:2213a8d88ed35459bda71597599d4eec7c2ebad201c88f0bfc2c26fd9b0dd2ea", + "sha256:24db3959de8ee394eeeca89ccb8ba25305c2da9a668dd44173394cbd5aa0777f", + "sha256:284e06eadfe15ddfee2f4ee56631f164ef897a7d7d5a15bca5f0bb88889fc5ba", + "sha256:299d66e9218193f9dc6e4880629ed7c4cd23486005166247c283fb98531656c3", + "sha256:2d098709621d0819039f3f1e471ee554f55a0b2ac0d816883c765b14129b5627", + "sha256:2f5e731627a3d5ef11a2a35aa0c6f7c435867c7ccbc391268eb4f2ca5dbdcc10", + "sha256:303d38b19626c1981e1bb067a9928236d88eb0e4479b18a74812f05a82071508", + "sha256:318002f1fd819bdc1651c619268aa5bc853c35fa5cc6d1e8c96bd9cd6c828b75", + "sha256:318b2e4753cbf611061e01b6cc81477e1cdfeb69c36c4a14e6595e674caadb56", + "sha256:31b6e889c53d4e6687ca63706148049494aace140cffece1c4dc6acadb70a7b3", + "sha256:343aaeb5f8bb7bcd38620fd7bc56e6ee8207847d8c6103a1e7b72322d381ba4a", + "sha256:3d1aed4f4e837a832df2f3b4f68a690eede0de4560a2dbc214ea0bc55aabcdb4", + "sha256:3f379b02c18a64de78c4ccdddf1c81c2c5ae1956c72dacb9133d7dd7809794ab", + "sha256:44f14a62f5da2e9aedf9080e01d2cda61df39197d48e323538ec037336d68da8", + "sha256:46d29926349b5c4f1ea4fca95e8c892835515f3600995a383fa9a923b5739ea4", + "sha256:51c4c42c0e7d09a822b08b6cf79b3c4db8333fffde7450da946719ba0d45730f", + "sha256:53be4aab8ddef18beb6188f3a3fdbf4d1af2277d098d4e618be3a8e6c88e74be", + "sha256:562136b0d401992118d9b49fbee5454e16f95f85b120a4226a04d816e33fe024", + "sha256:5907605ee20e126eeee2abe14aae137043c2c8af2fa9b38d2ab3b7a6b8137f73", + "sha256:59224bfb2e9b37c1335ae35d00daa3a5b4e0b1a20f530be208fff1ecfa436f43", + "sha256:5b1ad2e0dc672625c44bc4fe34514602a9fd8b10d52ddc414dc585f74453516c", + "sha256:5badd7e596e6b0c89aa8ec6d37f4473e4357f982ce57f9a2942b0221cd9cf60c", + "sha256:5d67b9ed6f7b5527b209b24b3df9f2e5bf0198c1bbf99c6971b0e2dcb7e2a107", + "sha256:65436cde5ecabe26fb2f0bf598962f0a054d3f23ad529361326ac002c61a2a1e", + "sha256:6ed2e787249b922a93cd95c671cc9f4c9797a106e81b455c83a9ddb9d34590c0", + "sha256:71295f2d1d170b9977dc386d46a7a1b7cbb30e5405492529b4c930113a33f895", + "sha256:75b3c0300f3fa15809bd62d9ca8b170eb21fcf0100eb4b4154d6dc8b3a5bbd43", + "sha256:79f2670c7e772f4917895c3d89aad59e01f3dbe68a4ed2d0373b431fad1dcfba", + "sha256:7a482f2da9086971efb12daca1d6547007ede3674ea06e16d7663414445c683e", + "sha256:7bbb5aa9016c4c29e3432e087aa29ebee3f8fda089cfbfb4e6d64bd292dcd1c2", + "sha256:7df8759ee57b9f3f7b66799b7660c282f4375bef620ade1686d6a7b03699e75f", + "sha256:824bb95cd71604031ae9a48edb91fd6effde669522f960375668ed21b36e3ec4", + "sha256:853c3d3c79ff0db65797aad79dee6be020efd218ac4510f15a205f1e8d13ce25", + "sha256:87ff33b652b3556b05e204ae20793d1f872161b0fa5ec8a9ac76f8430e152ed6", + "sha256:8bb09e83c603f152d855f666d70a71765ca8e67332e5829e62cb9466c176af23", + "sha256:8f1010029a5b52dc427c8e2a8dbddb2303ddd180b806687d1acd1bb1d06649e7", + "sha256:8f2adf4bcffbbec41f366f2e6dffb9d24e8172d16e91da5799c9b7ed6b5716e6", + "sha256:90a8af9dba6429b2573199622d72e0ebf024d6276f16abce394ad4d181bb0910", + "sha256:94d2ac94bd0cc57c5626f52f8c2fffed1444b5ae8c9fc68320306cc2b255e155", + "sha256:96c3be8bae9d0333e403cc1a8eb078a7f928b5650bae94a18fb4820cc993fb9b", + "sha256:989aa158c0eb19d83c76c26f4ba00dbb272485c56e452010a3450bdbc9daafd9", + "sha256:99fee45adbb1caeb914da16f70e557fb7ff6ddc9e4b14de665bd41af631367ef", + "sha256:9db3a3285d91c0b70fab9f39f0a4aa37d375873677efe4e71e58d8321e8c5d39", + "sha256:9f9efbbaf79f935d5fbe3ad814825cbce4f6cdb3054384cb49f0c0f496125fa0", + "sha256:a2f7589c6132c44c53f6e705e1a6677e2b7821378c22f7703b2cf5388d0d4587", + "sha256:a88705500988c8acad8b8fd86c2a933d3aa96bec1ddc4bc5cb256360db7bbd00", + "sha256:ab6d72bffac9deb6e6cb0f61042e748de3f9f8e98afb0375a8e64b0b6e11746b", + "sha256:ae9306b5299e31e31e0d3b908c66bcb6e7e3ddca143dea0266e9ce6c667346d3", + "sha256:b2182129f4c101272ff5f2f18038d7b698db1bf8e7aa9e615cb48440899ad32e", + "sha256:b2beb64c145593a50d90db5c7178f55daeae129123b0d265bdb3cbec83e5194a", + "sha256:b607a40cba795cfac6d130220d25962931ce101f2f478a29822b19755377fb34", + "sha256:be14d0622125edef21b3a4d8cd2d138c4872bf6e38adc90fd92385e3312f406a", + "sha256:bfeee64ad8b4aae3233abb77eb6b52b51b05fa89da9645518671b9939a78732b", + "sha256:c5e9787cec750793a19a28df7edd85ac4e49d3fb91721afcdc3b86f6c08d9aa8", + "sha256:c672d4e2f0575a4ca2bf2aa0c5ced5188220ab806c1bb6d7179f70a11a017222", + "sha256:c6f6169bbdbdb85aab8ac0392d776948907267fcc91deeacf6f9d55f7a83ae3b", + "sha256:ca46e5c3be3b195098dd88711890b8011a9fa4feca942292bb84714ce5eab5d3", + "sha256:cc7fd0f726795420f3678ac82ff882c7fc33770bd0074463b5aef7293285ace9", + "sha256:cd5dee4fd7659d8306ffa79eeaaafd91fa30a302dac3af723b9b469e549247e0", + "sha256:d1a049b5c51b3b679928dd35e47c4a2235e0b6128b479a7596d0ef5b42fa6301", + "sha256:d358dc408edc28730aed5477a69338e444e62fba0b7e9e4a131c505fadad691e", + "sha256:d3a16d6398666510a6886f67f43d9537bfd0e13aca299688a19daa84f543122f", + "sha256:d401f0864a1d3198422816878e4e84ca89ec1c1bf166ecc0ae01380a39b888cd", + "sha256:d6f4a21328ea49d38565b55599e1c02834e76583a6953e5586d65cb1efebd8f8", + "sha256:db83b77f97129813dbd463a67e5335adc6a6a91db652cc085d60c2d512746f96", + "sha256:debf29e0b157769843dff0981cc76f79e0ed04e36bb773c6cac5f6029054bd8a", + "sha256:dfb428e41377e6b9ba1b0a32df6db5409cb089a0ed1d0a672dc4953ec110d84f", + "sha256:e129328ad1258e49cae0123a3b5fcb93d6c2fa90d540f0b4c7cdcdc019aaa3dc", + "sha256:e5b86db331c682fd0e4be7098e6acee5e8a293f824d41487c667a93705d415ca", + "sha256:ed48b4170caa2c4420e0cd27dc977caaffc7eecc317355751df8373dddcef595", + "sha256:edc7754932682d52cf6e7a71806e529ecd5ce660e630e8bd1d37109a2e5f63ba", + "sha256:f45c9bcb16bee25a798ccba8a2f6a1251b19de6a0d617bb365d7d2f386c4e20e", + "sha256:f75695e157c83d374f88dcc646a60cb94173304a9258b2e74ba5a66b7614a51a", + "sha256:f7f153d0184d45f3873b3ad3ad22694fd73aadcb8cdbc4337ab4b41ea6b4dff1", + "sha256:f7f6182d3dfb8802c1747eacbfe611b669455b69b7c037484bb1efbbb56711ac", + "sha256:f9bada7bc660d20b23d7d312ebe29e927b655cf414dadcdb6335a2075695bd86", + "sha256:fae6a21537519c2af00245e834e5bf2884699cc7c1055738fd0f9dc37a3644ad", + "sha256:fb25061a66802df9fc13a9ba1967d25faa4dae0418db469264fd9860a921dde4", + "sha256:fc970575799a9d17d5c3fafd83a0f6ccf5d5117cdc9ad6fbd791e9ead82418b0", + "sha256:fcda51c918c7a13ad93b5f89a58d56e3a072c9e0ba5c231b0ed81404bf2648fb" ], "markers": "python_version >= '3.10'", - "version": "==7.13.2" + "version": "==7.13.3" }, "dill": { "hashes": [ @@ -2551,6 +2616,15 @@ "markers": "python_version >= '3.8'", "version": "==0.31.1" }, + "types-cachetools": { + "hashes": [ + "sha256:698eb17b8f16b661b90624708b6915f33dbac2d185db499ed57e4997e7962cad", + "sha256:f1d3c736f0f741e89ec10f0e1b0138625023e21eb33603a930c149e0318c0cef" + ], + "index": "pypi", + "markers": "python_version >= '3.9'", + "version": "==6.2.0.20251022" + }, "types-psutil": { "hashes": [ "sha256:0c372e2d1b6529938a080a6ba4a9358e3dfc8526d82fabf40c1ef9325e4ca52e", diff --git a/codeforlife/caches.py b/codeforlife/caches.py deleted file mode 100644 index fba5c50c..00000000 --- a/codeforlife/caches.py +++ /dev/null @@ -1,124 +0,0 @@ -""" -© Ocado Group -Created on 11/08/2025 at 11:07:45(+01:00). -""" - -import typing as t - -from django.core.cache import cache - -K = t.TypeVar("K") -V = t.TypeVar("V") - - -class BaseValueCache(t.Generic[V]): - """Base class which helps to get and set cache values.""" - - @classmethod - def get( - cls, - key: str, - default: t.Optional[V] = None, - version: t.Optional[int] = None, - ) -> t.Optional[V]: - """ - Fetch a given key from the cache. If the key does not exist, return - default, which itself defaults to None. - """ - return cache.get( - key=key, - default=default, - version=version, - ) - - @classmethod - def set( - cls, - key: str, - value: V, - timeout: t.Optional[int] = None, - version: t.Optional[int] = None, - ): - """ - Set a value in the cache. If timeout is given, use that timeout for the - key; otherwise use the default cache timeout. - """ - cache.set( - key=key, - value=value, - timeout=timeout, - version=version, - ) - - -class BaseFixedKeyValueCache(BaseValueCache[V], t.Generic[V]): - """Base class which helps to get and set cache values with a fixed key.""" - - key: str - - # pylint: disable=arguments-differ - - @classmethod - def get( # type: ignore[override] - cls, - default: t.Optional[V] = None, - version: t.Optional[int] = None, - ) -> t.Optional[V]: - return super().get( - key=cls.key, - default=default, - version=version, - ) - - @classmethod - def set( # type: ignore[override] - cls, - value: V, - timeout: t.Optional[int] = None, - version: t.Optional[int] = None, - ): - super().set( - key=cls.key, - value=value, - timeout=timeout, - version=version, - ) - - # pylint: enable=arguments-differ - - -class BaseDynamicKeyValueCache(BaseValueCache[V], t.Generic[K, V]): - """Base class which helps to get and set cache values with a dynamic key.""" - - @staticmethod - def make_key(key: K) -> str: - """Make the cache key from the key's data.""" - raise NotImplementedError() - - @classmethod - def get( # type: ignore[override] - cls, - key: K, - default: t.Optional[V] = None, - version: t.Optional[int] = None, - ) -> t.Optional[V]: - return super().get( - key=cls.make_key(key), - default=default, - version=version, - ) - - @classmethod - def set( # type: ignore[override] - cls, - key: K, - value: V, - timeout: t.Optional[int] = None, - version: t.Optional[int] = None, - ): - super().set( - key=cls.make_key(key), - value=value, - timeout=timeout, - version=version, - ) diff --git a/codeforlife/caches/__init__.py b/codeforlife/caches/__init__.py new file mode 100644 index 00000000..96bb76f3 --- /dev/null +++ b/codeforlife/caches/__init__.py @@ -0,0 +1,8 @@ +""" +© Ocado Group +Created on 28/01/2026 at 16:52:19(+00:00). +""" + +from .base import BaseCache +from .base_dynamic_key import BaseDynamicKeyCache +from .base_fixed_key import BaseFixedKeyCache diff --git a/codeforlife/caches/base.py b/codeforlife/caches/base.py new file mode 100644 index 00000000..9438fa9c --- /dev/null +++ b/codeforlife/caches/base.py @@ -0,0 +1,67 @@ +""" +© Ocado Group +Created on 28/01/2026 at 16:52:19(+00:00). +""" + +import typing as t + +from django.core.cache import cache + +V = t.TypeVar("V") + + +class BaseCache(t.Generic[V]): + """Base class which helps to get and set cache values.""" + + timeout: int | None = None + + @classmethod + def get( + cls, + key: str, + default: t.Optional[V] = None, + version: t.Optional[int] = None, + ) -> t.Optional[V]: + """ + Fetch a given key from the cache. If the key does not exist, return + default, which itself defaults to None. + """ + return cache.get( + key=key, + default=default, + version=version, + ) + + @classmethod + def set( + cls, + key: str, + value: V, + timeout: t.Optional[int] = None, + version: t.Optional[int] = None, + ): + """ + Set a value in the cache. If timeout is given, use that timeout for the + key; otherwise use the default cache timeout. + """ + cache.set( + key=key, + value=value, + timeout=timeout or cls.timeout, + version=version, + ) + + @classmethod + def delete( + cls, + key: str, + version: t.Optional[int] = None, + ): + """ + Delete a key from the cache and return whether it succeeded, failing + silently. + """ + cache.delete( + key=key, + version=version, + ) diff --git a/codeforlife/caches/base_dynamic_key.py b/codeforlife/caches/base_dynamic_key.py new file mode 100644 index 00000000..e3b48628 --- /dev/null +++ b/codeforlife/caches/base_dynamic_key.py @@ -0,0 +1,59 @@ +""" +© Ocado Group +Created on 28/01/2026 at 16:52:19(+00:00). +""" + +import typing as t + +from .base import BaseCache + +V = t.TypeVar("V") +K = t.TypeVar("K") + + +class BaseDynamicKeyCache(BaseCache[V], t.Generic[K, V]): + """Base class which helps to get and set cache values with a dynamic key.""" + + @staticmethod + def make_key(key: K) -> str: + """Make the cache key from the key's data.""" + raise NotImplementedError() + + @classmethod + def get( # type: ignore[override] + cls, + key: K, + default: t.Optional[V] = None, + version: t.Optional[int] = None, + ) -> t.Optional[V]: + return super().get( + key=cls.make_key(key), + default=default, + version=version, + ) + + @classmethod + def set( # type: ignore[override] + cls, + key: K, + value: V, + timeout: t.Optional[int] = None, + version: t.Optional[int] = None, + ): + super().set( + key=cls.make_key(key), + value=value, + timeout=timeout, + version=version, + ) + + @classmethod + def delete( # type: ignore[override] + cls, + key: K, + version: t.Optional[int] = None, + ): + super().delete( + key=cls.make_key(key), + version=version, + ) diff --git a/codeforlife/caches/base_fixed_key.py b/codeforlife/caches/base_fixed_key.py new file mode 100644 index 00000000..4fd1dda5 --- /dev/null +++ b/codeforlife/caches/base_fixed_key.py @@ -0,0 +1,56 @@ +""" +© Ocado Group +Created on 28/01/2026 at 16:52:19(+00:00). +""" + +import typing as t + +from .base import BaseCache + +V = t.TypeVar("V") + + +class BaseFixedKeyCache(BaseCache[V], t.Generic[V]): + """Base class which helps to get and set cache values with a fixed key.""" + + key: str + + # pylint: disable=arguments-differ + + @classmethod + def get( # type: ignore[override] + cls, + default: t.Optional[V] = None, + version: t.Optional[int] = None, + ) -> t.Optional[V]: + return super().get( + key=cls.key, + default=default, + version=version, + ) + + @classmethod + def set( # type: ignore[override] + cls, + value: V, + timeout: t.Optional[int] = None, + version: t.Optional[int] = None, + ): + super().set( + key=cls.key, + value=value, + timeout=timeout, + version=version, + ) + + @classmethod + def delete( # type: ignore[override] + cls, + version: t.Optional[int] = None, + ): + super().delete( + key=cls.key, + version=version, + ) + + # pylint: enable=arguments-differ diff --git a/codeforlife/encryption.py b/codeforlife/encryption.py new file mode 100644 index 00000000..b0a2ea4c --- /dev/null +++ b/codeforlife/encryption.py @@ -0,0 +1,168 @@ +""" +© Ocado Group +Created on 19/01/2026 at 09:55:44(+00:00). + +Here we provide a few utility functions that interact with Google Cloud KMS and +the `tink` cryptography library. + +To avoid a dependency on Google Cloud KMS during local development and in CI/CD +pipelines, we use fake (mock) implementations of the KMS client and its AEAD +primitive. A simple check for the environment (e.g., `settings.ENV == "local"`) +determines whether to use the real `GcpKmsClient` or a `FakeGcpKmsClient`. + +The fake client mimics the behavior of the real one. Instead of performing real +encryption, it simulates it by encoding the plaintext in base64 and adding a +prefix. This allows the application to run without needing cloud credentials +while still being able to distinguish between "encrypted" and plaintext data. + +While the `FakeAead` and `FakeGcpKmsClient` are sufficient for running a local +development server, they are not `unittest.mock.MagicMock` instances by default. +This is intentional, as we don't want the overhead of tracking function calls +during local development. + +For unit tests, where you might need to assert that encryption or decryption +methods were called, both fake classes provide an `as_mock()` class method. This +method returns a `MagicMock` instance of the class, allowing you to use mock +assertions like `assert_called_once()`. +""" + +import typing as t +from base64 import b64decode, b64encode +from dataclasses import dataclass +from io import BytesIO +from unittest.mock import MagicMock, create_autospec + +from django.conf import settings +from django.utils.crypto import get_random_string +from tink import ( # type: ignore[import-untyped] + BinaryKeysetReader, + BinaryKeysetWriter, + new_keyset_handle, + read_keyset_handle, +) +from tink.aead import Aead, aead_key_templates # type: ignore[import-untyped] +from tink.aead import register as aead_register # type: ignore[import-untyped] +from tink.integration import gcpkms # type: ignore[import-untyped] + +# Shortcut to the real GcpKmsClient class. +_GcpKmsClient = gcpkms.GcpKmsClient + + +@dataclass +class FakeAead: + """A fake AEAD primitive for local testing.""" + + @staticmethod + # pylint: disable-next=unused-argument + def encrypt(plaintext: bytes, associated_data: bytes = b""): + """Simulate ciphertext by wrapping in base64 and adding a prefix.""" + return b"fake_enc:" + b64encode(plaintext) + + @staticmethod + # pylint: disable-next=unused-argument + def decrypt(ciphertext: bytes, associated_data: bytes = b""): + """Simulate decryption by removing prefix and base64 decoding.""" + if not ciphertext.startswith(b"fake_enc:"): + raise ValueError("Invalid ciphertext for fake mock") + + return b64decode(ciphertext.replace(b"fake_enc:", b"")) + + @classmethod + def as_mock(cls): + """ + Returns the class as a functional MagicMock for testing. The mock tracks + calls while still performing the fake encryption and decryption by using + the original methods as side effects. We set `instance=True` to ensure + the mock behaves as an instance of the class, not the class itself. + """ + mock: MagicMock = create_autospec(Aead, instance=True) + mock.encrypt.side_effect = cls.encrypt + mock.decrypt.side_effect = cls.decrypt + + return mock + + +@dataclass +class FakeGcpKmsClient: + """A fake GcpKmsClient for local testing.""" + + key_uri: str + + @staticmethod + def register_client( + key_uri: t.Optional[str], credentials_path: t.Optional[str] + ): + """No-op for registering the fake client.""" + + # pylint: disable-next=unused-argument + def get_aead(self, key_uri: str) -> Aead: + """Return the fake AEAD primitive.""" + return FakeAead() + + @classmethod + def as_mock(cls): + """ + Returns the class as a functional MagicMock for testing. It returns a + mocked FakeAead instance that is also functional. We set `instance=True` + to ensure the mock behaves as an instance of the class, not the class + itself. + """ + mock: MagicMock = create_autospec(_GcpKmsClient, instance=True) + mock.get_aead.return_value = FakeAead.as_mock() + + return mock + + +def _get_kek_aead(): + """Get the AEAD primitive for the key encryption key (KEK).""" + return GcpKmsClient(key_uri=settings.GCP_KMS_KEY_URI).get_aead( + key_uri=settings.GCP_KMS_KEY_URI + ) + + +def create_dek(): + """ + Creates a new random AES-256-GCM data encryption key (DEK), wraps it with + Cloud KMS, and returns the binary blob for storage. + """ + # In local environment, return a fake encrypted DEK. + if settings.ENV == "local": + return FakeAead.encrypt(get_random_string(32).encode()) + + stream = BytesIO() + new_keyset_handle(key_template=aead_key_templates.AES256_GCM).write( + keyset_writer=BinaryKeysetWriter(stream), + master_key_primitive=_get_kek_aead(), + ) + + return stream.getvalue() + + +def get_dek_aead(dek: bytes) -> Aead: + """ + Takes an encrypted DEK, decrypts it with the KEK, and returns the AEAD + primitive for performing cryptographic operations. + """ + if not dek: + raise ValueError("The data encryption key (DEK) is missing.") + + # In local environment, return the fake AEAD primitive. + if settings.ENV == "local": + return FakeAead() + + return read_keyset_handle( + keyset_reader=BinaryKeysetReader(dek), + master_key_aead=_get_kek_aead(), + ).primitive(Aead) + + +# Ensure Tink AEAD is registered. +aead_register() + +# Use the fake client in 'local' environment, otherwise use the real one. +GcpKmsClient = FakeGcpKmsClient if settings.ENV == "local" else _GcpKmsClient + +# Register the GCP KMS client with Tink. +GcpKmsClient.register_client( + key_uri=settings.GCP_KMS_KEY_URI, credentials_path=None +) diff --git a/codeforlife/models/__init__.py b/codeforlife/models/__init__.py index 9d38a834..21934e1f 100644 --- a/codeforlife/models/__init__.py +++ b/codeforlife/models/__init__.py @@ -6,5 +6,7 @@ from .abstract_base_session import AbstractBaseSession from .abstract_base_user import AbstractBaseUser from .base import * +from .base_data_encryption_key import BaseDataEncryptionKeyModel from .base_session_store import BaseSessionStore -from .encrypted_char_field import EncryptedCharField +from .data_encryption_key import DataEncryptionKeyModel +from .encrypted import EncryptedModel diff --git a/codeforlife/models/base.py b/codeforlife/models/base.py index 0f974c6e..8da23226 100644 --- a/codeforlife/models/base.py +++ b/codeforlife/models/base.py @@ -7,8 +7,7 @@ import typing as t -from django.db.models import Manager -from django.db.models import Model as _Model +from django.db import models if t.TYPE_CHECKING: from django_stubs_ext.db.models import TypedModelMeta @@ -16,10 +15,10 @@ TypedModelMeta = object -class Model(_Model): +class Model(models.Model): """Base for all models.""" - objects: Manager[t.Self] + objects: models.Manager[t.Self] class Meta(TypedModelMeta): abstract = True diff --git a/codeforlife/models/base_data_encryption_key.py b/codeforlife/models/base_data_encryption_key.py new file mode 100644 index 00000000..600d5a2f --- /dev/null +++ b/codeforlife/models/base_data_encryption_key.py @@ -0,0 +1,95 @@ +""" +© Ocado Group +Created on 26/01/2026 at 10:32:18(+00:00). + +This abstract model brings the `EncryptedModel` and `DataEncryptionKeyField` +together. It also implements the `dek_aead` property, which retrieves and caches +the decrypted DEK's AEAD primitive for use in encryption/decryption operations. +""" + +import typing as t + +from cachetools import TTLCache +from django.core.exceptions import ValidationError + +from ..encryption import create_dek, get_dek_aead +from .encrypted import EncryptedModel + +if t.TYPE_CHECKING: + from django_stubs_ext.db.models import TypedModelMeta + + from .fields import DataEncryptionKeyField +else: + TypedModelMeta = object + + +class BaseDataEncryptionKeyModel(EncryptedModel): + """Model to store and manage a data encryption key.""" + + # Cache configuration for data encryption keys. + dek_aead_cache_maxsize: float = 1024 + dek_aead_cache_ttl: float = 900 # 15 minutes + + # In-memory cache for the decrypted DEK AEAD primitive. + DEK_AEAD_CACHE: TTLCache + + def __init_subclass__(cls): + super().__init_subclass__() + cls.DEK_AEAD_CACHE = TTLCache( + maxsize=cls.dek_aead_cache_maxsize, ttl=cls.dek_aead_cache_ttl + ) + + # A class-level reference to the DataEncryptionKeyField instance. + # This is set by the `contribute_to_class` method of the field. + DEK_FIELD: t.Optional["DataEncryptionKeyField"] = None + + class Meta(TypedModelMeta): + abstract = True + + @property + def dek_aead(self): + """ + Provides the AEAD primitive for the DEK, caching it for performance. + """ + # Ensure the instance is saved before accessing the DEK AEAD. + if self.pk is None: + raise ValidationError( + "Instance must be saved before accessing dek_aead.", + code="unsaved_instance", + ) + + # Return None if there is no DEK. + if self.DEK_FIELD is None: + return None + + # Check the cache for the DEK AEAD. + if self.pk in self.DEK_AEAD_CACHE: + return self.DEK_AEAD_CACHE[self.pk] + + # Get the AEAD primitive for the data encryption key. + dek_aead = get_dek_aead(self.DEK_FIELD) + + # Cache the DEK AEAD for future access. + self.DEK_AEAD_CACHE[self.pk] = dek_aead + + return dek_aead + + def save( + self, + *args, + force_insert=False, + force_update=False, + using=None, + update_fields=None, + ): + # Lazily create a new DEK for new instances. + if self.pk is None and self.__class__.DEK_FIELD is not None: + self.__dict__[self.__class__.DEK_FIELD.field.attname] = create_dek() + + return super().save( # type: ignore[misc] + *args, + force_insert=force_insert, + force_update=force_update, + using=using, + update_fields=update_fields, + ) diff --git a/codeforlife/models/base_data_encryption_key_test.py b/codeforlife/models/base_data_encryption_key_test.py new file mode 100644 index 00000000..565a1e3b --- /dev/null +++ b/codeforlife/models/base_data_encryption_key_test.py @@ -0,0 +1,116 @@ +""" +© Ocado Group +Created on 26/01/2026 at 13:44:31(+00:00). +""" + +import typing as t +from unittest.mock import MagicMock, patch + +from ..encryption import FakeAead, create_dek +from ..tests import ModelTestCase +from .base_data_encryption_key import BaseDataEncryptionKeyModel +from .fields import DataEncryptionKeyField + +if t.TYPE_CHECKING: + from django_stubs_ext.db.models import TypedModelMeta +else: + TypedModelMeta = object + +# pylint: disable=missing-class-docstring +# pylint: disable=too-few-public-methods + + +class TestDataEncryptionKeyModel(ModelTestCase[BaseDataEncryptionKeyModel]): + @classmethod + def get_model_class(cls): + """ + Dynamically create a subclass of BaseDataEncryptionKeyModel for testing. + """ + + class TestModel(BaseDataEncryptionKeyModel): + dek: DataEncryptionKeyField = DataEncryptionKeyField() + + class Meta(TypedModelMeta): + app_label = "codeforlife.user" + + def set_dek_for_test(self): + """Sets the dek field for testing purposes.""" + self.__dict__[self.__class__.dek.field.attname] = create_dek() + + return TestModel + + def get_model_instance(self, *args, **kwargs): + return self.get_model_class()(*args, **kwargs) + + def test_dek_aead__none(self): + """Returns None when dek is None on a saved instance.""" + instance = self.get_model_instance(pk=1, dek=None) + assert instance.dek_aead is None + + def test_dek_aead__unsaved_instance(self): + """Cannot get dek before saving the instance.""" + instance = self.get_model_instance() + with self.assert_raises_validation_error(code="unsaved_instance"): + _ = instance.dek_aead + + @patch("codeforlife.models.base_data_encryption_key.get_dek_aead") + def test_dek_aead__not_cached(self, get_dek_aead_mock: MagicMock): + """Returns dek_aead and caches it when not cached.""" + # Create an instance with a primary key to mimic a saved instance. + instance = self.get_model_instance(pk=1) + instance.set_dek_for_test() + + # Setup the mock to return a FakeAead instance. + dek_aead_mock = FakeAead.as_mock() + get_dek_aead_mock.return_value = dek_aead_mock + + # Initially, the cache should not have the dek_aead. After accessing + # dek_aead, it should be cached. + assert instance.pk not in instance.DEK_AEAD_CACHE + assert instance.dek_aead is dek_aead_mock + assert instance.pk in instance.DEK_AEAD_CACHE + + # Ensure the get_dek_aead function was called with the correct dek. + get_dek_aead_mock.assert_called_once_with(instance.dek) + + @patch("codeforlife.models.base_data_encryption_key.get_dek_aead") + def test_dek_aead__cached(self, get_dek_aead_mock: MagicMock): + """Returns the cached dek_aead.""" + # Create an instance with a primary key to mimic a saved instance. + instance = self.get_model_instance(pk=1) + instance.set_dek_for_test() + + # Pre-populate the cache with a FakeAead instance. + dek_aead_mock = FakeAead.as_mock() + instance.DEK_AEAD_CACHE[instance.pk] = dek_aead_mock + + # Accessing dek_aead should return the cached value. + assert instance.dek_aead is dek_aead_mock + + # Ensure the get_dek_aead function was not called as its cached. + get_dek_aead_mock.assert_not_called() + + @patch("django.db.models.base.Model.save", autospec=True) + @patch( + "codeforlife.models.base_data_encryption_key.create_dek", + autospec=True, + ) + def test_save__creates_dek( + self, create_dek_mock: MagicMock, save_mock: MagicMock + ): + """Saves a new DEK when saving a new instance.""" + instance = self.get_model_instance() + assert instance.dek is None + instance.save() + + # Ensure create_dek was called and save was called correctly. + create_dek_mock.assert_called_once_with() + save_mock.assert_called_once_with( + instance, + force_insert=False, + force_update=False, + using=None, + update_fields=None, + ) + + assert instance.dek is not None diff --git a/codeforlife/models/data_encryption_key.py b/codeforlife/models/data_encryption_key.py new file mode 100644 index 00000000..301f6f16 --- /dev/null +++ b/codeforlife/models/data_encryption_key.py @@ -0,0 +1,28 @@ +""" +© Ocado Group +Created on 29/01/2026 at 14:03:09(+00:00). + +This model inherits from `BaseDataEncryptionKeyModel` and conveniently includes +the `dek` field by default. It serves as a ready-to-use base for any model that +needs to manage its own Data Encryption Key. +""" + +import typing as t + +from .base_data_encryption_key import BaseDataEncryptionKeyModel +from .fields import DataEncryptionKeyField + +if t.TYPE_CHECKING: + from django_stubs_ext.db.models import TypedModelMeta +else: + TypedModelMeta = object + + +class DataEncryptionKeyModel(BaseDataEncryptionKeyModel): + """A model that includes a data encryption key field.""" + + # Create a DataEncryptionKeyField to store the encrypted DEK. + dek: DataEncryptionKeyField = DataEncryptionKeyField() + + class Meta(TypedModelMeta): + abstract = True diff --git a/codeforlife/models/encrypted.py b/codeforlife/models/encrypted.py new file mode 100644 index 00000000..69c9f0b4 --- /dev/null +++ b/codeforlife/models/encrypted.py @@ -0,0 +1,199 @@ +""" +© Ocado Group +Created on 19/01/2026 at 09:56:25(+00:00). + +This is the base class for any model that will contain encrypted fields. Its +primary role is to override the default manager to disable bulk operations that +could otherwise bypass the field-level encryption logic, potentially leading to +data corruption or leaks. It also uses Django's `check` framework to validate +the model's configuration. + +A critical security measure in this architecture is the custom `Manager` within +`EncryptedModel`. Standard Django bulk operations like `bulk_create()` and +`update()` bypass the individual model's `save()` method and, by extension, our +custom field's `pre_save` logic. If these methods were allowed, it would be +possible to insert or update data without it being properly encrypted, or to +corrupt existing encrypted data. + +To prevent this, the `Manager` explicitly disables these operations by setting +them to `None`. The `update()` method is given a special implementation that +actively checks if any of the fields being updated are encrypted fields and +raises a `ValidationError` if they are. This forces developers to fetch model +instances and update their properties individually, ensuring the encryption +logic is always triggered. Furthermore, the model's `check` framework includes a +validation step to ensure that any subclass of `EncryptedModel` is using a +manager that inherits from `EncryptedModel.Manager`, guaranteeing these security +measures are always enforced. +""" + +import typing as t + +from django.apps import apps +from django.core import checks +from django.core.exceptions import ValidationError +from django.db import models + +from .base import Model + +if t.TYPE_CHECKING: + from django_stubs_ext.db.models import TypedModelMeta + from tink.aead import Aead # type: ignore[import] + + from .fields import BaseEncryptedField +else: + TypedModelMeta = object + + +AnyEncryptedModel = t.TypeVar("AnyEncryptedModel", bound="EncryptedModel") + + +class EncryptedModel(Model): + """Base for all models with encrypted fields.""" + + associated_data: str + + ENCRYPTED_FIELDS: t.List["BaseEncryptedField"] + + def __init__(self, *args, **kwargs): + # Each instance gets its own dict of decrypted values. + self.__decrypted_values__: t.Dict[str, t.Any] = {} + super().__init__(*args, **kwargs) + + def __init_subclass__(cls): + super().__init_subclass__() + # Each subclass gets its own list of encrypted fields. + cls.ENCRYPTED_FIELDS = [] + + # pylint: disable-next=too-few-public-methods + class Manager( + models.Manager[AnyEncryptedModel], t.Generic[AnyEncryptedModel] + ): + """ + Base manager for models with encrypted fields. Disables bulk operations + that would bypass field-level encryption. + """ + + def update(self, **kwargs): + """Ensure encrypted fields are not updated via 'update()'.""" + for name in kwargs: + if any( + field.name == name for field in self.model.ENCRYPTED_FIELDS + ): + raise ValidationError( + f"Cannot update encrypted field '{name}' via" + " 'update()'. Set the property on each instance" + " instead.", + code="cannot_update", + ) + + return super().update(**kwargs) + + # Disable bulk operations that would bypass field-level encryption. + aupdate: t.Never = None # type: ignore[assignment] + bulk_update: t.Never = None # type: ignore[assignment] + abulk_update: t.Never = None # type: ignore[assignment] + bulk_create: t.Never = None # type: ignore[assignment] + abulk_create: t.Never = None # type: ignore[assignment] + in_bulk: t.Never = None # type: ignore[assignment] + ain_bulk: t.Never = None # type: ignore[assignment] + + objects: Manager["EncryptedModel"] = Manager() # type: ignore[assignment] + + class Meta(TypedModelMeta): + abstract = True + + @classmethod + def _check_associated_data(cls, **kwargs): + """ + Check 'associated_data' values are unique across all EncryptedModel + subclasses. + """ + errors: t.List[checks.Error] = [] + + if cls._meta.abstract: + return errors + + # Ensure associated_data is defined. + if not hasattr(cls, "associated_data"): + errors.append( + checks.Error( + "Must define an associated_data attribute.", + hint=f"{cls.__module__}.{cls.__name__}", + obj=cls, + id="encrypted.E001", + ) + ) + # Ensure associated_data is a string. + elif not isinstance(cls.associated_data, str): + errors.append( + checks.Error( + "associated_data must be a string.", + hint=f"{cls.__module__}.{cls.__name__}", + obj=cls, + id="encrypted.E002", + ) + ) + # Ensure associated_data is not empty. + elif not cls.associated_data: + errors.append( + checks.Error( + "associated_data cannot be empty.", + hint=f"{cls.__module__}.{cls.__name__}", + obj=cls, + id="encrypted.E003", + ) + ) + # Ensure associated_data is unique. + else: + for model in apps.get_models(): + if ( + # pylint: disable-next=too-many-boolean-expressions + not model is cls + and not model._meta.abstract + and issubclass(model, EncryptedModel) + and hasattr(model, "associated_data") + and isinstance(model.associated_data, str) + and model.associated_data == cls.associated_data + ): + errors.append( + checks.Error( + "Duplicate 'associated_data' detected:" + f" '{cls.associated_data}'", + hint=( + f"{cls.__module__}.{cls.__name__}" + " shares this associated data with" + f" {model.__module__}.{model.__name__}." + ), + obj=cls, + id="encrypted.E004", + ) + ) + + return errors + + @classmethod + def check(cls, **kwargs): + """Run model checks, including custom checks for encrypted models.""" + errors = super().check(**kwargs) + errors.extend(cls._check_associated_data(**kwargs)) + + if not issubclass(cls.objects.__class__, EncryptedModel.Manager): + errors.append( + checks.Error( + "EncryptedModel subclasses must use the" + " EncryptedModel.Manager.", + hint=( + f"Set 'objects = EncryptedModel.Manager()' on" + f" {cls.__module__}.{cls.__name__}." + ), + obj=cls, + id="encrypted.E005", + ) + ) + + return errors + + @property + def dek_aead(self) -> "Aead": + """Gets the AEAD primitive for this model's DEK.""" + raise NotImplementedError() diff --git a/codeforlife/models/encrypted_char_field.py b/codeforlife/models/encrypted_char_field.py deleted file mode 100644 index 056e94e0..00000000 --- a/codeforlife/models/encrypted_char_field.py +++ /dev/null @@ -1,72 +0,0 @@ -""" -© Ocado Group -Created on 12/08/2025 at 10:28:24(+01:00). -""" - -import typing as t - -from cryptography.fernet import Fernet -from django.conf import settings -from django.db import models - - -class EncryptedCharField(models.CharField): - """ - A custom CharField that encrypts data before saving and decrypts it when - retrieved. - """ - - _fernet = Fernet(settings.SECRET_KEY) - _prefix = "ENC:" - - def __init__(self, *args, **kwargs): - kwargs["max_length"] += len(self._prefix) - super().__init__(*args, **kwargs) - - # pylint: disable-next=unused-argument - def from_db_value(self, value: t.Optional[str], expression, connection): - """ - Converts a value as returned by the database to a Python object. It is - the reverse of get_prep_value(). - - https://docs.djangoproject.com/en/5.1/howto/custom-model-fields/#converting-values-to-python-objects - """ - if isinstance(value, str): - return self.decrypt_value(value) - return value - - def to_python(self, value: t.Optional[str]): - """ - Converts the value into the correct Python object. It acts as the - reverse of value_to_string(), and is also called in clean(). - - https://docs.djangoproject.com/en/5.1/howto/custom-model-fields/#converting-values-to-python-objects - """ - if isinstance(value, str): - return self.decrypt_value(value) - return value - - def get_prep_value(self, value: t.Optional[str]): - """ - 'value' is the current value of the model's attribute, and the method - should return data in a format that has been prepared for use as a - parameter in a query. - - https://docs.djangoproject.com/en/5.1/howto/custom-model-fields/#converting-python-objects-to-query-values - """ - if isinstance(value, str): - return self.encrypt_value(value) - return value - - def encrypt_value(self, value: str): - """Encrypt the value if it's not encrypted.""" - if not value.startswith(self._prefix): - return self._prefix + self._fernet.encrypt(value.encode()).decode() - return value - - def decrypt_value(self, value: str): - """Decrpyt the value if it's encrypted..""" - if value.startswith(self._prefix): - value = value[len(self._prefix) :] - return self._fernet.decrypt(value).decode() - return value diff --git a/codeforlife/models/encrypted_test.py b/codeforlife/models/encrypted_test.py new file mode 100644 index 00000000..35e346f6 --- /dev/null +++ b/codeforlife/models/encrypted_test.py @@ -0,0 +1,132 @@ +""" +© Ocado Group +Created on 19/01/2026 at 09:56:31(+00:00). +""" + +import typing as t + +from django.db import models + +from ..tests import ModelTestCase +from ..user.models import OtpBypassToken +from .encrypted import EncryptedModel +from .fields import EncryptedTextField + +if t.TYPE_CHECKING: + from django_stubs_ext.db.models import TypedModelMeta +else: + TypedModelMeta = object + +# pylint: disable=missing-class-docstring +# pylint: disable=too-few-public-methods + + +# pylint: disable-next=abstract-method +class Person(EncryptedModel): + associated_data = "person" + + name = EncryptedTextField(associated_data="name") + + class Meta(TypedModelMeta): + app_label = "codeforlife.user" + + +class TestEncryptedModel(ModelTestCase[EncryptedModel]): + def test_objects___update__cannot_update(self): + """Cannot update encrypted field via objects.update().""" + with self.assert_raises_validation_error(code="cannot_update"): + Person.objects.update(name="Alice") + + def test_objects___aupdate(self): + """Cannot aupdate encrypted field via objects.aupdate().""" + assert Person.objects.aupdate is None + + def test_objects___bulk_update(self): + """Cannot bulk update encrypted field via objects.bulk_update().""" + assert Person.objects.bulk_update is None + + def test_objects___abulk_update(self): + """Cannot abulk_update encrypted field via objects.abulk_update().""" + assert Person.objects.abulk_update is None + + def test_objects___bulk_create(self): + """Cannot bulk create encrypted field via objects.bulk_create().""" + assert Person.objects.bulk_create is None + + def test_objects___abulk_create(self): + """Cannot abulk_create encrypted field via objects.abulk_create().""" + assert Person.objects.abulk_create is None + + def test_objects__in_bulk(self): + """Cannot in_bulk encrypted field via objects.in_bulk().""" + assert Person.objects.in_bulk is None + + def test_objects__ain_bulk(self): + """Cannot ain_bulk encrypted field via objects.ain_bulk().""" + assert Person.objects.ain_bulk is None + + def test_dek_aead(self): + """dek_aead raises NotImplementedError.""" + with self.assertRaises(NotImplementedError): + # pylint: disable-next=expression-not-assigned + Person().dek_aead + + def test_check__e001(self): + """Check for missing associated_data.""" + + # pylint: disable-next=abstract-method + class E001(EncryptedModel): + class Meta(TypedModelMeta): + app_label = "codeforlife.user" + + self.assert_check(error_id="encrypted.E001", model_class=E001) + + def test_check__e002(self): + """Check for string associated_data.""" + + # pylint: disable-next=abstract-method + class E002(EncryptedModel): + associated_data = 123 # type: ignore[assignment] + + class Meta(TypedModelMeta): + app_label = "codeforlife.user" + + self.assert_check(error_id="encrypted.E002", model_class=E002) + + def test_check__e003(self): + """Check for non-empty associated_data.""" + + # pylint: disable-next=abstract-method + class E003(EncryptedModel): + associated_data = "" + + class Meta(TypedModelMeta): + app_label = "codeforlife.user" + + self.assert_check(error_id="encrypted.E003", model_class=E003) + + def test_check__e004(self): + """Check for unique associated_data.""" + + # pylint: disable-next=abstract-method + class E004(EncryptedModel): + associated_data = OtpBypassToken.associated_data + + class Meta(TypedModelMeta): + app_label = "codeforlife.user" + + self.assert_check(error_id="encrypted.E004", model_class=E004) + + def test_check__e005(self): + """Check manager subclasses EncryptedModel.Manager.""" + + # pylint: disable-next=abstract-method + class E005(EncryptedModel): + associated_data = "example" + + objects = models.Manager() # type: ignore[assignment] + + class Meta(TypedModelMeta): + app_label = "codeforlife.user" + + self.assert_check(error_id="encrypted.E005", model_class=E005) diff --git a/codeforlife/models/fields/__init__.py b/codeforlife/models/fields/__init__.py new file mode 100644 index 00000000..19cfac94 --- /dev/null +++ b/codeforlife/models/fields/__init__.py @@ -0,0 +1,9 @@ +""" +© Ocado Group +Created on 19/01/2026 at 09:56:44(+00:00). +""" + +from .base_encrypted import BaseEncryptedField +from .data_encryption_key import DataEncryptionKeyField +from .deferred_attribute import DeferredAttribute +from .encrypted_text import EncryptedTextField diff --git a/codeforlife/models/fields/base_encrypted.py b/codeforlife/models/fields/base_encrypted.py new file mode 100644 index 00000000..621ddf36 --- /dev/null +++ b/codeforlife/models/fields/base_encrypted.py @@ -0,0 +1,356 @@ +""" +© Ocado Group +Created on 19/01/2026 at 09:57:04(+00:00). + +This is where the core logic of transparent encryption and decryption happens. +`BaseEncryptedField` is a generic field that stores encrypted data as bytes. The +magic is in its descriptor, `EncryptedAttribute`. + +The descriptor intercepts get and set operations: + +- On `set`: The value is not immediately encrypted. It's wrapped in a + `_PendingEncryption` object and stored in the model instance's `__dict__`. + This is a performance optimization to avoid encrypting a value multiple + times if it's changed repeatedly before saving. Any previously cached + decrypted value is cleared. +- On `get`: The descriptor first checks if a decrypted value is already cached + on the model instance. If so, it returns it immediately. Otherwise, it checks + the value in `__dict__`. If it's ciphertext (bytes), it's decrypted + on-the-fly, cached on the instance, and then returned. If it's a pending + plaintext value, it's returned directly. +- On `save`: The field's `pre_save` method is called. It checks for a + `_PendingEncryption` object. If found, it encrypts the plaintext value using + the `dek_aead` and replaces it with the resulting ciphertext bytes, which are + then written to the database. + +A key challenge is differentiating between a value that has just been loaded +from the database (and is therefore encrypted ciphertext) and a new plaintext +value that a developer is setting. + +- When a developer sets `user.email = "new@example.com"`, this is **new + plaintext** that needs to be encrypted on save. +- When Django loads a `User` from the database, the `email` field contains + **existing ciphertext** (raw bytes). + +We solve this with two wrapper classes: + +1. `_PendingEncryption(value)`: When a developer sets a value on the field, the + `EncryptedAttribute` descriptor wraps it in this class. This marks the data + as "dirty" or "pending encryption." The `pre_save` method looks for this + wrapper to know what needs to be encrypted. +2. `_TrustedCiphertext(value)`: When Django loads data from the database, the + `from_db_value` method on the field is called. We wrap the raw bytes from the + database in this class. The `EncryptedAttribute` descriptor's `__set__` + method sees this wrapper and knows the value is already-encrypted ciphertext, + preventing it from being re-wrapped as `_PendingEncryption`. This avoids + unnecessary re-encryption of data that hasn't changed. + +This distinction allows the field to correctly handle both new data and existing +data without ambiguity. +""" + +import typing as t +from dataclasses import dataclass + +from django.core.exceptions import ValidationError +from django.db.models import BinaryField + +from ...types import Args, KwArgs +from ..encrypted import EncryptedModel +from .deferred_attribute import DeferredAttribute + +T = t.TypeVar("T") + + +@dataclass(frozen=True) +class _PendingEncryption(t.Generic[T]): + """A wrapper for plaintext that is pending encryption.""" + + value: T + + +@dataclass(frozen=True) +class _TrustedCiphertext: + """A wrapper for ciphertext that comes directly from the database.""" + + ciphertext: bytes + + +Value: t.TypeAlias = t.Union[_TrustedCiphertext, _PendingEncryption[T]] + +AnyBaseEncryptedField = t.TypeVar( + "AnyBaseEncryptedField", bound="BaseEncryptedField" +) + + +class EncryptedAttribute( + DeferredAttribute[AnyBaseEncryptedField, EncryptedModel, Value[T]], + t.Generic[AnyBaseEncryptedField, T], +): + """ + Descriptor that handles the get/set mechanics for encrypted fields. + """ + + def __get__(self, instance, cls=None): + # Get the internal value from the instance. + internal_value = super().__get__(instance, cls) + + # Return the descriptor itself when accessed on the class. + if internal_value is self: + return self + + # No data to decrypt. + if internal_value is None: + return None + + # The user just set this value, return it directly. + if isinstance(internal_value, _PendingEncryption): + return internal_value.value + + if isinstance(internal_value, _TrustedCiphertext): + # If we have a cached decrypted value, return it. + if self.field.attname in instance.__decrypted_values__: + return t.cast( + T, instance.__decrypted_values__[self.field.attname] + ) + + # Decrypt the value before returning it. + decrypted_value = t.cast( + T, self.field.decrypt_value(instance, internal_value.ciphertext) + ) + + # Cache the decrypted value on the instance. + instance.__decrypted_values__[self.field.attname] = decrypted_value + + return decrypted_value + + raise ValidationError( + "Unexpected internal value type for encrypted field.", + code="invalid_internal_value_type", + ) + + def __set__( + self, + instance, + value: t.Optional[ # type: ignore[override] + t.Union[memoryview, _TrustedCiphertext, T] + ], + ): + # Clear any cached decrypted value. + instance.__decrypted_values__.pop(self.field.attname, None) + + # Determine the internal value to set. + internal_value: t.Optional[Value[T]] + if value is None: + internal_value = None + # When Django loads data from a fixture (e.g., a JSON file), it + # provides binary data as a `memoryview` object. Our descriptor + # handles this by extracting the raw bytes from the `memoryview`. + elif isinstance(value, memoryview): + if not isinstance(value.obj, bytes): + raise ValidationError( + "Expected bytes in memoryview for encrypted field.", + code="invalid_memoryview_type", + ) + internal_value = _TrustedCiphertext(value.obj) + elif isinstance(value, _TrustedCiphertext): # From DB. + internal_value = value + else: # From user input. + internal_value = _PendingEncryption(value) + + # Set the internal value on the instance. + super().__set__(instance, internal_value) + + +class BaseEncryptedField(BinaryField, t.Generic[T]): + """Encrypted field base class.""" + + model: t.Type[EncryptedModel] + + descriptor_class = EncryptedAttribute + + # -------------------------------------------------------------------------- + # Construction & Deconstruction + # -------------------------------------------------------------------------- + + def set_init_kwargs(self, kwargs: KwArgs): + """Sets common init kwargs.""" + kwargs.setdefault("db_column", self.associated_data) + + def __init__( + self, + associated_data: str, + # Set type for default to match T. + default: t.Optional[t.Union[T, t.Callable[[], T]]] = None, + **kwargs, + ): + if not associated_data: + raise ValidationError( + "Associated data cannot be empty.", + code="no_associated_data", + ) + self.associated_data = associated_data + + self.set_init_kwargs(kwargs) + super().__init__(**kwargs, default=default) + + def deconstruct(self): + name, path, args, kwargs = t.cast( + t.Tuple[str, str, Args, KwArgs], super().deconstruct() + ) + + self.set_init_kwargs(kwargs) + kwargs["associated_data"] = self.associated_data + + return name, path, args, kwargs + + # -------------------------------------------------------------------------- + # Django Model Field Integration + # -------------------------------------------------------------------------- + + def contribute_to_class(self, cls, name, private_only=False): + """ + Called by Django when the field is added to a model. This method + performs critical validations and registers the field with the model. + """ + super().contribute_to_class(cls, name, private_only) + + # Skip fake models used for migrations. + if cls.__module__ == "__fake__": + return + + # Ensure the model subclasses EncryptedModel. + if not issubclass(cls, EncryptedModel): + raise ValidationError( + f"'{cls.__module__}.{cls.__name__}' must subclass" + f" '{EncryptedModel.__module__}.{EncryptedModel.__name__}'.", + code="invalid_model_base_class", + ) + + # Ensure no duplicate encrypted fields. + if self in cls.ENCRYPTED_FIELDS: + raise ValidationError( + "Encrypted field already registered.", + code="already_registered", + ) + + # Ensure no duplicate associated data. + for field in cls.ENCRYPTED_FIELDS: + if self.associated_data == field.associated_data: + raise ValidationError( + f"The encrypted fields '{self.name}' and '{field.name}'" + " have the same associated data.", + code="associated_data_already_used", + ) + + # Register this field as an encrypted field on the model. + cls.ENCRYPTED_FIELDS.append(self) + + # -------------------------------------------------------------------------- + # Descriptor Methods + # -------------------------------------------------------------------------- + + @t.overload # type: ignore[override] + def __get__( # Get the descriptor with the correct types. + self, instance: None, owner: t.Any + ) -> EncryptedAttribute[t.Self, T]: ... + + @t.overload + def __get__( # Get the internal value when accessed on an instance. + self, instance: EncryptedModel, owner: t.Any + ) -> t.Optional[T]: ... + + # Actual implementation of __get__. + def __get__(self, instance: t.Optional[EncryptedModel], owner: t.Any): + return t.cast( + t.Union[EncryptedAttribute[t.Self, T], t.Optional[T]], + # pylint: disable-next=no-member + super().__get__(instance, owner), + ) + + # Set the internal value when assigned on an instance. + def __set__(self, instance: EncryptedModel, value: t.Optional[T]): ... + + # pylint: disable-next=unused-argument + def from_db_value(self, value: t.Optional[bytes], expression, connection): + """ + Converts a value as returned by the database to a Python object. + We wrap the raw bytes in _TrustedCiphertext to signal that this is + existing ciphertext from the database, not new plaintext. + """ + if value is None: + return None + + # Wrap it so __set__ knows this is NOT new user input. + return _TrustedCiphertext(value) + + def pre_save( + self, model_instance: EncryptedModel, add # type: ignore[override] + ): + """Before saving, encrypt any pending values.""" + value: t.Optional[Value[T]] = model_instance.__dict__.get(self.attname) + + # No data to encrypt. + if value is None: + return None + + # Data needs encrypting. + if isinstance(value, _PendingEncryption): + return self.encrypt_value(model_instance, value.value) + + # Already encrypted data from DB, store as-is. + if isinstance(value, _TrustedCiphertext): + return value.ciphertext + + raise ValidationError( + f"Unexpected value type '{type(value)}' for encryption.", + code="invalid_value_type", + ) + + # -------------------------------------------------------------------------- + # Crypto Logic + # -------------------------------------------------------------------------- + + def bytes_to_value(self, data: bytes) -> T: + """ + Subclasses must implement this method to convert decrypted bytes back to + the field's value type. + """ + raise NotImplementedError() + + def value_to_bytes(self, value: T) -> bytes: + """ + Subclasses must implement this method to convert the field's value to + bytes before encryption. + """ + raise NotImplementedError() + + @property + def full_associated_data(self): + """Returns the fully qualified associated data for this field.""" + return f"{self.model.associated_data}:{self.associated_data}".encode() + + def decrypt_value( + self, instance: EncryptedModel, ciphertext: t.Optional[bytes] + ): + """Decrypts a single value using the DEK and associated data.""" + if ciphertext is None: + return None + + data = instance.dek_aead.decrypt( + ciphertext=ciphertext, + associated_data=self.full_associated_data, + ) + + return self.bytes_to_value(data) + + def encrypt_value(self, instance: EncryptedModel, plaintext: t.Optional[T]): + """Encrypts a single value using the DEK and associated data.""" + return ( + None + if plaintext is None + else instance.dek_aead.encrypt( + plaintext=self.value_to_bytes(plaintext), + associated_data=self.full_associated_data, + ) + ) diff --git a/codeforlife/models/fields/base_encrypted_test.py b/codeforlife/models/fields/base_encrypted_test.py new file mode 100644 index 00000000..4c206d1e --- /dev/null +++ b/codeforlife/models/fields/base_encrypted_test.py @@ -0,0 +1,506 @@ +""" +© Ocado Group +Created on 19/01/2026 at 09:56:57(+00:00). +""" + +import typing as t +from functools import cached_property +from unittest.mock import MagicMock + +from django.db import models + +from ...encryption import FakeAead +from ...tests import InterruptPipelineError, TestCase +from ..encrypted import EncryptedModel +from .base_encrypted import ( + BaseEncryptedField, + _PendingEncryption, + _TrustedCiphertext, +) + +if t.TYPE_CHECKING: + from django_stubs_ext.db.models import TypedModelMeta +else: + TypedModelMeta = object + +# pylint: disable=missing-class-docstring +# pylint: disable=too-few-public-methods +# pylint: disable=too-many-instance-attributes + + +class FakeEncryptedModel(EncryptedModel): + """A fake EncryptedModel without fields for testing.""" + + associated_data = "model" + + class Meta(TypedModelMeta): + abstract = True + + @cached_property + def dek_aead(self): + return FakeAead.as_mock() + + def get_stored_value(self, field: BaseEncryptedField): + """Gets the stored value for the given field.""" + assert field in self.ENCRYPTED_FIELDS + return self.__dict__[field.attname] + + def set_stored_value(self, field: BaseEncryptedField, value): + """Sets the stored value for the given field.""" + assert field in self.ENCRYPTED_FIELDS + self.__dict__[field.attname] = value + + def assert_value_is_pending_encryption( + self, field: BaseEncryptedField, value: str + ): + """ + Asserts the value for the given field is pending encryption. + """ + pending_encryption = self.get_stored_value(field) + assert isinstance(pending_encryption, _PendingEncryption) + assert pending_encryption.value == value + + +class FakeModelMeta(TypedModelMeta): + """A fake Meta class for testing.""" + + app_label = "codeforlife.user" + + +# pylint: disable-next=abstract-method +class FakeEncryptedField(BaseEncryptedField[str]): + """A fake BaseEncryptedField for testing.""" + + value_to_bytes: MagicMock + bytes_to_value: MagicMock + encrypt_value: MagicMock + decrypt_value: MagicMock + + @staticmethod + def _value_to_bytes(value: str): + return value.encode() + + @staticmethod + def _bytes_to_value(data: bytes): + return data.decode() + + def __init__(self, associated_data, default=None, **kwargs): + super().__init__(associated_data, default, **kwargs) + + self.value_to_bytes = MagicMock(side_effect=self._value_to_bytes) + self.bytes_to_value = MagicMock(side_effect=self._bytes_to_value) + self.encrypt_value = MagicMock(side_effect=super().encrypt_value) + self.decrypt_value = MagicMock(side_effect=super().decrypt_value) + + +# pylint: disable-next=too-many-public-methods +class TestBaseEncryptedField(TestCase): + # -------------------------------------------------------------------------- + # Test Helper Methods + # -------------------------------------------------------------------------- + + def _get_model_class(self): + """Dynamically creates a FakeEncryptedModel subclass with fields. + + This assigns self.field and self.field2 to the model. + """ + + class Model(FakeEncryptedModel): + """A fake EncryptedModel with fields for testing.""" + + field = self.field + field2 = self.field2 + + Meta = FakeModelMeta + + return Model + + def _get_model_instance(self, **kwargs): + """Gets an instance of the dynamically created model class.""" + return self._get_model_class()(**kwargs) + + def setUp(self): + # Set up the first field with a non-callable default for testing. + self.field_associated_data = "field" + self.field_default = "default" + self.field = FakeEncryptedField( + associated_data=self.field_associated_data, + default=self.field_default, + ) + + # Set up a second field with a callable default for testing. + self.field2_associated_data = "field2" + self.field2_default = "default2" + self.field2 = FakeEncryptedField( + associated_data=self.field2_associated_data, + default=lambda: self.field2_default, + ) + + # -------------------------------------------------------------------------- + # Construction & Deconstruction Tests + # -------------------------------------------------------------------------- + + def test_init__no_associated_data(self): + """Cannot create BaseEncryptedField with no associated data.""" + with self.assert_raises_validation_error(code="no_associated_data"): + BaseEncryptedField(associated_data="") + + def test_init(self): + """BaseEncryptedField is constructed correctly.""" + assert self.field.associated_data == self.field_associated_data + assert self.field.db_column == self.field_associated_data + + def test_deconstruct(self): + """BaseEncryptedField is deconstructed correctly.""" + _, _, _, kwargs = self.field.deconstruct() + + assert kwargs["associated_data"] == self.field_associated_data + assert kwargs["db_column"] == self.field_associated_data + + # -------------------------------------------------------------------------- + # Django Model Field Integration Tests + # -------------------------------------------------------------------------- + + def test_contribute_to_class__invalid_model_base_class(self): + """Cannot contribute BaseEncryptedField to invalid model base class.""" + with self.assert_raises_validation_error( + code="invalid_model_base_class" + ): + # pylint: disable-next=unused-variable + class Model(models.Model): + field = self.field + + Meta = FakeModelMeta + + def test_contribute_to_class__already_registered(self): + """Cannot contribute BaseEncryptedField that is already registered.""" + with self.assert_raises_validation_error(code="already_registered"): + # pylint: disable-next=unused-variable + class Model(FakeEncryptedModel): + field = self.field + field2 = self.field + + Meta = FakeModelMeta + + def test_contribute_to_class(self): + """BaseEncryptedField is contributed to class correctly.""" + Model = self._get_model_class() # Assign fields to model. + self.assertListEqual(Model.ENCRYPTED_FIELDS, [self.field, self.field2]) + + def test_contribute_to_class__associated_data_already_used(self): + """ + Cannot contribute BaseEncryptedField with duplicate associated data. + """ + with self.assert_raises_validation_error( + code="associated_data_already_used" + ): + # pylint: disable-next=unused-variable + class Model(FakeEncryptedModel): + field = self.field + field2 = FakeEncryptedField( + associated_data=self.field_associated_data + ) + + Meta = FakeModelMeta + + # -------------------------------------------------------------------------- + # Encryption & Decryption Tests + # -------------------------------------------------------------------------- + + def test_bytes_to_value(self): + """bytes_to_value raises NotImplementedError.""" + with self.assertRaises(NotImplementedError): + # pylint: disable-next=expression-not-assigned + BaseEncryptedField[str](associated_data="test").bytes_to_value( + b"data" + ) + + def test_value_to_bytes(self): + """value_to_bytes raises NotImplementedError.""" + with self.assertRaises(NotImplementedError): + # pylint: disable-next=expression-not-assigned + BaseEncryptedField[str](associated_data="test").value_to_bytes( + "value" + ) + + def test_full_associated_data(self): + """Returns fully qualified associated data.""" + Model = self._get_model_class() + assert ( + self.field.full_associated_data + == f"{Model.associated_data}:{self.field_associated_data}".encode() + ) + + def test_decrypt_value(self): + """decrypt_value decrypts the given ciphertext.""" + # Create instance and mock shorthands. + instance = self._get_model_instance() + decrypt_mock: MagicMock = instance.dek_aead.decrypt + bytes_to_value_mock: MagicMock = self.field.bytes_to_value + + # When ciphertext is None, no decryption occurs. + decrypted_value = self.field.decrypt_value(instance, ciphertext=None) + assert decrypted_value is None + decrypt_mock.assert_not_called() + bytes_to_value_mock.assert_not_called() + + # When ciphertext is provided, decryption occurs. + ciphertext = FakeAead.encrypt(b"value") + decrypted_value = self.field.decrypt_value(instance, ciphertext) + decrypt_kwargs = { + "ciphertext": ciphertext, + "associated_data": self.field.full_associated_data, + } + decrypt_mock.assert_called_once_with(**decrypt_kwargs) + decrypted_bytes = decrypt_mock.side_effect(**decrypt_kwargs) + bytes_to_value_mock.assert_called_once_with(decrypted_bytes) + assert decrypted_value == bytes_to_value_mock.side_effect( + decrypted_bytes + ) + + def test_encrypt_value(self): + """encrypt_value encrypts the given plaintext.""" + # Create instance and mock shorthands. + instance = self._get_model_instance() + encrypt_mock: MagicMock = instance.dek_aead.encrypt + value_to_bytes_mock: MagicMock = self.field.value_to_bytes + + # When plaintext is None, no encryption occurs. + encrypted_bytes = self.field.encrypt_value(instance, plaintext=None) + assert encrypted_bytes is None + value_to_bytes_mock.assert_not_called() + encrypt_mock.assert_not_called() + + # When plaintext is provided, encryption occurs. + plaintext = self.field_default + encrypted_bytes = self.field.encrypt_value(instance, plaintext) + value_to_bytes_mock.assert_called_once_with(plaintext) + decrypted_bytes = value_to_bytes_mock.side_effect(plaintext) + encrypt_kwargs = { + "plaintext": decrypted_bytes, + "associated_data": self.field.full_associated_data, + } + encrypt_mock.assert_called_once_with(**encrypt_kwargs) + assert encrypted_bytes == encrypt_mock.side_effect(**encrypt_kwargs) + + # -------------------------------------------------------------------------- + # Descriptor Methods Tests + # -------------------------------------------------------------------------- + + def test_set__default(self): + """Setting field to default value stores pending encryption.""" + # Field must have a non-callable default for this test. + assert self.field.default is not None and not callable( + self.field.default + ) + # Field2 must have a callable default for this test. + assert self.field2.default is not None and callable(self.field2.default) + + instance = self._get_model_instance() + instance.assert_value_is_pending_encryption( + self.field, self.field_default + ) + instance.assert_value_is_pending_encryption( + self.field2, self.field2_default + ) + + def test_set__init(self): + """Setting field to initial value stores pending encryption.""" + value = "initial_value" + instance = self._get_model_instance(field=value) + instance.assert_value_is_pending_encryption(self.field, value) + + def test_set__none(self): + """Setting field to None stores None.""" + assert self.field.default is not None + instance = self._get_model_instance(field=None) + assert instance.get_stored_value(self.field) is None + + def test_set__trusted_ciphertext(self): + """Setting field to _TrustedCiphertext stores ciphertext directly.""" + trusted_ciphertext = _TrustedCiphertext(b"encrypted_value") + instance = self._get_model_instance(field=trusted_ciphertext) + assert instance.get_stored_value(self.field) is trusted_ciphertext + + def test_set__memoryview(self): + """Setting field to memoryview stores bytes directly.""" + memoryview_value = memoryview(b"byte_value") + instance = self._get_model_instance(field=memoryview_value) + trusted_ciphertext = instance.get_stored_value(self.field) + assert isinstance(trusted_ciphertext, _TrustedCiphertext) + assert trusted_ciphertext.ciphertext == memoryview_value.obj + + def test_set__memoryview__invalid_memoryview_type(self): + """Setting field to invalid memoryview type raises ValidationError.""" + value = bytearray(b"Hello") + memoryview_value = memoryview(value) + with self.assert_raises_validation_error( + code="invalid_memoryview_type" + ): + self._get_model_instance(field=memoryview_value) + + def test_set__new_value(self): + """ + Setting field to new value stores pending encryption and clears cache. + """ + assert self.field.default is not None + + value = "new_value" + assert self.field.default != value + + instance = self._get_model_instance() + + # Cache the value on the instance. + instance.__decrypted_values__[self.field.attname] = value + + # Clear cache by setting to new value. + instance.field = value + instance.assert_value_is_pending_encryption(self.field, value) + + # Ensure cached value is cleared. + assert self.field.attname not in instance.__decrypted_values__ + + def test_get__invalid_internal_value_type(self): + """ + Getting field with invalid internal value type raises ValidationError. + """ + instance = self._get_model_instance() + instance.set_stored_value(self.field, b"data") # Invalid type. + + with self.assert_raises_validation_error( + code="invalid_internal_value_type" + ): + _ = instance.field + + def test_get__descriptor(self): + """Getting field from class returns the descriptor.""" + Model = self._get_model_class() + assert isinstance(Model.field, BaseEncryptedField.descriptor_class) + assert Model.field.field == self.field + + def test_get__cached(self): + """Getting field when cached returns cached value.""" + instance = self._get_model_instance() + instance.set_stored_value(self.field, _TrustedCiphertext(b"irrelevant")) + + value = "decrypted_value" + instance.__decrypted_values__[self.field.attname] = value + assert instance.field == value + + def test_get__none(self): + """Getting field when stored value is None returns None.""" + instance = self._get_model_instance() + instance.set_stored_value(self.field, None) + + assert instance.field is None + self.field.decrypt_value.assert_not_called() + + def test_get__pending_encryption(self): + """ + Getting field when stored value is pending encryption returns value. + """ + instance = self._get_model_instance() + value = "decrypted_value" + pending_encryption = _PendingEncryption(value) + instance.set_stored_value(self.field, pending_encryption) + + assert instance.field == value + self.field.decrypt_value.assert_not_called() + + def test_get__decrypted_value(self): + """Getting field when stored value is ciphertext returns decrypted.""" + plaintext = "decrypted_value" + ciphertext = FakeAead.encrypt(plaintext.encode()) + + # Create instance with stored ciphertext. + instance = self._get_model_instance() + instance.set_stored_value(self.field, _TrustedCiphertext(ciphertext)) + # Ensure cache is not set initially. + assert self.field.attname not in instance.__decrypted_values__ + + # Get the field value, which should decrypt the ciphertext. + assert instance.field == plaintext + self.field.decrypt_value.assert_called_once_with(instance, ciphertext) + + # Ensure decrypted value is cached on the instance. + assert instance.__decrypted_values__[self.field.attname] == plaintext + + # -------------------------------------------------------------------------- + # pre_save Tests + # -------------------------------------------------------------------------- + + def test_pre_save__pending_encryption(self): + """pre_save encrypts pending encryption before saving.""" + # Create instance with pending encryption. + instance = self._get_model_instance() + pending_encryption = instance.get_stored_value(self.field) + assert isinstance(pending_encryption, _PendingEncryption) + + # Assert the value is encrypted in pre_save. + def assert_pre_save(result): + self.field.encrypt_value.assert_called_once_with( + instance, pending_encryption.value + ) + assert result == self.field.encrypt_value.side_effect( + instance, pending_encryption.value + ) + + # Run the save pipeline, interrupting at pre_save. + InterruptPipelineError.run( + test_case=self, + step_target=self.field, + step_attribute="pre_save", + assert_step=assert_pre_save, + pipeline=instance.save, + ) + + def test_pre_save__none(self): + """pre_save with no value does nothing.""" + # Create instance with no stored value. + instance = self._get_model_instance() + instance.set_stored_value(self.field, None) + + # Assert pre_save does nothing. + def assert_pre_save(result): + assert result is None + self.field.encrypt_value.assert_not_called() + + # Run the save pipeline, interrupting at pre_save. + InterruptPipelineError.run( + test_case=self, + step_target=self.field, + step_attribute="pre_save", + assert_step=assert_pre_save, + pipeline=instance.save, + ) + + def test_pre_save__trusted_ciphertext(self): + """pre_save with trusted ciphertext does nothing.""" + # Create instance with trusted ciphertext. + ciphertext = b"encrypted_value" + trusted_ciphertext = _TrustedCiphertext(ciphertext) + instance = self._get_model_instance(field=trusted_ciphertext) + + # Assert pre_save returns the ciphertext directly. + def assert_pre_save(result): + assert result == ciphertext + self.field.encrypt_value.assert_not_called() + + # Run the save pipeline, interrupting at pre_save. + InterruptPipelineError.run( + test_case=self, + step_target=self.field, + step_attribute="pre_save", + assert_step=assert_pre_save, + pipeline=instance.save, + ) + + def test_pre_save__invalid_value_type(self): + """pre_save with invalid value type raises ValidationError.""" + # Create instance with invalid stored value. + instance = self._get_model_instance() + instance.set_stored_value(self.field, b"data") # Invalid type. + + # Run the save pipeline, interrupting at pre_save. + with self.assert_raises_validation_error(code="invalid_value_type"): + instance.save() diff --git a/codeforlife/models/fields/data_encryption_key.py b/codeforlife/models/fields/data_encryption_key.py new file mode 100644 index 00000000..f4e71610 --- /dev/null +++ b/codeforlife/models/fields/data_encryption_key.py @@ -0,0 +1,195 @@ +""" +© Ocado Group +Created on 19/01/2026 at 09:57:19(+00:00). + +This field is responsible for storing the encrypted Data Encryption Key (DEK) +for a model instance. The actual lifecycle of the DEK, including its lazy +creation for new model instances, is managed by the `save()` method on the +`BaseDataEncryptionKeyModel`. + +The `contribute_to_class` method on `DataEncryptionKeyField` performs crucial +validations when the field is added to a model. It ensures that the model +inherits from the correct base class (`BaseDataEncryptionKeyModel`) and, most +importantly, it guarantees that a model can only have one +`DataEncryptionKeyField`. It does this by checking a class-level `_dek` +attribute; if this attribute is already set, it means another DEK field has +already been processed, and a `ValidationError` is raised. This prevents +developers from accidentally creating multiple encryption keys for a single +model instance, which would lead to ambiguity and potential data loss. +""" + +import typing as t +from dataclasses import dataclass + +from django.core.exceptions import ValidationError +from django.db.models import BinaryField +from django.utils.translation import gettext_lazy as _ + +from ...types import KwArgs +from ..base_data_encryption_key import BaseDataEncryptionKeyModel +from .deferred_attribute import DeferredAttribute + +AnyDataEncryptionKeyField = t.TypeVar( + "AnyDataEncryptionKeyField", bound="DataEncryptionKeyField" +) + + +@dataclass(frozen=True) +class _TrustedDek: + """A wrapper for a DEK that comes directly from the database.""" + + dek: bytes + + +class DataEncryptionKeyAttribute( + DeferredAttribute[ + AnyDataEncryptionKeyField, BaseDataEncryptionKeyModel, bytes + ], + t.Generic[AnyDataEncryptionKeyField], +): + """ + Descriptor for DataEncryptionKeyField that handles data shredding. + """ + + def __set__( + self, + instance, + value: t.Optional[_TrustedDek], # type: ignore[override] + ): + # Clear any cached DEK AEAD. + if instance.pk is not None and instance.pk in instance.DEK_AEAD_CACHE: + instance.DEK_AEAD_CACHE.pop(instance.pk, None) + + if isinstance(value, _TrustedDek): # From DB. + internal_value = value.dek + elif value is None: # Data is being shredded. + internal_value = None + else: + raise ValidationError( + "DataEncryptionKeyField can only be set to None.", + code="cannot_set_value", + ) + + super().__set__(instance, internal_value) + + +class DataEncryptionKeyField(BinaryField): + """ + A custom BinaryField to store an encrypted data encryption key (DEK). + """ + + model: t.Type[BaseDataEncryptionKeyModel] + + descriptor_class = DataEncryptionKeyAttribute + + # -------------------------------------------------------------------------- + # Construction & Deconstruction + # -------------------------------------------------------------------------- + + default_verbose_name = "data encryption key" + default_help_text = ( + "The encrypted data encryption key (DEK) for this model." + ) + + def set_init_kwargs(self, kwargs: KwArgs): + """Sets common init kwargs.""" + kwargs["editable"] = False # DEK should not be editable in admin forms + kwargs["null"] = True # Allow null for data shredding + kwargs.setdefault("verbose_name", _(self.default_verbose_name)) + kwargs.setdefault("help_text", _(self.default_help_text)) + + def __init__(self, **kwargs): + if kwargs.get("editable", False): + raise ValidationError( + "DataEncryptionKeyField cannot be editable.", + code="editable_not_allowed", + ) + if "default" in kwargs: + raise ValidationError( + "DataEncryptionKeyField cannot have a default value.", + code="default_not_allowed", + ) + if not kwargs.get("null", True): + raise ValidationError( + "DataEncryptionKeyField must allow null to support data" + " shredding.", + code="null_not_allowed", + ) + + self.set_init_kwargs(kwargs) + super().__init__(**kwargs) + + def deconstruct(self): + name, path, args, kwargs = super().deconstruct() + self.set_init_kwargs(kwargs) + return name, path, args, kwargs + + # -------------------------------------------------------------------------- + # Django Model Field Integration + # -------------------------------------------------------------------------- + + def contribute_to_class(self, cls, name, private_only=False): + super().contribute_to_class(cls, name, private_only) + + # Skip fake models used for migrations. + if cls.__module__ == "__fake__": + return + + # Ensure the model subclasses BaseDataEncryptionKeyModel. + if not issubclass(cls, BaseDataEncryptionKeyModel): + raise ValidationError( + f"'{cls.__module__}.{cls.__name__}' must subclass" + f" '{BaseDataEncryptionKeyModel.__module__}." + f"{BaseDataEncryptionKeyModel.__name__}'.", + code="invalid_model_base_class", + ) + + # Ensure only one DEK field per model. + if cls.DEK_FIELD is not None: + raise ValidationError( + f"'{cls.__module__}.{cls.__name__}' already has a" + " DataEncryptionKeyField defined.", + code="multiple_dek_fields_not_allowed", + ) + + # Set the class DEK field reference. + cls.DEK_FIELD = getattr(cls, self.name) + + # -------------------------------------------------------------------------- + # Descriptor Methods + # -------------------------------------------------------------------------- + + @t.overload # type: ignore[override] + def __get__( # Get the descriptor. + self, instance: None, owner: t.Any + ) -> DataEncryptionKeyAttribute[t.Self]: ... + + @t.overload + def __get__( # Get the value. + self, instance: BaseDataEncryptionKeyModel, owner: t.Any + ) -> t.Optional[bytes]: ... + + def __get__( # Actual implementation of __get__. + self, instance: t.Optional[BaseDataEncryptionKeyModel], owner: t.Any + ): + return t.cast( + t.Union[DataEncryptionKeyAttribute[t.Self], t.Optional[bytes]], + # pylint: disable-next=no-member + super().__get__(instance, owner), + ) + + # Can only be set to None to allow data shredding. + def __set__(self, instance: BaseDataEncryptionKeyModel, value: None): ... + + # pylint: disable-next=unused-argument + def from_db_value(self, value: t.Optional[bytes], expression, connection): + """ + Converts a value as returned by the database to a Python object. + We wrap the raw bytes in _TrustedDek to signal that this is an + existing DEK from the database, not new plaintext. + """ + if value is None: + return None + + # Wrap it so __set__ knows this is NOT new user input. + return _TrustedDek(value) diff --git a/codeforlife/models/fields/data_encryption_key_test.py b/codeforlife/models/fields/data_encryption_key_test.py new file mode 100644 index 00000000..42d8c3b8 --- /dev/null +++ b/codeforlife/models/fields/data_encryption_key_test.py @@ -0,0 +1,181 @@ +""" +© Ocado Group +Created on 19/01/2026 at 09:57:10(+00:00). +""" + +import typing as t + +from django.db import models + +from ...encryption import create_dek +from ...tests import TestCase +from ..base_data_encryption_key import BaseDataEncryptionKeyModel +from .data_encryption_key import DataEncryptionKeyField, _TrustedDek + +if t.TYPE_CHECKING: + from django_stubs_ext.db.models import TypedModelMeta +else: + TypedModelMeta = object + +# pylint: disable=missing-class-docstring +# pylint: disable=too-few-public-methods + + +class FakeModelMeta(TypedModelMeta): + """A fake Meta class for testing.""" + + app_label = "codeforlife.user" + + +class TestDataEncryptionKeyField(TestCase): + # -------------------------------------------------------------------------- + # Test Helper Methods + # -------------------------------------------------------------------------- + + def _get_model_class(self): + """Dynamically creates a Model subclass with a DEK field. + + This assigns self.field to the 'dek' attribute of the model. + """ + + class DekModel(BaseDataEncryptionKeyModel): + """A fake Model with a DEK field for testing.""" + + dek = self.field + + Meta = FakeModelMeta + + def set_dek_for_test(self): + """Sets the dek field for testing purposes.""" + self.__dict__[self.__class__.dek.field.attname] = create_dek() + + return DekModel + + def _get_model_instance(self, **kwargs): + """Gets an instance of the dynamically created model class.""" + return self._get_model_class()(**kwargs) + + def setUp(self): + # Casting as the field is not deferred in a model. + self.field = t.cast(DataEncryptionKeyField, DataEncryptionKeyField()) + self.field2 = t.cast(DataEncryptionKeyField, DataEncryptionKeyField()) + + # -------------------------------------------------------------------------- + # Construction & Deconstruction Tests + # -------------------------------------------------------------------------- + + def test_init__editable_not_allowed(self): + """Cannot create DataEncryptionKeyField with editable=True.""" + with self.assert_raises_validation_error(code="editable_not_allowed"): + DataEncryptionKeyField(editable=True) + + def test_init__default_not_allowed(self): + """Cannot create DataEncryptionKeyField with default value.""" + with self.assert_raises_validation_error(code="default_not_allowed"): + DataEncryptionKeyField(default=b"default_value") + + def test_init__null_allowed(self): + """Cannot create DataEncryptionKeyField with null=True.""" + with self.assert_raises_validation_error(code="null_not_allowed"): + DataEncryptionKeyField(null=False) + + def test_init(self): + """DataEncryptionKeyField is constructed correctly.""" + assert self.field.editable is False + assert self.field.null is True + assert ( + self.field.verbose_name + == DataEncryptionKeyField.default_verbose_name + ) + assert self.field.help_text == DataEncryptionKeyField.default_help_text + + def test_deconstruct(self): + """DataEncryptionKeyField is deconstructed correctly.""" + _, _, _, kwargs = self.field.deconstruct() + + assert kwargs["editable"] is False + assert kwargs["null"] is True + assert ( + kwargs["verbose_name"] + == DataEncryptionKeyField.default_verbose_name + ) + assert kwargs["help_text"] == DataEncryptionKeyField.default_help_text + + # -------------------------------------------------------------------------- + # Django Model Field Integration Tests + # -------------------------------------------------------------------------- + + def test_contribute_to_class__invalid_model_base_class(self): + """ + Cannot contribute DataEncryptionKeyField to invalid model base class. + """ + with self.assert_raises_validation_error( + code="invalid_model_base_class" + ): + # pylint: disable-next=unused-variable + class Model(models.Model): + field = self.field + + Meta = FakeModelMeta + + def test_contribute_to_class__multiple_dek_fields_not_allowed(self): + """ + Cannot contribute multiple DataEncryptionKeyFields to a model. + """ + with self.assert_raises_validation_error( + code="multiple_dek_fields_not_allowed" + ): + # pylint: disable-next=unused-variable + class Model(BaseDataEncryptionKeyModel): + dek1 = self.field + dek2 = self.field2 + + Meta = FakeModelMeta + + def test_contribute_to_class(self): + """DataEncryptionKeyField is contributed to model correctly.""" + with self.subTest("Class attribute set correctly"): + Model = self._get_model_class() + assert Model.DEK_FIELD == Model.dek + + with self.subTest("Instance attribute set correctly"): + instance = Model() + assert instance.DEK_FIELD == instance.dek + + # -------------------------------------------------------------------------- + # Descriptor Methods Tests + # -------------------------------------------------------------------------- + + def test_get__descriptor(self): + """Getting field from class returns the descriptor.""" + Model = self._get_model_class() + assert isinstance(Model.dek, DataEncryptionKeyField.descriptor_class) + assert Model.dek.field == self.field + + def test_get__value(self): + """Getting field from instance returns the DEK bytes.""" + instance = self._get_model_instance() + instance.set_dek_for_test() + dek_value = instance.dek + assert isinstance(dek_value, bytes) + assert dek_value == instance.__dict__["dek"] + + def test_set__default(self): + """Setting field to _TrustedDek sets to DEK bytes.""" + instance = self._get_model_instance() + trusted_dek = _TrustedDek(b"dek") + instance.dek = trusted_dek + assert trusted_dek.dek == instance.__dict__["dek"] + + def test_set__none(self): + """Setting field to None sets to None.""" + instance = self._get_model_instance() + instance.set_dek_for_test() + instance.dek = None + assert instance.__dict__["dek"] is None + + def test_set__cannot_set_value(self): + """Setting field to any value other than None or _TrustedDek raises.""" + instance = self._get_model_instance() + with self.assert_raises_validation_error(code="cannot_set_value"): + instance.dek = b"some_value" diff --git a/codeforlife/models/fields/deferred_attribute.py b/codeforlife/models/fields/deferred_attribute.py new file mode 100644 index 00000000..f4d13286 --- /dev/null +++ b/codeforlife/models/fields/deferred_attribute.py @@ -0,0 +1,43 @@ +""" +© Ocado Group +Created on 22/01/2026 at 13:43:46(+00:00). +""" + +import typing as t + +from django.db.models import Field, Model +from django.db.models.query_utils import DeferredAttribute as _DeferredAttribute + +AnyModel = t.TypeVar("AnyModel", bound=Model) +AnyField = t.TypeVar("AnyField", bound=Field) +T = t.TypeVar("T") + + +# pylint: disable-next=too-few-public-methods +class DeferredAttribute(_DeferredAttribute, t.Generic[AnyField, AnyModel, T]): + """Custom DeferredAttribute with type hints ref to the field.""" + + _field: AnyField + + @property + def field(self): + """Helper to get the field with the correct type.""" + # Mypy tries to be helpful here but fails to infer the correct type. + # Hence the cast. + return t.cast(AnyField, self._field) + + @field.setter + def field(self, value: AnyField): + self._field = value + + def __get__( + self, instance: t.Optional[AnyModel], cls=None # type: ignore[override] + ): + return t.cast( + t.Optional[T], + super().__get__(instance, cls), # type: ignore[misc] + ) + + def __set__(self, instance: AnyModel, value: t.Optional[T]): + # Set the internal value on the instance. + instance.__dict__[self.field.attname] = value diff --git a/codeforlife/models/fields/encrypted_text.py b/codeforlife/models/fields/encrypted_text.py new file mode 100644 index 00000000..8f8f96f8 --- /dev/null +++ b/codeforlife/models/fields/encrypted_text.py @@ -0,0 +1,16 @@ +""" +© Ocado Group +Created on 12/01/2026 at 09:17:46(+00:00). +""" + +from .base_encrypted import BaseEncryptedField + + +class EncryptedTextField(BaseEncryptedField[str]): + """An encrypted text field.""" + + def bytes_to_value(self, data): + return data.decode() + + def value_to_bytes(self, value): + return value.encode() diff --git a/codeforlife/models/fields/encrypted_text_test.py b/codeforlife/models/fields/encrypted_text_test.py new file mode 100644 index 00000000..eb14f12c --- /dev/null +++ b/codeforlife/models/fields/encrypted_text_test.py @@ -0,0 +1,25 @@ +""" +© Ocado Group +Created on 19/01/2026 at 09:57:24(+00:00). +""" + +from ...tests import TestCase +from .encrypted_text import EncryptedTextField + + +# pylint: disable-next=missing-class-docstring +class EncryptedTextFieldTestCase(TestCase): + def setUp(self): + self.field = EncryptedTextField(associated_data="field") + + def test_bytes_to_value(self): + """bytes_to_value decodes bytes to string.""" + data = b"hello world" + value = self.field.bytes_to_value(data) + assert value == data.decode() + + def test_value_to_bytes(self): + """value_to_bytes encodes string to bytes.""" + value = "hello world" + data = self.field.value_to_bytes(value) + assert data == value.encode() diff --git a/codeforlife/settings/__init__.py b/codeforlife/settings/__init__.py index 3b0d4193..0166da8d 100644 --- a/codeforlife/settings/__init__.py +++ b/codeforlife/settings/__init__.py @@ -16,5 +16,6 @@ from .custom import * from .django import * +from .google import * from .otp import * from .third_party import * diff --git a/codeforlife/settings/custom.py b/codeforlife/settings/custom.py index 294083ff..586209a5 100644 --- a/codeforlife/settings/custom.py +++ b/codeforlife/settings/custom.py @@ -126,21 +126,3 @@ def get_redis_url(): # The URL to connect to the Redis cache. REDIS_URL = get_redis_url() - -# Our Google OAuth 2.0 client credentials -# https://console.cloud.google.com/auth/clients -GOOGLE_CLIENT_ID = os.getenv( - "GOOGLE_CLIENT_ID", - "354656325390-o5n12nbaivhi4do8lalkh29q403uu9u4.apps.googleusercontent.com", -) -GOOGLE_CLIENT_SECRET = os.getenv("GOOGLE_CLIENT_SECRET", "REPLACE_ME") - -# The ID of our GCP project. -GOOGLE_CLOUD_PROJECT_ID = os.getenv( - "GOOGLE_CLOUD_PROJECT_ID", "decent-digit-629" -) - -# The ID of our BigQuery dataset. -GOOGLE_CLOUD_BIGQUERY_DATASET_ID = os.getenv( - "GOOGLE_CLOUD_BIGQUERY_DATASET_ID", "REPLACE_ME" -) diff --git a/codeforlife/settings/google.py b/codeforlife/settings/google.py new file mode 100644 index 00000000..37e160e3 --- /dev/null +++ b/codeforlife/settings/google.py @@ -0,0 +1,41 @@ +""" +© Ocado Group +Created on 19/01/2026 at 09:54:59(+00:00). + +This file contains all the variables required by Google Cloud Platform (GCP). +""" + +import os + +# Our Google OAuth 2.0 client credentials +# https://console.cloud.google.com/auth/clients +GOOGLE_CLIENT_ID = os.getenv( + "GOOGLE_CLIENT_ID", + "354656325390-o5n12nbaivhi4do8lalkh29q403uu9u4.apps.googleusercontent.com", +) +GOOGLE_CLIENT_SECRET = os.getenv("GOOGLE_CLIENT_SECRET", "REPLACE_ME") + +# The ID of our GCP project. +GOOGLE_CLOUD_PROJECT_ID = os.getenv( + "GOOGLE_CLOUD_PROJECT_ID", "decent-digit-629" +) + +# The ID of our BigQuery dataset. +GOOGLE_CLOUD_BIGQUERY_DATASET_ID = os.getenv( + "GOOGLE_CLOUD_BIGQUERY_DATASET_ID", "REPLACE_ME" +) + +# Key management service (KMS) +# https://docs.cloud.google.com/python/docs/reference/cloudkms/latest/summary_overview + +GCP_KMS_KEY_RING_LOCATION = os.getenv("GCP_KMS_KEY_RING_LOCATION", "REPLACE_ME") +GCP_KMS_KEY_RING_NAME = os.getenv("GCP_KMS_KEY_RING_NAME", "REPLACE_ME") +GCP_KMS_KEY_NAME = os.getenv("GCP_KMS_KEY_NAME", "REPLACE_ME") +# The URI of the KMS key encryption key (KEK). +GCP_KMS_KEY_URI = ( + "gcp-kms://" + f"projects/{GOOGLE_CLOUD_PROJECT_ID}/" + f"locations/{GCP_KMS_KEY_RING_LOCATION}/" + f"keyRings/{GCP_KMS_KEY_RING_NAME}/" + f"cryptoKeys/{GCP_KMS_KEY_NAME}" +) diff --git a/codeforlife/tests/__init__.py b/codeforlife/tests/__init__.py index 4a4ba069..185ecf84 100644 --- a/codeforlife/tests/__init__.py +++ b/codeforlife/tests/__init__.py @@ -9,6 +9,7 @@ from .api_client import APIClient, BaseAPIClient from .api_request_factory import APIRequestFactory, BaseAPIRequestFactory from .celery import CeleryTestCase +from .exceptions import InterruptPipelineError from .model import ModelTestCase from .model_list_serializer import ( BaseModelListSerializerTestCase, diff --git a/codeforlife/tests/exceptions.py b/codeforlife/tests/exceptions.py new file mode 100644 index 00000000..a15d488b --- /dev/null +++ b/codeforlife/tests/exceptions.py @@ -0,0 +1,61 @@ +""" +© Ocado Group +Created on 22/01/2026 at 11:19:31(+00:00). +""" + +import typing as t +from unittest.mock import patch + +if t.TYPE_CHECKING: + from ..types import Args, KwArgs + from .test import TestCase + + +class InterruptPipelineError(Exception): + """Custom exception to support the Interruption Pattern in tests. + + The Interruption Pattern is a testing technique used to verify that a + specific step in a complex pipeline is reached and executed correctly, + without allowing the pipeline to finish. It is useful when the final steps + have side effects you want to avoid (like writing to a database, sending an + email, or charging a credit card). + """ + + @classmethod + # pylint: disable-next=too-many-arguments + def run( + cls, + test_case: "TestCase", + step_target, + step_attribute: str, + assert_step: t.Callable[[t.Any], None], + pipeline: t.Callable[..., t.Any], + pipeline_args: t.Optional["Args"] = None, + pipeline_kwargs: t.Optional["KwArgs"] = None, + ): + """Run a pipeline, interrupting at a specified step. + + Args: + test_case: The test case instance to use for assertions. + step_target: The object containing the step method to patch. + step_attribute: The name of the step method to patch. + assert_step: A callable that asserts the step was reached correctly. + pipeline: The pipeline function to run. + pipeline_args: Positional arguments to pass to the pipeline. + pipeline_kwargs: Keyword arguments to pass to the pipeline. + """ + + # Get the original step method. + step = getattr(step_target, step_attribute) + assert callable(step) + + def side_effect(*step_args, **step_kwargs): + result = step(*step_args, **step_kwargs) # Call the original step. + assert_step(result) # Assert the step was reached correctly. + raise cls() # Interrupt the pipeline. + + # Patch the step method to include the side effect. + with patch.object(step_target, step_attribute, side_effect=side_effect): + # Run the pipeline and expect the interruption. + with test_case.assertRaises(cls): + pipeline(*(pipeline_args or ()), **(pipeline_kwargs or {})) diff --git a/codeforlife/tests/model.py b/codeforlife/tests/model.py index 541e9a2d..e79e93e8 100644 --- a/codeforlife/tests/model.py +++ b/codeforlife/tests/model.py @@ -28,6 +28,14 @@ def get_model_class(cls) -> t.Type[AnyModel]: """ return get_arg(cls, 0) + def get_model_instance(self, *args, **kwargs) -> AnyModel: + """Get an instance of the model. + + Returns: + An instance of the model. + """ + return self.get_model_class()(*args, **kwargs) + def assert_raises_integrity_error(self, *args, **kwargs): """Assert the code block raises an integrity error. @@ -94,3 +102,23 @@ def assert_get_queryset( if ordered and not queryset.ordered: queryset = queryset.order_by("pk") self.assertQuerySetEqual(queryset, values, ordered=ordered) + + def assert_check( + self, + error_id: str, + model_class: t.Optional[t.Type[AnyModel]] = None, + **kwargs, + ): + """Assert that the model check returns an error with the given ID. + + https://docs.djangoproject.com/en/5.1/topics/checks/#field-model-manager-template-engine-and-database-checks + + Args: + error_id: The check error ID to assert. + model_class: The model class to check. If None, uses the test case's + model class. + **kwargs: Additional kwargs to pass to the model's check() method. + """ + model_class = model_class or self.get_model_class() + errors = model_class.check(**kwargs) + assert any(error.id == error_id for error in errors) diff --git a/codeforlife/user/caches/google_oauth2_token.py b/codeforlife/user/caches/google_oauth2_token.py index aaa19d99..969ca2a6 100644 --- a/codeforlife/user/caches/google_oauth2_token.py +++ b/codeforlife/user/caches/google_oauth2_token.py @@ -8,7 +8,7 @@ import requests from django.conf import settings -from ...caches import BaseDynamicKeyValueCache +from ...caches import BaseDynamicKeyCache from ...types import OAuth2TokenFromRefreshDict from ..models import GoogleUser @@ -24,9 +24,7 @@ class GoogleOAuth2TokenCacheValue(t.TypedDict): class GoogleOAuth2TokenCache( - BaseDynamicKeyValueCache[ - GoogleOAuth2TokenCacheKey, GoogleOAuth2TokenCacheValue - ] + BaseDynamicKeyCache[GoogleOAuth2TokenCacheKey, GoogleOAuth2TokenCacheValue] ): """ Authorization to a user's Google account. The key is the user's ID. The diff --git a/codeforlife/user/fixtures/school_2.json b/codeforlife/user/fixtures/school_2.json index 2fd9b02a..86f3cfb1 100644 --- a/codeforlife/user/fixtures/school_2.json +++ b/codeforlife/user/fixtures/school_2.json @@ -41,7 +41,7 @@ "pk": 1, "fields": { "user": 25, - "token": "ENC:gAAAAABnt0DfBz0jtynO_NyzpsG_IClRWETapgBtPnZKR9GJnL7Yj68pG5mpfJwy9yn-fzfDYYL3Rxyv5Uzp6PPA-OoQyPN99A==" + "token": "ZmFrZV9lbmM6WVdGaFlXRmhZV0U9" } }, { @@ -49,7 +49,7 @@ "pk": 2, "fields": { "user": 25, - "token": "ENC:gAAAAABnt0Dv2ZTvAbtOdiAvJg8c-Y7QgikQmZFuzKXPbjfIw-Zc1aDF3Xy8_25vgZpQCpvO8m3X55zy7fTCaI7gLMpkK2Vnig==" + "token": "ZmFrZV9lbmM6WW1KaVltSmlZbUk9" } }, { @@ -57,7 +57,7 @@ "pk": 3, "fields": { "user": 25, - "token": "ENC:gAAAAABnt0D9eN0PiXVfqX820jjOo8NCjlQM4jjEwgOxkfGHNEGkTDoPN-MulsM7n70JdZ3Sh2vvbHGBZj3_DmFFeXxhmyOOtg==" + "token": "ZmFrZV9lbmM6WTJOalkyTmpZMk09" } }, { @@ -65,7 +65,7 @@ "pk": 4, "fields": { "user": 25, - "token": "ENC:gAAAAABnt0EITA69-SWr69FPOg6KMpE_6_lERFr7cIEi7SoECzUVETiFmVEhCAe5yurBqB2wTUuAgZ_H0LnwSH8QoeM3pJ8eKw==" + "token": "ZmFrZV9lbmM6WkdSa1pHUmtaR1E9" } }, { @@ -73,7 +73,7 @@ "pk": 5, "fields": { "user": 25, - "token": "ENC:gAAAAABnt0EThR1vJGD6uJ4hrtef0KY1RyfTB8x1dMvbxAOhW9L0Zm3LvhQfKmy-BnrOTddU9sYIFWTWj3ra65V8gKEG3lGHsA==" + "token": "ZmFrZV9lbmM6WldWbFpXVmxaV1U9" } }, { @@ -81,7 +81,7 @@ "pk": 6, "fields": { "user": 25, - "token": "ENC:gAAAAABnt0EfaRvyIDSy_ZLDx5ap-SHG2udtvhqOF2VEfpXac4rKrYI5-8zvPlMOLFXdMbbcJrQlI1NBAf_1WLbg2wD4cS8Lpg==" + "token": "ZmFrZV9lbmM6Wm1abVptWm1abVk9" } }, { @@ -89,7 +89,7 @@ "pk": 7, "fields": { "user": 25, - "token": "ENC:gAAAAABnt0EoS5Oz5XlpjpIlNyCtgA3QnclUu7fTH09fTP8HN7LtmRiPg6Ee7jxJIOPzO4pYuuZtYt7KBFjIfNwPcwCMbZS1VQ==" + "token": "ZmFrZV9lbmM6WjJkbloyZG5aMmM9" } }, { @@ -97,7 +97,7 @@ "pk": 8, "fields": { "user": 25, - "token": "ENC:gAAAAABnt0EwNEvH1nIoW934uo0mJkwSLe2OjLKLAiJwt1dMsnqkoVwUSbX9Rju2HZ_xNJ-gfVspZOvVqh1BXxSLjowmPLlt5A==" + "token": "ZmFrZV9lbmM6YUdob2FHaG9hR2c9" } }, { @@ -105,7 +105,7 @@ "pk": 9, "fields": { "user": 25, - "token": "ENC:gAAAAABnt0E8iBOFay2uIm5mV6MNmTYXSWHZuhUqx8cAeNtlhnoX9dQitrbZ-NiR2n8p-ZoTNzxOwAmrGr6l_9x1IAxl4KUv7g==" + "token": "ZmFrZV9lbmM6YVdscGFXbHBhV2s9" } }, { @@ -113,7 +113,7 @@ "pk": 10, "fields": { "user": 25, - "token": "ENC:gAAAAABnt0FI41vva7R8HDqec7XAYE5wP2o7tpnz7Qu_ZKpOtiMC2HUqOJNeUFtgJGLxF-6Iu_YZaAMxYzZ66xmlMNjgZ-HCzQ==" + "token": "ZmFrZV9lbmM6YW1wcWFtcHFhbW89" } }, { @@ -181,4 +181,4 @@ "teacher": 9 } } -] \ No newline at end of file +] diff --git a/codeforlife/user/migrations/0001_initial.py b/codeforlife/user/migrations/0001_initial.py index c48171a9..bba29e65 100644 --- a/codeforlife/user/migrations/0001_initial.py +++ b/codeforlife/user/migrations/0001_initial.py @@ -1,7 +1,15 @@ -# Generated by Django 5.1.10 on 2025-08-12 10:29 +# Generated by Django 5.1.15 on 2026-01-19 14:38 -import codeforlife.models.encrypted_char_field -import codeforlife.user.models.user +import codeforlife.models.fields.encrypted_text +import codeforlife.user.models.user.admin_school_teacher +import codeforlife.user.models.user.contactable +import codeforlife.user.models.user.google +import codeforlife.user.models.user.independent +import codeforlife.user.models.user.non_admin_school_teacher +import codeforlife.user.models.user.non_school_teacher +import codeforlife.user.models.user.school_teacher +import codeforlife.user.models.user.student +import codeforlife.user.models.user.teacher import django.contrib.auth.models import django.db.models.deletion from django.db import migrations, models @@ -13,7 +21,7 @@ class Migration(migrations.Migration): dependencies = [ ("auth", "0012_alter_user_first_name_max_length"), - ("common", "0057_teacher_teacher__is_admin"), + ("common", "0058_userprofile_google_refresh_token_and_more"), ] operations = [ @@ -107,9 +115,10 @@ class Migration(migrations.Migration): ), ( "token", - codeforlife.models.encrypted_char_field.EncryptedCharField( + codeforlife.models.fields.encrypted_text.EncryptedTextField( + associated_data="token", + db_column="token", help_text="The encrypted equivalent of the token.", - max_length=104, verbose_name="token", ), ), @@ -163,7 +172,10 @@ class Migration(migrations.Migration): }, bases=("user.user",), managers=[ - ("objects", codeforlife.user.models.user.ContactableUserManager()), + ( + "objects", + codeforlife.user.models.user.contactable.ContactableUserManager(), + ), ], ), migrations.CreateModel( @@ -176,7 +188,7 @@ class Migration(migrations.Migration): }, bases=("user.user",), managers=[ - ("objects", codeforlife.user.models.user.StudentUserManager()), + ("objects", codeforlife.user.models.user.student.StudentUserManager()), ], ), migrations.CreateModel( @@ -242,7 +254,7 @@ class Migration(migrations.Migration): }, bases=("user.contactableuser",), managers=[ - ("objects", codeforlife.user.models.user.GoogleUserManager()), + ("objects", codeforlife.user.models.user.google.GoogleUserManager()), ], ), migrations.CreateModel( @@ -255,7 +267,10 @@ class Migration(migrations.Migration): }, bases=("user.contactableuser",), managers=[ - ("objects", codeforlife.user.models.user.IndependentUserManager()), + ( + "objects", + codeforlife.user.models.user.independent.IndependentUserManager(), + ), ], ), migrations.CreateModel( @@ -268,7 +283,7 @@ class Migration(migrations.Migration): }, bases=("user.contactableuser",), managers=[ - ("objects", codeforlife.user.models.user.TeacherUserManager()), + ("objects", codeforlife.user.models.user.teacher.TeacherUserManager()), ], ), migrations.CreateModel( @@ -281,7 +296,10 @@ class Migration(migrations.Migration): }, bases=("user.teacheruser",), managers=[ - ("objects", codeforlife.user.models.user.NonSchoolTeacherUserManager()), + ( + "objects", + codeforlife.user.models.user.non_school_teacher.NonSchoolTeacherUserManager(), + ), ], ), migrations.CreateModel( @@ -294,7 +312,10 @@ class Migration(migrations.Migration): }, bases=("user.teacheruser",), managers=[ - ("objects", codeforlife.user.models.user.SchoolTeacherUserManager()), + ( + "objects", + codeforlife.user.models.user.school_teacher.SchoolTeacherUserManager(), + ), ], ), migrations.CreateModel( @@ -309,7 +330,7 @@ class Migration(migrations.Migration): managers=[ ( "objects", - codeforlife.user.models.user.AdminSchoolTeacherUserManager(), + codeforlife.user.models.user.admin_school_teacher.AdminSchoolTeacherUserManager(), ), ], ), @@ -325,7 +346,7 @@ class Migration(migrations.Migration): managers=[ ( "objects", - codeforlife.user.models.user.NonAdminSchoolTeacherUserManager(), + codeforlife.user.models.user.non_admin_school_teacher.NonAdminSchoolTeacherUserManager(), ), ], ), diff --git a/codeforlife/user/models/otp_bypass_token.py b/codeforlife/user/models/otp_bypass_token.py index db4a4438..504024f3 100644 --- a/codeforlife/user/models/otp_bypass_token.py +++ b/codeforlife/user/models/otp_bypass_token.py @@ -12,7 +12,8 @@ from django.utils.crypto import get_random_string from django.utils.translation import gettext_lazy as _ -from ...models import EncryptedCharField +from ...models import EncryptedModel +from ...models.fields import EncryptedTextField from ...types import Validators from ...validators import CharSetValidatorBuilder from .user import User @@ -23,9 +24,10 @@ TypedModelMeta = object -class OtpBypassToken(models.Model): +class OtpBypassToken(EncryptedModel): """A single use token to bypass a user's OTP authentication factor.""" + associated_data = "otp_bypass_token" length = 8 allowed_chars = string.ascii_lowercase max_count = 10 @@ -39,7 +41,7 @@ class OtpBypassToken(models.Model): ] # pylint: disable-next=missing-class-docstring,too-few-public-methods - class Manager(models.Manager["OtpBypassToken"]): + class Manager(EncryptedModel.Manager["OtpBypassToken"]): def bulk_create(self, user: User): # type: ignore[override] """Bulk create OTP-bypass tokens. @@ -60,11 +62,15 @@ def bulk_create(self, user: User): # type: ignore[override] user.otp_bypass_tokens.all().delete() - return super().bulk_create( - [OtpBypassToken(user=user, token=token) for token in tokens] - ) + otp_bypass_tokens = [ + OtpBypassToken(user=user, token=token) for token in tokens + ] + for otp_bypass_token in otp_bypass_tokens: + otp_bypass_token.save() - objects: Manager = Manager() + return otp_bypass_tokens + + objects: Manager = Manager() # type: ignore[assignment] user = models.ForeignKey( User, @@ -72,9 +78,9 @@ def bulk_create(self, user: User): # type: ignore[override] on_delete=models.CASCADE, ) - token = EncryptedCharField( - _("token"), - max_length=100, + token = EncryptedTextField( + associated_data="token", + verbose_name=_("token"), help_text=_("The encrypted equivalent of the token."), ) @@ -82,6 +88,10 @@ class Meta(TypedModelMeta): verbose_name = _("OTP bypass token") verbose_name_plural = _("OTP bypass tokens") + @property + def dek_aead(self): + return self.user.userprofile.dek_aead + def save(self, *args, **kwargs): raise IntegrityError("Cannot create or update a single instance.") diff --git a/codeforlife/user/models/otp_bypass_token_test.py b/codeforlife/user/models/otp_bypass_token_test.py index 75d006d4..a2d2d969 100644 --- a/codeforlife/user/models/otp_bypass_token_test.py +++ b/codeforlife/user/models/otp_bypass_token_test.py @@ -35,6 +35,7 @@ def test_objects__bulk_create(self): assert len(otp_bypass_tokens) == self.user.otp_bypass_tokens.count() for otp_bypass_token in otp_bypass_tokens: + assert otp_bypass_token.token is not None assert len(otp_bypass_token.token) == OtpBypassToken.length assert all( char in OtpBypassToken.allowed_chars diff --git a/docs/client-side-encryption.md b/docs/client-side-encryption.md new file mode 100644 index 00000000..144f2e8b --- /dev/null +++ b/docs/client-side-encryption.md @@ -0,0 +1,413 @@ +# Client-Side Encryption + +Client-Side Encryption with Per-User Keys and Django ORM Integration + +--- + +## 1. Executive Summary + +This architecture implements **Application-Layer Encryption** (often called **Client-Side Encryption** relative to the database) using a **Per-User Key** strategy. + +Instead of relying on a single global key (which is a single point of failure), every user in the system is assigned a unique **Data Encryption Key (DEK)**. This key wraps their specific data. These DEKs are themselves encrypted by a master **Key Encryption Key (KEK)** managed by **Google Cloud KMS**. + +**Security Guarantee:** The database (PostgreSQL) never sees plaintext data or the plaintext keys required to decrypt it. A database leak results in zero data compromise without also compromising the running application server and Google Cloud credentials. + +This document outlines an approach that seamlessly integrates this encryption strategy into the **Django ORM**, making the encryption and decryption of data transparent to the developer. + +--- + +## 2. Architecture Overview + +### The "Two-Key" Envelope System + +We utilize a hierarchy of keys to balance security and performance. + +1. **KEK (Key Encryption Key):** + * **Location:** Google Cloud KMS (Hardware Security Module). + * **Role:** The "Master Lock." It never leaves Google. It is used only to encrypt/decrypt the User Keys (DEKs). +2. **DEK (Data Encryption Key):** + * **Location:** Encrypted in the database (e.g., in the `users` table); Decrypted only in Application Memory (RAM). + * **Role:** The "Worker Bee." Unique to every user. Used to encrypt/decrypt the actual database fields (e.g., SSNs, names). + +--- + +## 3. Encryption/Decryption Utilities + +At the core of this system are a few utility functions that interact with Google Cloud KMS and the `tink` cryptography library. In addition, to avoid a dependency on Google Cloud KMS during local development and in CI/CD pipelines, we use fake (mock) implementations of the KMS client and its AEAD primitive. + +**[codeforlife/encryption.py](../codeforlife/encryption.py)** + +--- + +## 4. Django ORM Integration + +To make working with encrypted data seamless, we've integrated the encryption logic directly into Django's ORM. This is achieved through a combination of a base model class, custom model fields, and descriptors. + +### Associated Data for Integrity + +A core principle of this architecture is the use of **Associated Data** to ensure the integrity of encrypted values. Authenticated Encryption with Associated Data (AEAD) algorithms, like the AES-GCM we use, bind a piece of ciphertext to a specific context. This means that ciphertext encrypted in one context cannot be decrypted in another, which prevents certain attacks like swapping encrypted values between different database columns or rows. + +To achieve this, we enforce the use of an `associated_data` string at two levels: + +1. **Model-level:** Every `EncryptedModel` subclass must define a unique `associated_data` string. This scopes all encrypted fields within that model. +2. **Field-level:** Every `BaseEncryptedField` instance must be initialized with its own `associated_data` string, which must be unique within that model. + +These two strings are combined to create a fully qualified identifier that is passed to the encryption and decryption functions. This provides two critical layers of integrity: + +1. **Field-Level Integrity:** Consider a `Balance` model with two encrypted fields, `debit` and `credit`. The field-level AD ensures their values cannot be swapped. The AD for each would be `"balance:debit"` and `"balance:credit"`. An encrypted debit value cannot be moved to the credit column, as the decryption would fail due to the AD mismatch. + +2. **Model-Level Integrity:** Now consider two different models, `Debit` and `Credit`, each with an encrypted `balance` field. The model-level AD prevents swapping values between them. The AD would be `"debit:balance"` and `"credit:balance"`. An encrypted balance from a `Debit` instance cannot be moved to a `Credit` instance, again because the AD would not match during decryption. + +### Implementation + +The implementation details can be found in the docstring of these files. It's recommended you read them in the following order. + +1. **[codeforlife/models/encrypted.py](../codeforlife/models/encrypted.py):** This is the base class for any model that will contain encrypted fields. +1. **[codeforlife/models/fields/base_encrypted.py](../codeforlife/models/fields/base_encrypted.py):** This is where the core logic of transparent encryption and decryption happens. +1. **[codeforlife/models/fields/encrypted_text.py](../codeforlife/models/fields/encrypted_text.py):** A concrete encrypted text field which subclasses `BaseEncryptedField`. +1. **[codeforlife/models/base_data_encryption_key.py](../codeforlife/models/base_data_encryption_key.py):** This abstract model brings the `EncryptedModel` and `DataEncryptionKeyField` together. +1. **[codeforlife/models/data_encryption_key.py](../codeforlife/models/data_encryption_key.py):** This model inherits from `BaseDataEncryptionKeyModel` and conveniently includes the `dek` field by default. +1. **[codeforlife/models/fields/data_encryption_key.py](../codeforlife/models/fields/data_encryption_key.py):** This field is responsible for managing the lifecycle of a DEK for a model instance. + +--- + +## 5. Usage Patterns + +Here are two common patterns for using the encryption framework. + +### Pattern 1: Self-Contained Encrypted Model + +This is the simplest pattern. The model inherits from `DataEncryptionKeyModel`, which means it manages its own DEK and can have one or more encrypted fields. This is ideal for models that represent a primary entity, like a `User`. + +```python +class User(DataEncryptionKeyModel): + """ + A user model with an encrypted email. Because it inherits from + DataEncryptionKeyModel, it automatically gets a 'dek' field to manage + its own encryption key. + """ + associated_data = "user" # Required for EncryptedModel + + username = models.CharField(max_length=150, unique=True) + email = EncryptedTextField(associated_data="email") + + class Meta: + app_label = "auth" + +# --- Usage --- + +# Create a new user. A new DEK is automatically generated and stored in the +# 'dek' field. The 'email' field is encrypted using this key. +user = User.objects.create( + username="johndoe", + email="john.doe@example.com" +) + +# The 'dek' and 'email' fields are stored as encrypted bytes in the database. +# But when we access the 'email' attribute, it's decrypted automatically. +print(f"User's email: {user.email}") +# >>> User's email: john.doe@example.com + +# You can update the email as you would with a normal field. +user.email = "john.doe.new@example.com" +user.save() +``` + +### Pattern 2: Delegated Encryption Key + +Sometimes, you have a model whose data should be encrypted under another object's key. For example, a `Secret` that belongs to a `User`. The `Secret` itself doesn't need its own DEK; it should be encrypted with the `User`'s DEK. + +In this case, the model inherits directly from `EncryptedModel` and must implement the `dek_aead` property to point to the key provider (the `User` model in this case). + +```python +class Secret(EncryptedModel): + """ + A model that stores a secret value. It does not have its own DEK. + Instead, it relies on the related User's DEK for encryption. + """ + associated_data = "secret" # Required for EncryptedModel + + user = models.ForeignKey(User, on_delete=models.CASCADE, related_name="secrets") + secret_value = EncryptedTextField(associated_data="secret-value") + + class Meta: + app_label = "app" + + @property + def dek_aead(self) -> Aead: + """ + This model delegates encryption to the associated user's DEK. + The `dek_aead` property is implemented to return the user's AEAD primitive. + """ + return self.user.dek_aead + +# --- Usage --- + +# Assume 'user' is the User instance created in the previous example. +secret = Secret.objects.create( + user=user, + secret_value="my-super-secret-password" +) + +# The 'secret_value' is encrypted using the DEK from the 'user' object. +# When accessed, it's decrypted using the same key. +print(f"The secret is: {secret.secret_value}") +# >>> The secret is: my-super-secret-password +``` + +--- + +## 6. Sequence Diagrams + +This section contains diagrams that explain what the Django ORM is doing. + +### 1. DEK Generation and Initial Save + +This diagram shows the process that occurs when a new `EncryptedModel` instance (e.g., a `User`) is created and saved for the first time. The `EncryptedModel.save()` method is overridden to lazily manage the creation of the user-specific DEK. + +```mermaid +sequenceDiagram + actor Developer + participant User as User (EncryptedModel) + participant EncryptedModel as EncryptedModel (Base Class) + participant GcpKmsClient as Tink/KMS Client + participant PostgreSQL + + Developer->>User: user = User(name="test") + Developer->>User: user.save() + User->>EncryptedModel: save() + activate EncryptedModel + + Note over EncryptedModel, GcpKmsClient: If self.pk is None (new user) + EncryptedModel->>GcpKmsClient: create_dek() + activate GcpKmsClient + GcpKmsClient-->>EncryptedModel: Returns new encrypted DEK + deactivate GcpKmsClient + + EncryptedModel->>User: self.dek = encrypted_dek + Note over User, PostgreSQL: The model's save() method is called via super() + User->>PostgreSQL: INSERT INTO users (name, dek) + PostgreSQL-->>User: Returns + deactivate EncryptedModel +``` + +### 2. Data Encryption + +This diagram illustrates what happens when a developer sets a value on an encrypted field. The `EncryptedAttribute` descriptor intercepts the assignment, wrapping the plaintext value in a `_PendingEncryption` object. The actual encryption happens later, just before the model is saved to the database. + +```mermaid +sequenceDiagram + actor Developer + participant User as User (Model Instance) + participant EncryptedAttribute as EncryptedAttribute (Descriptor) + participant _PendingEncryption as _PendingEncryption (Wrapper) + participant BaseEncryptedField as BaseEncryptedField + participant GcpKmsClient as Tink/KMS Client + participant PostgreSQL + + Developer->>User: user.ssn = "123-456-7890" + User->>EncryptedAttribute: __set__(user, "123-456-7890") + activate EncryptedAttribute + EncryptedAttribute->>_PendingEncryption: Create(value="123-456-7890") + _PendingEncryption-->>EncryptedAttribute: Returns _PendingEncryption instance + Note left of EncryptedAttribute: Caches are cleared + EncryptedAttribute->>User: instance.__dict__["ssn"] = _PendingEncryption(...) + deactivate EncryptedAttribute + + Developer->>User: user.save() + User->>BaseEncryptedField: pre_save(user, True) + activate BaseEncryptedField + Note right of BaseEncryptedField: Checks for _PendingEncryption + BaseEncryptedField->>GcpKmsClient: Decrypt user's DEK + GcpKmsClient-->>BaseEncryptedField: Returns plaintext DEK + BaseEncryptedField->>GcpKmsClient: encrypt(value, associated_data) + GcpKmsClient-->>BaseEncryptedField: Returns ciphertext + BaseEncryptedField-->>User: Returns ciphertext + deactivate BaseEncryptedField + User->>PostgreSQL: UPDATE users SET ssn=ciphertext + PostgreSQL-->>User: Returns +``` + +### 3. Data Decryption + +This diagram shows the process of reading an encrypted value from a model instance. The `EncryptedAttribute` descriptor checks an in-memory cache for the decrypted value first. If it's not cached, it decrypts the ciphertext from the database and populates the cache. + +```mermaid +sequenceDiagram + actor Developer + participant User as User (Model Instance) + participant EncryptedAttribute as EncryptedAttribute (Descriptor) + participant GcpKmsClient as Tink/KMS Client + + Developer->>User: print(user.ssn) + User->>EncryptedAttribute: __get__(user) + activate EncryptedAttribute + + Note over User, EncryptedAttribute: Check instance.__decrypted_values__ cache + alt Value is cached + EncryptedAttribute-->>Developer: return instance.__decrypted_values__[field_name] + else Value is not cached + EncryptedAttribute->>User: Get ciphertext from instance.__dict__ + User-->>EncryptedAttribute: Returns ciphertext + EncryptedAttribute->>GcpKmsClient: Decrypt user's DEK + GcpKmsClient-->>EncryptedAttribute: Returns plaintext DEK + EncryptedAttribute->>GcpKmsClient: decrypt(ciphertext, associated_data) + GcpKmsClient-->>EncryptedAttribute: Returns plaintext value + EncryptedAttribute->>User: instance.__decrypted_values__[field_name] = plaintext_value + EncryptedAttribute-->>Developer: return plaintext_value + end + deactivate EncryptedAttribute +``` + +### 4. Data Shredding + +Data shredding is achieved by nullifying the user's encrypted DEK. Once the key is gone, the data associated with it is rendered permanently unrecoverable. + +```mermaid +sequenceDiagram + actor Developer + participant User as User (Model Instance) + participant PostgreSQL + + Developer->>User: user.encrypted_dek = None + Developer->>User: user.save() + User->>PostgreSQL: UPDATE users SET encrypted_dek=NULL WHERE id=... + PostgreSQL-->>User: Returns + + Note over Developer, PostgreSQL: The user's data (e.g., SSN) is now permanently unrecoverable. +``` + +### 5. Encrypted Model and Field Initialization + +This diagram shows how an encrypted model and its fields are initialized and validated when Django starts up. + +```mermaid +sequenceDiagram + participant Django as Django Startup + participant EncryptedModel as EncryptedModel (Class) + participant BaseEncryptedField as BaseEncryptedField + + Django->>EncryptedModel: check() + activate EncryptedModel + + Note over EncryptedModel: 1. Validate `associated_data` + alt `associated_data` is not defined, not a string, or empty + EncryptedModel-->>Django: raise Error + end + + Note over EncryptedModel: 2. Check `associated_data` uniqueness + alt `associated_data` is used by another model + EncryptedModel-->>Django: raise Error + end + + Note over EncryptedModel: 3. Validate Manager + alt `objects` is not a subclass of `EncryptedModel.Manager` + EncryptedModel-->>Django: raise Error + end + deactivate EncryptedModel + + Django->>BaseEncryptedField: contribute_to_class(EncryptedModel, "ssn") + activate BaseEncryptedField + + Note over BaseEncryptedField: 4. Validate Model Subclass + alt issubclass(Model, EncryptedModel) is False + BaseEncryptedField-->>Django: raise ValidationError + end + + Note over BaseEncryptedField: 5. Check for Duplicate Fields + alt "ssn" is already in Model.ENCRYPTED_FIELDS + BaseEncryptedField-->>Django: raise ValidationError + end + + Note over BaseEncryptedField: 6. Check for Duplicate Associated Data + alt "model:ssn" is already used by another field + BaseEncryptedField-->>Django: raise ValidationError + end + + Note over BaseEncryptedField: 7. Register Field + BaseEncryptedField->>EncryptedModel: Model.ENCRYPTED_FIELDS.append(self) + + deactivate BaseEncryptedField +``` + +### 6. DEK Model and Field Initialization + +This diagram details the validation that occurs when a `DataEncryptionKeyField` is added to a model. The field's `contribute_to_class` method ensures that the model is a valid `BaseDataEncryptionKeyModel` and that it contains only one DEK field. + +```mermaid +sequenceDiagram + participant Django as Django Startup + participant DataEncryptionKeyField as DataEncryptionKeyField + participant BaseDataEncryptionKeyModel as BaseDataEncryptionKeyModel (Class) + + Django->>DataEncryptionKeyField: contribute_to_class(Model, "dek") + activate DataEncryptionKeyField + + Note over DataEncryptionKeyField: 1. Validate Model Subclass + alt issubclass(Model, BaseDataEncryptionKeyModel) is False + DataEncryptionKeyField-->>Django: raise ValidationError + end + + Note over DataEncryptionKeyField: 2. Check for multiple DEK fields + alt Model.DEK_FIELD is not None + DataEncryptionKeyField-->>Django: raise ValidationError + end + + Note over DataEncryptionKeyField: 3. Register Field on Model + DataEncryptionKeyField->>BaseDataEncryptionKeyModel: Model.DEK_FIELD = self + + deactivate DataEncryptionKeyField +``` + +### 7. DEK AEAD Caching + +To minimize latency and cost associated with decrypting the Data Encryption Key (DEK) via GCP KMS, the system employs an in-memory, time-to-live (TTL) cache for the AEAD primitive (`DEK_AEAD_CACHE`). + +The following diagrams illustrate the two main caching flows: retrieval (cache hit/miss) and invalidation. + +#### 7.1. Cache Retrieval (Hit & Miss) + +This diagram shows how the `dek_aead` property on a `BaseDataEncryptionKeyModel` instance leverages the cache. A cache hit returns the stored AEAD primitive immediately, while a cache miss triggers a call to GCP KMS to decrypt the key, which is then cached for subsequent requests. + +```mermaid +sequenceDiagram + participant User + participant EncryptedField as EncryptedField (__get__) + participant DEKEnabledModel as DEK-Enabled Model + participant DEK_AEAD_CACHE as TTLCache (in-memory) + participant KMS as Google Cloud KMS + + User->>+EncryptedField: Accesses encrypted attribute + EncryptedField->>+DEKEnabledModel: Accesses `dek_aead` property + alt Cache Miss + DEKEnabledModel->>DEK_AEAD_CACHE: Check for cached AEAD primitive (not found) + DEKEnabledModel->>+KMS: Decrypt DEK using KEK + KMS-->>-DEKEnabledModel: Returns AEAD primitive + DEKEnabledModel->>DEK_AEAD_CACHE: Store AEAD primitive with instance PK + else Cache Hit + DEKEnabledModel->>DEK_AEAD_CACHE: Check for cached AEAD primitive (found) + DEK_AEAD_CACHE-->>DEKEnabledModel: Return AEAD primitive + end + DEKEnabledModel-->>-EncryptedField: Return AEAD primitive + EncryptedField->>EncryptedField: Decrypts data using AEAD primitive + EncryptedField-->>-User: Returns decrypted value +``` + +#### 7.2. Cache Invalidation + +The cache must be invalidated whenever the underlying DEK changes to prevent the use of stale keys. This happens automatically when the `DataEncryptionKeyField` is set, for example, during a data shredding operation where the key is set to `None`. + +```mermaid +sequenceDiagram + participant User + participant DEKEnabledModel as DEK-Enabled Model + participant DataEncryptionKeyAttribute as DataEncryptionKeyAttribute (__set__) + participant DEK_AEAD_CACHE as TTLCache (in-memory) + + User->>+DEKEnabledModel: Shred data (e.g., `instance.dek = None`) + DEKEnabledModel->>+DataEncryptionKeyAttribute: Sets `dek` field to new value + DataEncryptionKeyAttribute->>+DEK_AEAD_CACHE: Delete cached AEAD for instance PK + DEK_AEAD_CACHE-->>-DataEncryptionKeyAttribute: + DataEncryptionKeyAttribute->>DEKEnabledModel: Update `dek` value in `__dict__` + DEKEnabledModel-->>-User: +```