From 80f4ebdbe61c3746b3b68519496a413ca4572e30 Mon Sep 17 00:00:00 2001 From: rm-dr <96270320+rm-dr@users.noreply.github.com> Date: Thu, 26 Mar 2026 14:37:18 -0700 Subject: [PATCH] Remove S3 + encryption --- Cargo.lock | 985 +--------------------- Cargo.toml | 4 - crates/pile-config/src/lib.rs | 30 - crates/pile-dataset/src/dataset.rs | 90 +- crates/pile-io/Cargo.toml | 4 - crates/pile-io/src/chacha/format.rs | 50 -- crates/pile-io/src/chacha/mod.rs | 9 - crates/pile-io/src/chacha/reader.rs | 151 ---- crates/pile-io/src/chacha/reader_async.rs | 165 ---- crates/pile-io/src/chacha/writer.rs | 91 -- crates/pile-io/src/chacha/writer_async.rs | 260 ------ crates/pile-io/src/lib.rs | 5 - crates/pile-io/src/s3reader.rs | 181 ---- crates/pile-value/Cargo.toml | 2 - crates/pile-value/src/source/mod.rs | 3 - crates/pile-value/src/source/s3.rs | 322 ------- crates/pile-value/src/source/s3reader.rs | 158 ---- crates/pile-value/src/value/item.rs | 51 +- crates/pile-value/src/value/readers.rs | 8 +- crates/pile/Cargo.toml | 3 - crates/pile/src/cli.rs | 3 +- crates/pile/src/command/encrypt.rs | 75 -- crates/pile/src/command/mod.rs | 23 - crates/pile/src/command/upload.rs | 284 ------- 24 files changed, 42 insertions(+), 2915 deletions(-) delete mode 100644 crates/pile-io/src/chacha/format.rs delete mode 100644 crates/pile-io/src/chacha/mod.rs delete mode 100644 crates/pile-io/src/chacha/reader.rs delete mode 100644 crates/pile-io/src/chacha/reader_async.rs delete mode 100644 crates/pile-io/src/chacha/writer.rs delete mode 100644 crates/pile-io/src/chacha/writer_async.rs delete mode 100644 crates/pile-io/src/s3reader.rs delete mode 100644 crates/pile-value/src/source/s3.rs delete mode 100644 crates/pile-value/src/source/s3reader.rs delete mode 100644 crates/pile/src/command/encrypt.rs delete mode 100644 crates/pile/src/command/upload.rs diff --git a/Cargo.lock b/Cargo.lock index 0d668fb..c1dc4a0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -14,16 +14,6 @@ version = "1.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "aae1277d39aeec15cb388266ecc24b11c80469deae6067e17a1a7aa9e5c1f234" -[[package]] -name = "aead" -version = "0.5.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d122413f284cf2d62fb1b7db97e02edb8cda96d769b16e443a4f6195e35662b0" -dependencies = [ - "crypto-common 0.1.7", - "generic-array", -] - [[package]] name = "aes" version = "0.8.4" @@ -133,12 +123,6 @@ dependencies = [ "rustversion", ] -[[package]] -name = "array-init" -version = "2.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3d62b7694a562cdf5a74227903507c56ab2cc8bdd1f781ed5cb4cf9c9f810bfc" - [[package]] name = "arrayref" version = "0.3.9" @@ -174,335 +158,6 @@ version = "1.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c08606f8c3cbf4ce6ec8e28fb0014a2c086708fe954eaa885384a6165172e7e8" -[[package]] -name = "aws-credential-types" -version = "1.2.13" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6d203b0bf2626dcba8665f5cd0871d7c2c0930223d6b6be9097592fea21242d0" -dependencies = [ - "aws-smithy-async", - "aws-smithy-runtime-api", - "aws-smithy-types", - "zeroize", -] - -[[package]] -name = "aws-lc-rs" -version = "1.16.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "94bffc006df10ac2a68c83692d734a465f8ee6c5b384d8545a636f81d858f4bf" -dependencies = [ - "aws-lc-sys", - "zeroize", -] - -[[package]] -name = "aws-lc-sys" -version = "0.38.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4321e568ed89bb5a7d291a7f37997c2c0df89809d7b6d12062c81ddb54aa782e" -dependencies = [ - "cc", - "cmake", - "dunce", - "fs_extra", -] - -[[package]] -name = "aws-runtime" -version = "1.7.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ede2ddc593e6c8acc6ce3358c28d6677a6dc49b65ba4b37a2befe14a11297e75" -dependencies = [ - "aws-credential-types", - "aws-sigv4", - "aws-smithy-async", - "aws-smithy-eventstream", - "aws-smithy-http", - "aws-smithy-runtime", - "aws-smithy-runtime-api", - "aws-smithy-types", - "aws-types", - "bytes", - "bytes-utils", - "fastrand", - "http 0.2.12", - "http 1.4.0", - "http-body 0.4.6", - "http-body 1.0.1", - "percent-encoding", - "pin-project-lite", - "tracing", - "uuid", -] - -[[package]] -name = "aws-sdk-s3" -version = "1.124.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "744c09d75dfec039a05cf8e117c995ded3b0baffa6eb83f3ed7075a01d8d8947" -dependencies = [ - "aws-credential-types", - "aws-runtime", - "aws-sigv4", - "aws-smithy-async", - "aws-smithy-checksums", - "aws-smithy-eventstream", - "aws-smithy-http", - "aws-smithy-json", - "aws-smithy-observability", - "aws-smithy-runtime", - "aws-smithy-runtime-api", - "aws-smithy-types", - "aws-smithy-xml", - "aws-types", - "bytes", - "fastrand", - "hex", - "hmac", - "http 0.2.12", - "http 1.4.0", - "http-body 1.0.1", - "lru 0.16.3", - "percent-encoding", - "regex-lite", - "sha2 0.10.9", - "tracing", - "url", -] - -[[package]] -name = "aws-sigv4" -version = "1.4.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "37411f8e0f4bea0c3ca0958ce7f18f6439db24d555dbd809787262cd00926aa9" -dependencies = [ - "aws-credential-types", - "aws-smithy-eventstream", - "aws-smithy-http", - "aws-smithy-runtime-api", - "aws-smithy-types", - "bytes", - "crypto-bigint 0.5.5", - "form_urlencoded", - "hex", - "hmac", - "http 0.2.12", - "http 1.4.0", - "p256", - "percent-encoding", - "ring", - "sha2 0.10.9", - "subtle", - "time", - "tracing", - "zeroize", -] - -[[package]] -name = "aws-smithy-async" -version = "1.2.14" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2ffcaf626bdda484571968400c326a244598634dc75fd451325a54ad1a59acfc" -dependencies = [ - "futures-util", - "pin-project-lite", - "tokio", -] - -[[package]] -name = "aws-smithy-checksums" -version = "0.64.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "180dddf5ef0f52a2f99e2fada10e16ea610e507ef6148a42bdc4d5867596aa00" -dependencies = [ - "aws-smithy-http", - "aws-smithy-types", - "bytes", - "crc-fast", - "hex", - "http 1.4.0", - "http-body 1.0.1", - "http-body-util", - "md-5", - "pin-project-lite", - "sha1", - "sha2 0.10.9", - "tracing", -] - -[[package]] -name = "aws-smithy-eventstream" -version = "0.60.19" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1c0b3e587fbaa5d7f7e870544508af8ce82ea47cd30376e69e1e37c4ac746f79" -dependencies = [ - "aws-smithy-types", - "bytes", - "crc32fast", -] - -[[package]] -name = "aws-smithy-http" -version = "0.63.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d619373d490ad70966994801bc126846afaa0d1ee920697a031f0cf63f2568e7" -dependencies = [ - "aws-smithy-eventstream", - "aws-smithy-runtime-api", - "aws-smithy-types", - "bytes", - "bytes-utils", - "futures-core", - "futures-util", - "http 1.4.0", - "http-body 1.0.1", - "http-body-util", - "percent-encoding", - "pin-project-lite", - "pin-utils", - "tracing", -] - -[[package]] -name = "aws-smithy-http-client" -version = "1.1.11" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "00ccbb08c10f6bcf912f398188e42ee2eab5f1767ce215a02a73bc5df1bbdd95" -dependencies = [ - "aws-smithy-async", - "aws-smithy-runtime-api", - "aws-smithy-types", - "h2 0.3.27", - "h2 0.4.13", - "http 0.2.12", - "http 1.4.0", - "http-body 0.4.6", - "hyper 0.14.32", - "hyper 1.8.1", - "hyper-rustls 0.24.2", - "hyper-rustls 0.27.7", - "hyper-util", - "pin-project-lite", - "rustls 0.21.12", - "rustls 0.23.37", - "rustls-native-certs", - "rustls-pki-types", - "tokio", - "tokio-rustls 0.26.4", - "tower", - "tracing", -] - -[[package]] -name = "aws-smithy-json" -version = "0.62.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "27b3a779093e18cad88bbae08dc4261e1d95018c4c5b9356a52bcae7c0b6e9bb" -dependencies = [ - "aws-smithy-types", -] - -[[package]] -name = "aws-smithy-observability" -version = "0.2.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4d3f39d5bb871aaf461d59144557f16d5927a5248a983a40654d9cf3b9ba183b" -dependencies = [ - "aws-smithy-runtime-api", -] - -[[package]] -name = "aws-smithy-runtime" -version = "1.10.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "22ccf7f6eba8b2dcf8ce9b74806c6c185659c311665c4bf8d6e71ebd454db6bf" -dependencies = [ - "aws-smithy-async", - "aws-smithy-http", - "aws-smithy-http-client", - "aws-smithy-observability", - "aws-smithy-runtime-api", - "aws-smithy-types", - "bytes", - "fastrand", - "http 0.2.12", - "http 1.4.0", - "http-body 0.4.6", - "http-body 1.0.1", - "http-body-util", - "pin-project-lite", - "pin-utils", - "tokio", - "tracing", -] - -[[package]] -name = "aws-smithy-runtime-api" -version = "1.11.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b4af6e5def28be846479bbeac55aa4603d6f7986fc5da4601ba324dd5d377516" -dependencies = [ - "aws-smithy-async", - "aws-smithy-types", - "bytes", - "http 0.2.12", - "http 1.4.0", - "pin-project-lite", - "tokio", - "tracing", - "zeroize", -] - -[[package]] -name = "aws-smithy-types" -version = "1.4.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8ca2734c16913a45343b37313605d84e7d8b34a4611598ce1d25b35860a2bed3" -dependencies = [ - "base64-simd", - "bytes", - "bytes-utils", - "futures-core", - "http 0.2.12", - "http 1.4.0", - "http-body 0.4.6", - "http-body 1.0.1", - "http-body-util", - "itoa", - "num-integer", - "pin-project-lite", - "pin-utils", - "ryu", - "serde", - "time", - "tokio", - "tokio-util", -] - -[[package]] -name = "aws-smithy-xml" -version = "0.60.15" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0ce02add1aa3677d022f8adf81dcbe3046a95f17a1b1e8979c145cd21d3d22b3" -dependencies = [ - "xmlparser", -] - -[[package]] -name = "aws-types" -version = "1.3.13" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0470cc047657c6e286346bdf10a8719d26efd6a91626992e0e64481e44323e96" -dependencies = [ - "aws-credential-types", - "aws-smithy-async", - "aws-smithy-runtime-api", - "aws-smithy-types", - "rustc_version", - "tracing", -] - [[package]] name = "axum" version = "0.8.8" @@ -514,10 +169,10 @@ dependencies = [ "bytes", "form_urlencoded", "futures-util", - "http 1.4.0", - "http-body 1.0.1", + "http", + "http-body", "http-body-util", - "hyper 1.8.1", + "hyper", "hyper-util", "itoa", "matchit", @@ -546,8 +201,8 @@ checksum = "08c78f31d7b1291f7ee735c1c6780ccde7785daae9a9206026862dab7d8792d1" dependencies = [ "bytes", "futures-core", - "http 1.4.0", - "http-body 1.0.1", + "http", + "http-body", "http-body-util", "mime", "pin-project-lite", @@ -568,58 +223,12 @@ dependencies = [ "syn 2.0.117", ] -[[package]] -name = "base16ct" -version = "0.1.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "349a06037c7bf932dd7e7d1f653678b2038b9ad46a74102f1fc7bd7872678cce" - [[package]] name = "base64" version = "0.22.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "72b3254f16251a8381aa12e40e3c4d2f0199f8c6508fbecb9d91f575e0fbb8c6" -[[package]] -name = "base64-simd" -version = "0.8.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "339abbe78e73178762e23bea9dfd08e697eb3f3301cd4be981c0f78ba5859195" -dependencies = [ - "outref", - "vsimd", -] - -[[package]] -name = "base64ct" -version = "1.8.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2af50177e190e07a26ab74f8b1efbfe2ef87da2116221318cb1c2e82baf7de06" - -[[package]] -name = "binrw" -version = "0.15.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d53195f985e88ab94d1cc87e80049dd2929fd39e4a772c5ae96a7e5c4aad3642" -dependencies = [ - "array-init", - "binrw_derive", - "bytemuck", -] - -[[package]] -name = "binrw_derive" -version = "0.15.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5910da05ee556b789032c8ff5a61fb99239580aa3fd0bfaa8f4d094b2aee00ad" -dependencies = [ - "either", - "owo-colors", - "proc-macro2", - "quote", - "syn 2.0.117", -] - [[package]] name = "bitflags" version = "2.11.0" @@ -731,16 +340,6 @@ version = "1.11.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b35204fbdc0b3f4446b89fc1ac2cf84a8a68971995d0bf2e925ec7cd960f9cb3" -[[package]] -name = "bytes-utils" -version = "0.1.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7dafe3a8757b027e2be6e4e5601ed563c55989fcf1546e933c66c8eb3a058d35" -dependencies = [ - "bytes", - "either", -] - [[package]] name = "cbc" version = "0.1.2" @@ -774,30 +373,6 @@ version = "1.0.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9330f8b2ff13f34540b44e946ef35111825727b38d33286ef986142615121801" -[[package]] -name = "chacha20" -version = "0.9.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c3613f74bd2eac03dad61bd53dbe620703d4371614fe0bc3b9f04dd36fe4e818" -dependencies = [ - "cfg-if", - "cipher", - "cpufeatures", -] - -[[package]] -name = "chacha20poly1305" -version = "0.10.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "10cd79432192d1c0f4e1a0fef9527696cc039165d729fb41b3f4f4f354c2dc35" -dependencies = [ - "aead", - "chacha20", - "cipher", - "poly1305", - "zeroize", -] - [[package]] name = "chrono" version = "0.4.43" @@ -819,7 +394,6 @@ checksum = "773f3b9af64447d2ce9850330c473515014aa235e6a783b02db81ff39e4a3dad" dependencies = [ "crypto-common 0.1.7", "inout", - "zeroize", ] [[package]] @@ -862,15 +436,6 @@ version = "1.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3a822ea5bc7590f9d40f1ba12c0dc3c2760f3482c6984db1573ad11031420831" -[[package]] -name = "cmake" -version = "0.1.57" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "75443c44cd6b379beb8c5b45d85d0773baf31cce901fe7bb252f4eff3008ef7d" -dependencies = [ - "cc", -] - [[package]] name = "colorchoice" version = "1.0.4" @@ -910,12 +475,6 @@ dependencies = [ "web-sys", ] -[[package]] -name = "const-oid" -version = "0.9.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c2459377285ad874054d797f3ccebf984978aa39129f6eafde5cdc8315b612f8" - [[package]] name = "const-oid" version = "0.10.2" @@ -972,33 +531,6 @@ dependencies = [ "libc", ] -[[package]] -name = "crc" -version = "3.3.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9710d3b3739c2e349eb44fe848ad0b7c8cb1e42bd87ee49371df2f7acaf3e675" -dependencies = [ - "crc-catalog", -] - -[[package]] -name = "crc-catalog" -version = "2.4.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "19d374276b40fb8bbdee95aef7c7fa6b5316ec764510eb64b8dd0e2ed0d7e7f5" - -[[package]] -name = "crc-fast" -version = "1.9.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2fd92aca2c6001b1bf5ba0ff84ee74ec8501b52bbef0cac80bf25a6c1d87a83d" -dependencies = [ - "crc", - "digest 0.10.7", - "rustversion", - "spin 0.10.0", -] - [[package]] name = "crc32fast" version = "1.5.0" @@ -1048,28 +580,6 @@ version = "0.2.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "460fbee9c2c2f33933d720630a6a0bac33ba7053db5344fac858d4b8952d77d5" -[[package]] -name = "crypto-bigint" -version = "0.4.9" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ef2b4b23cddf68b89b8f8069890e8c270d54e2d5fe1b143820234805e4cb17ef" -dependencies = [ - "generic-array", - "rand_core", - "subtle", - "zeroize", -] - -[[package]] -name = "crypto-bigint" -version = "0.5.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0dc92fb57ca44df6db8059111ab3af99a63d5d0f8375d9972e319a379c6bab76" -dependencies = [ - "rand_core", - "subtle", -] - [[package]] name = "crypto-common" version = "0.1.7" @@ -1077,7 +587,6 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "78c8292055d1c1df0cce5d180393dc8cce0abec0a7102adb6c7b1eef6016d60a" dependencies = [ "generic-array", - "rand_core", "typenum", ] @@ -1159,16 +668,6 @@ dependencies = [ "adler32", ] -[[package]] -name = "der" -version = "0.6.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f1a467a65c5e759bce6e65eaf91cc29f466cdc57cb65777bd646872a8a1fd4de" -dependencies = [ - "const-oid 0.9.6", - "zeroize", -] - [[package]] name = "deranged" version = "0.5.7" @@ -1198,7 +697,6 @@ checksum = "9ed9a281f7bc9b7576e61468ba615a66a5c8cfdff42420a70aa82701a3b1e292" dependencies = [ "block-buffer 0.10.4", "crypto-common 0.1.7", - "subtle", ] [[package]] @@ -1208,7 +706,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f8bf3682cdec91817be507e4aa104314898b95b84d74f3d43882210101a545b6" dependencies = [ "block-buffer 0.11.0", - "const-oid 0.10.2", + "const-oid", "crypto-common 0.2.0", ] @@ -1229,50 +727,12 @@ version = "2.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "117240f60069e65410b3ae1bb213295bd828f707b5bec6596a1afc8793ce0cbc" -[[package]] -name = "dunce" -version = "1.0.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "92773504d58c093f6de2459af4af33faa518c13451eb8f2b5698ed3d36e7c813" - -[[package]] -name = "ecdsa" -version = "0.14.8" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "413301934810f597c1d19ca71c8710e99a3f1ba28a0d2ebc01551a2daeea3c5c" -dependencies = [ - "der", - "elliptic-curve", - "rfc6979", - "signature", -] - [[package]] name = "either" version = "1.15.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "48c757948c5ede0e46177b7add2e67155f70e33c07fea8284df6576da70b3719" -[[package]] -name = "elliptic-curve" -version = "0.12.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e7bb888ab5300a19b8e5bceef25ac745ad065f3c9f7efc6de1b91958110891d3" -dependencies = [ - "base16ct", - "crypto-bigint 0.4.9", - "der", - "digest 0.10.7", - "ff", - "generic-array", - "group", - "pkcs8", - "rand_core", - "sec1", - "subtle", - "zeroize", -] - [[package]] name = "encode_unicode" version = "1.0.0" @@ -1358,16 +818,6 @@ dependencies = [ "simd-adler32", ] -[[package]] -name = "ff" -version = "0.12.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d013fc25338cc558c5c2cfbad646908fb23591e2404481826742b651c9af7160" -dependencies = [ - "rand_core", - "subtle", -] - [[package]] name = "find-msvc-tools" version = "0.1.9" @@ -1437,12 +887,6 @@ dependencies = [ "windows-sys 0.59.0", ] -[[package]] -name = "fs_extra" -version = "1.3.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "42703706b716c37f96a77aea830392ad231f44c9e9a67872fa5548707e11b11c" - [[package]] name = "futures-channel" version = "0.3.32" @@ -1561,36 +1005,6 @@ dependencies = [ "web-time", ] -[[package]] -name = "group" -version = "0.12.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5dfbfb3a6cfbd390d5c9564ab283a0349b9b9fcd46a706c1eb10e0db70bfbac7" -dependencies = [ - "ff", - "rand_core", - "subtle", -] - -[[package]] -name = "h2" -version = "0.3.27" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0beca50380b1fc32983fc1cb4587bfa4bb9e78fc259aad4a0032d2080309222d" -dependencies = [ - "bytes", - "fnv", - "futures-core", - "futures-sink", - "futures-util", - "http 0.2.12", - "indexmap", - "slab", - "tokio", - "tokio-util", - "tracing", -] - [[package]] name = "h2" version = "0.4.13" @@ -1602,7 +1016,7 @@ dependencies = [ "fnv", "futures-core", "futures-sink", - "http 1.4.0", + "http", "indexmap", "slab", "tokio", @@ -1638,38 +1052,12 @@ version = "0.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2304e00983f87ffb38b55b444b5e3b60a884b5d30c0fca7d82fe33449bbe55ea" -[[package]] -name = "hex" -version = "0.4.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7f24254aa9a54b5c858eaee2f5bccdb46aaf0e486a595ed5fd8f86ba55232a70" - -[[package]] -name = "hmac" -version = "0.12.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6c49c37c09c17a53d937dfbb742eb3a961d65a994e6bcdcf37e7399d0cc8ab5e" -dependencies = [ - "digest 0.10.7", -] - [[package]] name = "htmlescape" version = "0.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e9025058dae765dee5070ec375f591e2ba14638c63feff74f13805a72e523163" -[[package]] -name = "http" -version = "0.2.12" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "601cbb57e577e2f5ef5be8e7b83f0f63994f25aa94d673e54a92d5c516d101f1" -dependencies = [ - "bytes", - "fnv", - "itoa", -] - [[package]] name = "http" version = "1.4.0" @@ -1680,17 +1068,6 @@ dependencies = [ "itoa", ] -[[package]] -name = "http-body" -version = "0.4.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7ceab25649e9960c0311ea418d17bee82c0dcec1bd053b5f9a66e265a693bed2" -dependencies = [ - "bytes", - "http 0.2.12", - "pin-project-lite", -] - [[package]] name = "http-body" version = "1.0.1" @@ -1698,7 +1075,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1efedce1fb8e6913f23e0c92de8e62cd5b772a67e7b3946df930a62566c93184" dependencies = [ "bytes", - "http 1.4.0", + "http", ] [[package]] @@ -1709,8 +1086,8 @@ checksum = "b021d93e26becf5dc7e1b75b1bed1fd93124b374ceb73f43d4d4eafec896a64a" dependencies = [ "bytes", "futures-core", - "http 1.4.0", - "http-body 1.0.1", + "http", + "http-body", "pin-project-lite", ] @@ -1735,30 +1112,6 @@ dependencies = [ "typenum", ] -[[package]] -name = "hyper" -version = "0.14.32" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "41dfc780fdec9373c01bae43289ea34c972e40ee3c9f6b3c8801a35f35586ce7" -dependencies = [ - "bytes", - "futures-channel", - "futures-core", - "futures-util", - "h2 0.3.27", - "http 0.2.12", - "http-body 0.4.6", - "httparse", - "httpdate", - "itoa", - "pin-project-lite", - "socket2 0.5.10", - "tokio", - "tower-service", - "tracing", - "want", -] - [[package]] name = "hyper" version = "1.8.1" @@ -1769,9 +1122,9 @@ dependencies = [ "bytes", "futures-channel", "futures-core", - "h2 0.4.13", - "http 1.4.0", - "http-body 1.0.1", + "h2", + "http", + "http-body", "httparse", "httpdate", "itoa", @@ -1782,35 +1135,19 @@ dependencies = [ "want", ] -[[package]] -name = "hyper-rustls" -version = "0.24.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ec3efd23720e2049821a693cbc7e65ea87c72f1c58ff2f9522ff332b1491e590" -dependencies = [ - "futures-util", - "http 0.2.12", - "hyper 0.14.32", - "log", - "rustls 0.21.12", - "tokio", - "tokio-rustls 0.24.1", -] - [[package]] name = "hyper-rustls" version = "0.27.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e3c93eb611681b207e1fe55d5a71ecf91572ec8a6705cdb6857f7d8d5242cf58" dependencies = [ - "http 1.4.0", - "hyper 1.8.1", + "http", + "hyper", "hyper-util", - "rustls 0.23.37", - "rustls-native-certs", + "rustls", "rustls-pki-types", "tokio", - "tokio-rustls 0.26.4", + "tokio-rustls", "tower-service", ] @@ -1822,7 +1159,7 @@ checksum = "70206fc6890eaca9fde8a0bf71caa2ddfc9fe045ac9e5c70df101a7dbde866e0" dependencies = [ "bytes", "http-body-util", - "hyper 1.8.1", + "hyper", "hyper-util", "native-tls", "tokio", @@ -1840,14 +1177,14 @@ dependencies = [ "bytes", "futures-channel", "futures-util", - "http 1.4.0", - "http-body 1.0.1", - "hyper 1.8.1", + "http", + "http-body", + "hyper", "ipnet", "libc", "percent-encoding", "pin-project-lite", - "socket2 0.6.2", + "socket2", "system-configuration", "tokio", "tower-service", @@ -2255,15 +1592,6 @@ dependencies = [ "hashbrown 0.15.5", ] -[[package]] -name = "lru" -version = "0.16.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a1dc47f592c06f33f8e3aea9591776ec7c9f9e4124778ff8a3c3b87159f7e593" -dependencies = [ - "hashbrown 0.16.1", -] - [[package]] name = "lz4_flex" version = "0.11.5" @@ -2291,16 +1619,6 @@ version = "0.3.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4facc753ae494aeb6e3c22f839b158aebd4f9270f55cd3c79906c45476c47ab4" -[[package]] -name = "md-5" -version = "0.10.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d89e7ee0cfbedfc4da3340218492196241d89eefb6dab27de5df917a6d2e78cf" -dependencies = [ - "cfg-if", - "digest 0.10.7", -] - [[package]] name = "md5" version = "0.7.0" @@ -2393,11 +1711,11 @@ dependencies = [ "bytes", "encoding_rs", "futures-util", - "http 1.4.0", + "http", "httparse", "memchr", "mime", - "spin 0.9.8", + "spin", "version_check", ] @@ -2455,15 +1773,6 @@ version = "0.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cf97ec579c3c42f953ef76dbf8d55ac91fb219dde70e49aa4a6b7d74e9919050" -[[package]] -name = "num-integer" -version = "0.1.46" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7969661fd2958a5cb096e56c8e1ad0444ac2bbcd0061bd28660485a44879858f" -dependencies = [ - "num-traits", -] - [[package]] name = "num-traits" version = "0.2.19" @@ -2492,12 +1801,6 @@ version = "0.1.13" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "269bca4c2591a28585d6bf10d9ed0332b7d76900a1b02bec41bdc3a2cdcda107" -[[package]] -name = "opaque-debug" -version = "0.3.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c08d65885ee38876c4f86fa503fb49d7b507c2b62552df7c70b2fce627e06381" - [[package]] name = "openssl" version = "0.10.76" @@ -2542,12 +1845,6 @@ dependencies = [ "vcpkg", ] -[[package]] -name = "outref" -version = "0.5.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1a80800c0488c3a21695ea981a54918fbb37abf04f4d0720c453632255e2ff0e" - [[package]] name = "ownedbytes" version = "0.9.0" @@ -2557,23 +1854,6 @@ dependencies = [ "stable_deref_trait", ] -[[package]] -name = "owo-colors" -version = "4.3.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d211803b9b6b570f68772237e415a029d5a50c65d382910b879fb19d3271f94d" - -[[package]] -name = "p256" -version = "0.11.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "51f44edd08f51e2ade572f141051021c5af22677e42b7dd28a88155151c33594" -dependencies = [ - "ecdsa", - "elliptic-curve", - "sha2 0.10.9", -] - [[package]] name = "parking_lot" version = "0.12.5" @@ -2680,14 +1960,11 @@ version = "0.0.2" dependencies = [ "anstyle", "anyhow", - "aws-sdk-s3", "axum", - "bytes", "clap", "indicatif", "pile-config", "pile-dataset", - "pile-io", "pile-toolbox", "pile-value", "serde", @@ -2766,10 +2043,6 @@ dependencies = [ name = "pile-io" version = "0.0.2" dependencies = [ - "aws-sdk-s3", - "binrw", - "chacha20poly1305", - "smartstring", "tokio", ] @@ -2787,9 +2060,7 @@ version = "0.0.2" dependencies = [ "anyhow", "async-trait", - "base64", "blake3", - "chacha20poly1305", "chrono", "epub", "id3", @@ -2831,16 +2102,6 @@ version = "1.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ad78bf43dcf80e8f950c92b84f938a0fc7590b7f6866fbcbeca781609c115590" -[[package]] -name = "pkcs8" -version = "0.9.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9eca2c590a5f85da82668fa685c09ce2888b9430e83299debf1f34b65fd4a4ba" -dependencies = [ - "der", - "spki", -] - [[package]] name = "pkg-config" version = "0.3.32" @@ -2860,17 +2121,6 @@ dependencies = [ "miniz_oxide", ] -[[package]] -name = "poly1305" -version = "0.8.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8159bd90725d2df49889a078b54f4f79e87f1f8a8444194cdca81d38f5393abf" -dependencies = [ - "cpufeatures", - "opaque-debug", - "universal-hash", -] - [[package]] name = "portable-atomic" version = "1.13.0" @@ -3033,12 +2283,6 @@ dependencies = [ "regex-syntax", ] -[[package]] -name = "regex-lite" -version = "0.1.9" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cab834c73d247e67f4fae452806d17d3c7501756d98c8808d7c9c7aa7d18f973" - [[package]] name = "regex-syntax" version = "0.8.8" @@ -3056,12 +2300,12 @@ dependencies = [ "encoding_rs", "futures-core", "futures-util", - "h2 0.4.13", - "http 1.4.0", - "http-body 1.0.1", + "h2", + "http", + "http-body", "http-body-util", - "hyper 1.8.1", - "hyper-rustls 0.27.7", + "hyper", + "hyper-rustls", "hyper-tls", "hyper-util", "js-sys", @@ -3088,17 +2332,6 @@ dependencies = [ "web-sys", ] -[[package]] -name = "rfc6979" -version = "0.3.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7743f17af12fa0b03b803ba12cd6a8d9483a587e89c69445e3909655c0b9fabb" -dependencies = [ - "crypto-bigint 0.4.9", - "hmac", - "zeroize", -] - [[package]] name = "ring" version = "0.17.14" @@ -3169,15 +2402,6 @@ version = "2.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "357703d41365b4b27c590e3ed91eabb1b663f07c4c084095e60cbed4362dff0d" -[[package]] -name = "rustc_version" -version = "0.4.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cfcb3a22ef46e85b45de6ee7e79d063319ebb6594faafcf1c225ea92ab6e9b92" -dependencies = [ - "semver", -] - [[package]] name = "rustix" version = "1.1.3" @@ -3191,44 +2415,19 @@ dependencies = [ "windows-sys 0.61.2", ] -[[package]] -name = "rustls" -version = "0.21.12" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3f56a14d1f48b391359b22f731fd4bd7e43c97f3c50eee276f3aa09c94784d3e" -dependencies = [ - "log", - "ring", - "rustls-webpki 0.101.7", - "sct", -] - [[package]] name = "rustls" version = "0.23.37" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "758025cb5fccfd3bc2fd74708fd4682be41d99e5dff73c377c0646c6012c73a4" dependencies = [ - "aws-lc-rs", "once_cell", "rustls-pki-types", - "rustls-webpki 0.103.9", + "rustls-webpki", "subtle", "zeroize", ] -[[package]] -name = "rustls-native-certs" -version = "0.8.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "612460d5f7bea540c490b2b6395d8e34a953e52b491accd6c86c8164c5932a63" -dependencies = [ - "openssl-probe", - "rustls-pki-types", - "schannel", - "security-framework", -] - [[package]] name = "rustls-pki-types" version = "1.14.0" @@ -3238,23 +2437,12 @@ dependencies = [ "zeroize", ] -[[package]] -name = "rustls-webpki" -version = "0.101.7" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8b6275d1ee7a1cd780b64aca7726599a1dbc893b1e64144529e55c3c2f745765" -dependencies = [ - "ring", - "untrusted", -] - [[package]] name = "rustls-webpki" version = "0.103.9" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d7df23109aa6c1567d1c575b9952556388da57401e4ace1d15f79eedad0d8f53" dependencies = [ - "aws-lc-rs", "ring", "rustls-pki-types", "untrusted", @@ -3296,30 +2484,6 @@ version = "1.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49" -[[package]] -name = "sct" -version = "0.7.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "da046153aa2352493d6cb7da4b6e5c0c057d8a1d0a9aa8560baffdd945acd414" -dependencies = [ - "ring", - "untrusted", -] - -[[package]] -name = "sec1" -version = "0.3.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3be24c1842290c45df0a7bf069e0c268a747ad05a192f2fd7dcfdbc1cba40928" -dependencies = [ - "base16ct", - "der", - "generic-array", - "pkcs8", - "subtle", - "zeroize", -] - [[package]] name = "security-framework" version = "3.7.0" @@ -3424,17 +2588,6 @@ dependencies = [ "serde", ] -[[package]] -name = "sha1" -version = "0.10.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e3bf829a2d51ab4a5ddf1352d8470c140cadc8301b2ae1789db023f01cedd6ba" -dependencies = [ - "cfg-if", - "cpufeatures", - "digest 0.10.7", -] - [[package]] name = "sha2" version = "0.10.9" @@ -3482,16 +2635,6 @@ dependencies = [ "libc", ] -[[package]] -name = "signature" -version = "1.6.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "74233d3b3b2f6d4b006dc19dee745e73e2a6bfb6f93607cd3b02bd5b00797d7c" -dependencies = [ - "digest 0.10.7", - "rand_core", -] - [[package]] name = "simd-adler32" version = "0.3.8" @@ -3551,16 +2694,6 @@ dependencies = [ "syn 2.0.117", ] -[[package]] -name = "socket2" -version = "0.5.10" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e22376abed350d73dd1cd119b57ffccad95b4e585a7cda43e286245ce23c0678" -dependencies = [ - "libc", - "windows-sys 0.52.0", -] - [[package]] name = "socket2" version = "0.6.2" @@ -3577,22 +2710,6 @@ version = "0.9.8" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6980e8d7511241f8acf4aebddbb1ff938df5eebe98691418c4468d0b72a96a67" -[[package]] -name = "spin" -version = "0.10.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d5fe4ccb98d9c292d56fec89a5e07da7fc4cf0dc11e156b41793132775d3e591" - -[[package]] -name = "spki" -version = "0.6.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "67cf02bbac7a337dc36e4f5a693db6c21e7863f45070f7064577eb4367a3212b" -dependencies = [ - "base64ct", - "der", -] - [[package]] name = "stable_deref_trait" version = "1.2.1" @@ -3736,7 +2853,7 @@ dependencies = [ "itertools 0.14.0", "levenshtein_automata", "log", - "lru 0.12.5", + "lru", "lz4_flex", "measure_time", "memmap2", @@ -3968,7 +3085,7 @@ dependencies = [ "parking_lot", "pin-project-lite", "signal-hook-registry", - "socket2 0.6.2", + "socket2", "tokio-macros", "windows-sys 0.61.2", ] @@ -3994,23 +3111,13 @@ dependencies = [ "tokio", ] -[[package]] -name = "tokio-rustls" -version = "0.24.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c28327cf380ac148141087fbfb9de9d7bd4e84ab5d2c28fbc911d753de8a7081" -dependencies = [ - "rustls 0.21.12", - "tokio", -] - [[package]] name = "tokio-rustls" version = "0.26.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1729aa945f29d91ba541258c8df89027d5792d85a8841fb65e8bf0f4ede4ef61" dependencies = [ - "rustls 0.23.37", + "rustls", "tokio", ] @@ -4102,8 +3209,8 @@ dependencies = [ "bitflags", "bytes", "futures-util", - "http 1.4.0", - "http-body 1.0.1", + "http", + "http-body", "iri-string", "pin-project-lite", "tower", @@ -4289,16 +3396,6 @@ version = "0.5.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "81e544489bf3d8ef66c953931f56617f423cd4b5494be343d9b9d3dda037b9a3" -[[package]] -name = "universal-hash" -version = "0.5.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fc1de2c688dc15305988b563c3854064043356019f97a4b46276fe734c4f07ea" -dependencies = [ - "crypto-common 0.1.7", - "subtle", -] - [[package]] name = "untrusted" version = "0.9.0" @@ -4434,12 +3531,6 @@ version = "0.9.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0b928f33d975fc6ad9f86c8f283853ad26bdd5b10b7f1542aa2fa15e2289105a" -[[package]] -name = "vsimd" -version = "0.8.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5c3082ca00d5a5ef149bb8b555a72ae84c9c59f7250f013ac822ac2e49b19c64" - [[package]] name = "vt100" version = "0.16.2" @@ -5013,12 +4104,6 @@ version = "0.8.28" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3ae8337f8a065cfc972643663ea4279e04e7256de865aa66fe25cec5fb912d3f" -[[package]] -name = "xmlparser" -version = "0.13.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "66fee0b777b0f5ac1c69bb06d361268faafa61cd4682ae064a171c16c433e9e4" - [[package]] name = "yoke" version = "0.8.1" diff --git a/Cargo.toml b/Cargo.toml index ba2b402..288ba07 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -92,8 +92,6 @@ utoipa-swagger-ui = { version = "9.0.2", features = [ tokio = { version = "1.49.0", features = ["full"] } tokio-stream = "0.1" async-trait = "0.1" -aws-sdk-s3 = "1" -aws-config = "1" # CLI & logging tracing = "0.1.44" @@ -112,8 +110,6 @@ toml = "1.0.3" toml_edit = "0.25.4" sha2 = "0.11.0-rc.5" blake3 = "1.8.3" -chacha20poly1305 = "0.10.0" -binrw = "0.15.1" # Extractors pdf = "0.10.0" diff --git a/crates/pile-config/src/lib.rs b/crates/pile-config/src/lib.rs index e633485..4012d06 100644 --- a/crates/pile-config/src/lib.rs +++ b/crates/pile-config/src/lib.rs @@ -40,12 +40,6 @@ pub struct DatasetConfig { pub source: HashMap, } -#[derive(Debug, Clone, Deserialize)] -pub struct S3Credentials { - pub access_key_id: String, - pub secret_access_key: String, -} - #[derive(Debug, Clone, Deserialize)] #[serde(tag = "type")] #[serde(rename_all = "lowercase")] @@ -64,30 +58,6 @@ pub enum Source { #[serde(default)] pattern: GroupPattern, }, - - /// An S3-compatible object store bucket - S3 { - /// If false, ignore this dataset - #[serde(default = "default_true")] - enabled: bool, - - bucket: String, - prefix: Option, - - /// Custom endpoint URL (for MinIO, etc.) - endpoint: Option, - - region: String, - - credentials: S3Credentials, - - /// How to group files into items in this source - #[serde(default)] - pattern: GroupPattern, - - /// If provided, assume objects are encrypted with this secret key. - encryption_key: Option, - }, } // diff --git a/crates/pile-dataset/src/dataset.rs b/crates/pile-dataset/src/dataset.rs index d39ca79..da56f0c 100644 --- a/crates/pile-dataset/src/dataset.rs +++ b/crates/pile-dataset/src/dataset.rs @@ -5,7 +5,7 @@ use pile_config::{ use pile_toolbox::cancelabletask::{CancelFlag, CancelableTaskError}; use pile_value::{ extract::traits::ExtractState, - source::{DataSource, DirDataSource, S3DataSource, misc::path_ts_earliest, string_to_key}, + source::{DataSource, DirDataSource, misc::path_ts_earliest}, value::{Item, PileValue}, }; use serde_json::Value; @@ -33,31 +33,27 @@ pub enum DatasetError { // MARK: Dataset enum // -/// An opened data source — either a local filesystem directory or an S3 bucket. +/// An opened data source pub enum Dataset { Dir(Arc), - S3(Arc), } impl Dataset { pub fn len(&self) -> usize { match self { Self::Dir(ds) => ds.len(), - Self::S3(ds) => ds.len(), } } pub async fn get(&self, key: &str) -> Option { match self { Self::Dir(ds) => ds.get(key).await.ok().flatten(), - Self::S3(ds) => ds.get(key).await.ok().flatten(), } } pub fn iter(&self) -> Box + Send + '_> { match self { Self::Dir(ds) => Box::new(ds.iter()), - Self::S3(ds) => Box::new(ds.iter()), } } @@ -68,14 +64,12 @@ impl Dataset { ) -> Box + Send + '_> { match self { Self::Dir(ds) => Box::new(ds.iter_page(offset, limit)), - Self::S3(ds) => Box::new(ds.iter_page(offset, limit)), } } pub async fn latest_change(&self) -> Result>, std::io::Error> { match self { Self::Dir(ds) => ds.latest_change().await, - Self::S3(ds) => ds.latest_change().await, } } } @@ -148,46 +142,6 @@ impl Datasets { ), ); } - - Source::S3 { - enabled, - bucket, - prefix, - endpoint, - region, - credentials, - pattern, - encryption_key, - } => { - let target = match enabled { - true => &mut sources, - false => &mut disabled_sources, - }; - - let encryption_key = encryption_key.as_ref().map(|x| string_to_key(x)); - - match S3DataSource::new( - label, - bucket, - prefix.as_ref().map(|x| x.as_str()), - endpoint.as_ref().map(|x| x.as_str()), - region, - &credentials.access_key_id, - &credentials.secret_access_key, - 10_000_000, - pattern.clone(), - encryption_key, - ) - .await - { - Ok(ds) => { - target.insert(label.clone(), Dataset::S3(ds)); - } - Err(err) => { - warn!("Could not open S3 source {label}: {err}"); - } - } - } } } @@ -259,46 +213,6 @@ impl Datasets { ), ); } - - Source::S3 { - enabled, - bucket, - prefix, - endpoint, - region, - credentials, - pattern, - encryption_key, - } => { - let target = match enabled { - true => &mut sources, - false => &mut disabled_sources, - }; - - let encryption_key = encryption_key.as_ref().map(|x| string_to_key(x)); - - match S3DataSource::new( - label, - bucket, - prefix.as_ref().map(|x| x.as_str()), - endpoint.as_ref().map(|x| x.as_str()), - region, - &credentials.access_key_id, - &credentials.secret_access_key, - 10_000_000, - pattern.clone(), - encryption_key, - ) - .await - { - Ok(ds) => { - target.insert(label.clone(), Dataset::S3(ds)); - } - Err(err) => { - warn!("Could not open S3 source {label}: {err}"); - } - } - } } } diff --git a/crates/pile-io/Cargo.toml b/crates/pile-io/Cargo.toml index e1160e9..937a831 100644 --- a/crates/pile-io/Cargo.toml +++ b/crates/pile-io/Cargo.toml @@ -9,7 +9,3 @@ workspace = true [dependencies] tokio = { workspace = true } -smartstring = { workspace = true } -aws-sdk-s3 = { workspace = true } -chacha20poly1305 = { workspace = true } -binrw = { workspace = true } diff --git a/crates/pile-io/src/chacha/format.rs b/crates/pile-io/src/chacha/format.rs deleted file mode 100644 index a1c315b..0000000 --- a/crates/pile-io/src/chacha/format.rs +++ /dev/null @@ -1,50 +0,0 @@ -use binrw::{binrw, meta::ReadMagic}; - -#[binrw] -#[brw(little, magic = b"PileChaChav1")] -#[derive(Debug, Clone, Copy)] -pub struct ChaChaHeaderv1 { - pub config: ChaChaConfigv1, - pub plaintext_size: u64, -} - -impl ChaChaHeaderv1 { - pub const SIZE: usize = ChaChaHeaderv1::MAGIC.len() + std::mem::size_of::() + 8; -} - -#[test] -fn chachaheader_size() { - assert_eq!( - ChaChaHeaderv1::SIZE, - std::mem::size_of::() - ChaChaHeaderv1::MAGIC.len() - ) -} - -// -// MARK: config -// - -#[binrw] -#[brw(little)] -#[derive(Debug, Clone, Copy)] -pub struct ChaChaConfigv1 { - pub chunk_size: u64, - pub nonce_size: u64, - pub tag_size: u64, -} - -impl Default for ChaChaConfigv1 { - fn default() -> Self { - Self { - chunk_size: 64 * 1024, - nonce_size: 24, - tag_size: 16, - } - } -} - -impl ChaChaConfigv1 { - pub(crate) fn enc_chunk_size(&self) -> u64 { - self.chunk_size + self.nonce_size + self.tag_size - } -} diff --git a/crates/pile-io/src/chacha/mod.rs b/crates/pile-io/src/chacha/mod.rs deleted file mode 100644 index 8f3f6a1..0000000 --- a/crates/pile-io/src/chacha/mod.rs +++ /dev/null @@ -1,9 +0,0 @@ -mod reader; -mod reader_async; -mod writer; -mod writer_async; - -pub use {reader::*, reader_async::*, writer::*, writer_async::*}; - -mod format; -pub use format::*; diff --git a/crates/pile-io/src/chacha/reader.rs b/crates/pile-io/src/chacha/reader.rs deleted file mode 100644 index 2353238..0000000 --- a/crates/pile-io/src/chacha/reader.rs +++ /dev/null @@ -1,151 +0,0 @@ -use std::io::{Read, Seek, SeekFrom}; - -use crate::{AsyncReader, AsyncSeekReader, chacha::ChaChaHeaderv1}; - -// -// MARK: reader -// - -pub struct ChaChaReaderv1 { - inner: R, - header: ChaChaHeaderv1, - - data_offset: u64, - encryption_key: [u8; 32], - cursor: u64, - plaintext_size: u64, - cached_chunk: Option<(u64, Vec)>, -} - -impl ChaChaReaderv1 { - pub fn new(mut inner: R, encryption_key: [u8; 32]) -> Result { - use binrw::BinReaderExt; - - inner.seek(SeekFrom::Start(0))?; - let header: ChaChaHeaderv1 = inner.read_le().map_err(std::io::Error::other)?; - let data_offset = inner.stream_position()?; - - Ok(Self { - inner, - header, - data_offset, - encryption_key, - cursor: 0, - plaintext_size: header.plaintext_size, - cached_chunk: None, - }) - } - - fn fetch_chunk(&mut self, chunk_index: u64) -> Result<(), std::io::Error> { - use chacha20poly1305::{KeyInit, XChaCha20Poly1305, XNonce, aead::Aead}; - - let enc_start = self.data_offset + chunk_index * self.header.config.enc_chunk_size(); - self.inner.seek(SeekFrom::Start(enc_start))?; - - let mut encrypted = vec![0u8; self.header.config.enc_chunk_size() as usize]; - let n = self.read_exact_or_eof(&mut encrypted)?; - encrypted.truncate(n); - - if encrypted.len() < (self.header.config.nonce_size + self.header.config.tag_size) as usize - { - return Err(std::io::Error::new( - std::io::ErrorKind::InvalidData, - "encrypted chunk too short", - )); - } - - let (nonce_bytes, ciphertext) = encrypted.split_at(self.header.config.nonce_size as usize); - let nonce = XNonce::from_slice(nonce_bytes); - let key = chacha20poly1305::Key::from_slice(&self.encryption_key); - let cipher = XChaCha20Poly1305::new(key); - let plaintext = cipher.decrypt(nonce, ciphertext).map_err(|_| { - std::io::Error::new(std::io::ErrorKind::InvalidData, "decryption failed") - })?; - - self.cached_chunk = Some((chunk_index, plaintext)); - Ok(()) - } - - fn read_exact_or_eof(&mut self, buf: &mut [u8]) -> Result { - let mut total = 0; - while total < buf.len() { - match self.inner.read(&mut buf[total..])? { - 0 => break, - n => total += n, - } - } - Ok(total) - } -} - -impl AsyncReader for ChaChaReaderv1 { - async fn read(&mut self, buf: &mut [u8]) -> Result { - let remaining = self.plaintext_size.saturating_sub(self.cursor); - if remaining == 0 || buf.is_empty() { - return Ok(0); - } - - let chunk_index = self.cursor / self.header.config.chunk_size; - - let need_fetch = match &self.cached_chunk { - None => true, - Some((idx, _)) => *idx != chunk_index, - }; - - if need_fetch { - self.fetch_chunk(chunk_index)?; - } - - #[expect(clippy::unwrap_used)] - let (_, chunk_data) = self.cached_chunk.as_ref().unwrap(); - - let offset_in_chunk = (self.cursor % self.header.config.chunk_size) as usize; - let available = chunk_data.len() - offset_in_chunk; - let to_copy = available.min(buf.len()); - - buf[..to_copy].copy_from_slice(&chunk_data[offset_in_chunk..offset_in_chunk + to_copy]); - self.cursor += to_copy as u64; - Ok(to_copy) - } -} - -impl AsyncSeekReader for ChaChaReaderv1 { - async fn seek(&mut self, pos: SeekFrom) -> Result { - match pos { - SeekFrom::Start(x) => self.cursor = x.min(self.plaintext_size), - - SeekFrom::Current(x) => { - if x < 0 { - let abs = x.unsigned_abs(); - if abs > self.cursor { - return Err(std::io::Error::new( - std::io::ErrorKind::InvalidInput, - "cannot seek past start", - )); - } - self.cursor -= abs; - } else { - self.cursor += x as u64; - } - } - - SeekFrom::End(x) => { - if x < 0 { - let abs = x.unsigned_abs(); - if abs > self.plaintext_size { - return Err(std::io::Error::new( - std::io::ErrorKind::InvalidInput, - "cannot seek past start", - )); - } - self.cursor = self.plaintext_size - abs; - } else { - self.cursor = self.plaintext_size + x as u64; - } - } - } - - self.cursor = self.cursor.min(self.plaintext_size); - Ok(self.cursor) - } -} diff --git a/crates/pile-io/src/chacha/reader_async.rs b/crates/pile-io/src/chacha/reader_async.rs deleted file mode 100644 index 4d1aeb3..0000000 --- a/crates/pile-io/src/chacha/reader_async.rs +++ /dev/null @@ -1,165 +0,0 @@ -use std::io::SeekFrom; - -use crate::{AsyncReader, AsyncSeekReader, chacha::ChaChaHeaderv1}; - -pub struct ChaChaReaderv1Async { - inner: R, - header: ChaChaHeaderv1, - - data_offset: u64, - encryption_key: [u8; 32], - cursor: u64, - plaintext_size: u64, - cached_chunk: Option<(u64, Vec)>, -} - -impl ChaChaReaderv1Async { - pub async fn new(mut inner: R, encryption_key: [u8; 32]) -> Result { - use binrw::BinReaderExt; - use std::io::Cursor; - - inner.seek(SeekFrom::Start(0)).await?; - let mut buf = [0u8; ChaChaHeaderv1::SIZE]; - read_exact(&mut inner, &mut buf).await?; - let header: ChaChaHeaderv1 = Cursor::new(&buf[..]) - .read_le() - .map_err(std::io::Error::other)?; - - Ok(Self { - inner, - header, - data_offset: buf.len() as u64, - encryption_key, - cursor: 0, - plaintext_size: header.plaintext_size, - cached_chunk: None, - }) - } - - async fn fetch_chunk(&mut self, chunk_index: u64) -> Result<(), std::io::Error> { - use chacha20poly1305::{KeyInit, XChaCha20Poly1305, XNonce, aead::Aead}; - - let enc_start = self.data_offset + chunk_index * self.header.config.enc_chunk_size(); - self.inner.seek(SeekFrom::Start(enc_start)).await?; - - let mut encrypted = vec![0u8; self.header.config.enc_chunk_size() as usize]; - let n = read_exact_or_eof(&mut self.inner, &mut encrypted).await?; - encrypted.truncate(n); - - if encrypted.len() < (self.header.config.nonce_size + self.header.config.tag_size) as usize - { - return Err(std::io::Error::new( - std::io::ErrorKind::InvalidData, - "encrypted chunk too short", - )); - } - - let (nonce_bytes, ciphertext) = encrypted.split_at(self.header.config.nonce_size as usize); - let nonce = XNonce::from_slice(nonce_bytes); - let key = chacha20poly1305::Key::from_slice(&self.encryption_key); - let cipher = XChaCha20Poly1305::new(key); - let plaintext = cipher.decrypt(nonce, ciphertext).map_err(|_| { - std::io::Error::new(std::io::ErrorKind::InvalidData, "decryption failed") - })?; - - self.cached_chunk = Some((chunk_index, plaintext)); - Ok(()) - } -} - -async fn read_exact(inner: &mut R, buf: &mut [u8]) -> Result<(), std::io::Error> { - let n = read_exact_or_eof(inner, buf).await?; - if n < buf.len() { - return Err(std::io::Error::new( - std::io::ErrorKind::UnexpectedEof, - "unexpected EOF reading header", - )); - } - Ok(()) -} - -async fn read_exact_or_eof( - inner: &mut R, - buf: &mut [u8], -) -> Result { - let mut total = 0; - while total < buf.len() { - match inner.read(&mut buf[total..]).await? { - 0 => break, - n => total += n, - } - } - Ok(total) -} - -impl AsyncReader for ChaChaReaderv1Async { - async fn read(&mut self, buf: &mut [u8]) -> Result { - let remaining = self.plaintext_size.saturating_sub(self.cursor); - if remaining == 0 || buf.is_empty() { - return Ok(0); - } - - let chunk_index = self.cursor / self.header.config.chunk_size; - - let need_fetch = match &self.cached_chunk { - None => true, - Some((idx, _)) => *idx != chunk_index, - }; - - if need_fetch { - self.fetch_chunk(chunk_index).await?; - } - - #[expect(clippy::unwrap_used)] - let (_, chunk_data) = self.cached_chunk.as_ref().unwrap(); - - let offset_in_chunk = (self.cursor % self.header.config.chunk_size) as usize; - let available = chunk_data.len() - offset_in_chunk; - let to_copy = available.min(buf.len()); - - buf[..to_copy].copy_from_slice(&chunk_data[offset_in_chunk..offset_in_chunk + to_copy]); - self.cursor += to_copy as u64; - Ok(to_copy) - } -} - -impl AsyncSeekReader for ChaChaReaderv1Async { - async fn seek(&mut self, pos: SeekFrom) -> Result { - match pos { - SeekFrom::Start(x) => self.cursor = x.min(self.plaintext_size), - - SeekFrom::Current(x) => { - if x < 0 { - let abs = x.unsigned_abs(); - if abs > self.cursor { - return Err(std::io::Error::new( - std::io::ErrorKind::InvalidInput, - "cannot seek past start", - )); - } - self.cursor -= abs; - } else { - self.cursor += x as u64; - } - } - - SeekFrom::End(x) => { - if x < 0 { - let abs = x.unsigned_abs(); - if abs > self.plaintext_size { - return Err(std::io::Error::new( - std::io::ErrorKind::InvalidInput, - "cannot seek past start", - )); - } - self.cursor = self.plaintext_size - abs; - } else { - self.cursor = self.plaintext_size + x as u64; - } - } - } - - self.cursor = self.cursor.min(self.plaintext_size); - Ok(self.cursor) - } -} diff --git a/crates/pile-io/src/chacha/writer.rs b/crates/pile-io/src/chacha/writer.rs deleted file mode 100644 index 41e57c3..0000000 --- a/crates/pile-io/src/chacha/writer.rs +++ /dev/null @@ -1,91 +0,0 @@ -use std::io::SeekFrom; - -use tokio::io::{AsyncSeek, AsyncSeekExt, AsyncWrite, AsyncWriteExt}; - -use crate::chacha::{ChaChaConfigv1, ChaChaHeaderv1}; - -pub struct ChaChaWriterAsync { - inner: W, - header: ChaChaHeaderv1, - - encryption_key: [u8; 32], - buffer: Vec, - plaintext_bytes_written: u64, -} - -impl ChaChaWriterAsync { - pub async fn new(mut inner: W, encryption_key: [u8; 32]) -> Result { - let header = ChaChaHeaderv1 { - config: ChaChaConfigv1::default(), - plaintext_size: 0, - }; - inner.write_all(&serialize_header(header)?).await?; - - Ok(Self { - inner, - header, - encryption_key, - buffer: Vec::new(), - plaintext_bytes_written: 0, - }) - } - - pub async fn write(&mut self, buf: &[u8]) -> Result<(), std::io::Error> { - self.buffer.extend_from_slice(buf); - self.plaintext_bytes_written += buf.len() as u64; - - let chunk_size = self.header.config.chunk_size as usize; - while self.buffer.len() >= chunk_size { - let encrypted = encrypt_chunk(&self.encryption_key, &self.buffer[..chunk_size])?; - self.inner.write_all(&encrypted).await?; - self.buffer.drain(..chunk_size); - } - - Ok(()) - } - - /// Encrypt and write any buffered plaintext, patch the header with the - /// final `plaintext_size`, then return the inner writer. - pub async fn finish(mut self) -> Result { - if !self.buffer.is_empty() { - let encrypted = encrypt_chunk(&self.encryption_key, &self.buffer)?; - self.inner.write_all(&encrypted).await?; - } - - self.inner.seek(SeekFrom::Start(0)).await?; - let header_bytes = serialize_header(ChaChaHeaderv1 { - config: self.header.config, - plaintext_size: self.plaintext_bytes_written, - })?; - self.inner.write_all(&header_bytes).await?; - - Ok(self.inner) - } -} - -fn encrypt_chunk(key: &[u8; 32], plaintext: &[u8]) -> Result, std::io::Error> { - use chacha20poly1305::{ - XChaCha20Poly1305, - aead::{Aead, AeadCore, KeyInit, OsRng}, - }; - - let nonce = XChaCha20Poly1305::generate_nonce(&mut OsRng); - let cipher = XChaCha20Poly1305::new(chacha20poly1305::Key::from_slice(key)); - let ciphertext = cipher - .encrypt(&nonce, plaintext) - .map_err(|_| std::io::Error::other("encryption failed"))?; - - let mut output = Vec::with_capacity(nonce.len() + ciphertext.len()); - output.extend_from_slice(&nonce); - output.extend_from_slice(&ciphertext); - Ok(output) -} - -fn serialize_header(header: ChaChaHeaderv1) -> Result, std::io::Error> { - use binrw::BinWriterExt; - use std::io::Cursor; - - let mut buf = Cursor::new(Vec::new()); - buf.write_le(&header).map_err(std::io::Error::other)?; - Ok(buf.into_inner()) -} diff --git a/crates/pile-io/src/chacha/writer_async.rs b/crates/pile-io/src/chacha/writer_async.rs deleted file mode 100644 index db6dbf8..0000000 --- a/crates/pile-io/src/chacha/writer_async.rs +++ /dev/null @@ -1,260 +0,0 @@ -use std::io::{Seek, SeekFrom, Write}; - -use crate::chacha::{ChaChaConfigv1, ChaChaHeaderv1}; - -/// Generate a random 32-byte encryption key suitable for use with [`ChaChaWriter`]. -pub fn generate_key() -> [u8; 32] { - use chacha20poly1305::aead::OsRng; - use chacha20poly1305::{KeyInit, XChaCha20Poly1305}; - XChaCha20Poly1305::generate_key(&mut OsRng).into() -} - -pub struct ChaChaWriterv1 { - inner: W, - header: ChaChaHeaderv1, - - encryption_key: [u8; 32], - buffer: Vec, - plaintext_bytes_written: u64, -} - -impl ChaChaWriterv1 { - pub fn new(mut inner: W, encryption_key: [u8; 32]) -> Result { - use binrw::BinWriterExt; - - let header = ChaChaHeaderv1 { - config: ChaChaConfigv1::default(), - plaintext_size: 0, - }; - inner.write_le(&header).map_err(std::io::Error::other)?; - - Ok(Self { - inner, - header, - encryption_key, - buffer: Vec::new(), - plaintext_bytes_written: 0, - }) - } - - /// Encrypt and write any buffered plaintext, patch the header with the - /// final `plaintext_size`, then return the inner writer. - pub fn finish(mut self) -> Result { - use binrw::BinWriterExt; - - self.flush_buffer()?; - - self.inner.seek(SeekFrom::Start(0))?; - let header = ChaChaHeaderv1 { - config: self.header.config, - plaintext_size: self.plaintext_bytes_written, - }; - self.inner - .write_le(&header) - .map_err(std::io::Error::other)?; - - Ok(self.inner) - } - - fn encrypt_chunk(&self, plaintext: &[u8]) -> Result, std::io::Error> { - use chacha20poly1305::{ - XChaCha20Poly1305, - aead::{Aead, AeadCore, KeyInit, OsRng}, - }; - - let nonce = XChaCha20Poly1305::generate_nonce(&mut OsRng); - let key = chacha20poly1305::Key::from_slice(&self.encryption_key); - let cipher = XChaCha20Poly1305::new(key); - let ciphertext = cipher - .encrypt(&nonce, plaintext) - .map_err(|_| std::io::Error::other("encryption failed"))?; - - let mut output = Vec::with_capacity(nonce.len() + ciphertext.len()); - output.extend_from_slice(&nonce); - output.extend_from_slice(&ciphertext); - Ok(output) - } - - fn flush_buffer(&mut self) -> Result<(), std::io::Error> { - if !self.buffer.is_empty() { - let encrypted = self.encrypt_chunk(&self.buffer)?; - self.inner.write_all(&encrypted)?; - self.buffer.clear(); - } - Ok(()) - } -} - -impl Write for ChaChaWriterv1 { - fn write(&mut self, buf: &[u8]) -> Result { - self.buffer.extend_from_slice(buf); - self.plaintext_bytes_written += buf.len() as u64; - - let chunk_size = self.header.config.chunk_size as usize; - while self.buffer.len() >= chunk_size { - let encrypted = self.encrypt_chunk(&self.buffer[..chunk_size])?; - self.inner.write_all(&encrypted)?; - self.buffer.drain(..chunk_size); - } - - Ok(buf.len()) - } - - /// Encrypts and flushes any buffered plaintext as a partial chunk. - /// - /// Prefer [`finish`](Self::finish) to retrieve the inner writer after - /// all data has been written. Calling `flush` multiple times will produce - /// multiple small encrypted chunks for the same partial data. - fn flush(&mut self) -> Result<(), std::io::Error> { - self.flush_buffer()?; - self.inner.flush() - } -} - -#[cfg(test)] -#[expect(clippy::unwrap_used)] -mod tests { - use std::io::{Cursor, SeekFrom, Write}; - - use super::ChaChaWriterv1; - use crate::{AsyncReader, AsyncSeekReader, chacha::ChaChaReaderv1}; - - const KEY: [u8; 32] = [42u8; 32]; - - fn encrypt(data: &[u8]) -> Cursor> { - let mut writer = ChaChaWriterv1::new(Cursor::new(Vec::new()), KEY).unwrap(); - writer.write_all(data).unwrap(); - let mut buf = writer.finish().unwrap(); - buf.set_position(0); - buf - } - - async fn decrypt_all(buf: Cursor>) -> Vec { - let mut reader = ChaChaReaderv1::new(buf, KEY).unwrap(); - reader.read_to_end().await.unwrap() - } - - #[tokio::test] - async fn roundtrip_empty() { - let buf = encrypt(&[]); - // Header present but no chunks - assert!(!buf.get_ref().is_empty()); - assert!(decrypt_all(buf).await.is_empty()); - } - - #[tokio::test] - async fn roundtrip_small() { - let data = b"hello, world!"; - assert_eq!(decrypt_all(encrypt(data)).await, data); - } - - #[tokio::test] - async fn roundtrip_exact_chunk() { - let data = vec![0xABu8; 65536]; - assert_eq!(decrypt_all(encrypt(&data)).await, data); - } - - #[tokio::test] - async fn roundtrip_multi_chunk() { - // 2.5 chunks - let data: Vec = (0u8..=255).cycle().take(65536 * 2 + 1000).collect(); - assert_eq!(decrypt_all(encrypt(&data)).await, data); - } - - #[tokio::test] - async fn roundtrip_incremental_writes() { - // Write one byte at a time - let data: Vec = (0u8..200).collect(); - let mut writer = ChaChaWriterv1::new(Cursor::new(Vec::new()), KEY).unwrap(); - for byte in &data { - writer.write_all(&[*byte]).unwrap(); - } - let mut buf = writer.finish().unwrap(); - buf.set_position(0); - assert_eq!(decrypt_all(buf).await, data); - } - - #[tokio::test] - async fn wrong_key_fails() { - let buf = encrypt(b"secret data"); - let mut reader = ChaChaReaderv1::new(buf, [0u8; 32]).unwrap(); - assert!(reader.read_to_end().await.is_err()); - } - - #[tokio::test] - async fn header_magic_checked() { - // Corrupt the magic bytes — reader should fail - let mut buf = encrypt(b"data"); - buf.get_mut()[0] = 0xFF; - buf.set_position(0); - assert!(ChaChaReaderv1::new(buf, KEY).is_err()); - } - - #[tokio::test] - async fn seek_from_start() { - let data: Vec = (0u8..100).collect(); - let mut reader = ChaChaReaderv1::new(encrypt(&data), KEY).unwrap(); - - reader.seek(SeekFrom::Start(50)).await.unwrap(); - let mut buf = [0u8; 10]; - let mut read = 0; - while read < buf.len() { - read += reader.read(&mut buf[read..]).await.unwrap(); - } - assert_eq!(buf, data[50..60]); - } - - #[tokio::test] - async fn seek_from_end() { - let data: Vec = (0u8..100).collect(); - let mut reader = ChaChaReaderv1::new(encrypt(&data), KEY).unwrap(); - - reader.seek(SeekFrom::End(-10)).await.unwrap(); - assert_eq!(reader.read_to_end().await.unwrap(), &data[90..]); - } - - #[tokio::test] - async fn seek_across_chunk_boundary() { - // Seek to 6 bytes before the end of chunk 0, read 12 bytes spanning into chunk 1 - let data: Vec = (0u8..=255).cycle().take(65536 + 500).collect(); - let mut reader = ChaChaReaderv1::new(encrypt(&data), KEY).unwrap(); - - reader.seek(SeekFrom::Start(65530)).await.unwrap(); - let mut buf = vec![0u8; 12]; - let mut read = 0; - while read < buf.len() { - read += reader.read(&mut buf[read..]).await.unwrap(); - } - assert_eq!(buf, data[65530..65542]); - } - - #[tokio::test] - async fn seek_current() { - let data: Vec = (0u8..=255).cycle().take(200).collect(); - let mut reader = ChaChaReaderv1::new(encrypt(&data), KEY).unwrap(); - - // Read 10, seek back 5, read 5 — should get bytes 5..10 - let mut first = [0u8; 10]; - let mut n = 0; - while n < first.len() { - n += reader.read(&mut first[n..]).await.unwrap(); - } - reader.seek(SeekFrom::Current(-5)).await.unwrap(); - let mut second = [0u8; 5]; - n = 0; - while n < second.len() { - n += reader.read(&mut second[n..]).await.unwrap(); - } - assert_eq!(second, data[5..10]); - } - - #[tokio::test] - async fn seek_past_end_clamps() { - let data = b"hello"; - let mut reader = ChaChaReaderv1::new(encrypt(data), KEY).unwrap(); - - let pos = reader.seek(SeekFrom::Start(9999)).await.unwrap(); - assert_eq!(pos, data.len() as u64); - assert_eq!(reader.read_to_end().await.unwrap(), b""); - } -} diff --git a/crates/pile-io/src/lib.rs b/crates/pile-io/src/lib.rs index 27eae54..6610a8a 100644 --- a/crates/pile-io/src/lib.rs +++ b/crates/pile-io/src/lib.rs @@ -1,7 +1,2 @@ mod asyncreader; pub use asyncreader::*; - -mod s3reader; -pub use s3reader::*; - -pub mod chacha; diff --git a/crates/pile-io/src/s3reader.rs b/crates/pile-io/src/s3reader.rs deleted file mode 100644 index 77cc871..0000000 --- a/crates/pile-io/src/s3reader.rs +++ /dev/null @@ -1,181 +0,0 @@ -use aws_sdk_s3::config::{BehaviorVersion, Credentials, Region}; -use smartstring::{LazyCompact, SmartString}; -use std::{fmt::Debug, io::SeekFrom, sync::Arc}; - -use crate::{AsyncReader, AsyncSeekReader}; - -// -// MARK: client -// - -/// An interface to an S3 bucket. -/// -/// TODO: S3 is slow and expensive. Ideally, we'll have this struct cache data -/// so we don't have to download anything twice. This is, however, complicated, -/// and doesn't fully solve the "expensive" problem. -pub struct S3Client { - pub client: aws_sdk_s3::Client, - bucket: SmartString, - - /// maximum number of bytes to use for cached data - cache_limit_bytes: usize, -} - -impl Debug for S3Client { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - f.debug_struct("S3Client") - .field("bucket", &self.bucket) - .field("cache_limit_bytes", &self.cache_limit_bytes) - .finish() - } -} - -impl S3Client { - pub async fn new( - bucket: &str, - endpoint: Option<&str>, - region: &str, - access_key_id: &str, - secret_access_key: &str, - cache_limit_bytes: usize, - ) -> Arc { - let client = { - let mut s3_config = aws_sdk_s3::config::Builder::new() - .behavior_version(BehaviorVersion::latest()) - .region(Region::new(region.to_owned())) - .credentials_provider(Credentials::new( - access_key_id, - secret_access_key, - None, - None, - "pile", - )); - - if let Some(ep) = endpoint { - s3_config = s3_config.endpoint_url(ep).force_path_style(true); - } - - aws_sdk_s3::Client::from_conf(s3_config.build()) - }; - - return Arc::new(Self { - bucket: bucket.into(), - client, - cache_limit_bytes, - }); - } - - pub fn bucket(&self) -> &str { - &self.bucket - } - - pub async fn get(self: &Arc, key: &str) -> Result { - let head = self - .client - .head_object() - .bucket(self.bucket.as_str()) - .key(key) - .send() - .await - .map_err(std::io::Error::other)?; - - let size = head.content_length().unwrap_or(0) as u64; - - Ok(S3Reader { - client: self.clone(), - bucket: self.bucket.clone(), - key: key.into(), - cursor: 0, - size, - }) - } -} - -// -// MARK: reader -// - -pub struct S3Reader { - pub client: Arc, - pub bucket: SmartString, - pub key: SmartString, - pub cursor: u64, - pub size: u64, -} - -impl AsyncReader for S3Reader { - async fn read(&mut self, buf: &mut [u8]) -> Result { - let len_left = self.size.saturating_sub(self.cursor); - if len_left == 0 || buf.is_empty() { - return Ok(0); - } - - let start_byte = self.cursor; - let len_to_read = (buf.len() as u64).min(len_left); - let end_byte = start_byte + len_to_read - 1; - - let resp = self - .client - .client - .get_object() - .bucket(self.bucket.as_str()) - .key(self.key.as_str()) - .range(format!("bytes={start_byte}-{end_byte}")) - .send() - .await - .map_err(std::io::Error::other)?; - - let bytes = resp - .body - .collect() - .await - .map(|x| x.into_bytes()) - .map_err(std::io::Error::other)?; - - let n = bytes.len().min(buf.len()); - buf[..n].copy_from_slice(&bytes[..n]); - self.cursor += n as u64; - Ok(n) - } -} - -impl AsyncSeekReader for S3Reader { - async fn seek(&mut self, pos: SeekFrom) -> Result { - match pos { - SeekFrom::Start(x) => self.cursor = x.min(self.size), - - SeekFrom::Current(x) => { - if x < 0 { - let abs = x.unsigned_abs(); - if abs > self.cursor { - return Err(std::io::Error::new( - std::io::ErrorKind::InvalidInput, - "cannot seek past start", - )); - } - self.cursor -= abs; - } else { - self.cursor += x as u64; - } - } - - std::io::SeekFrom::End(x) => { - if x < 0 { - let abs = x.unsigned_abs(); - if abs > self.size { - return Err(std::io::Error::new( - std::io::ErrorKind::InvalidInput, - "cannot seek past start", - )); - } - self.cursor = self.size - abs; - } else { - self.cursor = self.size + x as u64; - } - } - } - - self.cursor = self.cursor.min(self.size); - Ok(self.cursor) - } -} diff --git a/crates/pile-value/Cargo.toml b/crates/pile-value/Cargo.toml index 1ed558f..72c8f2f 100644 --- a/crates/pile-value/Cargo.toml +++ b/crates/pile-value/Cargo.toml @@ -21,8 +21,6 @@ toml = { workspace = true } smartstring = { workspace = true } regex = { workspace = true } blake3 = { workspace = true } -chacha20poly1305 = { workspace = true } -base64 = { workspace = true } epub = { workspace = true } kamadak-exif = { workspace = true } pdf = { workspace = true } diff --git a/crates/pile-value/src/source/mod.rs b/crates/pile-value/src/source/mod.rs index f231967..6f1326e 100644 --- a/crates/pile-value/src/source/mod.rs +++ b/crates/pile-value/src/source/mod.rs @@ -1,9 +1,6 @@ mod dir; pub use dir::*; -mod s3; -pub use s3::*; - pub mod misc; /// A read-only set of [Item]s. diff --git a/crates/pile-value/src/source/s3.rs b/crates/pile-value/src/source/s3.rs deleted file mode 100644 index df0eb28..0000000 --- a/crates/pile-value/src/source/s3.rs +++ /dev/null @@ -1,322 +0,0 @@ -use chrono::{DateTime, Utc}; -use pile_config::{ - Label, - pattern::{GroupPattern, GroupSegment}, -}; -use pile_io::S3Client; -use smartstring::{LazyCompact, SmartString}; -use std::{ - collections::{BTreeMap, HashMap, HashSet}, - sync::{Arc, OnceLock}, -}; - -use crate::{ - extract::traits::ExtractState, - source::DataSource, - value::{Item, PileValue}, -}; - -#[derive(Debug)] -pub struct S3DataSource { - pub name: Label, - pub client: Arc, - - pub prefix: Option>, - pub pattern: GroupPattern, - pub encryption_key: Option<[u8; 32]>, - pub index: OnceLock, Item>>, -} - -impl S3DataSource { - pub async fn new( - name: &Label, - bucket: &str, - prefix: Option<&str>, - endpoint: Option<&str>, - region: &str, - access_key_id: &str, - secret_access_key: &str, - cache_limit_bytes: usize, - pattern: GroupPattern, - encryption_key: Option<[u8; 32]>, - ) -> Result, std::io::Error> { - let client = S3Client::new( - bucket, - endpoint, - region, - access_key_id, - secret_access_key, - cache_limit_bytes, - ) - .await; - - let source = Arc::new(Self { - name: name.clone(), - client, - prefix: prefix.map(|x| x.into()), - pattern, - encryption_key, - index: OnceLock::new(), - }); - - // - // MARK: list keys - // - - let mut all_keys: HashSet> = HashSet::new(); - let mut continuation_token: Option = None; - - loop { - let mut req = source - .client - .client - .list_objects_v2() - .bucket(source.client.bucket()); - - if let Some(prefix) = &source.prefix { - req = req.prefix(prefix.as_str()); - } - - if let Some(token) = continuation_token { - req = req.continuation_token(token); - } - - let resp = req.send().await.map_err(std::io::Error::other)?; - - let next_token = resp.next_continuation_token().map(ToOwned::to_owned); - let is_truncated = resp.is_truncated().unwrap_or(false); - - for obj in resp.contents() { - let Some(full_key) = obj.key() else { continue }; - let raw_key = strip_prefix(full_key, source.prefix.as_deref()); - let key = match &source.encryption_key { - None => raw_key.into(), - Some(enc_key) => match decrypt_path(enc_key, raw_key) { - Some(decrypted) => decrypted.into(), - None => continue, - }, - }; - all_keys.insert(key); - } - - if !is_truncated { - break; - } - continuation_token = next_token; - } - - // - // MARK: resolve groups - // - - let mut keys_grouped: HashSet> = HashSet::new(); - for key in &all_keys { - let groups = resolve_groups(&source.pattern, key).await; - for group_key in groups.into_values() { - if all_keys.contains(&group_key) { - keys_grouped.insert(group_key); - } - } - } - - let mut index = BTreeMap::new(); - for key in all_keys.difference(&keys_grouped) { - let groups = resolve_groups(&source.pattern, key).await; - let group = groups - .into_iter() - .filter(|(_, gk)| all_keys.contains(gk)) - .map(|(label, gk)| { - ( - label, - Box::new(Item::S3 { - source: Arc::clone(&source), - mime: mime_guess::from_path(gk.as_str()).first_or_octet_stream(), - key: gk, - group: Arc::new(HashMap::new()), - }), - ) - }) - .collect::>(); - - let item = Item::S3 { - source: Arc::clone(&source), - mime: mime_guess::from_path(key.as_str()).first_or_octet_stream(), - key: key.clone(), - group: Arc::new(group), - }; - - index.insert(item.key(), item); - } - - source.index.get_or_init(|| index); - Ok(source) - } -} - -impl DataSource for Arc { - #[expect(clippy::expect_used)] - fn len(&self) -> usize { - self.index.get().expect("index should be initialized").len() - } - - #[expect(clippy::expect_used)] - async fn get(&self, key: &str) -> Result, std::io::Error> { - return Ok(self - .index - .get() - .expect("index should be initialized") - .get(key) - .cloned()); - } - - #[expect(clippy::expect_used)] - fn iter(&self) -> impl Iterator { - self.index - .get() - .expect("index should be initialized") - .values() - } - - async fn latest_change(&self) -> Result>, std::io::Error> { - let mut ts: Option> = None; - let mut continuation_token: Option = None; - - loop { - let mut req = self - .client - .client - .list_objects_v2() - .bucket(self.client.bucket()); - - if let Some(prefix) = &self.prefix { - req = req.prefix(prefix.as_str()); - } - - if let Some(token) = continuation_token { - req = req.continuation_token(token); - } - - let resp = match req.send().await { - Err(_) => return Ok(None), - Ok(resp) => resp, - }; - - let next_token = resp.next_continuation_token().map(ToOwned::to_owned); - let is_truncated = resp.is_truncated().unwrap_or(false); - - for obj in resp.contents() { - if let Some(last_modified) = obj.last_modified() { - let dt = DateTime::from_timestamp( - last_modified.secs(), - last_modified.subsec_nanos(), - ); - if let Some(dt) = dt { - ts = Some(match ts { - None => dt, - Some(prev) => prev.max(dt), - }); - } - } - } - - if !is_truncated { - break; - } - continuation_token = next_token; - } - - Ok(ts) - } -} - -/// Derive an encryption key from a password -pub fn string_to_key(password: &str) -> [u8; 32] { - blake3::derive_key("pile s3 encryption", password.as_bytes()) -} - -/// Encrypt a logical path to a base64 S3 key using a deterministic nonce. -pub fn encrypt_path(enc_key: &[u8; 32], path: &str) -> String { - use base64::Engine; - use chacha20poly1305::{KeyInit, XChaCha20Poly1305, XNonce, aead::Aead}; - - let hash = blake3::keyed_hash(enc_key, path.as_bytes()); - let nonce_bytes = &hash.as_bytes()[..24]; - let nonce = XNonce::from_slice(nonce_bytes); - let key = chacha20poly1305::Key::from_slice(enc_key); - let cipher = XChaCha20Poly1305::new(key); - #[expect(clippy::expect_used)] - let ciphertext = cipher - .encrypt(nonce, path.as_bytes()) - .expect("path encryption should not fail"); - - let mut result = nonce_bytes.to_vec(); - result.extend_from_slice(&ciphertext); - base64::engine::general_purpose::URL_SAFE_NO_PAD.encode(result) -} - -/// Decrypt a base64 S3 key back to its logical path. -fn decrypt_path(enc_key: &[u8; 32], encrypted: &str) -> Option { - use base64::Engine; - use chacha20poly1305::{KeyInit, XChaCha20Poly1305, XNonce, aead::Aead}; - - let bytes = base64::engine::general_purpose::URL_SAFE_NO_PAD - .decode(encrypted) - .ok()?; - if bytes.len() < 24 + 16 { - return None; - } - let (nonce_bytes, ciphertext) = bytes.split_at(24); - let nonce = XNonce::from_slice(nonce_bytes); - let key = chacha20poly1305::Key::from_slice(enc_key); - let cipher = XChaCha20Poly1305::new(key); - let plaintext = cipher.decrypt(nonce, ciphertext).ok()?; - String::from_utf8(plaintext).ok() -} - -fn strip_prefix<'a>(key: &'a str, prefix: Option<&str>) -> &'a str { - match prefix { - None => key, - Some(p) => { - let with_slash = if p.ends_with('/') { - key.strip_prefix(p) - } else { - key.strip_prefix(&format!("{p}/")) - }; - with_slash.unwrap_or(key) - } - } -} - -async fn resolve_groups( - pattern: &GroupPattern, - key: &str, -) -> HashMap> { - let state = ExtractState { ignore_mime: false }; - let mut group = HashMap::new(); - 'pattern: for (l, pat) in &pattern.pattern { - let item = PileValue::String(Arc::new(key.into())); - let mut target = String::new(); - for p in pat { - match p { - GroupSegment::Literal(x) => target.push_str(x), - GroupSegment::Path(op) => { - let res = match item.query(&state, op).await { - Ok(Some(x)) => x, - _ => continue 'pattern, - }; - - let res = match res.as_str() { - Some(x) => x, - None => continue 'pattern, - }; - - target.push_str(res); - } - } - } - - group.insert(l.clone(), target.into()); - } - - return group; -} diff --git a/crates/pile-value/src/source/s3reader.rs b/crates/pile-value/src/source/s3reader.rs deleted file mode 100644 index 9d36218..0000000 --- a/crates/pile-value/src/source/s3reader.rs +++ /dev/null @@ -1,158 +0,0 @@ -use aws_sdk_s3::{error::SdkError, operation::get_object::GetObjectError}; -use mime::Mime; -use std::io::{Error as IoError, Seek, SeekFrom, Write}; -use thiserror::Error; - -use super::S3Client; -use crate::retry; - -#[derive(Debug, Error)] -#[expect(clippy::large_enum_variant)] -pub enum S3ReaderError { - #[error("sdk error")] - SdkError(#[from] SdkError), - - #[error("byte stream error")] - ByteStreamError(#[from] aws_sdk_s3::primitives::ByteStreamError), - - #[error("i/o error")] - IoError(#[from] IoError), -} - -/// Provides a [`std::io::Read`]-like interface to an S3 object. \ -/// This doesn't actually implement [`std::io::Read`] because Read isn't async. -/// -/// Also implements [`std::io::Seek`] -pub struct S3Reader { - pub(super) client: S3Client, - pub(super) bucket: String, - pub(super) key: String, - - pub(super) cursor: u64, - pub(super) size: u64, - pub(super) mime: Mime, -} - -impl S3Reader { - pub async fn read(&mut self, mut buf: &mut [u8]) -> Result { - let len_left = self.size - self.cursor; - if len_left == 0 || buf.is_empty() { - return Ok(0); - } - - #[expect(clippy::unwrap_used)] // TODO: probably fits? - let start_byte = usize::try_from(self.cursor).unwrap(); - - #[expect(clippy::unwrap_used)] // usize fits in u64 - let len_to_read = u64::try_from(buf.len()).unwrap().min(len_left); - - #[expect(clippy::unwrap_used)] // must fit, we called min() - let len_to_read = usize::try_from(len_to_read).unwrap(); - - let end_byte = start_byte + len_to_read - 1; - - let b = retry!( - self.client.retries, - self.client - .client - .get_object() - .bucket(self.bucket.as_str()) - .key(self.key.as_str()) - .range(format!("bytes={start_byte}-{end_byte}")) - .send() - .await - )?; - - // Looks like `bytes 31000000-31999999/33921176`` - // println!("{:?}", b.content_range); - - let mut bytes = b.body.collect().await?.into_bytes(); - bytes.truncate(len_to_read); - let l = bytes.len(); - - // Memory to memory writes are infallible - #[expect(clippy::unwrap_used)] - buf.write_all(&bytes).unwrap(); - - // Cannot fail, usize should always fit into u64 - #[expect(clippy::unwrap_used)] - { - self.cursor += u64::try_from(l).unwrap(); - } - - return Ok(len_to_read); - } - - pub fn is_done(&self) -> bool { - return self.cursor == self.size; - } - - pub fn mime(&self) -> &Mime { - &self.mime - } - - /// Write the entire contents of this reader to `r`. - /// - /// This method always downloads the whole object, - /// and always preserves `self.cursor`. - pub async fn download(&mut self, r: &mut W) -> Result<(), S3ReaderError> { - let pos = self.stream_position()?; - - const BUF_LEN: usize = 10_000_000; - #[expect(clippy::unwrap_used)] // Cannot fail - let mut buf: Box<[u8; BUF_LEN]> = vec![0u8; BUF_LEN].try_into().unwrap(); - - while !self.is_done() { - let b = self.read(&mut buf[..]).await?; - r.write_all(&buf[0..b])?; - } - - self.seek(SeekFrom::Start(pos))?; - Ok(()) - } -} - -impl Seek for S3Reader { - fn seek(&mut self, pos: SeekFrom) -> std::io::Result { - match pos { - SeekFrom::Start(x) => self.cursor = x.min(self.size - 1), - - // Cannot panic, we handle all cases - #[expect(clippy::unwrap_used)] - SeekFrom::Current(x) => { - if x < 0 { - if u64::try_from(x.abs()).unwrap() > self.cursor { - return Err(std::io::Error::new( - std::io::ErrorKind::InvalidInput, - "cannot seek past start", - )); - } - self.cursor -= u64::try_from(x.abs()).unwrap(); - } else { - self.cursor += u64::try_from(x).unwrap(); - } - } - - // Cannot panic, we handle all cases - #[expect(clippy::unwrap_used)] - SeekFrom::End(x) => { - if x < 0 { - if u64::try_from(x.abs()).unwrap() > self.size { - return Err(std::io::Error::new( - std::io::ErrorKind::InvalidInput, - "cannot seek past start", - )); - } - // Cannot fail, is abs - self.cursor = self.size - u64::try_from(x.abs()).unwrap(); - } else { - // Cannot fail, is positive - self.cursor = self.size + u64::try_from(x).unwrap(); - } - } - } - - self.cursor = self.cursor.min(self.size - 1); - return Ok(self.cursor); - } -} diff --git a/crates/pile-value/src/value/item.rs b/crates/pile-value/src/value/item.rs index 281bdc3..af4cbad 100644 --- a/crates/pile-value/src/value/item.rs +++ b/crates/pile-value/src/value/item.rs @@ -1,13 +1,10 @@ use mime::Mime; use pile_config::Label; -use pile_io::{SyncReadBridge, chacha::ChaChaReaderv1Async}; +use pile_io::SyncReadBridge; use smartstring::{LazyCompact, SmartString}; use std::{collections::HashMap, fs::File, path::PathBuf, sync::Arc}; -use crate::{ - source::{DirDataSource, S3DataSource, encrypt_path}, - value::ItemReader, -}; +use crate::{source::DirDataSource, value::ItemReader}; // // MARK: item @@ -23,58 +20,19 @@ pub enum Item { path: PathBuf, group: Arc>>, }, - - S3 { - source: Arc, - mime: Mime, - - key: SmartString, - group: Arc>>, - }, } impl Item { - /// Open the item for reading. For S3, performs a HEAD request to determine - /// the object size. + /// Open the item for reading. pub async fn read(&self) -> Result { Ok(match self { Self::File { path, .. } => ItemReader::File(File::open(path)?), - - Self::S3 { source, key, .. } => { - let logical_key = key.as_str(); - - let s3_key_part: SmartString = match &source.encryption_key { - None => logical_key.into(), - Some(enc_key) => encrypt_path(enc_key, logical_key).into(), - }; - - let full_key: SmartString = match &source.prefix { - None => s3_key_part, - Some(p) => { - if p.ends_with('/') { - format!("{p}{s3_key_part}").into() - } else { - format!("{p}/{s3_key_part}").into() - } - } - }; - - let reader = source.client.get(&full_key).await?; - - match source.encryption_key { - None => ItemReader::S3(reader), - Some(enc_key) => { - ItemReader::EncryptedS3(ChaChaReaderv1Async::new(reader, enc_key).await?) - } - } - } }) } pub fn source_name(&self) -> &pile_config::Label { match self { Self::File { source, .. } => &source.name, - Self::S3 { source, .. } => &source.name, } } @@ -87,7 +45,6 @@ impl Item { .to_str() .expect("path is not utf-8") .into(), - Self::S3 { key, .. } => key.clone(), } } @@ -106,14 +63,12 @@ impl Item { pub fn mime(&self) -> &Mime { match self { Self::File { mime, .. } => mime, - Self::S3 { mime, .. } => mime, } } pub fn group(&self) -> &HashMap> { match self { Self::File { group, .. } => group, - Self::S3 { group, .. } => group, } } } diff --git a/crates/pile-value/src/value/readers.rs b/crates/pile-value/src/value/readers.rs index fbbe299..b7dac78 100644 --- a/crates/pile-value/src/value/readers.rs +++ b/crates/pile-value/src/value/readers.rs @@ -1,4 +1,4 @@ -use pile_io::{AsyncReader, AsyncSeekReader, S3Reader, chacha::ChaChaReaderv1Async}; +use pile_io::{AsyncReader, AsyncSeekReader}; use std::{fs::File, io::Seek}; // @@ -7,16 +7,12 @@ use std::{fs::File, io::Seek}; pub enum ItemReader { File(File), - S3(S3Reader), - EncryptedS3(ChaChaReaderv1Async), } impl AsyncReader for ItemReader { async fn read(&mut self, buf: &mut [u8]) -> Result { match self { Self::File(x) => std::io::Read::read(x, buf), - Self::S3(x) => x.read(buf).await, - Self::EncryptedS3(x) => x.read(buf).await, } } } @@ -25,8 +21,6 @@ impl AsyncSeekReader for ItemReader { async fn seek(&mut self, pos: std::io::SeekFrom) -> Result { match self { Self::File(x) => x.seek(pos), - Self::S3(x) => x.seek(pos).await, - Self::EncryptedS3(x) => x.seek(pos).await, } } } diff --git a/crates/pile/Cargo.toml b/crates/pile/Cargo.toml index f7c3105..0e5f6b3 100644 --- a/crates/pile/Cargo.toml +++ b/crates/pile/Cargo.toml @@ -12,10 +12,7 @@ pile-toolbox = { workspace = true } pile-dataset = { workspace = true, features = ["axum", "pdfium"] } pile-value = { workspace = true, features = ["pdfium"] } pile-config = { workspace = true } -pile-io = { workspace = true } -aws-sdk-s3 = { workspace = true } -bytes = { workspace = true } tracing = { workspace = true } tracing-subscriber = { workspace = true } tokio = { workspace = true } diff --git a/crates/pile/src/cli.rs b/crates/pile/src/cli.rs index b74ddbb..85e3783 100644 --- a/crates/pile/src/cli.rs +++ b/crates/pile/src/cli.rs @@ -1,5 +1,4 @@ use anstyle::{AnsiColor, Color, Style}; -use indicatif::ProgressStyle; pub fn clap_styles() -> clap::builder::Styles { clap::builder::Styles::styled() @@ -37,6 +36,7 @@ pub fn clap_styles() -> clap::builder::Styles { .placeholder(Style::new().fg_color(Some(Color::Ansi(AnsiColor::White)))) } +/* #[expect(clippy::unwrap_used)] pub fn progress_big() -> ProgressStyle { return ProgressStyle::default_bar() @@ -50,7 +50,6 @@ pub fn progress_big() -> ProgressStyle { ]); } -/* #[expect(clippy::unwrap_used)] pub fn spinner_small() -> ProgressStyle { return ProgressStyle::default_bar() diff --git a/crates/pile/src/command/encrypt.rs b/crates/pile/src/command/encrypt.rs deleted file mode 100644 index bc2a13c..0000000 --- a/crates/pile/src/command/encrypt.rs +++ /dev/null @@ -1,75 +0,0 @@ -use anyhow::{Context, Result}; -use clap::Args; -use pile_io::AsyncReader; -use pile_io::chacha::{ChaChaReaderv1, ChaChaWriterv1}; -use pile_toolbox::cancelabletask::{CancelFlag, CancelableTaskError}; -use pile_value::source::string_to_key; -use std::io::{Cursor, Write}; -use std::path::PathBuf; - -use crate::{CliCmd, GlobalContext}; - -#[derive(Debug, Args)] -pub struct EncryptCommand { - /// File to encrypt - path: PathBuf, - - /// Encryption password - password: String, -} - -#[derive(Debug, Args)] -pub struct DecryptCommand { - /// File to decrypt - path: PathBuf, - - /// Encryption password - password: String, -} - -impl CliCmd for EncryptCommand { - async fn run( - self, - _ctx: GlobalContext, - _flag: CancelFlag, - ) -> Result> { - let key = string_to_key(&self.password); - let plaintext = tokio::fs::read(&self.path) - .await - .with_context(|| format!("while reading '{}'", self.path.display()))?; - - let mut writer = ChaChaWriterv1::new(Cursor::new(Vec::new()), key) - .context("while initializing encryptor")?; - writer.write_all(&plaintext).context("while encrypting")?; - let buf = writer.finish().context("while finalizing encryptor")?; - - std::io::stdout() - .write_all(buf.get_ref()) - .context("while writing to stdout")?; - - Ok(0) - } -} - -impl CliCmd for DecryptCommand { - async fn run( - self, - _ctx: GlobalContext, - _flag: CancelFlag, - ) -> Result> { - let key = string_to_key(&self.password); - let ciphertext = tokio::fs::read(&self.path) - .await - .with_context(|| format!("while reading '{}'", self.path.display()))?; - - let mut reader = ChaChaReaderv1::new(Cursor::new(ciphertext), key) - .context("while initializing decryptor")?; - let plaintext = reader.read_to_end().await.context("while decrypting")?; - - std::io::stdout() - .write_all(&plaintext) - .context("while writing to stdout")?; - - Ok(0) - } -} diff --git a/crates/pile/src/command/mod.rs b/crates/pile/src/command/mod.rs index 3412b15..94cfc73 100644 --- a/crates/pile/src/command/mod.rs +++ b/crates/pile/src/command/mod.rs @@ -5,7 +5,6 @@ use pile_toolbox::cancelabletask::{ }; mod check; -mod encrypt; mod fields; mod index; mod init; @@ -15,7 +14,6 @@ mod lookup; mod probe; mod serve; mod server; -mod upload; use crate::{Cli, GlobalContext}; @@ -85,24 +83,6 @@ pub enum SubCommand { #[command(flatten)] cmd: server::ServerCommand, }, - - /// Upload a filesystem source to an S3 source - Upload { - #[command(flatten)] - cmd: upload::UploadCommand, - }, - - /// Encrypt a file to stdout - Encrypt { - #[command(flatten)] - cmd: encrypt::EncryptCommand, - }, - - /// Decrypt a file to stdout - Decrypt { - #[command(flatten)] - cmd: encrypt::DecryptCommand, - }, } impl CliCmdDispatch for SubCommand { @@ -118,9 +98,6 @@ impl CliCmdDispatch for SubCommand { Self::Item { cmd } => cmd.start(ctx), Self::Serve { cmd } => cmd.start(ctx), Self::Server { cmd } => cmd.start(ctx), - Self::Upload { cmd } => cmd.start(ctx), - Self::Encrypt { cmd } => cmd.start(ctx), - Self::Decrypt { cmd } => cmd.start(ctx), Self::Docs {} => { print_help_recursively(&mut Cli::command(), None); diff --git a/crates/pile/src/command/upload.rs b/crates/pile/src/command/upload.rs deleted file mode 100644 index a4b501a..0000000 --- a/crates/pile/src/command/upload.rs +++ /dev/null @@ -1,284 +0,0 @@ -use anyhow::{Context, Result}; -use aws_sdk_s3::primitives::ByteStream; -use clap::Args; -use indicatif::ProgressBar; -use pile_config::Label; -use pile_dataset::{Dataset, Datasets}; -use pile_io::chacha::ChaChaWriterv1; -use pile_toolbox::cancelabletask::{CancelFlag, CancelableTaskError}; -use pile_value::source::{DataSource, DirDataSource, S3DataSource, encrypt_path}; -use std::{ - io::{Cursor, Write}, - path::PathBuf, - sync::Arc, - time::Duration, -}; -use tokio::task::JoinSet; -use tracing::info; - -use crate::{CliCmd, GlobalContext, cli::progress_big}; - -#[derive(Debug, Args)] -pub struct UploadCommand { - /// Name of the filesystem source to upload from - dir_source: String, - - /// Name of the S3 source to upload to - s3_source: String, - - /// Prefix path under the S3 source to upload files to - prefix: String, - - /// Path to dataset config - #[arg(long, short = 'c', default_value = "./pile.toml")] - config: PathBuf, - - /// Override the S3 bucket from pile.toml - #[arg(long)] - bucket: Option, - - /// Allow overwriting files that already exist at the target prefix - #[arg(long)] - overwrite: bool, - - /// Delete all files at the target prefix before uploading - #[arg(long)] - delete_existing_forever: bool, - - /// Number of parallel upload jobs - #[arg(long, short = 'j', default_value = "5")] - jobs: usize, -} - -impl CliCmd for UploadCommand { - async fn run( - self, - ctx: GlobalContext, - flag: CancelFlag, - ) -> Result> { - let ds = Datasets::open(&self.config) - .await - .with_context(|| format!("while opening dataset for {}", self.config.display()))?; - - let dir_label = Label::new(&self.dir_source) - .ok_or_else(|| anyhow::anyhow!("invalid source name: {}", self.dir_source))?; - let s3_label = Label::new(&self.s3_source) - .ok_or_else(|| anyhow::anyhow!("invalid source name: {}", self.s3_source))?; - - let dir_ds: Arc = get_dir_source(&ds, &dir_label, &self.dir_source)?; - let s3_ds: Arc = get_s3_source(&ds, &s3_label, &self.s3_source)?; - - let bucket = self - .bucket - .as_deref() - .unwrap_or(s3_ds.client.bucket()) - .to_owned(); - let full_prefix = self.prefix.trim_matches('/').to_owned(); - - // Check for existing objects at the target prefix - let existing_keys = list_prefix(&s3_ds.client.client, &bucket, &full_prefix) - .await - .context("while checking for existing objects at target prefix")?; - - if !existing_keys.is_empty() { - if self.delete_existing_forever { - info!( - "Deleting {} existing object(s) at '{}'", - existing_keys.len(), - full_prefix - ); - for key in &existing_keys { - s3_ds - .client - .client - .delete_object() - .bucket(&bucket) - .key(key) - .send() - .await - .with_context(|| format!("while deleting existing object '{key}'"))?; - } - } else if !self.overwrite { - return Err(anyhow::anyhow!( - "{} file(s) already exist at '{}'. \ - Pass --overwrite to allow overwriting, \ - or --delete-existing-forever to delete them first.", - existing_keys.len(), - full_prefix - ) - .into()); - } - } - - // Count total files before uploading so we can show accurate progress - let total = dir_ds.iter().count() as u64; - - // Walk filesystem source and upload files in parallel - let jobs = self.jobs.max(1); - let mut uploaded: u64 = 0; - let mut stream = dir_ds.iter(); - let mut join_set: JoinSet> = JoinSet::new(); - - let pb = ctx.mp.add(ProgressBar::new(total)); - pb.set_style(progress_big()); - pb.enable_steady_tick(Duration::from_millis(100)); - pb.set_message(full_prefix.clone()); - - loop { - // Drain completed tasks before checking for cancellation or new work - while join_set.len() >= jobs { - match join_set.join_next().await { - Some(Ok(Ok(key))) => { - info!("Uploaded {key}"); - pb.set_message(key); - pb.inc(1); - uploaded += 1; - } - Some(Ok(Err(e))) => return Err(e.into()), - Some(Err(e)) => return Err(anyhow::anyhow!("upload task panicked: {e}").into()), - None => break, - } - } - - if flag.is_cancelled() { - join_set.abort_all(); - return Err(CancelableTaskError::Cancelled); - } - - let item = match stream.next() { - None => break, - Some(item) => item.clone(), - }; - - let relative_str = item.key().to_string(); - let item_path = dir_ds.dir.join(&relative_str); - - let enc_key_part = match s3_ds.encryption_key { - None => relative_str.clone(), - Some(ref enc_key) => encrypt_path(enc_key, &relative_str), - }; - let key = format!("{full_prefix}/{enc_key_part}"); - let mime = item.mime().to_string(); - let client = Arc::clone(&s3_ds.client); - let bucket = bucket.clone(); - let encryption_key = s3_ds.encryption_key; - - join_set.spawn(async move { - let body = if let Some(enc_key) = encryption_key { - let path = item_path.clone(); - let encrypted = - tokio::task::spawn_blocking(move || -> anyhow::Result> { - let plaintext = std::fs::read(&path) - .with_context(|| format!("while opening '{}'", path.display()))?; - let mut writer = ChaChaWriterv1::new(Cursor::new(Vec::new()), enc_key) - .context("while initializing encryptor")?; - writer.write_all(&plaintext).context("while encrypting")?; - Ok(writer.finish().context("while finalizing")?.into_inner()) - }) - .await - .context("encryptor task panicked")??; - ByteStream::from(bytes::Bytes::from(encrypted)) - } else { - ByteStream::from_path(&item_path) - .await - .with_context(|| format!("while opening '{}'", item_path.display()))? - }; - - client - .client - .put_object() - .bucket(&bucket) - .key(&key) - .content_type(&mime) - .body(body) - .send() - .await - .with_context(|| { - format!("while uploading '{}' to '{key}'", item_path.display()) - })?; - - Ok(key) - }); - } - - // Drain remaining tasks - while let Some(result) = join_set.join_next().await { - match result { - Ok(Ok(key)) => { - info!("Uploaded {key}"); - pb.set_message(key); - pb.inc(1); - uploaded += 1; - } - Ok(Err(e)) => return Err(e.into()), - Err(e) => return Err(anyhow::anyhow!("upload task panicked: {e}").into()), - } - } - - pb.finish_and_clear(); - info!("Done: uploaded {uploaded} file(s) to '{full_prefix}'"); - Ok(0) - } -} - -fn get_dir_source( - ds: &Datasets, - label: &Label, - name: &str, -) -> Result, anyhow::Error> { - match ds.sources.get(label).or(ds.disabled_sources.get(label)) { - Some(Dataset::Dir(d)) => Ok(Arc::clone(d)), - Some(_) => Err(anyhow::anyhow!( - "source '{name}' is not a filesystem source" - )), - None => Err(anyhow::anyhow!( - "filesystem source '{name}' not found in config" - )), - } -} - -fn get_s3_source( - ds: &Datasets, - label: &Label, - name: &str, -) -> Result, anyhow::Error> { - match ds.sources.get(label).or(ds.disabled_sources.get(label)) { - Some(Dataset::S3(s)) => Ok(Arc::clone(s)), - Some(_) => Err(anyhow::anyhow!("source '{name}' is not an S3 source")), - None => Err(anyhow::anyhow!("s3 source '{name}' not found in config")), - } -} - -/// List all S3 object keys under the given prefix. -async fn list_prefix( - client: &aws_sdk_s3::Client, - bucket: &str, - prefix: &str, -) -> Result> { - let mut keys = Vec::new(); - let mut continuation_token: Option = None; - - loop { - let mut req = client.list_objects_v2().bucket(bucket).prefix(prefix); - - if let Some(token) = continuation_token { - req = req.continuation_token(token); - } - - let resp = req.send().await.context("list_objects_v2 failed")?; - - for obj in resp.contents() { - if let Some(k) = obj.key() { - keys.push(k.to_owned()); - } - } - - if !resp.is_truncated().unwrap_or(false) { - break; - } - - continuation_token = resp.next_continuation_token().map(ToOwned::to_owned); - } - - Ok(keys) -}