diff --git a/Pipfile b/Pipfile index 2916f5d..4da785d 100644 --- a/Pipfile +++ b/Pipfile @@ -35,7 +35,7 @@ codeforlife-portal = "==8.9.9" # TODO: remove rapid-router = "==7.6.10" # TODO: remove phonenumbers = "==8.12.12" # TODO: remove google-auth = "==2.40.3" -google-cloud-storage = "==3.4.0" +google-cloud-bigquery = "==3.38.0" [dev-packages] celery-types = "==0.23.0" diff --git a/Pipfile.lock b/Pipfile.lock index 971205b..d03b579 100644 --- a/Pipfile.lock +++ b/Pipfile.lock @@ -1,7 +1,7 @@ { "_meta": { "hash": { - "sha256": "d6fba6b8d21808ea6a5b67532ea3d09dea92c22c4b79762e6354c47336b9346a" + "sha256": "3ac74425cc8f71710c8ee76591fcc54fa4d073c09dfc058fe9b77ad5c6e22e24" }, "pipfile-spec": 6, "requires": { @@ -579,6 +579,9 @@ "version": "==2.2.1" }, "google-api-core": { + "extras": [ + "grpc" + ], "hashes": [ "sha256:2b405df02d68e68ce0fbc138559e6036559e685159d148ae5861013dc201baf8", "sha256:4021b0f8ceb77a6fb4de6fde4502cecab45062e66ff4f2895169e0b35bc9466c" @@ -595,6 +598,15 @@ "markers": "python_version >= '3.7'", "version": "==2.40.3" }, + "google-cloud-bigquery": { + "hashes": [ + "sha256:8afcb7116f5eac849097a344eb8bfda78b7cfaae128e60e019193dd483873520", + "sha256:e06e93ff7b245b239945ef59cb59616057598d369edac457ebf292bd61984da6" + ], + "index": "pypi", + "markers": "python_version >= '3.9'", + "version": "==3.38.0" + }, "google-cloud-core": { "hashes": [ "sha256:67d977b41ae6c7211ee830c7912e41003ea8194bff15ae7d72fd6f51e57acabc", @@ -603,15 +615,6 @@ "markers": "python_version >= '3.7'", "version": "==2.5.0" }, - "google-cloud-storage": { - "hashes": [ - "sha256:16eeca305e4747a6871f8f7627eef3b862fdd365b872ca74d4a89e9841d0f8e8", - "sha256:4c77ec00c98ccc6428e4c39404926f41e2152f48809b02af29d5116645c3c317" - ], - "index": "pypi", - "markers": "python_version >= '3.7'", - "version": "==3.4.0" - }, "google-crc32c": { "hashes": [ "sha256:0f99eaa09a9a7e642a61e06742856eec8b19fc0037832e03f941fe7cf0c8e4db", @@ -668,6 +671,81 @@ "markers": "python_version >= '3.7'", "version": "==1.72.0" }, + "grpcio": { + "hashes": [ + "sha256:035d90bc79eaa4bed83f524331d55e35820725c9fbb00ffa1904d5550ed7ede3", + "sha256:04bbe1bfe3a68bbfd4e52402ab7d4eb59d72d02647ae2042204326cf4bbad280", + "sha256:063065249d9e7e0782d03d2bca50787f53bd0fb89a67de9a7b521c4a01f1989b", + "sha256:06c3d6b076e7b593905d04fdba6a0525711b3466f43b3400266f04ff735de0cd", + "sha256:08caea849a9d3c71a542827d6df9d5a69067b0a1efbea8a855633ff5d9571465", + "sha256:0aaa82d0813fd4c8e589fac9b65d7dd88702555f702fb10417f96e2a2a6d4c0f", + "sha256:0b7604868b38c1bfd5cf72d768aedd7db41d78cb6a4a18585e33fb0f9f2363fd", + "sha256:0c37db8606c258e2ee0c56b78c62fc9dee0e901b5dbdcf816c2dd4ad652b8b0c", + "sha256:1c9b93f79f48b03ada57ea24725d83a30284a012ec27eab2cf7e50a550cbbbcc", + "sha256:2107b0c024d1b35f4083f11245c0e23846ae64d02f40b2b226684840260ed054", + "sha256:2229ae655ec4e8999599469559e97630185fdd53ae1e8997d147b7c9b2b72cba", + "sha256:25a18e9810fbc7e7f03ec2516addc116a957f8cbb8cbc95ccc80faa072743d03", + "sha256:26ef06c73eb53267c2b319f43e6634c7556ea37672029241a056629af27c10e2", + "sha256:2e1743fbd7f5fa713a1b0a8ac8ebabf0ec980b5d8809ec358d488e273b9cf02a", + "sha256:32483fe2aab2c3794101c2a159070584e5db11d0aa091b2c0ea9c4fc43d0d749", + "sha256:3bf0f392c0b806905ed174dcd8bdd5e418a40d5567a05615a030a5aeddea692d", + "sha256:3e2a27c89eb9ac3d81ec8835e12414d73536c6e620355d65102503064a4ed6eb", + "sha256:40ad3afe81676fd9ec6d9d406eda00933f218038433980aa19d401490e46ecde", + "sha256:4215d3a102bd95e2e11b5395c78562967959824156af11fa93d18fdd18050990", + "sha256:45d59a649a82df5718fd9527ce775fd66d1af35e6d31abdcdc906a49c6822958", + "sha256:45e0111e73f43f735d70786557dc38141185072d7ff8dc1829d6a77ac1471468", + "sha256:479496325ce554792dba6548fae3df31a72cef7bad71ca2e12b0e58f9b336bfc", + "sha256:490fa6d203992c47c7b9e4a9d39003a0c2bcc1c9aa3c058730884bbbb0ee9f09", + "sha256:49ce47231818806067aea3324d4bf13825b658ad662d3b25fada0bdad9b8a6af", + "sha256:4baf3cbe2f0be3289eb68ac8ae771156971848bb8aaff60bad42005539431980", + "sha256:522175aba7af9113c48ec10cc471b9b9bd4f6ceb36aeb4544a8e2c80ed9d252d", + "sha256:5e8571632780e08526f118f74170ad8d50fb0a48c23a746bef2a6ebade3abd6f", + "sha256:615ba64c208aaceb5ec83bfdce7728b80bfeb8be97562944836a7a0a9647d882", + "sha256:61f69297cba3950a524f61c7c8ee12e55c486cb5f7db47ff9dcee33da6f0d3ae", + "sha256:65a20de41e85648e00305c1bb09a3598f840422e522277641145a32d42dcefcc", + "sha256:6a15c17af8839b6801d554263c546c69c4d7718ad4321e3166175b37eaacca77", + "sha256:747fa73efa9b8b1488a95d0ba1039c8e2dca0f741612d80415b1e1c560febf4e", + "sha256:7be78388d6da1a25c0d5ec506523db58b18be22d9c37d8d3a32c08be4987bd73", + "sha256:81fd9652b37b36f16138611c7e884eb82e0cec137c40d3ef7c3f9b3ed00f6ed8", + "sha256:83d57312a58dcfe2a3a0f9d1389b299438909a02db60e2f2ea2ae2d8034909d3", + "sha256:8843114c0cfce61b40ad48df65abcfc00d4dba82eae8718fab5352390848c5da", + "sha256:8cc3309d8e08fd79089e13ed4819d0af72aa935dd8f435a195fd152796752ff2", + "sha256:8ebe63ee5f8fa4296b1b8cfc743f870d10e902ca18afc65c68cf46fd39bb0783", + "sha256:8eddfb4d203a237da6f3cc8a540dad0517d274b5a1e9e636fd8d2c79b5c1d397", + "sha256:922fa70ba549fce362d2e2871ab542082d66e2aaf0c19480ea453905b01f384e", + "sha256:931091142fd8cc14edccc0845a79248bc155425eee9a98b2db2ea4f00a235a42", + "sha256:971fd5a1d6e62e00d945423a567e42eb1fa678ba89072832185ca836a94daaa6", + "sha256:980a846182ce88c4f2f7e2c22c56aefd515daeb36149d1c897f83cf57999e0b6", + "sha256:9d9adda641db7207e800a7f089068f6f645959f2df27e870ee81d44701dd9db3", + "sha256:9f8f757bebaaea112c00dba718fc0d3260052ce714e25804a03f93f5d1c6cc11", + "sha256:a6ae758eb08088d36812dd5d9af7a9859c05b1e0f714470ea243694b49278e7b", + "sha256:a8c2cf1209497cf659a667d7dea88985e834c24b7c3b605e6254cbb5076d985c", + "sha256:acab0277c40eff7143c2323190ea57b9ee5fd353d8190ee9652369fae735668a", + "sha256:b331680e46239e090f5b3cead313cc772f6caa7d0fc8de349337563125361a4a", + "sha256:c088e7a90b6017307f423efbb9d1ba97a22aa2170876223f9709e9d1de0b5347", + "sha256:d099566accf23d21037f18a2a63d323075bebace807742e4b0ac210971d4dd70", + "sha256:d388087771c837cdb6515539f43b9d4bf0b0f23593a24054ac16f7a960be16f4", + "sha256:dcfe41187da8992c5f40aa8c5ec086fa3672834d2be57a32384c08d5a05b4c00", + "sha256:e6d1db20594d9daba22f90da738b1a0441a7427552cc6e2e3d1297aeddc00378", + "sha256:ebea5cc3aa8ea72e04df9913492f9a96d9348db876f9dda3ad729cfedf7ac416", + "sha256:ebebf83299b0cb1721a8859ea98f3a77811e35dce7609c5c963b9ad90728f886", + "sha256:f0e34c2079d47ae9f6188211db9e777c619a21d4faba6977774e8fa43b085e48", + "sha256:f92f88e6c033db65a5ae3d97905c8fea9c725b63e28d5a75cb73b49bda5024d8", + "sha256:f9f7bd5faab55f47231ad8dba7787866b69f5e93bc306e3915606779bbfb4ba8", + "sha256:fd5ef5932f6475c436c4a55e4336ebbe47bd3272be04964a03d316bbf4afbcbc", + "sha256:ff8a59ea85a1f2191a0ffcc61298c571bc566332f82e5f5be1b83c9d8e668a62" + ], + "markers": "python_version >= '3.9'", + "version": "==1.76.0" + }, + "grpcio-status": { + "hashes": [ + "sha256:25fcbfec74c15d1a1cb5da3fab8ee9672852dc16a5a9eeb5baf7d7a9952943cd", + "sha256:380568794055a8efbbd8871162df92012e0228a5f6dffaf57f2a00c534103b18" + ], + "markers": "python_version >= '3.9'", + "version": "==1.76.0" + }, "gunicorn": { "hashes": [ "sha256:ec400d38950de4dfd418cff8328b2c8faed0edb0d517d3394e457c317908ca4d", @@ -1180,19 +1258,19 @@ }, "protobuf": { "hashes": [ - "sha256:023af8449482fa884d88b4563d85e83accab54138ae098924a985bcbb734a213", - "sha256:0f4cf01222c0d959c2b399142deb526de420be8236f22c71356e2a544e153c53", - "sha256:8fd7d5e0eb08cd5b87fd3df49bc193f5cfd778701f47e11d127d0afc6c39f1d1", - "sha256:923aa6d27a92bf44394f6abf7ea0500f38769d4b07f4be41cb52bd8b1123b9ed", - "sha256:97f65757e8d09870de6fd973aeddb92f85435607235d20b2dfed93405d00c85b", - "sha256:d595a9fd694fdeb061a62fbe10eb039cc1e444df81ec9bb70c7fc59ebcb1eafa", - "sha256:df051de4fd7e5e4371334e234c62ba43763f15ab605579e04c7008c05735cd82", - "sha256:f8adba2e44cde2d7618996b3fc02341f03f5bc3f2748be72dc7b063319276178", - "sha256:f8d3fdbc966aaab1d05046d0240dd94d40f2a8c62856d41eaa141ff64a79de6b", - "sha256:fe34575f2bdde76ac429ec7b570235bf0c788883e70aee90068e9981806f2490" + "sha256:1f8017c48c07ec5859106533b682260ba3d7c5567b1ca1f24297ce03384d1b4f", + "sha256:2981c58f582f44b6b13173e12bb8656711189c2a70250845f264b877f00b1913", + "sha256:56dc370c91fbb8ac85bc13582c9e373569668a290aa2e66a590c2a0d35ddb9e4", + "sha256:7109dcc38a680d033ffb8bf896727423528db9163be1b6a02d6a49606dcadbfe", + "sha256:7636aad9bb01768870266de5dc009de2d1b936771b38a793f73cbbf279c91c5c", + "sha256:87eb388bd2d0f78febd8f4c8779c79247b26a5befad525008e49a6955787ff3d", + "sha256:8cd7640aee0b7828b6d03ae518b5b4806fdfc1afe8de82f79c3454f8aef29872", + "sha256:b5d3b5625192214066d99b2b605f5783483575656784de223f00a8d00754fc0e", + "sha256:d9b19771ca75935b3a4422957bc518b0cecb978b31d1dd12037b088f6bcc0e43", + "sha256:fc2a0e8b05b180e5fc0dd1559fe8ebdae21a27e81ac77728fb6c42b12c7419b4" ], "markers": "python_version >= '3.9'", - "version": "==6.33.1" + "version": "==6.33.2" }, "psutil": { "hashes": [ @@ -1858,11 +1936,11 @@ }, "botocore-stubs": { "hashes": [ - "sha256:181417984475967abbc76bb75a1cc60e01eb7068430c185a9e48ce1314af79bb", - "sha256:f23dc7ecffde89918077c3f935a1aef1e4af672882cb98e2362a8b71d8ab0939" + "sha256:0b6711abe0ceffe32fa572c0c683e93d9a17b2c4cff1b47a7b48bb784fe2dbcc", + "sha256:5b8e63a1f62800945dcc824514fa0465f78ad2c8fe7a592b59f77ca19f1aa10b" ], "markers": "python_version >= '3.9'", - "version": "==1.42.1" + "version": "==1.42.4" }, "celery-types": { "hashes": [ @@ -2013,101 +2091,101 @@ "toml" ], "hashes": [ - "sha256:01d24af36fedda51c2b1aca56e4330a3710f83b02a5ff3743a6b015ffa7c9384", - "sha256:04a79245ab2b7a61688958f7a855275997134bc84f4a03bc240cf64ff132abf6", - "sha256:083631eeff5eb9992c923e14b810a179798bb598e6a0dd60586819fc23be6e60", - "sha256:099d11698385d572ceafb3288a5b80fe1fc58bf665b3f9d362389de488361d3d", - "sha256:09a86acaaa8455f13d6a99221d9654df249b33937b4e212b4e5a822065f12aa7", - "sha256:159d50c0b12e060b15ed3d39f87ed43d4f7f7ad40b8a534f4dd331adbb51104a", - "sha256:172cf3a34bfef42611963e2b661302a8931f44df31629e5b1050567d6b90287d", - "sha256:22a7aade354a72dff3b59c577bfd18d6945c61f97393bc5fb7bd293a4237024b", - "sha256:24cff9d1f5743f67db7ba46ff284018a6e9aeb649b67aa1e70c396aa1b7cb23c", - "sha256:29644c928772c78512b48e14156b81255000dcfd4817574ff69def189bcb3647", - "sha256:297bc2da28440f5ae51c845a47c8175a4db0553a53827886e4fb25c66633000c", - "sha256:2fd8354ed5d69775ac42986a691fbf68b4084278710cee9d7c3eaa0c28fa982a", - "sha256:313672140638b6ddb2c6455ddeda41c6a0b208298034544cfca138978c6baed6", - "sha256:31b8b2e38391a56e3cea39d22a23faaa7c3fc911751756ef6d2621d2a9daf742", - "sha256:32b75c2ba3f324ee37af3ccee5b30458038c50b349ad9b88cee85096132a575b", - "sha256:33baadc0efd5c7294f436a632566ccc1f72c867f82833eb59820ee37dc811c6f", - "sha256:3ff651dcd36d2fea66877cd4a82de478004c59b849945446acb5baf9379a1b64", - "sha256:40c867af715f22592e0d0fb533a33a71ec9e0f73a6945f722a0c85c8c1cbe3a2", - "sha256:42435d46d6461a3b305cdfcad7cdd3248787771f53fe18305548cba474e6523b", - "sha256:459443346509476170d553035e4a3eed7b860f4fe5242f02de1010501956ce87", - "sha256:4648158fd8dd9381b5847622df1c90ff314efbfc1df4550092ab6013c238a5fc", - "sha256:47324fffca8d8eae7e185b5bb20c14645f23350f870c1649003618ea91a78941", - "sha256:473dc45d69694069adb7680c405fb1e81f60b2aff42c81e2f2c3feaf544d878c", - "sha256:4b59b501455535e2e5dde5881739897967b272ba25988c89145c12d772810ccb", - "sha256:4c589361263ab2953e3c4cd2a94db94c4ad4a8e572776ecfbad2389c626e4507", - "sha256:51777647a749abdf6f6fd8c7cffab12de68ab93aab15efc72fbbb83036c2a068", - "sha256:52ca620260bd8cd6027317bdd8b8ba929be1d741764ee765b42c4d79a408601e", - "sha256:5560c7e0d82b42eb1951e4f68f071f8017c824ebfd5a6ebe42c60ac16c6c2434", - "sha256:5734b5d913c3755e72f70bf6cc37a0518d4f4745cde760c5d8e12005e62f9832", - "sha256:583f9adbefd278e9de33c33d6846aa8f5d164fa49b47144180a0e037f0688bb9", - "sha256:58c1c6aa677f3a1411fe6fb28ec3a942e4f665df036a3608816e0847fad23296", - "sha256:5b3c889c0b8b283a24d721a9eabc8ccafcfc3aebf167e4cd0d0e23bf8ec4e339", - "sha256:5bcead88c8423e1855e64b8057d0544e33e4080b95b240c2a355334bb7ced937", - "sha256:5ea5a9f7dc8877455b13dd1effd3202e0bca72f6f3ab09f9036b1bcf728f69ac", - "sha256:5f3738279524e988d9da2893f307c2093815c623f8d05a8f79e3eff3a7a9e553", - "sha256:68b0d0a2d84f333de875666259dadf28cc67858bc8fd8b3f1eae84d3c2bec455", - "sha256:6d907ddccbca819afa2cd014bc69983b146cca2735a0b1e6259b2a6c10be1e70", - "sha256:6e1a8c066dabcde56d5d9fed6a66bc19a2883a3fe051f0c397a41fc42aedd4cc", - "sha256:6ff7651cc01a246908eac162a6a86fc0dbab6de1ad165dfb9a1e2ec660b44984", - "sha256:737c3814903be30695b2de20d22bcc5428fdae305c61ba44cdc8b3252984c49c", - "sha256:73f9e7fbd51a221818fd11b7090eaa835a353ddd59c236c57b2199486b116c6d", - "sha256:76336c19a9ef4a94b2f8dc79f8ac2da3f193f625bb5d6f51a328cd19bfc19933", - "sha256:7670d860e18b1e3ee5930b17a7d55ae6287ec6e55d9799982aa103a2cc1fa2ef", - "sha256:79a44421cd5fba96aa57b5e3b5a4d3274c449d4c622e8f76882d76635501fd13", - "sha256:7c1059b600aec6ef090721f8f633f60ed70afaffe8ecab85b59df748f24b31fe", - "sha256:8638cbb002eaa5d7c8d04da667813ce1067080b9a91099801a0053086e52b736", - "sha256:874fe69a0785d96bd066059cd4368022cebbec1a8958f224f0016979183916e6", - "sha256:8787b0f982e020adb732b9f051f3e49dd5054cebbc3f3432061278512a2b1360", - "sha256:8bb5b894b3ec09dcd6d3743229dc7f2c42ef7787dc40596ae04c0edda487371e", - "sha256:907e0df1b71ba77463687a74149c6122c3f6aac56c2510a5d906b2f368208560", - "sha256:90d58ac63bc85e0fb919f14d09d6caa63f35a5512a2205284b7816cafd21bb03", - "sha256:9157a5e233c40ce6613dead4c131a006adfda70e557b6856b97aceed01b0e27a", - "sha256:91b810a163ccad2e43b1faa11d70d3cf4b6f3d83f9fd5f2df82a32d47b648e0d", - "sha256:950411f1eb5d579999c5f66c62a40961f126fc71e5e14419f004471957b51508", - "sha256:99d5415c73ca12d558e07776bd957c4222c687b9f1d26fa0e1b57e3598bdcde8", - "sha256:9b57e2d0ddd5f0582bae5437c04ee71c46cd908e7bc5d4d0391f9a41e812dd12", - "sha256:9bb44c889fb68004e94cab71f6a021ec83eac9aeabdbb5a5a88821ec46e1da73", - "sha256:a00594770eb715854fb1c57e0dea08cce6720cfbc531accdb9850d7c7770396c", - "sha256:a1783ed5bd0d5938d4435014626568dc7f93e3cb99bc59188cc18857c47aa3c4", - "sha256:a1c59b7dc169809a88b21a936eccf71c3895a78f5592051b1af8f4d59c2b4f92", - "sha256:aa124a3683d2af98bd9d9c2bfa7a5076ca7e5ab09fdb96b81fa7d89376ae928f", - "sha256:aa7d48520a32cb21c7a9b31f81799e8eaec7239db36c3b670be0fa2403828d1d", - "sha256:b1518ecbad4e6173f4c6e6c4a46e49555ea5679bf3feda5edb1b935c7c44e8a0", - "sha256:b1aab7302a87bafebfe76b12af681b56ff446dc6f32ed178ff9c092ca776e6bc", - "sha256:b2089cc445f2dc0af6f801f0d1355c025b76c24481935303cf1af28f636688f0", - "sha256:b365adc70a6936c6b0582dc38746b33b2454148c02349345412c6e743efb646d", - "sha256:b527a08cdf15753279b7afb2339a12073620b761d79b81cbe2cdebdb43d90daa", - "sha256:bc13baf85cd8a4cfcf4a35c7bc9d795837ad809775f782f697bf630b7e200211", - "sha256:bcec6f47e4cb8a4c2dc91ce507f6eefc6a1b10f58df32cdc61dff65455031dfc", - "sha256:c406a71f544800ef7e9e0000af706b88465f3573ae8b8de37e5f96c59f689ad1", - "sha256:c5a6f20bf48b8866095c6820641e7ffbe23f2ac84a2efc218d91235e404c7777", - "sha256:c87395744f5c77c866d0f5a43d97cc39e17c7f1cb0115e54a2fe67ca75c5d14d", - "sha256:ca8ecfa283764fdda3eae1bdb6afe58bf78c2c3ec2b2edcb05a671f0bba7b3f9", - "sha256:cb2a1b6ab9fe833714a483a915de350abc624a37149649297624c8d57add089c", - "sha256:ccf3b2ede91decd2fb53ec73c1f949c3e034129d1e0b07798ff1d02ea0c8fa4a", - "sha256:ce61969812d6a98a981d147d9ac583a36ac7db7766f2e64a9d4d059c2fe29d07", - "sha256:d6c2e26b481c9159c2773a37947a9718cfdc58893029cdfb177531793e375cfc", - "sha256:d7e0d0303c13b54db495eb636bc2465b2fb8475d4c8bcec8fe4b5ca454dfbae8", - "sha256:d8842f17095b9868a05837b7b1b73495293091bed870e099521ada176aa3e00e", - "sha256:d93fbf446c31c0140208dcd07c5d882029832e8ed7891a39d6d44bd65f2316c3", - "sha256:dcbb630ab034e86d2a0f79aefd2be07e583202f41e037602d438c80044957baa", - "sha256:e0d68c1f7eabbc8abe582d11fa393ea483caf4f44b0af86881174769f185c94d", - "sha256:e0f483ab4f749039894abaf80c2f9e7ed77bbf3c737517fb88c8e8e305896a17", - "sha256:e71bba6a40883b00c6d571599b4627f50c360b3d0d02bfc658168936be74027b", - "sha256:e84da3a0fd233aeec797b981c51af1cabac74f9bd67be42458365b30d11b5291", - "sha256:e949ebf60c717c3df63adb4a1a366c096c8d7fd8472608cd09359e1bd48ef59f", - "sha256:f3433ffd541380f3a0e423cff0f4926d55b0cc8c1d160fdc3be24a4c03aa65f7", - "sha256:f7ba9da4726e446d8dd8aae5a6cd872511184a5d861de80a86ef970b5dacce3e", - "sha256:f7bbb321d4adc9f65e402c677cd1c8e4c2d0105d3ce285b51b4d87f1d5db5245", - "sha256:f999813dddeb2a56aab5841e687b68169da0d3f6fc78ccf50952fa2463746022", - "sha256:fc11e0a4e372cb5f282f16ef90d4a585034050ccda536451901abfb19a57f40c", - "sha256:fdba9f15849534594f60b47c9a30bc70409b54947319a7c4fd0e8e3d8d2f355d" + "sha256:0018f73dfb4301a89292c73be6ba5f58722ff79f51593352759c1790ded1cabe", + "sha256:00c3d22cf6fb1cf3bf662aaaa4e563be8243a5ed2630339069799835a9cc7f9b", + "sha256:02d9fb9eccd48f6843c98a37bd6817462f130b86da8660461e8f5e54d4c06070", + "sha256:0602f701057c6823e5db1b74530ce85f17c3c5be5c85fc042ac939cbd909426e", + "sha256:06cac81bf10f74034e055e903f5f946e3e26fc51c09fc9f584e4a1605d977053", + "sha256:086cede306d96202e15a4b77ace8472e39d9f4e5f9fd92dd4fecdfb2313b2080", + "sha256:0900872f2fdb3ee5646b557918d02279dc3af3dfb39029ac4e945458b13f73bc", + "sha256:0a3a30f0e257df382f5f9534d4ce3d4cf06eafaf5192beb1a7bd066cb10e78fb", + "sha256:0b3d67d31383c4c68e19a88e28fc4c2e29517580f1b0ebec4a069d502ce1e0bf", + "sha256:0dfa3855031070058add1a59fdfda0192fd3e8f97e7c81de0596c145dea51820", + "sha256:0f4872f5d6c54419c94c25dd6ae1d015deeb337d06e448cd890a1e89a8ee7f3b", + "sha256:11c21557d0e0a5a38632cbbaca5f008723b26a89d70db6315523df6df77d6232", + "sha256:166ad2a22ee770f5656e1257703139d3533b4a0b6909af67c6b4a3adc1c98657", + "sha256:193c3887285eec1dbdb3f2bd7fbc351d570ca9c02ca756c3afbc71b3c98af6ef", + "sha256:1d84e91521c5e4cb6602fe11ece3e1de03b2760e14ae4fcf1a4b56fa3c801fcd", + "sha256:1ed5630d946859de835a85e9a43b721123a8a44ec26e2830b296d478c7fd4259", + "sha256:22486cdafba4f9e471c816a2a5745337742a617fef68e890d8baf9f3036d7833", + "sha256:22ccfe8d9bb0d6134892cbe1262493a8c70d736b9df930f3f3afae0fe3ac924d", + "sha256:24e4e56304fdb56f96f80eabf840eab043b3afea9348b88be680ec5986780a0f", + "sha256:25dc33618d45456ccb1d37bce44bc78cf269909aa14c4db2e03d63146a8a1493", + "sha256:263c3dbccc78e2e331e59e90115941b5f53e85cfcc6b3b2fbff1fd4e3d2c6ea8", + "sha256:28ee1c96109974af104028a8ef57cec21447d42d0e937c0275329272e370ebcf", + "sha256:30a3a201a127ea57f7e14ba43c93c9c4be8b7d17a26e03bb49e6966d019eede9", + "sha256:3188936845cd0cb114fa6a51842a304cdbac2958145d03be2377ec41eb285d19", + "sha256:367449cf07d33dc216c083f2036bb7d976c6e4903ab31be400ad74ad9f85ce98", + "sha256:37eee4e552a65866f15dedd917d5e5f3d59805994260720821e2c1b51ac3248f", + "sha256:3a10260e6a152e5f03f26db4a407c4c62d3830b9af9b7c0450b183615f05d43b", + "sha256:3a7b1cd820e1b6116f92c6128f1188e7afe421c7e1b35fa9836b11444e53ebd9", + "sha256:3ab483ea0e251b5790c2aac03acde31bff0c736bf8a86829b89382b407cd1c3b", + "sha256:3ad968d1e3aa6ce5be295ab5fe3ae1bf5bb4769d0f98a80a0252d543a2ef2e9e", + "sha256:445badb539005283825959ac9fa4a28f712c214b65af3a2c464f1adc90f5fcbc", + "sha256:453b7ec753cf5e4356e14fe858064e5520c460d3bbbcb9c35e55c0d21155c256", + "sha256:494f5459ffa1bd45e18558cd98710c36c0b8fbfa82a5eabcbe671d80ecffbfe8", + "sha256:4b5de7d4583e60d5fd246dd57fcd3a8aa23c6e118a8c72b38adf666ba8e7e927", + "sha256:4f3e223b2b2db5e0db0c2b97286aba0036ca000f06aca9b12112eaa9af3d92ae", + "sha256:4fdb6f54f38e334db97f72fa0c701e66d8479af0bc3f9bfb5b90f1c30f54500f", + "sha256:51a202e0f80f241ccb68e3e26e19ab5b3bf0f813314f2c967642f13ebcf1ddfe", + "sha256:581f086833d24a22c89ae0fe2142cfaa1c92c930adf637ddf122d55083fb5a0f", + "sha256:583221913fbc8f53b88c42e8dbb8fca1d0f2e597cb190ce45916662b8b9d9621", + "sha256:58632b187be6f0be500f553be41e277712baa278147ecb7559983c6d9faf7ae1", + "sha256:5c67dace46f361125e6b9cace8fe0b729ed8479f47e70c89b838d319375c8137", + "sha256:5e70f92ef89bac1ac8a99b3324923b4749f008fdbd7aa9cb35e01d7a284a04f9", + "sha256:5f5d9bd30756fff3e7216491a0d6d520c448d5124d3d8e8f56446d6412499e74", + "sha256:5f8a0297355e652001015e93be345ee54393e45dc3050af4a0475c5a2b767d46", + "sha256:62d7c4f13102148c78d7353c6052af6d899a7f6df66a32bddcc0c0eb7c5326f8", + "sha256:69ac2c492918c2461bc6ace42d0479638e60719f2a4ef3f0815fa2df88e9f940", + "sha256:6abb3a4c52f05e08460bd9acf04fec027f8718ecaa0d09c40ffbc3fbd70ecc39", + "sha256:6e63ccc6e0ad8986386461c3c4b737540f20426e7ec932f42e030320896c311a", + "sha256:6e9e451dee940a86789134b6b0ffbe31c454ade3b849bb8a9d2cca2541a8e91d", + "sha256:6fb2d5d272341565f08e962cce14cdf843a08ac43bd621783527adb06b089c4b", + "sha256:71936a8b3b977ddd0b694c28c6a34f4fff2e9dd201969a4ff5d5fc7742d614b0", + "sha256:73419b89f812f498aca53f757dd834919b48ce4799f9d5cad33ca0ae442bdb1a", + "sha256:739c6c051a7540608d097b8e13c76cfa85263ced467168dc6b477bae3df7d0e2", + "sha256:7464663eaca6adba4175f6c19354feea61ebbdd735563a03d1e472c7072d27bb", + "sha256:74c136e4093627cf04b26a35dab8cbfc9b37c647f0502fc313376e11726ba303", + "sha256:76541dc8d53715fb4f7a3a06b34b0dc6846e3c69bc6204c55653a85dd6220971", + "sha256:7a485ff48fbd231efa32d58f479befce52dcb6bfb2a88bb7bf9a0b89b1bc8030", + "sha256:7e442c013447d1d8d195be62852270b78b6e255b79b8675bad8479641e21fd96", + "sha256:7f15a931a668e58087bc39d05d2b4bf4b14ff2875b49c994bbdb1c2217a8daeb", + "sha256:7f88ae3e69df2ab62fb0bc5219a597cb890ba5c438190ffa87490b315190bb33", + "sha256:8069e831f205d2ff1f3d355e82f511eb7c5522d7d413f5db5756b772ec8697f8", + "sha256:850d2998f380b1e266459ca5b47bc9e7daf9af1d070f66317972f382d46f1904", + "sha256:898cce66d0836973f48dda4e3514d863d70142bdf6dfab932b9b6a90ea5b222d", + "sha256:9097818b6cc1cfb5f174e3263eba4a62a17683bcfe5c4b5d07f4c97fa51fbf28", + "sha256:936bc20503ce24770c71938d1369461f0c5320830800933bc3956e2a4ded930e", + "sha256:9372dff5ea15930fea0445eaf37bbbafbc771a49e70c0aeed8b4e2c2614cc00e", + "sha256:9987a9e4f8197a1000280f7cc089e3ea2c8b3c0a64d750537809879a7b4ceaf9", + "sha256:99acd4dfdfeb58e1937629eb1ab6ab0899b131f183ee5f23e0b5da5cba2fec74", + "sha256:9b01c22bc74a7fb44066aaf765224c0d933ddf1f5047d6cdfe4795504a4493f8", + "sha256:a00d3a393207ae12f7c49bb1c113190883b500f48979abb118d8b72b8c95c032", + "sha256:a23e5a1f8b982d56fa64f8e442e037f6ce29322f1f9e6c2344cd9e9f4407ee57", + "sha256:a2bdb3babb74079f021696cb46b8bb5f5661165c385d3a238712b031a12355be", + "sha256:a394aa27f2d7ff9bc04cf703817773a59ad6dfbd577032e690f961d2460ee936", + "sha256:a6c6e16b663be828a8f0b6c5027d36471d4a9f90d28444aa4ced4d48d7d6ae8f", + "sha256:af0a583efaacc52ae2521f8d7910aff65cdb093091d76291ac5820d5e947fc1c", + "sha256:af827b7cbb303e1befa6c4f94fd2bf72f108089cfa0f8abab8f4ca553cf5ca5a", + "sha256:c4be718e51e86f553bcf515305a158a1cd180d23b72f07ae76d6017c3cc5d791", + "sha256:cdb3c9f8fef0a954c632f64328a3935988d33a6604ce4bf67ec3e39670f12ae5", + "sha256:d10fd186aac2316f9bbb46ef91977f9d394ded67050ad6d84d94ed6ea2e8e54e", + "sha256:d1e97353dcc5587b85986cda4ff3ec98081d7e84dd95e8b2a6d59820f0545f8a", + "sha256:d2a9d7f1c11487b1c69367ab3ac2d81b9b3721f097aa409a3191c3e90f8f3dd7", + "sha256:de7f6748b890708578fc4b7bb967d810aeb6fcc9bff4bb77dbca77dab2f9df6a", + "sha256:e5330fa0cc1f5c3c4c3bb8e101b742025933e7848989370a1d4c8c5e401ea753", + "sha256:e999e2dcc094002d6e2c7bbc1fb85b58ba4f465a760a8014d97619330cdbbbf3", + "sha256:eb76670874fdd6091eedcc856128ee48c41a9bbbb9c3f1c7c3cf169290e3ffd6", + "sha256:f1c23e24a7000da892a312fb17e33c5f94f8b001de44b7cf8ba2e36fbd15859e", + "sha256:f2ffc92b46ed6e6760f1d47a71e56b5664781bc68986dbd1836b2b70c0ce2071", + "sha256:f4f72a85316d8e13234cafe0a9f81b40418ad7a082792fa4165bd7d45d96066b", + "sha256:f59883c643cb19630500f57016f76cfdcd6845ca8c5b5ea1f6e17f74c8e5f511", + "sha256:f6aaef16d65d1787280943f1c8718dc32e9cf141014e4634d64446702d26e0ff", + "sha256:fe81055d8c6c9de76d60c94ddea73c290b416e061d40d542b24a5871bad498b7", + "sha256:ff45e0cd8451e293b63ced93161e189780baf444119391b3e7d25315060368a6" ], "markers": "python_version >= '3.10'", - "version": "==7.12.0" + "version": "==7.13.0" }, "dill": { "hashes": [ @@ -2331,11 +2409,11 @@ }, "platformdirs": { "hashes": [ - "sha256:70ddccdd7c99fc5942e9fc25636a8b34d04c24b335100223152c2803e4063312", - "sha256:e578a81bb873cbb89a41fcc904c7ef523cc18284b7e3b3ccf06aca1403b7ebd3" + "sha256:61d5cdcc6065745cdd94f0f878977f8de9437be93de97c1c12f853c9c0cdcbda", + "sha256:d03afa3963c806a9bed9d5125c8f4cb2fdaf74a55ab60e5d59b3fde758104d31" ], "markers": "python_version >= '3.10'", - "version": "==4.5.0" + "version": "==4.5.1" }, "pluggy": { "hashes": [ @@ -2480,11 +2558,11 @@ }, "types-awscrt": { "hashes": [ - "sha256:545f100e17d7633aa1791f92a8a4716f8ee1fc7cc9ee98dd31676d6c5e07e733", - "sha256:f583bef3d42a307b3f3c02ef8d5d49fbec04e02bf3aacd3ada3953213c9150a7" + "sha256:3f5d1e6c99b0b551af6365f9c04d8ce2effbcfe18bb719a34501efea279ae7bb", + "sha256:41e01e14d646877bd310e7e3c49ff193f8361480b9568e97b1639775009bbefa" ], "markers": "python_version >= '3.8'", - "version": "==0.29.1" + "version": "==0.29.2" }, "types-psutil": { "hashes": [ @@ -2530,11 +2608,11 @@ }, "types-s3transfer": { "hashes": [ - "sha256:1e617b14a9d3ce5be565f4b187fafa1d96075546b52072121f8fda8e0a444aed", - "sha256:43a523e0c43a88e447dfda5f4f6b63bf3da85316fdd2625f650817f2b170b5f7" + "sha256:1c0cd111ecf6e21437cb410f5cddb631bfb2263b77ad973e79b9c6d0cb24e0ef", + "sha256:b4636472024c5e2b62278c5b759661efeb52a81851cde5f092f24100b1ecb443" ], - "markers": "python_version >= '3.8'", - "version": "==0.15.0" + "markers": "python_version >= '3.9'", + "version": "==0.16.0" }, "typing-extensions": { "hashes": [ diff --git a/codeforlife/auth.py b/codeforlife/auth.py new file mode 100644 index 0000000..0be7f67 --- /dev/null +++ b/codeforlife/auth.py @@ -0,0 +1,90 @@ +""" +© Ocado Group +Created on 04/12/2025 at 18:58:44(+00:00). + +Authentication credentials. +""" + +import typing as t + +from boto3 import Session as AwsSession +from django.conf import settings +from google.auth.aws import ( + AwsSecurityCredentials, + AwsSecurityCredentialsSupplier, +) +from google.auth.aws import Credentials as AwsCredentials +from google.auth.credentials import Credentials +from google.oauth2.service_account import ( + Credentials as GcpServiceAccountCredentials, +) + + +class AwsSessionSecurityCredentialsSupplier(AwsSecurityCredentialsSupplier): + """Supplies AWS security credentials from the current boto3 session.""" + + def get_aws_region(self, _, __): + return settings.AWS_REGION + + def get_aws_security_credentials(self, _, __) -> AwsSecurityCredentials: + aws_credentials = AwsSession().get_credentials() + assert aws_credentials + + aws_read_only_credentials = aws_credentials.get_frozen_credentials() + assert aws_read_only_credentials.access_key + assert aws_read_only_credentials.secret_key + assert aws_read_only_credentials.token + + return AwsSecurityCredentials( + access_key_id=aws_read_only_credentials.access_key, + secret_access_key=aws_read_only_credentials.secret_key, + session_token=aws_read_only_credentials.token, + ) + + +# pylint: disable-next=abstract-method,too-many-ancestors +class GcpWifCredentials(AwsCredentials): + """Workload Identity Federation credentials for GCP using AWS IAM roles.""" + + def __init__(self, token_lifetime_seconds: int = 600): + super().__init__( + subject_token_type="urn:ietf:params:aws:token-type:aws4_request", + audience=settings.GCP_WIF_AUDIENCE, + universe_domain="googleapis.com", + token_url="https://sts.googleapis.com/v1/token", + service_account_impersonation_url=( + "https://iamcredentials.googleapis.com/v1/projects/-/" + f"serviceAccounts/{settings.GCP_WIF_SERVICE_ACCOUNT}" + ":generateAccessToken" + ), + service_account_impersonation_options={ + "token_lifetime_seconds": token_lifetime_seconds + }, + aws_security_credentials_supplier=( + AwsSessionSecurityCredentialsSupplier() + ), + ) + + +def get_gcp_service_account_credentials( + token_lifetime_seconds: int = 600, + service_account_json: t.Optional[str] = None, +) -> Credentials: + """Get GCP service account credentials. + + Args: + token_lifetime_seconds: The lifetime of the token in seconds. + service_account_json: The path to the service account JSON file. + + Returns: + The GCP service account credentials. + """ + if settings.ENV != "local": + return GcpWifCredentials(token_lifetime_seconds=token_lifetime_seconds) + assert ( + service_account_json + ), "Service account JSON file path must be provided in local environment." + + return GcpServiceAccountCredentials.from_service_account_file( + service_account_json + ) diff --git a/codeforlife/settings/custom.py b/codeforlife/settings/custom.py index 2135f0c..294083f 100644 --- a/codeforlife/settings/custom.py +++ b/codeforlife/settings/custom.py @@ -140,12 +140,7 @@ def get_redis_url(): "GOOGLE_CLOUD_PROJECT_ID", "decent-digit-629" ) -# The bucket on Google Cloud Storage used to export data to BigQuery and the -# service account to impersonate which has access to the bucket. -GOOGLE_CLOUD_STORAGE_BUCKET_NAME = os.getenv( - "GOOGLE_CLOUD_STORAGE_BUCKET_NAME", "REPLACE_ME" -) -GOOGLE_CLOUD_STORAGE_SERVICE_ACCOUNT_NAME = ( - os.getenv("GOOGLE_CLOUD_STORAGE_SERVICE_ACCOUNT_NAME", "REPLACE_ME") - + f"@{GOOGLE_CLOUD_PROJECT_ID}.iam.gserviceaccount.com" +# The ID of our BigQuery dataset. +GOOGLE_CLOUD_BIGQUERY_DATASET_ID = os.getenv( + "GOOGLE_CLOUD_BIGQUERY_DATASET_ID", "REPLACE_ME" ) diff --git a/codeforlife/settings/otp.py b/codeforlife/settings/otp.py index 1711ad5..30ebc79 100644 --- a/codeforlife/settings/otp.py +++ b/codeforlife/settings/otp.py @@ -60,6 +60,11 @@ f"{AWS_S3_APP_FOLDER}/elasticacheMetadata/{CACHE_CLUSTER_ID}.dbdata" ) +# GCP + +GCP_WIF_AUDIENCE = os.getenv("GCP_WIF_AUDIENCE") +GCP_WIF_SERVICE_ACCOUNT = os.getenv("GCP_WIF_SERVICE_ACCOUNT") + # SQS SQS_URL = os.getenv("SQS_URL") diff --git a/codeforlife/tasks/__init__.py b/codeforlife/tasks/__init__.py index c4328f6..265c097 100644 --- a/codeforlife/tasks/__init__.py +++ b/codeforlife/tasks/__init__.py @@ -3,5 +3,5 @@ Created on 06/10/2025 at 17:14:31(+01:00). """ -from .data_warehouse import DataWarehouseTask +from .bigquery import BigQueryTask from .utils import get_local_sqs_url, get_task_name, shared_task diff --git a/codeforlife/tasks/bigquery.py b/codeforlife/tasks/bigquery.py new file mode 100644 index 0000000..05ad2c8 --- /dev/null +++ b/codeforlife/tasks/bigquery.py @@ -0,0 +1,410 @@ +""" +© Ocado Group +Created on 06/10/2025 at 17:15:37(+01:00). +""" + +import csv +import io +import logging +import typing as t +from dataclasses import dataclass, field +from datetime import date, datetime, time, timezone +from tempfile import NamedTemporaryFile, _TemporaryFileWrapper + +from celery import Task +from celery import shared_task as _shared_task +from django.conf import settings as django_settings +from django.core.exceptions import ValidationError +from django.db.models.query import QuerySet +from google.cloud.bigquery import ( + Client, + CreateDisposition, + LoadJobConfig, + SourceFormat, + WriteDisposition, +) + +from ..auth import get_gcp_service_account_credentials +from ..types import KwArgs +from .utils import get_task_name + +if t.TYPE_CHECKING: + CsvFile = _TemporaryFileWrapper[bytes] + + +# pylint: disable-next=abstract-method +class BigQueryTask(Task): + """A task which loads data from a Django queryset into a BigQuery table.""" + + TABLE_NAMES: t.Set[str] = set() + + WriteDisposition: t.TypeAlias = WriteDisposition # shorthand + GetQuerySet: t.TypeAlias = t.Callable[..., QuerySet[t.Any]] + + @dataclass(frozen=True) + # pylint: disable-next=too-many-instance-attributes + class Settings: + """The settings for a BigQuery task.""" + + # The BigQuery table's write disposition. + write_disposition: str + # The number of rows to write at a time. Must be a multiple of 10. + chunk_size: int + # The [Django model] fields to include in the CSV. + fields: t.List[str] + # The name of the field used to identify each row. + id_field: str = "id" + # The maximum amount of time this task is allowed to take before it's + # hard-killed. + time_limit: int = 3600 + # The name of the BigQuery table where the data will ultimately be + # saved. If not provided, the name of the decorated function is used. + table_name: t.Optional[str] = None + # The maximum number of retries allowed. + max_retries: int = 5 + # The countdown before attempting the next retry. + retry_countdown: int = 10 + # The additional keyword arguments to pass to the Celery task decorator. + kwargs: KwArgs = field(default_factory=dict) + + def __post_init__(self): + # Set required values as defaults. + self.kwargs.setdefault("bind", True) + self.kwargs.setdefault("base", BigQueryTask) + + # Ensure the ID field is always present. + if self.id_field not in self.fields: + self.fields.append(self.id_field) + + # Validate args. + if not self.write_disposition.startswith("WRITE_") or not hasattr( + WriteDisposition, self.write_disposition + ): + raise ValidationError( + f'The write disposition "{self.write_disposition}"' + " does not exist.", + code="write_disposition_does_not_exist", + ) + if self.chunk_size <= 0: + raise ValidationError( + "The chunk size must be > 0.", + code="chunk_size_lte_0", + ) + if self.chunk_size % 10 != 0: + raise ValidationError( + "The chunk size must be a multiple of 10.", + code="chunk_size_not_multiple_of_10", + ) + if len(self.fields) <= 1: + raise ValidationError( + "Must provide at least 1 field (not including ID field).", + code="no_fields", + ) + if len(self.fields) != len(set(self.fields)): + raise ValidationError( + "Fields must be unique.", + code="duplicate_fields", + ) + if self.time_limit <= 0: + raise ValidationError( + "The time limit must be > 0.", + code="time_limit_lte_0", + ) + if self.time_limit > 3600: + raise ValidationError( + "The time limit must be <= 3600 (1 hour).", + code="time_limit_gt_3600", + ) + if self.max_retries < 0: + raise ValidationError( + "The max retries must be >= 0.", + code="max_retries_lt_0", + ) + if self.retry_countdown < 0: + raise ValidationError( + "The retry countdown must be >= 0.", + code="retry_countdown_lt_0", + ) + if self.kwargs["bind"] is not True: + raise ValidationError( + "The task must be bound.", code="task_unbound" + ) + if not issubclass(self.kwargs["base"], BigQueryTask): + raise ValidationError( + f"The base must be a subclass of " + f"'{BigQueryTask.__module__}." + f"{BigQueryTask.__qualname__}'.", + code="base_not_subclass", + ) + + settings: Settings + get_queryset: GetQuerySet + + @classmethod + def register_table_name(cls, table_name: str): + """Register a table name to ensure it is unique. + + Args: + table_name: The name of the table to register. + + Raises: + ValidationError: If the table name is already registered. + """ + + if table_name in cls.TABLE_NAMES: + raise ValidationError( + f'The table name "{table_name}" is already registered.', + code="table_name_already_registered", + ) + + cls.TABLE_NAMES.add(table_name) + + def get_ordered_queryset(self, *task_args, **task_kwargs): + """Get the ordered queryset. + + Args: + task_args: The positional arguments passed to the task. + task_kwargs: The keyword arguments passed to the task. + + Returns: + The ordered queryset. + """ + + queryset = self.get_queryset(*task_args, **task_kwargs) + if not queryset.ordered: + queryset = queryset.order_by(self.settings.id_field) + + return queryset + + @staticmethod + def format_value_for_csv(value: t.Any) -> str: + """Format a value for inclusion in a CSV file. + + Args: + value: The value to format. + + Returns: + The formatted value as a string. + """ + + if value is None: + return "" # BigQuery treats an empty string as NULL/None. + if isinstance(value, datetime): + return ( + value.astimezone(timezone.utc) + .replace(tzinfo=None) + .isoformat(sep=" ") + ) + if isinstance(value, (date, time)): + return value.isoformat() + if not isinstance(value, str): + return str(value) + + return value + + @classmethod + def write_queryset_to_csv( + cls, + fields: t.List[str], + chunk_size: int, + queryset: QuerySet[t.Any], + csv_file: "CsvFile", + ): + """Write a queryset to a CSV file. + + Args: + fields: The list of fields to include in the CSV. + chunk_size: The number of rows to write at a time. + queryset: The queryset to write. + csv_file: The CSV file to write to. + + Returns: + Whether any values were written to the CSV file. + """ + + text_wrapper = io.TextIOWrapper(csv_file, encoding="utf-8", newline="") + + csv_writer = csv.writer( + text_wrapper, lineterminator="\n", quoting=csv.QUOTE_MINIMAL + ) + csv_writer.writerow(fields) # Write the headers. + + chunk_index = 1 # 1 based index. For logging. + wrote_values = False # Track if any values were written. + + for row_index, values in enumerate( + t.cast( + t.Iterator[t.Tuple[t.Any, ...]], + # Iterate chunks to avoid OOM for large querysets. + queryset.values_list(*fields).iterator(chunk_size), + ) + ): + if row_index % chunk_size == 0: + logging.info("Writing chunk %d", chunk_index) + chunk_index += 1 + + csv_row = [cls.format_value_for_csv(value) for value in values] + csv_writer.writerow(csv_row) + wrote_values = True + + # Move back 1 byte (because lineterminator is "\n"). + text_wrapper.seek(text_wrapper.tell() - 1) + # Chop off the trailing newline. + text_wrapper.truncate() + # Detach the wrapper to flush data to the binary file. + text_wrapper.detach() + + return wrote_values + + @staticmethod + def load_csv_into_bq( + write_disposition: str, + time_limit: int, + table_name: str, + csv_file: "CsvFile", + ): + """Load a CSV file into a BigQuery table. + + Args: + write_disposition: Write disposition for the BigQuery table. + time_limit: The maximum time to wait for the load job to complete. + table_name: The table name in BigQuery. + csv_file: The CSV file to load into BigQuery. + """ + + bq_client = Client( + project=django_settings.GOOGLE_CLOUD_PROJECT_ID, + credentials=get_gcp_service_account_credentials( + token_lifetime_seconds=time_limit + ), + ) + + full_table_id = ".".join( + [ + django_settings.GOOGLE_CLOUD_PROJECT_ID, + django_settings.GOOGLE_CLOUD_BIGQUERY_DATASET_ID, + table_name, + ] + ) + + csv_file.seek(0) # Reset file pointer to the start. + + logging.info("Starting BigQuery load job.") + # Load the temporary CSV file into BigQuery. + bq_load_job = bq_client.load_table_from_file( + file_obj=csv_file, + destination=full_table_id, + job_config=LoadJobConfig( + create_disposition=CreateDisposition.CREATE_IF_NEEDED, + source_format=SourceFormat.CSV, + skip_leading_rows=1, + write_disposition=write_disposition, + time_zone="Etc/UTC", + date_format="YYYY-MM-DD", + time_format="HH24:MI:SS", + datetime_format="YYYY-MM-DD HH24:MI:SS", + ), + ) + + bq_load_job.result() + logging.info( + "Successfully loaded %d rows into to BigQuery table %s.", + bq_load_job.output_rows, + full_table_id, + ) + + @staticmethod + # pylint: disable-next=too-many-locals,bad-staticmethod-argument + def _load_data_into_bq( + self: "BigQueryTask", table_name: str, *task_args, **task_kwargs + ): + queryset = self.get_ordered_queryset(*task_args, **task_kwargs) + + with NamedTemporaryFile( + mode="w+b", suffix=".csv", delete=True + ) as csv_file: + if self.write_queryset_to_csv( + fields=self.settings.fields, + chunk_size=self.settings.chunk_size, + queryset=queryset, + csv_file=csv_file, + ): + self.load_csv_into_bq( + write_disposition=self.settings.write_disposition, + time_limit=self.settings.time_limit, + table_name=table_name, + csv_file=csv_file, + ) + + @classmethod + def shared(cls, settings: Settings): + """Create a shared BigQuery task. + + This decorator creates a Celery task that saves the queryset to a + BigQuery table. + + Each task *must* be given a distinct table name and queryset to avoid + unintended consequences. + + Examples: + ``` + @BigQueryTask.shared( + BigQueryTask.Settings( + # table_name = "example", <- explicitly set the table name + write_disposition=BigQueryTask.WriteDisposition.WRITE_TRUNCATE, + chunk_size=1000, + fields=["first_name", "joined_at", "is_active"], + ) + ) + def user(): # All users will be saved to a BQ table named "user". + return User.objects.all() + ``` + + Args: + settings: The settings for this BigQuery task. + + Returns: + A wrapper-function which expects to receive a callable that returns + a queryset and returns a Celery task to save the queryset to + BigQuery. + """ + + def wrapper(get_queryset: "BigQueryTask.GetQuerySet"): + table_name = settings.table_name or get_queryset.__name__ + cls.register_table_name(table_name) + + # Wraps the task with retry logic. + def task(self: "BigQueryTask", *task_args, **task_kwargs): + try: + cls._load_data_into_bq( + self, table_name, *task_args, **task_kwargs + ) + except Exception as exc: + raise self.retry( + args=task_args, + kwargs=task_kwargs, + exc=exc, + countdown=settings.retry_countdown, + ) + + # Namespace the task with service's name. If the name is not + # explicitly provided, it defaults to the name of the decorated + # function. + name = settings.kwargs.pop("name", None) + name = get_task_name( + name if isinstance(name, str) else get_queryset + ) + + return t.cast( + BigQueryTask, + _shared_task( # type: ignore[call-overload] + **settings.kwargs, + name=name, + time_limit=settings.time_limit, + max_retries=settings.max_retries, + settings=settings, + get_queryset=staticmethod(get_queryset), + )(task), + ) + + return wrapper diff --git a/codeforlife/tasks/bigquery_test.py b/codeforlife/tasks/bigquery_test.py new file mode 100644 index 0000000..1cbdf98 --- /dev/null +++ b/codeforlife/tasks/bigquery_test.py @@ -0,0 +1,378 @@ +""" +© Ocado Group +Created on 02/10/2025 at 17:22:38(+01:00). +""" + +import csv +import io +import os +import typing as t +from datetime import date, datetime, time, timedelta, timezone +from tempfile import NamedTemporaryFile +from unittest.mock import MagicMock + +from celery import Celery +from django.conf import settings +from django.db.models.query import QuerySet +from google.cloud.bigquery import CreateDisposition, SourceFormat + +from ..tests import CeleryTestCase +from ..types import KwArgs +from ..user.models import User +from .bigquery import BigQueryTask + +if t.TYPE_CHECKING: + from tempfile import _TemporaryFileWrapper + +CsvFile = t.Union[io.BufferedReader, "_TemporaryFileWrapper[bytes]"] + +# pylint: disable=missing-class-docstring + + +# pylint: disable-next=too-many-instance-attributes,too-many-public-methods +class TestLoadDataIntoBigQueryTask(CeleryTestCase): + + append_users: BigQueryTask + truncate_users: BigQueryTask + + @staticmethod + def _get_users(order_by: t.Optional[str] = None): + queryset = User.objects.all() + if order_by: + queryset = queryset.order_by(order_by) + return queryset + + @classmethod + def setUpClass(cls): + cls.app = Celery(broker="memory://") + + cls.append_users = BigQueryTask.shared( + BigQueryTask.Settings( + table_name="user__append", + write_disposition=BigQueryTask.WriteDisposition.WRITE_APPEND, + chunk_size=10, + fields=["first_name", "is_active"], + ) + )(cls._get_users) + + cls.truncate_users = BigQueryTask.shared( + BigQueryTask.Settings( + table_name="user__truncate", + write_disposition=BigQueryTask.WriteDisposition.WRITE_TRUNCATE, + chunk_size=10, + fields=["first_name", "is_active"], + ) + )(cls._get_users) + + return super().setUpClass() + + def setUp(self): + def target(relative_dot_path: str): # Shortcut for patching. + return f"{BigQueryTask.__module__}.{relative_dot_path}" + + # Mock creating a NamedTemporaryFile. + # pylint: disable-next=consider-using-with + self.csv_file = NamedTemporaryFile( + mode="w+b", suffix=".csv", delete=False + ) + self.addCleanup(os.remove, self.csv_file.name) + self.mock_named_temporary_file = self.patch( + target("NamedTemporaryFile"), return_value=self.csv_file + ) + + # Mock getting GCP service account credentials. + self.credentials = "I can haz cheezburger?" + self.mock_get_gcp_service_account_credentials = self.patch( + target("get_gcp_service_account_credentials"), + return_value=self.credentials, + ) + + # Mock BigQuery client and its methods. + self.mock_bq_client = MagicMock() + self.mock_bq_client_class = self.patch( + target("Client"), return_value=self.mock_bq_client + ) + + # Mock load_table_from_file method and its result(). + self.mock_load_table_from_file: MagicMock = ( + self.mock_bq_client.load_table_from_file + ) + self.mock_load_job = MagicMock() + self.mock_load_table_from_file.return_value = self.mock_load_job + self.mock_load_job_result: MagicMock = self.mock_load_job.result + self.job_config = MagicMock() + self.mock_load_job_config_class = self.patch( + target("LoadJobConfig"), return_value=self.job_config + ) + + return super().setUp() + + # assertions + + def _assert_queryset_written_to_csv( + self, + queryset: QuerySet[t.Any], + fields: t.List[str], + csv_file: t.Optional[CsvFile] = None, + ): + # Read the actual CSV content. + csv_file = csv_file or self.csv_file + csv_file.seek(0) + actual_content = csv_file.read().decode("utf-8") + + # Generate the expected CSV content. + csv_content = io.StringIO() + csv_writer = csv.writer( + csv_content, lineterminator="\n", quoting=csv.QUOTE_MINIMAL + ) + csv_writer.writerow(fields) # Write the headers. + for obj in queryset: + csv_writer.writerow( + [ + BigQueryTask.format_value_for_csv(getattr(obj, field)) + for field in fields + ] + ) + expected_content = csv_content.getvalue().rstrip() + + # Assert the actual CSV content matches the expected content. + assert actual_content == expected_content + + def _assert_csv_file_loaded_into_bigquery( + self, + table_name: str, + token_lifetime_seconds: int, + write_disposition: str, + csv_file: CsvFile, + ): + # Assert BigQuery client was created. + self.mock_get_gcp_service_account_credentials.assert_called_once_with( + token_lifetime_seconds=token_lifetime_seconds + ) + self.mock_bq_client_class.assert_called_once_with( + project=settings.GOOGLE_CLOUD_PROJECT_ID, + credentials=self.credentials, + ) + + # Assert load job was created and run. + self.mock_load_job_config_class.assert_called_once_with( + create_disposition=CreateDisposition.CREATE_IF_NEEDED, + source_format=SourceFormat.CSV, + skip_leading_rows=1, + write_disposition=write_disposition, + time_zone="Etc/UTC", + date_format="YYYY-MM-DD", + time_format="HH24:MI:SS", + datetime_format="YYYY-MM-DD HH24:MI:SS", + ) + self.mock_load_table_from_file.assert_called_once_with( + file_obj=csv_file, + destination=".".join( + [ + settings.GOOGLE_CLOUD_PROJECT_ID, + settings.GOOGLE_CLOUD_BIGQUERY_DATASET_ID, + table_name, + ] + ), + job_config=self.job_config, + ) + self.mock_load_job_result.assert_called_once_with() + + # settings + + # pylint: disable-next=too-many-arguments + def _test_settings( + self, + code: str, + write_disposition: str = BigQueryTask.WriteDisposition.WRITE_APPEND, + chunk_size: int = 10, + fields: t.Optional[t.List[str]] = None, + kwargs: t.Optional[KwArgs] = None, + **settings_kwargs, + ): + with self.assert_raises_validation_error(code=code): + BigQueryTask.Settings( + write_disposition=write_disposition, + chunk_size=chunk_size, + fields=fields or ["some_field"], + kwargs=kwargs or {}, + **settings_kwargs, + ) + + def test_settings__write_disposition_does_not_exist(self): + """Write disposition must exist.""" + self._test_settings( + code="write_disposition_does_not_exist", + write_disposition="WRITE_INVALID", + ) + + def test_settings__chunk_size_lte_0(self): + """Chunk size must be > 0.""" + self._test_settings(code="chunk_size_lte_0", chunk_size=0) + + def test_settings__chunk_size_not_multiple_of_10(self): + """Chunk size must be a multiple of 10.""" + self._test_settings(code="chunk_size_not_multiple_of_10", chunk_size=9) + + def test_settings__no_fields(self): + """Must provide at least 1 field (not including ID field).""" + self._test_settings(code="no_fields", fields=["id"]) + + def test_settings__duplicate_fields(self): + """Fields must be unique.""" + self._test_settings(code="duplicate_fields", fields=["email", "email"]) + + def test_settings__time_limit_lte_0(self): + """Time limit must be > 0.""" + self._test_settings(code="time_limit_lte_0", time_limit=0) + + def test_settings__time_limit_gt_3600(self): + """Time limit must be <= 3600 (1 hour).""" + self._test_settings(code="time_limit_gt_3600", time_limit=3601) + + def test_settings__max_retries_lt_0(self): + """Max retries must be >= 0.""" + self._test_settings(code="max_retries_lt_0", max_retries=-1) + + def test_settings__retry_countdown_lt_0(self): + """Retry countdown must be >= 0.""" + self._test_settings(code="retry_countdown_lt_0", retry_countdown=-1) + + def test_settings__task_unbound(self): + """BigQueryTask must be bound.""" + self._test_settings(code="task_unbound", kwargs={"bind": False}) + + def test_settings__base_not_subclass(self): + """Base must be a subclass of BigQueryTask.""" + self._test_settings(code="base_not_subclass", kwargs={"base": int}) + + # register_table_name + + def test_register_table_name__registered(self): + """An already registered table name raises a ValidationError.""" + table_name = self.append_users.settings.table_name + assert table_name + assert table_name in BigQueryTask.TABLE_NAMES + with self.assert_raises_validation_error( + code="table_name_already_registered" + ): + BigQueryTask.register_table_name(table_name) + + def test_register_table_name__unregistered(self): + """An unregistered table name does not raise an error.""" + table_name = "some_unique_table_name" + assert table_name not in BigQueryTask.TABLE_NAMES + BigQueryTask.register_table_name(table_name) + assert table_name in BigQueryTask.TABLE_NAMES + + # format_value_for_csv + + def test_format_value_for_csv__none(self): + """None is converted to an empty string.""" + assert "" == BigQueryTask.format_value_for_csv(None) + + def test_format_value_for_csv__bool(self): + """Booleans are converted to 0 or 1.""" + assert "True" == BigQueryTask.format_value_for_csv(True) + assert "False" == BigQueryTask.format_value_for_csv(False) + + def test_format_value_for_csv__datetime(self): + """Datetimes are converted to ISO 8601 format with a space separator.""" + assert "2025-02-01 11:30:15" == BigQueryTask.format_value_for_csv( + datetime( + year=2025, month=2, day=1, hour=12, minute=30, second=15 + ).replace(tzinfo=timezone(timedelta(hours=1))) + ) + + def test_format_value_for_csv__date(self): + """Dates are converted to ISO 8601 format.""" + assert "2025-02-01" == BigQueryTask.format_value_for_csv( + date(year=2025, month=2, day=1) + ) + + def test_format_value_for_csv__time(self): + """Times are converted to ISO 8601 format, ignoring timezone info.""" + assert "12:30:15" == BigQueryTask.format_value_for_csv( + time(hour=12, minute=30, second=15) + ) + + # get_ordered_queryset + + def _test_get_ordered_queryset(self, order_by: t.Optional[str] = None): + task = self.append_users + queryset = task.get_ordered_queryset(order_by=order_by) + assert queryset.ordered + assert list(queryset) == list( + User.objects.order_by(order_by or task.settings.id_field) + ) + + def test_get_ordered_queryset__pre_ordered(self): + """Does not reorder an already ordered queryset.""" + self._test_get_ordered_queryset(order_by="first_name") + + def test_get_ordered_queryset__post_ordered(self): + """Orders the queryset if not already ordered. The default is by ID.""" + self._test_get_ordered_queryset() + + # write_queryset_to_csv + + def _test_write_queryset_to_csv( + self, + queryset: QuerySet[t.Any], + fields: t.List[str], + chunk_size: int = 10, + ): + assert queryset.exists() == BigQueryTask.write_queryset_to_csv( + fields=fields, + chunk_size=chunk_size, + queryset=queryset, + csv_file=self.csv_file, + ) + + self._assert_queryset_written_to_csv(queryset, fields) + + def test_write_queryset_to_csv__all(self): + """Values are written to the CSV file.""" + queryset = User.objects.all() + assert queryset.exists() + self._test_write_queryset_to_csv(queryset, fields=["first_name"]) + + def test_write_queryset_to_csv__none(self): + """No values are written to the CSV file.""" + queryset = User.objects.none() + assert not queryset.exists() + self._test_write_queryset_to_csv(queryset, fields=["first_name"]) + + # shared + + def _test_shared__write(self, task: BigQueryTask): + self.apply_task(name=task.name) + + # Assert CSV file was created. + self.mock_named_temporary_file.assert_called_once_with( + mode="w+b", suffix=".csv", delete=True + ) + + # Assert queryset was written to CSV. + assert self.csv_file.closed + with open(self.csv_file.name, "rb") as csv_file: + self._assert_queryset_written_to_csv( + queryset=task.get_ordered_queryset(), + fields=task.settings.fields, + csv_file=csv_file, + ) + + self._assert_csv_file_loaded_into_bigquery( + table_name=task.settings.table_name or task.get_queryset.__name__, + token_lifetime_seconds=task.settings.time_limit, + write_disposition=task.settings.write_disposition, + csv_file=self.csv_file, + ) + + def test_shared__write_append(self): + """The append_users task writes data to BigQuery in append mode.""" + self._test_shared__write(self.append_users) + + def test_shared__write_truncate(self): + """The append_users task writes data to BigQuery in truncate mode.""" + self._test_shared__write(self.truncate_users) diff --git a/codeforlife/tasks/data_warehouse.py b/codeforlife/tasks/data_warehouse.py deleted file mode 100644 index 4b8a6d0..0000000 --- a/codeforlife/tasks/data_warehouse.py +++ /dev/null @@ -1,592 +0,0 @@ -""" -© Ocado Group -Created on 06/10/2025 at 17:15:37(+01:00). -""" - -import csv -import io -import logging -import typing as t -from dataclasses import dataclass -from datetime import date, datetime, time, timezone - -from celery import Task -from celery import shared_task as _shared_task -from django.conf import settings as django_settings -from django.core.exceptions import ValidationError -from django.db.models.query import QuerySet -from google.auth import default, impersonated_credentials -from google.cloud import storage as gcs # type: ignore[import-untyped] -from google.oauth2 import service_account - -from .utils import get_task_name - -_BQ_TABLE_NAMES: t.Set[str] = set() - - -# pylint: disable-next=abstract-method -class DataWarehouseTask(Task): - """A task that saves a queryset as CSV files in the GCS bucket.""" - - timestamp_key = "_timestamp" - - GetQuerySet: t.TypeAlias = t.Callable[..., QuerySet[t.Any]] - BqTableWriteMode: t.TypeAlias = t.Literal["overwrite", "append"] - - # pylint: disable-next=too-many-instance-attributes - class Settings: - """The settings for a data warehouse task.""" - - # pylint: disable-next=too-many-arguments,too-many-branches - def __init__( - self, - bq_table_write_mode: "DataWarehouseTask.BqTableWriteMode", - chunk_size: int, - fields: t.List[str], - id_field: str = "id", - time_limit: int = 3600, - bq_table_name: t.Optional[str] = None, - max_retries: int = 5, - retry_countdown: int = 10, - **kwargs, - ): - # pylint: disable=line-too-long - """Create the settings for a data warehouse task. - - Args: - bq_table_write_mode: The BigQuery table's write-mode. - chunk_size: The number of objects/rows per CSV. Must be a multiple of 10. - fields: The [Django model] fields to include in the CSV. - id_field: The name of the field used to identify each object. - time_limit: The maximum amount of time this task is allowed to take before it's hard-killed. - bq_table_name: The name of the BigQuery table where these CSV files will ultimately be imported into. If not provided, the name of the decorated function will be used instead. - max_retries: The maximum number of retries allowed. - retry_countdown: The countdown before attempting the next retry. - """ - # pylint: enable=line-too-long - - # Set required values as defaults. - kwargs.setdefault("bind", True) - kwargs.setdefault("base", DataWarehouseTask) - - # Ensure the ID field is always present. - if id_field not in fields: - fields.append(id_field) - - # Validate args. - if chunk_size <= 0: - raise ValidationError( - "The chunk size must be > 0.", - code="chunk_size_lte_0", - ) - if chunk_size % 10 != 0: - raise ValidationError( - "The chunk size must be a multiple of 10.", - code="chunk_size_not_multiple_of_10", - ) - if len(fields) <= 1: - raise ValidationError( - "Must provide at least 1 field (not including ID field).", - code="no_fields", - ) - if len(fields) != len(set(fields)): - raise ValidationError( - "Fields must be unique.", - code="duplicate_fields", - ) - if time_limit <= 0: - raise ValidationError( - "The time limit must be > 0.", - code="time_limit_lte_0", - ) - if time_limit > 3600: - raise ValidationError( - "The time limit must be <= 3600 (1 hour).", - code="time_limit_gt_3600", - ) - if max_retries < 0: - raise ValidationError( - "The max retries must be >= 0.", - code="max_retries_lt_0", - ) - if retry_countdown < 0: - raise ValidationError( - "The retry countdown must be >= 0.", - code="retry_countdown_lt_0", - ) - if kwargs["bind"] is not True: - raise ValidationError( - "The task must be bound.", code="task_unbound" - ) - if not issubclass(kwargs["base"], DataWarehouseTask): - raise ValidationError( - f"The base must be a subclass of " - f"'{DataWarehouseTask.__module__}." - f"{DataWarehouseTask.__qualname__}'.", - code="base_not_subclass", - ) - - self._bq_table_write_mode = bq_table_write_mode - self._chunk_size = chunk_size - self._fields = fields - self._id_field = id_field - self._time_limit = time_limit - self._bq_table_name = bq_table_name - self._max_retries = max_retries - self._retry_countdown = retry_countdown - self._kwargs = kwargs - - # Get the runtime settings based on the BigQuery table's write-mode. - bq_table_write_mode_is_append = bq_table_write_mode == "append" - self._only_list_blobs_from_current_timestamp = ( - bq_table_write_mode_is_append - ) - self._delete_blobs_not_from_current_timestamp = ( - not bq_table_write_mode_is_append - ) - - @property - def bq_table_write_mode(self): - """The BigQuery table's write-mode.""" - return self._bq_table_write_mode - - @property - def chunk_size(self): - """The number of objects/rows per CSV. Must be a multiple of 10.""" - return self._chunk_size - - @property - def fields(self): - """The [Django model] fields to include in the CSV.""" - return self._fields - - @property - def id_field(self): - """The name of the field used to identify each object.""" - return self._id_field - - @property - def time_limit(self): - """ - The maximum amount of time this task is allowed to take before it's - hard-killed. - """ - return self._time_limit - - @property - def bq_table_name(self): - """ - The name of the BigQuery table where the CSV files will ultimately - be imported into. - """ - return self._bq_table_name - - @property - def max_retries(self): - """The maximum number of retries allowed.""" - return self._max_retries - - @property - def retry_countdown(self): - """The countdown before attempting the next retry.""" - return self._retry_countdown - - @property - def only_list_blobs_from_current_timestamp(self): - """Whether to only list blobs from the current timestamp.""" - return self._only_list_blobs_from_current_timestamp - - @property - def delete_blobs_not_from_current_timestamp(self): - """Whether to delete all blobs not from the current timestamp.""" - return self._delete_blobs_not_from_current_timestamp - - settings: Settings - get_queryset: GetQuerySet - - @dataclass - class ChunkMetadata: - """All of the metadata used to track a chunk.""" - - bq_table_name: str # the name of the BigQuery table - bq_table_write_mode: "DataWarehouseTask.BqTableWriteMode" - timestamp: str # when the task was first run - obj_i_start: int # object index span start - obj_i_end: int # object index span end - - def to_blob_name(self): - """Convert this chunk metadata into a blob name.""" - - # E.g. "user__append/2025-01-01_00:00:00__1_1000.csv" - return ( - f"{self.bq_table_name}__{self.bq_table_write_mode}/" - f"{self.timestamp}__{self.obj_i_start}_{self.obj_i_end}.csv" - ) - - @classmethod - def from_blob_name(cls, blob_name: str): - """Extract the chunk metadata from a blob name.""" - - # E.g. "user__append/2025-01-01_00:00:00__1_1000.csv" - # "user__append", "2025-01-01_00:00:00__1_1000.csv" - dir_name, file_name = blob_name.split("/") - # "user", "append" - bq_table_name, bq_table_write_mode = dir_name.rsplit( - "__", maxsplit=1 - ) - assert bq_table_write_mode in ("overwrite", "append") - # "2025-01-01_00:00:00__1_1000" - file_name = file_name.removesuffix(".csv") - # "2025-01-01_00:00:00", "1_1000" - timestamp, obj_i_span = file_name.split("__") - # "1", "1000" - obj_i_start, obj_i_end = obj_i_span.split("_") - - return cls( - bq_table_name=bq_table_name, - bq_table_write_mode=t.cast( - DataWarehouseTask.BqTableWriteMode, bq_table_write_mode - ), - timestamp=timestamp, - obj_i_start=int(obj_i_start), - obj_i_end=int(obj_i_end), - ) - - def _get_gcs_bucket(self): - # Set the scopes of the credentials. - # https://cloud.google.com/storage/docs/oauth-scopes - scopes = ["https://www.googleapis.com/auth/devstorage.full_control"] - - if django_settings.ENV == "local": - # Load the credentials from a local JSON file. - credentials = service_account.Credentials.from_service_account_file( - "/replace/me/with/path/to/service_account.json", - scopes=scopes, - ) - else: - # Use Workload Identity Federation to get the default credentials - # from the environment. These are the short-lived credentials from - # the AWS IAM role. - source_credentials, _ = default() - - # Create the impersonated credentials object - credentials = impersonated_credentials.Credentials( - source_credentials=source_credentials, - target_principal=( - django_settings.GOOGLE_CLOUD_STORAGE_SERVICE_ACCOUNT_NAME - ), - target_scopes=scopes, - # The lifetime of the impersonated credentials in seconds. - lifetime=self.settings.time_limit, - ) - - # Create a client with the impersonated credentials and get the bucket. - return gcs.Client(credentials=credentials).bucket( - django_settings.GOOGLE_CLOUD_STORAGE_BUCKET_NAME - ) - - def init_csv_writer(self): - """Initializes a CSV writer. - - Returns: - A tuple where the first value is the string buffer containing the - CSV content and the second value is a CSV writer which handles - writing new rows to the CSV buffer. - """ - csv_content = io.StringIO() - csv_writer = csv.writer( - csv_content, lineterminator="\n", quoting=csv.QUOTE_MINIMAL - ) - csv_writer.writerow(self.settings.fields) # Write the headers. - return csv_content, csv_writer - - @staticmethod - def write_csv_row( - writer: "csv.Writer", # type: ignore[name-defined] - values: t.Tuple[t.Any, ...], - ): - """Write the values to the CSV file in a format BigQuery accepts. - - Args: - writer: The CSV writer which handles formatting the row. - values: The values to write to the CSV file. - """ - # Reimport required to avoid being mocked during testing. - # pylint: disable-next=reimported,import-outside-toplevel - from datetime import datetime as _datetime - - # Transform the values into their SQL representations. - csv_row: t.List[str] = [] - for value in values: - if value is None: - value = "" # BigQuery treats an empty string as NULL/None. - elif isinstance(value, _datetime): - value = ( - value.astimezone(timezone.utc) - .replace(tzinfo=None) - .isoformat(sep=" ") - ) - elif isinstance(value, (date, time)): - value = value.isoformat() - elif not isinstance(value, str): - value = str(value) - - csv_row.append(value) - - writer.writerow(csv_row) - - @staticmethod - def to_timestamp(dt: datetime): - """ - Formats a datetime to a timestamp to be used in a CSV name. - E.g. "2025-01-01_00:00:00" - """ - return dt.strftime("%Y-%m-%d_%H:%M:%S") - - @staticmethod - # pylint: disable-next=too-many-locals,bad-staticmethod-argument - def _save_query_set_as_csvs_in_gcs_bucket( - self: "DataWarehouseTask", timestamp: str, *task_args, **task_kwargs - ): - # Get the queryset. - queryset = self.get_queryset(*task_args, **task_kwargs) - - # Count the objects in the queryset and ensure there's at least 1. - obj_count = queryset.count() - if obj_count == 0: - return - - # If the queryset is not ordered, order it by ID by default. - if not queryset.ordered: - queryset = queryset.order_by(self.settings.id_field) - - # Limit the queryset to the object count to ensure the number of - # digits in the count remains consistent. - queryset = queryset[:obj_count] - - # Impersonate the service account and get access to the GCS bucket. - bucket = self._get_gcs_bucket() - - # The name of the last blob from the current timestamp. - last_blob_name_from_current_timestamp: t.Optional[str] = None - - # The name of the directory where the blobs are expected to be located. - blob_dir_name = ( - f"{self.settings.bq_table_name}__" - f"{self.settings.bq_table_write_mode}/" - ) - - # List all the existing blobs. - for blob in t.cast( - t.Iterator[gcs.Blob], - bucket.list_blobs( - prefix=blob_dir_name - + ( - timestamp - if self.settings.only_list_blobs_from_current_timestamp - else "" - ) - ), - ): - blob_name = t.cast(str, blob.name) - - # Check if found first blob from current timestamp. - if ( - self.settings.only_list_blobs_from_current_timestamp - or blob_name.startswith(blob_dir_name + timestamp) - ): - last_blob_name_from_current_timestamp = blob_name - # Check if blobs not from the current timestamp should be deleted. - elif self.settings.delete_blobs_not_from_current_timestamp: - logging.info('Deleting blob "%s".', blob_name) - blob.delete() - - # Track the current and starting object index (1-based). - obj_i = obj_i_start = ( - # ...extract the starting object index from its name. - self.ChunkMetadata.from_blob_name( - last_blob_name_from_current_timestamp - ).obj_i_end - + 1 - # If found a blob from the current timestamp... - if last_blob_name_from_current_timestamp is not None - else 1 # ...else start with the 1st object. - ) - - # If the queryset is not starting with the first object... - if obj_i != 1: - # ...offset the queryset... - offset = obj_i - 1 - logging.info("Offsetting queryset by %d objects.", offset) - queryset = queryset[offset:] - - # ...and ensure there's at least 1 object. - if not queryset.exists(): - return - - chunk_i = obj_i // self.settings.chunk_size # Chunk index (0-based). - - # Track content of the current CSV file. - csv_content, csv_writer = self.init_csv_writer() - - # Uploads the current CSV file to the GCS bucket. - def upload_csv(obj_i_end: int): - # Calculate the starting object index for the current chunk. - obj_i_start = (chunk_i * self.settings.chunk_size) + 1 - - # Generate the path to the CSV in the bucket. - blob_name = self.ChunkMetadata( - bq_table_name=self.settings.bq_table_name, - bq_table_write_mode=self.settings.bq_table_write_mode, - timestamp=timestamp, - obj_i_start=obj_i_start, - obj_i_end=obj_i_end, - ).to_blob_name() - - # Create a blob object for the CSV file's path and upload it. - logging.info("Uploading %s to bucket.", blob_name) - blob = bucket.blob(blob_name) - blob.upload_from_string( - csv_content.getvalue().strip(), content_type="text/csv" - ) - - # Iterate through the all the objects in the queryset. The objects - # are retrieved in chunks (no caching) to avoid OOM errors. For each - # object, a tuple of values is returned. The order of the values in - # the tuple is determined by the order of the fields. - for obj_i, values in enumerate( - t.cast( - t.Iterator[t.Tuple[t.Any, ...]], - queryset.values_list(*self.settings.fields).iterator( - chunk_size=self.settings.chunk_size - ), - ), - start=obj_i_start, - ): - if obj_i % self.settings.chunk_size == 1: # If start of a chunk... - if obj_i != obj_i_start: # ...and not the 1st iteration... - # ...upload the chunk's CSV and increment its index... - upload_csv(obj_i_end=obj_i - 1) - chunk_i += 1 - - # ...and start a new CSV. - csv_content, csv_writer = self.init_csv_writer() - - self.write_csv_row(csv_writer, values) - - upload_csv(obj_i_end=obj_i) # Upload final (maybe partial) chunk. - - @classmethod - def shared(cls, settings: Settings): - # pylint: disable=line-too-long,anomalous-backslash-in-string - """Create a Celery task that saves a queryset as CSV files in the GCS - bucket. - - This decorator handles chunking a queryset to avoid out-of-memory (OOM) - errors. Each chunk is saved as a separate CSV file and follows a naming - convention that tracks 2 dimensions: - - 1. timestamp - When this task first ran (in case of retries). - 2. object index (obj_i) - The start and end index of the objects. - - The naming convention follows the format: - `{timestamp}__{i_start}_{i_end}.csv` - The timestamp follows the format: - `{YYYY}-{MM}-{DD}_{HH}:{MM}:{SS}` (e.g. `2025-12-01_23:59:59`) - - NOTE: The index is padded with zeros to ensure sorting by name is - consistent. For example, the index span from 1 to 500 would be - `001_500`. - - Ultimately, these CSV files are imported into a BigQuery table, after - which they are deleted from the GCS bucket. - - Each task *must* be given a distinct table name and queryset to avoid - unintended consequences. - - Examples: - ``` - @DataWarehouseTask.shared( - DataWarehouseTask.Options( - # bq_table_name = "example", <- Alternatively, set the table name like so. - bq_table_write_mode="append", - chunk_size=1000, - fields=["first_name", "joined_at", "is_active"], - ) - ) - def user(): # CSVs will be saved to a BQ table named "user" - return User.objects.all() - ``` - - Args: - settings: The settings for this data warehouse task. - - Returns: - A wrapper-function which expects to receive a callable that returns - a queryset and returns a Celery task to save the queryset as CSV - files in the GCS bucket. - """ - # pylint: enable=line-too-long,anomalous-backslash-in-string - - def wrapper(get_queryset: "DataWarehouseTask.GetQuerySet"): - # Get BigQuery table name and validate it's not already registered. - bq_table_name = settings.bq_table_name or get_queryset.__name__ - if bq_table_name in _BQ_TABLE_NAMES: - raise ValueError( - f'The BigQuery table name "{bq_table_name}" is already' - "registered." - ) - _BQ_TABLE_NAMES.add(bq_table_name) - - # Overwrite BigQuery table name. - # pylint: disable-next=protected-access - settings._bq_table_name = bq_table_name - - # Wraps the task with retry logic. - def task(self: "DataWarehouseTask", *task_args, **task_kwargs): - # If this is not the first run... - if self.request.retries: - # ...pop the timestamp passed from the first run. - timestamp = t.cast(str, task_kwargs.pop(self.timestamp_key)) - else: # ...else get the current timestamp. - timestamp = self.to_timestamp(datetime.now(timezone.utc)) - - try: - cls._save_query_set_as_csvs_in_gcs_bucket( - self, timestamp, *task_args, **task_kwargs - ) - except Exception as exc: - # Pass the timestamp to the retry. - task_kwargs[self.timestamp_key] = timestamp - - raise self.retry( - args=task_args, - kwargs=task_kwargs, - exc=exc, - countdown=settings.retry_countdown, - ) - - # pylint: disable-next=protected-access - kwargs = settings._kwargs - - # Namespace the task with service's name. If the name is not - # explicitly provided, it defaults to the name of the decorated - # function. - name = kwargs.pop("name", None) - name = get_task_name( - name if isinstance(name, str) else get_queryset - ) - - return t.cast( - DataWarehouseTask, - _shared_task( # type: ignore[call-overload] - **kwargs, - name=name, - time_limit=settings.time_limit, - max_retries=settings.max_retries, - settings=settings, - get_queryset=staticmethod(get_queryset), - )(task), - ) - - return wrapper diff --git a/codeforlife/tasks/data_warehouse_test.py b/codeforlife/tasks/data_warehouse_test.py deleted file mode 100644 index 9fb9e3f..0000000 --- a/codeforlife/tasks/data_warehouse_test.py +++ /dev/null @@ -1,456 +0,0 @@ -""" -© Ocado Group -Created on 02/10/2025 at 17:22:38(+01:00). -""" - -import typing as t -from datetime import date, datetime, time, timedelta, timezone -from unittest.mock import MagicMock, call, patch - -from celery import Celery - -from ..tests import CeleryTestCase -from ..types import Args, KwArgs -from ..user.models import User -from .data_warehouse import DataWarehouseTask as DWT - -# pylint: disable=missing-class-docstring - - -@DWT.shared( - DWT.Settings( - bq_table_name="user__append", - bq_table_write_mode="append", - chunk_size=10, - fields=["first_name", "is_active"], - ) -) -def append_users(): - """Append all users in the "user__append" BigQuery table.""" - return User.objects.all() - - -@DWT.shared( - DWT.Settings( - bq_table_name="user__overwrite", - bq_table_write_mode="overwrite", - chunk_size=10, - fields=["first_name", "is_active"], - ) -) -def overwrite_users(): - """Overwrite all users in the "user__overwrite" BigQuery table.""" - return User.objects.all() - - -class MockGcsBlob: - def __init__(self, chunk_metadata: DWT.ChunkMetadata): - self.chunk_metadata = chunk_metadata - self.upload_from_string = MagicMock() - self.delete = MagicMock() - - @property - def name(self): - """The name of the blob.""" - return self.chunk_metadata.to_blob_name() - - def __repr__(self): - return self.name - - @classmethod - # pylint: disable-next=too-many-arguments - def generate_list( - cls, task: DWT, timestamp: str, obj_i_start: int, obj_i_end: int - ): - """Generate a list of mock GCS blobs. - - Args: - task: The task that produced these blobs. - timestamp: When the task first ran. - obj_i_start: The object index span start. - obj_i_end: The object index span end. - obj_count_digits: The number of digits in the object count - - Returns: - A list of mock GCS blobs. - """ - return [ - cls( - chunk_metadata=DWT.ChunkMetadata( - bq_table_name=task.settings.bq_table_name, - bq_table_write_mode=task.settings.bq_table_write_mode, - timestamp=timestamp, - obj_i_start=obj_i_start, - obj_i_end=min( - obj_i_start + task.settings.chunk_size - 1, obj_i_end - ), - ) - ) - for obj_i_start in range( - obj_i_start, obj_i_end + 1, task.settings.chunk_size - ) - ] - - -# pylint: disable-next=too-few-public-methods -class MockGcsBucket: - def __init__( - self, - list_blobs_return: t.List[MockGcsBlob], - new_blobs: t.List[MockGcsBlob], - ): - self.list_blobs = MagicMock(return_value=list_blobs_return) - self.blob = MagicMock(side_effect=new_blobs) # Returns 1 blob per call. - self.copy_blob = MagicMock() - - -# pylint: disable-next=too-many-instance-attributes,too-many-public-methods -class TestDataWarehouseTask(CeleryTestCase): - - @classmethod - def setUpClass(cls): - cls.app = Celery(broker="memory://") - - return super().setUpClass() - - def setUp(self): - self.date = date(year=2025, month=2, day=1) - self.time = time(hour=12, minute=30, second=15) - self.datetime = datetime.combine(self.date, self.time) - - self.bq_table_name = "example" - self.bq_table_write_mode: DWT.BqTableWriteMode = "append" - self.timestamp = DWT.to_timestamp(self.datetime) - self.obj_i_start = 1 - self.obj_i_end = 100 - - self.blob_name = ( - f"{self.bq_table_name}__{self.bq_table_write_mode}/" - f"{self.timestamp}__{self.obj_i_start}_{self.obj_i_end}.csv" - ) - - return super().setUp() - - # Settings - - def _test_settings( - self, - code: str, - bq_table_write_mode: DWT.BqTableWriteMode = "append", - chunk_size: int = 10, - fields: t.Optional[t.List[str]] = None, - **kwargs, - ): - with self.assert_raises_validation_error(code=code): - DWT.Settings( - bq_table_write_mode=bq_table_write_mode, - chunk_size=chunk_size, - fields=fields or ["some_field"], - **kwargs, - ) - - def test_settings__chunk_size_lte_0(self): - """Chunk size must be > 0.""" - self._test_settings(code="chunk_size_lte_0", chunk_size=0) - - def test_settings__chunk_size_not_multiple_of_10(self): - """Chunk size must be a multiple of 10.""" - self._test_settings(code="chunk_size_not_multiple_of_10", chunk_size=9) - - def test_settings__no_fields(self): - """Must provide at least 1 field (not including ID field).""" - self._test_settings(code="no_fields", fields=["id"]) - - def test_settings__duplicate_fields(self): - """Fields must be unique.""" - self._test_settings(code="duplicate_fields", fields=["email", "email"]) - - def test_settings__time_limit_lte_0(self): - """Time limit must be > 0.""" - self._test_settings(code="time_limit_lte_0", time_limit=0) - - def test_settings__time_limit_gt_3600(self): - """Time limit must be <= 3600 (1 hour).""" - self._test_settings(code="time_limit_gt_3600", time_limit=3601) - - def test_settings__max_retries_lt_0(self): - """Max retries must be >= 0.""" - self._test_settings(code="max_retries_lt_0", max_retries=-1) - - def test_settings__retry_countdown_lt_0(self): - """Retry countdown must be >= 0.""" - self._test_settings(code="retry_countdown_lt_0", retry_countdown=-1) - - def test_settings__task_unbound(self): - """Task must be bound.""" - self._test_settings(code="task_unbound", bind=False) - - def test_settings__base_not_subclass(self): - """Base must be a subclass of DWT.""" - self._test_settings(code="base_not_subclass", base=int) - - # To timestamp - - def test_to_timestamp(self): - """Format should match YYYY-MM-DD_HH:MM:SS.""" - timestamp = DWT.to_timestamp(self.datetime) - assert timestamp == "2025-02-01_12:30:15" - - # Chunk metadata - - def test_chunk_metadata__to_blob_name(self): - """Can successfully convert a chunk's metadata into a blob name.""" - blob_name = DWT.ChunkMetadata( - bq_table_name=self.bq_table_name, - bq_table_write_mode=self.bq_table_write_mode, - timestamp=self.timestamp, - obj_i_start=self.obj_i_start, - obj_i_end=self.obj_i_end, - ).to_blob_name() - assert blob_name == self.blob_name - - def test_chunk_metadata__from_blob_name(self): - """Can successfully convert a chunk's metadata into a blob name.""" - chunk_metadata = DWT.ChunkMetadata.from_blob_name(self.blob_name) - assert chunk_metadata.bq_table_name == self.bq_table_name - assert chunk_metadata.bq_table_write_mode == self.bq_table_write_mode - assert chunk_metadata.timestamp == self.timestamp - assert chunk_metadata.obj_i_start == self.obj_i_start - assert chunk_metadata.obj_i_end == self.obj_i_end - - # Init CSV writer - - def test_init_csv_writer(self): - """ - Initializing a CSV writer returns the content stream and writer. The - fields should be pre-written as headers and the writer should write to - the buffer. - """ - task = append_users - csv_content, csv_writer = task.init_csv_writer() - - csv_row = ["a", "b", "c"] - csv_writer.writerow(csv_row) - - assert csv_content.getvalue().strip() == "\n".join( - [",".join(task.settings.fields), ",".join(csv_row)] - ) - - # Write CSV row - - def _test_write_csv_row(self, *values: t.Tuple[t.Any, str]): - original_values = tuple(value[0] for value in values) - formatted_values = [value[1] for value in values] - - csv_content, csv_writer = append_users.init_csv_writer() - DWT.write_csv_row(csv_writer, original_values) - - assert csv_content.getvalue().strip().split("\n", maxsplit=1)[ - 1 - ] == ",".join(formatted_values) - - def test_write_csv_row__bool(self): - """Booleans are converted to 0 or 1.""" - self._test_write_csv_row((True, "True"), (False, "False")) - - def test_write_csv_row__datetime(self): - """Datetimes are converted to ISO 8601 format with a space separator.""" - tz_aware_dt = self.datetime.replace(tzinfo=timezone(timedelta(hours=1))) - self._test_write_csv_row((tz_aware_dt, "2025-02-01 11:30:15")) - - def test_write_csv_row__date(self): - """Dates are converted to ISO 8601 format.""" - self._test_write_csv_row((self.date, "2025-02-01")) - - def test_write_csv_row__time(self): - """Times are converted to ISO 8601 format, ignoring timezone info.""" - self._test_write_csv_row((self.time, "12:30:15")) - - def test_write_csv_row__str(self): - """ - Strings containing commas and/or double quotes are correctly escaped. - """ - self._test_write_csv_row( - ("a", "a"), ("b,c", '"b,c"'), ('"d","e"', '"""d"",""e"""') - ) - - # Task - - # pylint: disable-next=too-many-arguments,too-many-locals - def _test_task( - self, - task: DWT, - retries: int = 0, - since_previous_run: t.Optional[timedelta] = None, - task_args: t.Optional[Args] = None, - task_kwargs: t.Optional[KwArgs] = None, - ): - """Assert that a data warehouse task uploads chunks of data as CSVs. - - Args: - task: The task to make assertions on. - retries: How many times the task has been retried. - since_previous_run: How long ago since the task was previously run. - task_args: The arguments passed to the task. - task_kwargs: The keyword arguments passed to the task. - """ - - # Validate args. - assert 0 <= retries <= task.settings.max_retries - - # Get the queryset and order it if not already ordered. - task_args, task_kwargs = task_args or tuple(), task_kwargs or {} - queryset = task.get_queryset(*task_args, **task_kwargs) - if not queryset.ordered: - queryset = queryset.order_by(task.settings.id_field) - - # Count the objects in the queryset. - obj_count = queryset.count() - # Assume we've uploaded 1 chunk if retrying. - uploaded_obj_count = task.settings.chunk_size if retries else 0 - assert uploaded_obj_count <= obj_count - assert (obj_count - uploaded_obj_count) > 0 - - # Get the current datetime. - now = datetime.now(timezone.utc) - - # If not the first run, generate blobs for the last timestamp. - # Assume the same object count and timedelta. - uploaded_blobs_from_last_timestamp = ( - MockGcsBlob.generate_list( - task=task, - timestamp=DWT.to_timestamp(now - since_previous_run), - obj_i_start=1, - obj_i_end=obj_count, - ) - if since_previous_run is not None - else [] - ) - - # Generate blobs for the current timestamp. - timestamp = DWT.to_timestamp(now) - uploaded_blobs_from_current_timestamp = MockGcsBlob.generate_list( - task=task, - timestamp=timestamp, - obj_i_start=1, - obj_i_end=uploaded_obj_count, - ) - non_uploaded_blobs_from_current_timestamp = MockGcsBlob.generate_list( - task=task, - timestamp=timestamp, - obj_i_start=uploaded_obj_count + 1, - obj_i_end=obj_count, - ) - - # Generate a mock GCS bucket. - bucket = MockGcsBucket( - # Return the appropriate list based on the filters. - list_blobs_return=( - uploaded_blobs_from_current_timestamp - if task.settings.only_list_blobs_from_current_timestamp - else uploaded_blobs_from_last_timestamp - + uploaded_blobs_from_current_timestamp - ), - # Return the blobs not yet uploaded in the current timestamp. - new_blobs=non_uploaded_blobs_from_current_timestamp, - ) - - # Patch methods called in the task to create predetermined results. - with patch.object( - DWT, "_get_gcs_bucket", return_value=bucket - ) as get_gcs_bucket: - with patch("codeforlife.tasks.data_warehouse.datetime") as dt: - dt_now = t.cast(MagicMock, dt.now) - dt_now.return_value = now - if retries: - task_kwargs[DWT.timestamp_key] = timestamp - - self.apply_task( - task.name, - task_args, - task_kwargs, - retries=retries, - ) - - if retries: - dt_now.assert_not_called() - else: - dt_now.assert_called_once_with(timezone.utc) - get_gcs_bucket.assert_called_once() - - # Assert that the blobs for the BigQuery table were listed. If the - # table's write-mode is append, assert only the blobs in the current - # timestamp were listed. - bucket.list_blobs.assert_called_once_with( - prefix=( - f"{task.settings.bq_table_name}__" - f"{task.settings.bq_table_write_mode}/" - ) - + ( - timestamp - if task.settings.only_list_blobs_from_current_timestamp - else "" - ) - ) - - # Assert that all blobs not in the current timestamp were (not) deleted. - for blob in uploaded_blobs_from_last_timestamp: - if task.settings.delete_blobs_not_from_current_timestamp: - blob.delete.assert_called_once() - else: - blob.delete.assert_not_called() - - # Assert that a blob was created for each non-uploaded blob. - bucket.blob.assert_has_calls( - [ - call(blob.name) - for blob in non_uploaded_blobs_from_current_timestamp - ] - ) - - # Assert that each blob was uploaded from a CSV string. - for blob in non_uploaded_blobs_from_current_timestamp: - csv_content, csv_writer = task.init_csv_writer() - for values in t.cast( - t.List[t.Tuple[t.Any, ...]], - queryset.values_list(*task.settings.fields)[ - blob.chunk_metadata.obj_i_start - - 1 : blob.chunk_metadata.obj_i_end - ], - ): - DWT.write_csv_row(csv_writer, values) - - blob.upload_from_string.assert_called_once_with( - csv_content.getvalue().strip(), content_type="text/csv" - ) - - def test_task__append__no_retry__no_previous_blobs(self): - """ - 1. All blobs are uploaded on the first run - no retries are required. - 2. Blobs from the previous timestamp are not in the bucket. - """ - self._test_task(append_users) - - def test_task__append__no_retry__previous_blobs(self): - """ - 1. All blobs are uploaded on the first run - no retries are required. - 2. Blobs from the previous timestamp are in the bucket. - 3. The blobs from the previous timestamp are not deleted. - """ - self._test_task(append_users, since_previous_run=timedelta(days=1)) - - def test_task__append__retry__no_previous_blobs(self): - """ - 1. Some blobs are uploaded on the first run - retries are required. - 2. The order of magnitude in the object count has changed - the blobs - uploaded in the previous run(s) need to be renamed. - """ - self._test_task(append_users, retries=1) - - def test_task__overwrite__no_retry__previous_blobs(self): - """ - 1. All blobs are uploaded on the first run - no retries are required. - 2. Blobs from the previous timestamp are in the bucket. - 3. The blobs from the previous timestamp are deleted. - """ - self._test_task(overwrite_users, since_previous_run=timedelta(days=1)) diff --git a/codeforlife/tests/celery.py b/codeforlife/tests/celery.py index f73287e..98a902c 100644 --- a/codeforlife/tests/celery.py +++ b/codeforlife/tests/celery.py @@ -9,7 +9,7 @@ from celery import Celery, Task from django.db.models import QuerySet -from ..tasks import DataWarehouseTask, get_task_name +from ..tasks import BigQueryTask, get_task_name from ..types import Args, KwArgs from .test import TestCase @@ -48,9 +48,9 @@ def apply_task( task: Task = self.app.tasks[get_task_name(name)] task.apply(args=args, kwargs=kwargs, **options) - def assert_data_warehouse_task( + def assert_bigquery_task( self, - task: DataWarehouseTask, + task: BigQueryTask, args: t.Optional[Args] = None, kwargs: t.Optional[KwArgs] = None, ): diff --git a/codeforlife/tests/test.py b/codeforlife/tests/test.py index 712b4af..e4ddc99 100644 --- a/codeforlife/tests/test.py +++ b/codeforlife/tests/test.py @@ -5,12 +5,16 @@ import typing as t from unittest.case import _AssertRaisesContext +from unittest.mock import MagicMock, patch from django.core.exceptions import ValidationError from django.http import HttpResponse from django.test import Client as _Client from django.test import TestCase as _TestCase +if t.TYPE_CHECKING: + from unittest.mock import _patch_pass_arg # type: ignore[attr-defined] + class Client(_Client): """A Django client with type hints.""" @@ -40,6 +44,40 @@ def options(self, *args, **kwargs): class TestCase(_TestCase): """Base test case for all tests to inherit.""" + def _start_and_stop_patch(self, p: "_patch_pass_arg"): + mock = t.cast(MagicMock, p.start()) + self.addCleanup(p.stop) + return mock + + def patch(self, target: str, **kwargs): + """Patch a target. + + Sets up automatic unpatching on test cleanup. + + Args: + target: The target to patch. + + Returns: + The mock object. + """ + return self._start_and_stop_patch(patch(target, **kwargs)) + + def patch_object(self, target: t.Any, attribute: str, **kwargs): + """Patch an attribute on a target. + + Sets up automatic unpatching on test cleanup. + + Args: + target: The target to patch. + attribute: The attribute to patch. + + Returns: + The mock object. + """ + return self._start_and_stop_patch( + patch.object(target, attribute, **kwargs) + ) + def assert_raises_validation_error(self, code: str, *args, **kwargs): """Assert code block raises a validation error.