Compare commits
3 Commits
2a22086992
...
e83c522e78
| Author | SHA1 | Date | |
|---|---|---|---|
| e83c522e78 | |||
| dfcb4b0a24 | |||
| 76d38d48c5 |
258
Cargo.lock
generated
258
Cargo.lock
generated
@@ -928,6 +928,16 @@ version = "0.4.2"
|
|||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "3d52eff69cd5e647efe296129160853a42795992097e8af39800e1060caeea9b"
|
checksum = "3d52eff69cd5e647efe296129160853a42795992097e8af39800e1060caeea9b"
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "core-foundation"
|
||||||
|
version = "0.9.4"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "91e195e091a93c46f7102ec7818a2aa394e1e1771c3ab4825963fa03e45afb8f"
|
||||||
|
dependencies = [
|
||||||
|
"core-foundation-sys",
|
||||||
|
"libc",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "core-foundation"
|
name = "core-foundation"
|
||||||
version = "0.10.1"
|
version = "0.10.1"
|
||||||
@@ -1393,6 +1403,21 @@ version = "0.2.0"
|
|||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "77ce24cb58228fbb8aa041425bb1050850ac19177686ea6e0f41a70416f56fdb"
|
checksum = "77ce24cb58228fbb8aa041425bb1050850ac19177686ea6e0f41a70416f56fdb"
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "foreign-types"
|
||||||
|
version = "0.3.2"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "f6f339eb8adc052cd2ca78910fda869aefa38d22d5cb648e6485e4d3fc06f3b1"
|
||||||
|
dependencies = [
|
||||||
|
"foreign-types-shared",
|
||||||
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "foreign-types-shared"
|
||||||
|
version = "0.1.1"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "00b0228411908ca8685dba7fc2cdd70ec9990a6e753e89b6ac91a84c40fbaf4b"
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "form_urlencoded"
|
name = "form_urlencoded"
|
||||||
version = "1.2.2"
|
version = "1.2.2"
|
||||||
@@ -1433,6 +1458,12 @@ version = "0.3.32"
|
|||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "7e3450815272ef58cec6d564423f6e755e25379b217b0bc688e295ba24df6b1d"
|
checksum = "7e3450815272ef58cec6d564423f6e755e25379b217b0bc688e295ba24df6b1d"
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "futures-io"
|
||||||
|
version = "0.3.32"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "cecba35d7ad927e23624b22ad55235f2239cfa44fd10428eecbeba6d6a717718"
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "futures-macro"
|
name = "futures-macro"
|
||||||
version = "0.3.32"
|
version = "0.3.32"
|
||||||
@@ -1463,8 +1494,11 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
|
|||||||
checksum = "389ca41296e6190b48053de0321d02a77f32f8a5d2461dd38762c0593805c6d6"
|
checksum = "389ca41296e6190b48053de0321d02a77f32f8a5d2461dd38762c0593805c6d6"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"futures-core",
|
"futures-core",
|
||||||
|
"futures-io",
|
||||||
"futures-macro",
|
"futures-macro",
|
||||||
|
"futures-sink",
|
||||||
"futures-task",
|
"futures-task",
|
||||||
|
"memchr",
|
||||||
"pin-project-lite",
|
"pin-project-lite",
|
||||||
"slab",
|
"slab",
|
||||||
]
|
]
|
||||||
@@ -1780,6 +1814,22 @@ dependencies = [
|
|||||||
"tower-service",
|
"tower-service",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "hyper-tls"
|
||||||
|
version = "0.6.0"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "70206fc6890eaca9fde8a0bf71caa2ddfc9fe045ac9e5c70df101a7dbde866e0"
|
||||||
|
dependencies = [
|
||||||
|
"bytes",
|
||||||
|
"http-body-util",
|
||||||
|
"hyper 1.8.1",
|
||||||
|
"hyper-util",
|
||||||
|
"native-tls",
|
||||||
|
"tokio",
|
||||||
|
"tokio-native-tls",
|
||||||
|
"tower-service",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "hyper-util"
|
name = "hyper-util"
|
||||||
version = "0.1.20"
|
version = "0.1.20"
|
||||||
@@ -1798,9 +1848,11 @@ dependencies = [
|
|||||||
"percent-encoding",
|
"percent-encoding",
|
||||||
"pin-project-lite",
|
"pin-project-lite",
|
||||||
"socket2 0.6.2",
|
"socket2 0.6.2",
|
||||||
|
"system-configuration",
|
||||||
"tokio",
|
"tokio",
|
||||||
"tower-service",
|
"tower-service",
|
||||||
"tracing",
|
"tracing",
|
||||||
|
"windows-registry",
|
||||||
]
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
@@ -2019,6 +2071,16 @@ version = "2.12.0"
|
|||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "d98f6fed1fde3f8c21bc40a1abb88dd75e67924f9cffc3ef95607bad8017f8e2"
|
checksum = "d98f6fed1fde3f8c21bc40a1abb88dd75e67924f9cffc3ef95607bad8017f8e2"
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "iri-string"
|
||||||
|
version = "0.7.11"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "d8e7418f59cc01c88316161279a7f665217ae316b388e58a0d10e29f54f1e5eb"
|
||||||
|
dependencies = [
|
||||||
|
"memchr",
|
||||||
|
"serde",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "is_terminal_polyfill"
|
name = "is_terminal_polyfill"
|
||||||
version = "1.70.2"
|
version = "1.70.2"
|
||||||
@@ -2351,6 +2413,23 @@ version = "0.1.2"
|
|||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "13d2233c9842d08cfe13f9eac96e207ca6a2ea10b80259ebe8ad0268be27d2af"
|
checksum = "13d2233c9842d08cfe13f9eac96e207ca6a2ea10b80259ebe8ad0268be27d2af"
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "native-tls"
|
||||||
|
version = "0.2.18"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "465500e14ea162429d264d44189adc38b199b62b1c21eea9f69e4b73cb03bbf2"
|
||||||
|
dependencies = [
|
||||||
|
"libc",
|
||||||
|
"log",
|
||||||
|
"openssl",
|
||||||
|
"openssl-probe",
|
||||||
|
"openssl-sys",
|
||||||
|
"schannel",
|
||||||
|
"security-framework",
|
||||||
|
"security-framework-sys",
|
||||||
|
"tempfile",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "nom"
|
name = "nom"
|
||||||
version = "7.1.3"
|
version = "7.1.3"
|
||||||
@@ -2419,12 +2498,50 @@ version = "0.3.1"
|
|||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "c08d65885ee38876c4f86fa503fb49d7b507c2b62552df7c70b2fce627e06381"
|
checksum = "c08d65885ee38876c4f86fa503fb49d7b507c2b62552df7c70b2fce627e06381"
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "openssl"
|
||||||
|
version = "0.10.76"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "951c002c75e16ea2c65b8c7e4d3d51d5530d8dfa7d060b4776828c88cfb18ecf"
|
||||||
|
dependencies = [
|
||||||
|
"bitflags",
|
||||||
|
"cfg-if",
|
||||||
|
"foreign-types",
|
||||||
|
"libc",
|
||||||
|
"once_cell",
|
||||||
|
"openssl-macros",
|
||||||
|
"openssl-sys",
|
||||||
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "openssl-macros"
|
||||||
|
version = "0.1.1"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "a948666b637a0f465e8564c73e89d4dde00d72d4d473cc972f390fc3dcee7d9c"
|
||||||
|
dependencies = [
|
||||||
|
"proc-macro2",
|
||||||
|
"quote",
|
||||||
|
"syn 2.0.117",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "openssl-probe"
|
name = "openssl-probe"
|
||||||
version = "0.2.1"
|
version = "0.2.1"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "7c87def4c32ab89d880effc9e097653c8da5d6ef28e6b539d313baaacfbafcbe"
|
checksum = "7c87def4c32ab89d880effc9e097653c8da5d6ef28e6b539d313baaacfbafcbe"
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "openssl-sys"
|
||||||
|
version = "0.9.112"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "57d55af3b3e226502be1526dfdba67ab0e9c96fc293004e79576b2b9edb0dbdb"
|
||||||
|
dependencies = [
|
||||||
|
"cc",
|
||||||
|
"libc",
|
||||||
|
"pkg-config",
|
||||||
|
"vcpkg",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "outref"
|
name = "outref"
|
||||||
version = "0.5.2"
|
version = "0.5.2"
|
||||||
@@ -2580,6 +2697,19 @@ dependencies = [
|
|||||||
"tracing",
|
"tracing",
|
||||||
"tracing-indicatif",
|
"tracing-indicatif",
|
||||||
"tracing-subscriber",
|
"tracing-subscriber",
|
||||||
|
"utoipa",
|
||||||
|
"utoipa-swagger-ui",
|
||||||
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "pile-client"
|
||||||
|
version = "0.0.2"
|
||||||
|
dependencies = [
|
||||||
|
"bytes",
|
||||||
|
"futures-core",
|
||||||
|
"reqwest",
|
||||||
|
"serde",
|
||||||
|
"thiserror",
|
||||||
]
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
@@ -2653,7 +2783,6 @@ version = "0.0.2"
|
|||||||
dependencies = [
|
dependencies = [
|
||||||
"anyhow",
|
"anyhow",
|
||||||
"async-trait",
|
"async-trait",
|
||||||
"aws-sdk-s3",
|
|
||||||
"base64",
|
"base64",
|
||||||
"blake3",
|
"blake3",
|
||||||
"chacha20poly1305",
|
"chacha20poly1305",
|
||||||
@@ -2910,6 +3039,49 @@ version = "0.8.8"
|
|||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "7a2d987857b319362043e95f5353c0535c1f58eec5336fdfcf626430af7def58"
|
checksum = "7a2d987857b319362043e95f5353c0535c1f58eec5336fdfcf626430af7def58"
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "reqwest"
|
||||||
|
version = "0.12.28"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "eddd3ca559203180a307f12d114c268abf583f59b03cb906fd0b3ff8646c1147"
|
||||||
|
dependencies = [
|
||||||
|
"base64",
|
||||||
|
"bytes",
|
||||||
|
"encoding_rs",
|
||||||
|
"futures-core",
|
||||||
|
"futures-util",
|
||||||
|
"h2 0.4.13",
|
||||||
|
"http 1.4.0",
|
||||||
|
"http-body 1.0.1",
|
||||||
|
"http-body-util",
|
||||||
|
"hyper 1.8.1",
|
||||||
|
"hyper-rustls 0.27.7",
|
||||||
|
"hyper-tls",
|
||||||
|
"hyper-util",
|
||||||
|
"js-sys",
|
||||||
|
"log",
|
||||||
|
"mime",
|
||||||
|
"native-tls",
|
||||||
|
"percent-encoding",
|
||||||
|
"pin-project-lite",
|
||||||
|
"rustls-pki-types",
|
||||||
|
"serde",
|
||||||
|
"serde_json",
|
||||||
|
"serde_urlencoded",
|
||||||
|
"sync_wrapper",
|
||||||
|
"tokio",
|
||||||
|
"tokio-native-tls",
|
||||||
|
"tokio-util",
|
||||||
|
"tower",
|
||||||
|
"tower-http",
|
||||||
|
"tower-service",
|
||||||
|
"url",
|
||||||
|
"wasm-bindgen",
|
||||||
|
"wasm-bindgen-futures",
|
||||||
|
"wasm-streams",
|
||||||
|
"web-sys",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "rfc6979"
|
name = "rfc6979"
|
||||||
version = "0.3.1"
|
version = "0.3.1"
|
||||||
@@ -3149,7 +3321,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
|
|||||||
checksum = "b7f4bc775c73d9a02cde8bf7b2ec4c9d12743edf609006c7facc23998404cd1d"
|
checksum = "b7f4bc775c73d9a02cde8bf7b2ec4c9d12743edf609006c7facc23998404cd1d"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"bitflags",
|
"bitflags",
|
||||||
"core-foundation",
|
"core-foundation 0.10.1",
|
||||||
"core-foundation-sys",
|
"core-foundation-sys",
|
||||||
"libc",
|
"libc",
|
||||||
"security-framework-sys",
|
"security-framework-sys",
|
||||||
@@ -3498,6 +3670,9 @@ name = "sync_wrapper"
|
|||||||
version = "1.0.2"
|
version = "1.0.2"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "0bf256ce5efdfa370213c1dabab5935a12e49f2c58d15e9eac2870d3b4f27263"
|
checksum = "0bf256ce5efdfa370213c1dabab5935a12e49f2c58d15e9eac2870d3b4f27263"
|
||||||
|
dependencies = [
|
||||||
|
"futures-core",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "synstructure"
|
name = "synstructure"
|
||||||
@@ -3510,6 +3685,27 @@ dependencies = [
|
|||||||
"syn 2.0.117",
|
"syn 2.0.117",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "system-configuration"
|
||||||
|
version = "0.7.0"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "a13f3d0daba03132c0aa9767f98351b3488edc2c100cda2d2ec2b04f3d8d3c8b"
|
||||||
|
dependencies = [
|
||||||
|
"bitflags",
|
||||||
|
"core-foundation 0.9.4",
|
||||||
|
"system-configuration-sys",
|
||||||
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "system-configuration-sys"
|
||||||
|
version = "0.6.0"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "8e1d1b10ced5ca923a1fcb8d03e96b8d3268065d724548c0211415ff6ac6bac4"
|
||||||
|
dependencies = [
|
||||||
|
"core-foundation-sys",
|
||||||
|
"libc",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "tantivy"
|
name = "tantivy"
|
||||||
version = "0.25.0"
|
version = "0.25.0"
|
||||||
@@ -3782,6 +3978,16 @@ dependencies = [
|
|||||||
"syn 2.0.117",
|
"syn 2.0.117",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "tokio-native-tls"
|
||||||
|
version = "0.3.1"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "bbae76ab933c85776efabc971569dd6119c580d8f5d448769dec1764bf796ef2"
|
||||||
|
dependencies = [
|
||||||
|
"native-tls",
|
||||||
|
"tokio",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "tokio-rustls"
|
name = "tokio-rustls"
|
||||||
version = "0.24.1"
|
version = "0.24.1"
|
||||||
@@ -3881,6 +4087,24 @@ dependencies = [
|
|||||||
"tracing",
|
"tracing",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "tower-http"
|
||||||
|
version = "0.6.8"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "d4e6559d53cc268e5031cd8429d05415bc4cb4aefc4aa5d6cc35fbf5b924a1f8"
|
||||||
|
dependencies = [
|
||||||
|
"bitflags",
|
||||||
|
"bytes",
|
||||||
|
"futures-util",
|
||||||
|
"http 1.4.0",
|
||||||
|
"http-body 1.0.1",
|
||||||
|
"iri-string",
|
||||||
|
"pin-project-lite",
|
||||||
|
"tower",
|
||||||
|
"tower-layer",
|
||||||
|
"tower-service",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "tower-layer"
|
name = "tower-layer"
|
||||||
version = "0.3.3"
|
version = "0.3.3"
|
||||||
@@ -4183,6 +4407,12 @@ version = "0.1.1"
|
|||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "ba73ea9cf16a25df0c8caa16c51acb937d5712a8429db78a3ee29d5dcacd3a65"
|
checksum = "ba73ea9cf16a25df0c8caa16c51acb937d5712a8429db78a3ee29d5dcacd3a65"
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "vcpkg"
|
||||||
|
version = "0.2.15"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "accd4ea62f7bb7a82fe23066fb0957d48ef677f6eeb8215f372f52e48bb32426"
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "vecmath"
|
name = "vecmath"
|
||||||
version = "1.0.0"
|
version = "1.0.0"
|
||||||
@@ -4348,6 +4578,19 @@ dependencies = [
|
|||||||
"wasmparser",
|
"wasmparser",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "wasm-streams"
|
||||||
|
version = "0.4.2"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "15053d8d85c7eccdbefef60f06769760a563c7f0a9d6902a13d35c7800b0ad65"
|
||||||
|
dependencies = [
|
||||||
|
"futures-util",
|
||||||
|
"js-sys",
|
||||||
|
"wasm-bindgen",
|
||||||
|
"wasm-bindgen-futures",
|
||||||
|
"web-sys",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "wasmparser"
|
name = "wasmparser"
|
||||||
version = "0.244.0"
|
version = "0.244.0"
|
||||||
@@ -4458,6 +4701,17 @@ version = "0.2.1"
|
|||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "f0805222e57f7521d6a62e36fa9163bc891acd422f971defe97d64e70d0a4fe5"
|
checksum = "f0805222e57f7521d6a62e36fa9163bc891acd422f971defe97d64e70d0a4fe5"
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "windows-registry"
|
||||||
|
version = "0.6.1"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "02752bf7fbdcce7f2a27a742f798510f3e5ad88dbe84871e5168e2120c3d5720"
|
||||||
|
dependencies = [
|
||||||
|
"windows-link",
|
||||||
|
"windows-result",
|
||||||
|
"windows-strings",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "windows-result"
|
name = "windows-result"
|
||||||
version = "0.4.1"
|
version = "0.4.1"
|
||||||
|
|||||||
@@ -70,9 +70,11 @@ pile-flac = { path = "crates/pile-flac" }
|
|||||||
pile-dataset = { path = "crates/pile-dataset" }
|
pile-dataset = { path = "crates/pile-dataset" }
|
||||||
pile-value = { path = "crates/pile-value" }
|
pile-value = { path = "crates/pile-value" }
|
||||||
pile-io = { path = "crates/pile-io" }
|
pile-io = { path = "crates/pile-io" }
|
||||||
|
pile-client = { path = "crates/pile-client" }
|
||||||
|
|
||||||
# Clients & servers
|
# Clients & servers
|
||||||
tantivy = "0.25.0"
|
tantivy = "0.25.0"
|
||||||
|
servable = { version = "0.0.7", features = ["image"] }
|
||||||
axum = { version = "0.8.8", features = ["macros", "multipart"] }
|
axum = { version = "0.8.8", features = ["macros", "multipart"] }
|
||||||
utoipa = { version = "5.4.0", features = [
|
utoipa = { version = "5.4.0", features = [
|
||||||
"axum_extras",
|
"axum_extras",
|
||||||
|
|||||||
15
crates/pile-client/Cargo.toml
Normal file
15
crates/pile-client/Cargo.toml
Normal file
@@ -0,0 +1,15 @@
|
|||||||
|
[package]
|
||||||
|
name = "pile-client"
|
||||||
|
version = { workspace = true }
|
||||||
|
rust-version = { workspace = true }
|
||||||
|
edition = { workspace = true }
|
||||||
|
|
||||||
|
[lints]
|
||||||
|
workspace = true
|
||||||
|
|
||||||
|
[dependencies]
|
||||||
|
reqwest = { version = "0.12", features = ["json", "stream"] }
|
||||||
|
futures-core = "0.3"
|
||||||
|
serde = { workspace = true }
|
||||||
|
thiserror = { workspace = true }
|
||||||
|
bytes = { workspace = true }
|
||||||
230
crates/pile-client/src/lib.rs
Normal file
230
crates/pile-client/src/lib.rs
Normal file
@@ -0,0 +1,230 @@
|
|||||||
|
use bytes::Bytes;
|
||||||
|
use futures_core::Stream;
|
||||||
|
use reqwest::{Client, StatusCode, header};
|
||||||
|
use serde::{Deserialize, Serialize};
|
||||||
|
use std::pin::Pin;
|
||||||
|
use thiserror::Error;
|
||||||
|
|
||||||
|
//
|
||||||
|
// MARK: Error
|
||||||
|
//
|
||||||
|
|
||||||
|
#[derive(Debug, Error)]
|
||||||
|
pub enum ClientError {
|
||||||
|
#[error("invalid bearer token")]
|
||||||
|
InvalidToken,
|
||||||
|
|
||||||
|
#[error("HTTP {status}: {body}")]
|
||||||
|
Http { status: StatusCode, body: String },
|
||||||
|
|
||||||
|
#[error(transparent)]
|
||||||
|
Reqwest(#[from] reqwest::Error),
|
||||||
|
}
|
||||||
|
|
||||||
|
//
|
||||||
|
// MARK: Response types
|
||||||
|
//
|
||||||
|
|
||||||
|
#[derive(Debug, Deserialize)]
|
||||||
|
pub struct DatasetInfo {
|
||||||
|
pub name: String,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Serialize)]
|
||||||
|
pub struct LookupRequest {
|
||||||
|
pub query: String,
|
||||||
|
#[serde(skip_serializing_if = "Option::is_none")]
|
||||||
|
pub limit: Option<usize>,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Deserialize)]
|
||||||
|
pub struct LookupResult {
|
||||||
|
pub score: f32,
|
||||||
|
pub source: String,
|
||||||
|
pub key: String,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Deserialize)]
|
||||||
|
pub struct LookupResponse {
|
||||||
|
pub results: Vec<LookupResult>,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Deserialize)]
|
||||||
|
pub struct ItemRef {
|
||||||
|
pub source: String,
|
||||||
|
pub key: String,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Deserialize)]
|
||||||
|
pub struct ItemsResponse {
|
||||||
|
pub items: Vec<ItemRef>,
|
||||||
|
pub total: usize,
|
||||||
|
pub offset: usize,
|
||||||
|
pub limit: usize,
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Raw field response: the content-type and body bytes as returned by the server.
|
||||||
|
pub struct FieldResponse {
|
||||||
|
pub content_type: String,
|
||||||
|
pub data: Bytes,
|
||||||
|
}
|
||||||
|
|
||||||
|
//
|
||||||
|
// MARK: PileClient
|
||||||
|
//
|
||||||
|
|
||||||
|
/// A client for a pile server. Use [`PileClient::dataset`] to get a dataset-scoped client.
|
||||||
|
pub struct PileClient {
|
||||||
|
base_url: String,
|
||||||
|
client: Client,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl PileClient {
|
||||||
|
pub fn new(base_url: impl Into<String>, token: Option<&str>) -> Result<Self, ClientError> {
|
||||||
|
let mut headers = header::HeaderMap::new();
|
||||||
|
|
||||||
|
if let Some(token) = token {
|
||||||
|
let value = header::HeaderValue::from_str(&format!("Bearer {token}"))
|
||||||
|
.map_err(|_| ClientError::InvalidToken)?;
|
||||||
|
headers.insert(header::AUTHORIZATION, value);
|
||||||
|
}
|
||||||
|
|
||||||
|
let client = Client::builder()
|
||||||
|
.default_headers(headers)
|
||||||
|
.build()
|
||||||
|
.map_err(ClientError::Reqwest)?;
|
||||||
|
|
||||||
|
Ok(Self {
|
||||||
|
base_url: base_url.into(),
|
||||||
|
client,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Returns a client scoped to a specific dataset (i.e. `/{name}/...`).
|
||||||
|
pub fn dataset(&self, name: &str) -> DatasetClient {
|
||||||
|
DatasetClient {
|
||||||
|
base_url: format!("{}/{name}", self.base_url),
|
||||||
|
client: self.client.clone(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// `GET /datasets` — list all datasets served by this server.
|
||||||
|
pub async fn list_datasets(&self) -> Result<Vec<DatasetInfo>, ClientError> {
|
||||||
|
let resp = self
|
||||||
|
.client
|
||||||
|
.get(format!("{}/datasets", self.base_url))
|
||||||
|
.send()
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
check_status(resp).await?.json().await.map_err(Into::into)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
//
|
||||||
|
// MARK: DatasetClient
|
||||||
|
//
|
||||||
|
|
||||||
|
/// A client scoped to a single dataset on the server.
|
||||||
|
pub struct DatasetClient {
|
||||||
|
base_url: String,
|
||||||
|
client: Client,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl DatasetClient {
|
||||||
|
/// `POST /lookup` — full-text search within this dataset.
|
||||||
|
pub async fn lookup(
|
||||||
|
&self,
|
||||||
|
query: impl Into<String>,
|
||||||
|
limit: Option<usize>,
|
||||||
|
) -> Result<LookupResponse, ClientError> {
|
||||||
|
let body = LookupRequest {
|
||||||
|
query: query.into(),
|
||||||
|
limit,
|
||||||
|
};
|
||||||
|
|
||||||
|
let resp = self
|
||||||
|
.client
|
||||||
|
.post(format!("{}/lookup", self.base_url))
|
||||||
|
.json(&body)
|
||||||
|
.send()
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
check_status(resp).await?.json().await.map_err(Into::into)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// `GET /item` — stream the raw bytes of an item.
|
||||||
|
///
|
||||||
|
/// The returned stream yields chunks as they arrive from the server.
|
||||||
|
pub async fn get_item(
|
||||||
|
&self,
|
||||||
|
source: &str,
|
||||||
|
key: &str,
|
||||||
|
) -> Result<Pin<Box<dyn Stream<Item = Result<Bytes, reqwest::Error>> + Send>>, ClientError> {
|
||||||
|
let resp = self
|
||||||
|
.client
|
||||||
|
.get(format!("{}/item", self.base_url))
|
||||||
|
.query(&[("source", source), ("key", key)])
|
||||||
|
.send()
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
Ok(Box::pin(check_status(resp).await?.bytes_stream()))
|
||||||
|
}
|
||||||
|
|
||||||
|
/// `GET /field` — extract a field from an item by object path (e.g. `$.flac.title`).
|
||||||
|
pub async fn get_field(
|
||||||
|
&self,
|
||||||
|
source: &str,
|
||||||
|
key: &str,
|
||||||
|
path: &str,
|
||||||
|
) -> Result<FieldResponse, ClientError> {
|
||||||
|
let resp = self
|
||||||
|
.client
|
||||||
|
.get(format!("{}/field", self.base_url))
|
||||||
|
.query(&[("source", source), ("key", key), ("path", path)])
|
||||||
|
.send()
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
let resp = check_status(resp).await?;
|
||||||
|
|
||||||
|
let content_type = resp
|
||||||
|
.headers()
|
||||||
|
.get(header::CONTENT_TYPE)
|
||||||
|
.and_then(|v| v.to_str().ok())
|
||||||
|
.unwrap_or("application/octet-stream")
|
||||||
|
.to_owned();
|
||||||
|
|
||||||
|
let data = resp.bytes().await?;
|
||||||
|
|
||||||
|
Ok(FieldResponse { content_type, data })
|
||||||
|
}
|
||||||
|
|
||||||
|
/// `GET /items` — paginate over all items in this dataset, ordered by (source, key).
|
||||||
|
pub async fn list_items(
|
||||||
|
&self,
|
||||||
|
offset: usize,
|
||||||
|
limit: usize,
|
||||||
|
) -> Result<ItemsResponse, ClientError> {
|
||||||
|
let resp = self
|
||||||
|
.client
|
||||||
|
.get(format!("{}/items", self.base_url))
|
||||||
|
.query(&[("offset", offset), ("limit", limit)])
|
||||||
|
.send()
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
check_status(resp).await?.json().await.map_err(Into::into)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
//
|
||||||
|
// MARK: helpers
|
||||||
|
//
|
||||||
|
|
||||||
|
async fn check_status(resp: reqwest::Response) -> Result<reqwest::Response, ClientError> {
|
||||||
|
let status = resp.status();
|
||||||
|
if status.is_success() {
|
||||||
|
return Ok(resp);
|
||||||
|
}
|
||||||
|
|
||||||
|
let body = resp.text().await.unwrap_or_default();
|
||||||
|
Err(ClientError::Http { status, body })
|
||||||
|
}
|
||||||
@@ -61,6 +61,13 @@ impl Dataset {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn iter_page(&self, offset: usize, limit: usize) -> Box<dyn Iterator<Item = &Item> + Send + '_> {
|
||||||
|
match self {
|
||||||
|
Self::Dir(ds) => Box::new(ds.iter_page(offset, limit)),
|
||||||
|
Self::S3(ds) => Box::new(ds.iter_page(offset, limit)),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
pub async fn latest_change(&self) -> Result<Option<DateTime<Utc>>, std::io::Error> {
|
pub async fn latest_change(&self) -> Result<Option<DateTime<Utc>>, std::io::Error> {
|
||||||
match self {
|
match self {
|
||||||
Self::Dir(ds) => ds.latest_change().await,
|
Self::Dir(ds) => ds.latest_change().await,
|
||||||
@@ -81,6 +88,7 @@ pub struct Datasets {
|
|||||||
|
|
||||||
pub config: ConfigToml,
|
pub config: ConfigToml,
|
||||||
pub sources: HashMap<Label, Dataset>,
|
pub sources: HashMap<Label, Dataset>,
|
||||||
|
pub disabled_sources: HashMap<Label, Dataset>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Datasets {
|
impl Datasets {
|
||||||
@@ -114,6 +122,8 @@ impl Datasets {
|
|||||||
};
|
};
|
||||||
|
|
||||||
let mut sources = HashMap::new();
|
let mut sources = HashMap::new();
|
||||||
|
let mut disabled_sources = HashMap::new();
|
||||||
|
|
||||||
for (label, source) in &config.dataset.source {
|
for (label, source) in &config.dataset.source {
|
||||||
match source {
|
match source {
|
||||||
Source::Filesystem {
|
Source::Filesystem {
|
||||||
@@ -121,11 +131,12 @@ impl Datasets {
|
|||||||
path,
|
path,
|
||||||
pattern,
|
pattern,
|
||||||
} => {
|
} => {
|
||||||
if !enabled {
|
let target = match enabled {
|
||||||
continue;
|
true => &mut sources,
|
||||||
}
|
false => &mut disabled_sources,
|
||||||
|
};
|
||||||
|
|
||||||
sources.insert(
|
target.insert(
|
||||||
label.clone(),
|
label.clone(),
|
||||||
Dataset::Dir(
|
Dataset::Dir(
|
||||||
DirDataSource::new(label, path_parent.join(path), pattern.clone())
|
DirDataSource::new(label, path_parent.join(path), pattern.clone())
|
||||||
@@ -144,26 +155,29 @@ impl Datasets {
|
|||||||
pattern,
|
pattern,
|
||||||
encryption_key,
|
encryption_key,
|
||||||
} => {
|
} => {
|
||||||
if !enabled {
|
let target = match enabled {
|
||||||
continue;
|
true => &mut sources,
|
||||||
}
|
false => &mut disabled_sources,
|
||||||
|
};
|
||||||
|
|
||||||
let encryption_key = encryption_key.as_ref().map(|x| string_to_key(x));
|
let encryption_key = encryption_key.as_ref().map(|x| string_to_key(x));
|
||||||
|
|
||||||
match S3DataSource::new(
|
match S3DataSource::new(
|
||||||
label,
|
label,
|
||||||
bucket.clone(),
|
bucket,
|
||||||
prefix.clone(),
|
prefix.as_ref().map(|x| x.as_str()),
|
||||||
endpoint.clone(),
|
endpoint.as_ref().map(|x| x.as_str()),
|
||||||
region.clone(),
|
region,
|
||||||
credentials,
|
&credentials.access_key_id,
|
||||||
|
&credentials.secret_access_key,
|
||||||
|
10_000_000,
|
||||||
pattern.clone(),
|
pattern.clone(),
|
||||||
encryption_key,
|
encryption_key,
|
||||||
)
|
)
|
||||||
.await
|
.await
|
||||||
{
|
{
|
||||||
Ok(ds) => {
|
Ok(ds) => {
|
||||||
sources.insert(label.clone(), Dataset::S3(ds));
|
target.insert(label.clone(), Dataset::S3(ds));
|
||||||
}
|
}
|
||||||
Err(err) => {
|
Err(err) => {
|
||||||
warn!("Could not open S3 source {label}: {err}");
|
warn!("Could not open S3 source {label}: {err}");
|
||||||
@@ -179,6 +193,7 @@ impl Datasets {
|
|||||||
path_parent,
|
path_parent,
|
||||||
config,
|
config,
|
||||||
sources,
|
sources,
|
||||||
|
disabled_sources,
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -219,6 +234,7 @@ impl Datasets {
|
|||||||
.join(config.dataset.name.as_str());
|
.join(config.dataset.name.as_str());
|
||||||
|
|
||||||
let mut sources = HashMap::new();
|
let mut sources = HashMap::new();
|
||||||
|
let mut disabled_sources = HashMap::new();
|
||||||
for (label, source) in &config.dataset.source {
|
for (label, source) in &config.dataset.source {
|
||||||
match source {
|
match source {
|
||||||
Source::Filesystem {
|
Source::Filesystem {
|
||||||
@@ -226,11 +242,12 @@ impl Datasets {
|
|||||||
path,
|
path,
|
||||||
pattern,
|
pattern,
|
||||||
} => {
|
} => {
|
||||||
if !enabled {
|
let target = match enabled {
|
||||||
continue;
|
true => &mut sources,
|
||||||
}
|
false => &mut disabled_sources,
|
||||||
|
};
|
||||||
|
|
||||||
sources.insert(
|
target.insert(
|
||||||
label.clone(),
|
label.clone(),
|
||||||
Dataset::Dir(
|
Dataset::Dir(
|
||||||
DirDataSource::new(label, path_parent.join(path), pattern.clone())
|
DirDataSource::new(label, path_parent.join(path), pattern.clone())
|
||||||
@@ -249,26 +266,29 @@ impl Datasets {
|
|||||||
pattern,
|
pattern,
|
||||||
encryption_key,
|
encryption_key,
|
||||||
} => {
|
} => {
|
||||||
if !enabled {
|
let target = match enabled {
|
||||||
continue;
|
true => &mut sources,
|
||||||
}
|
false => &mut disabled_sources,
|
||||||
|
};
|
||||||
|
|
||||||
let encryption_key = encryption_key.as_ref().map(|x| string_to_key(x));
|
let encryption_key = encryption_key.as_ref().map(|x| string_to_key(x));
|
||||||
|
|
||||||
match S3DataSource::new(
|
match S3DataSource::new(
|
||||||
label,
|
label,
|
||||||
bucket.clone(),
|
bucket,
|
||||||
prefix.clone(),
|
prefix.as_ref().map(|x| x.as_str()),
|
||||||
endpoint.clone(),
|
endpoint.as_ref().map(|x| x.as_str()),
|
||||||
region.clone(),
|
region,
|
||||||
credentials,
|
&credentials.access_key_id,
|
||||||
|
&credentials.secret_access_key,
|
||||||
|
10_000_000,
|
||||||
pattern.clone(),
|
pattern.clone(),
|
||||||
encryption_key,
|
encryption_key,
|
||||||
)
|
)
|
||||||
.await
|
.await
|
||||||
{
|
{
|
||||||
Ok(ds) => {
|
Ok(ds) => {
|
||||||
sources.insert(label.clone(), Dataset::S3(ds));
|
target.insert(label.clone(), Dataset::S3(ds));
|
||||||
}
|
}
|
||||||
Err(err) => {
|
Err(err) => {
|
||||||
warn!("Could not open S3 source {label}: {err}");
|
warn!("Could not open S3 source {label}: {err}");
|
||||||
@@ -284,6 +304,7 @@ impl Datasets {
|
|||||||
path_parent,
|
path_parent,
|
||||||
config,
|
config,
|
||||||
sources,
|
sources,
|
||||||
|
disabled_sources,
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
104
crates/pile-dataset/src/serve/items.rs
Normal file
104
crates/pile-dataset/src/serve/items.rs
Normal file
@@ -0,0 +1,104 @@
|
|||||||
|
use axum::{
|
||||||
|
Json,
|
||||||
|
extract::{Query, State},
|
||||||
|
http::StatusCode,
|
||||||
|
response::{IntoResponse, Response},
|
||||||
|
};
|
||||||
|
use serde::{Deserialize, Serialize};
|
||||||
|
use std::sync::Arc;
|
||||||
|
use tracing::debug;
|
||||||
|
use utoipa::ToSchema;
|
||||||
|
|
||||||
|
use crate::Datasets;
|
||||||
|
|
||||||
|
#[derive(Deserialize, ToSchema)]
|
||||||
|
pub struct ItemsQuery {
|
||||||
|
#[serde(default)]
|
||||||
|
offset: usize,
|
||||||
|
#[serde(default = "default_limit")]
|
||||||
|
limit: usize,
|
||||||
|
}
|
||||||
|
|
||||||
|
fn default_limit() -> usize {
|
||||||
|
100
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Serialize, ToSchema)]
|
||||||
|
pub struct ItemsResponse {
|
||||||
|
pub items: Vec<ItemRef>,
|
||||||
|
pub total: usize,
|
||||||
|
pub offset: usize,
|
||||||
|
pub limit: usize,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Serialize, ToSchema)]
|
||||||
|
pub struct ItemRef {
|
||||||
|
pub source: String,
|
||||||
|
pub key: String,
|
||||||
|
}
|
||||||
|
|
||||||
|
/// List all items across all sources with consistent ordering, paginated by offset and limit
|
||||||
|
#[utoipa::path(
|
||||||
|
get,
|
||||||
|
path = "/items",
|
||||||
|
params(
|
||||||
|
("offset" = usize, Query, description = "Number of items to skip"),
|
||||||
|
("limit" = usize, Query, description = "Maximum number of items to return (max 1000)"),
|
||||||
|
),
|
||||||
|
responses(
|
||||||
|
(status = 200, description = "Paginated list of items", body = ItemsResponse),
|
||||||
|
)
|
||||||
|
)]
|
||||||
|
pub async fn items_list(
|
||||||
|
State(state): State<Arc<Datasets>>,
|
||||||
|
Query(params): Query<ItemsQuery>,
|
||||||
|
) -> Response {
|
||||||
|
let limit = params.limit.min(1000);
|
||||||
|
let offset = params.offset;
|
||||||
|
|
||||||
|
debug!(message = "Serving /items", offset, limit);
|
||||||
|
|
||||||
|
// Sort sources by label for a consistent global order: (source, key)
|
||||||
|
let mut source_labels: Vec<_> = state.sources.keys().collect();
|
||||||
|
source_labels.sort();
|
||||||
|
|
||||||
|
let mut items: Vec<ItemRef> = Vec::with_capacity(limit);
|
||||||
|
let mut total = 0usize;
|
||||||
|
let mut remaining_offset = offset;
|
||||||
|
|
||||||
|
for label in source_labels {
|
||||||
|
let dataset = &state.sources[label];
|
||||||
|
let source_len = dataset.len();
|
||||||
|
|
||||||
|
if remaining_offset >= source_len {
|
||||||
|
// This entire source is before our window; skip it efficiently
|
||||||
|
remaining_offset -= source_len;
|
||||||
|
total += source_len;
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
let want = (limit - items.len()).min(source_len - remaining_offset);
|
||||||
|
let source_str = label.as_str().to_owned();
|
||||||
|
for item in dataset.iter_page(remaining_offset, want) {
|
||||||
|
items.push(ItemRef {
|
||||||
|
source: source_str.clone(),
|
||||||
|
key: item.key().to_string(),
|
||||||
|
});
|
||||||
|
}
|
||||||
|
remaining_offset = 0;
|
||||||
|
total += source_len;
|
||||||
|
}
|
||||||
|
|
||||||
|
debug!(message = "Served /items", offset, limit, total);
|
||||||
|
|
||||||
|
(
|
||||||
|
StatusCode::OK,
|
||||||
|
Json(ItemsResponse {
|
||||||
|
items,
|
||||||
|
total,
|
||||||
|
offset,
|
||||||
|
limit,
|
||||||
|
}),
|
||||||
|
)
|
||||||
|
.into_response()
|
||||||
|
}
|
||||||
@@ -48,13 +48,10 @@ pub async fn lookup(
|
|||||||
Json(body): Json<LookupRequest>,
|
Json(body): Json<LookupRequest>,
|
||||||
) -> Response {
|
) -> Response {
|
||||||
let start = Instant::now();
|
let start = Instant::now();
|
||||||
debug!(
|
let limit = body.limit.unwrap_or(128).min(1024);
|
||||||
message = "Serving /lookup",
|
debug!(message = "Serving /lookup", query = body.query, limit);
|
||||||
query = body.query,
|
|
||||||
limit = body.limit.unwrap_or(10)
|
|
||||||
);
|
|
||||||
|
|
||||||
let results: Vec<LookupResult> = match state.fts_lookup(&body.query, body.limit.unwrap_or(10)) {
|
let results: Vec<LookupResult> = match state.fts_lookup(&body.query, limit) {
|
||||||
Ok(x) => x
|
Ok(x) => x
|
||||||
.into_iter()
|
.into_iter()
|
||||||
.map(|x| LookupResult {
|
.map(|x| LookupResult {
|
||||||
|
|||||||
@@ -17,31 +17,47 @@ pub use item::*;
|
|||||||
mod field;
|
mod field;
|
||||||
pub use field::*;
|
pub use field::*;
|
||||||
|
|
||||||
|
mod items;
|
||||||
|
pub use items::*;
|
||||||
|
|
||||||
#[derive(OpenApi)]
|
#[derive(OpenApi)]
|
||||||
#[openapi(
|
#[openapi(
|
||||||
tags(),
|
tags(),
|
||||||
paths(lookup, item_get, get_field),
|
paths(lookup, item_get, get_field, items_list),
|
||||||
components(schemas(LookupRequest, LookupResponse, LookupResult, ItemQuery, FieldQuery))
|
components(schemas(LookupRequest, LookupResponse, LookupResult, ItemQuery, FieldQuery, ItemsQuery, ItemsResponse, ItemRef))
|
||||||
)]
|
)]
|
||||||
pub(crate) struct Api;
|
pub(crate) struct Api;
|
||||||
|
|
||||||
impl Datasets {
|
impl Datasets {
|
||||||
#[inline]
|
#[inline]
|
||||||
pub fn router(self: Arc<Self>, with_docs: bool) -> Router<()> {
|
pub fn router(self: Arc<Self>, with_docs: bool) -> Router<()> {
|
||||||
|
self.router_prefix(with_docs, None)
|
||||||
|
}
|
||||||
|
|
||||||
|
#[inline]
|
||||||
|
pub fn router_prefix(self: Arc<Self>, with_docs: bool, prefix: Option<&str>) -> Router<()> {
|
||||||
let mut router = Router::new()
|
let mut router = Router::new()
|
||||||
.route("/lookup", post(lookup))
|
.route("/lookup", post(lookup))
|
||||||
.route("/item", get(item_get))
|
.route("/item", get(item_get))
|
||||||
.route("/field", get(get_field))
|
.route("/field", get(get_field))
|
||||||
|
.route("/items", get(items_list))
|
||||||
.with_state(self.clone());
|
.with_state(self.clone());
|
||||||
|
|
||||||
|
if let Some(prefix) = prefix {
|
||||||
|
router = Router::new().nest(prefix, router);
|
||||||
|
}
|
||||||
|
|
||||||
if with_docs {
|
if with_docs {
|
||||||
let docs_path = "/docs";
|
let docs_path = match prefix {
|
||||||
let docs = SwaggerUi::new(docs_path)
|
None => "/docs".into(),
|
||||||
|
Some(prefix) => format!("{prefix}/docs"),
|
||||||
|
};
|
||||||
|
|
||||||
|
let docs = SwaggerUi::new(docs_path.clone())
|
||||||
.url(format!("{}/openapi.json", docs_path), Api::openapi());
|
.url(format!("{}/openapi.json", docs_path), Api::openapi());
|
||||||
|
|
||||||
router = router.merge(docs);
|
router = router.merge(docs);
|
||||||
}
|
}
|
||||||
|
|
||||||
router
|
router
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
47
crates/pile-io/src/chacha/format.rs
Normal file
47
crates/pile-io/src/chacha/format.rs
Normal file
@@ -0,0 +1,47 @@
|
|||||||
|
use binrw::{binrw, meta::ReadMagic};
|
||||||
|
|
||||||
|
#[binrw]
|
||||||
|
#[brw(little, magic = b"PileChaChav1")]
|
||||||
|
#[derive(Debug, Clone, Copy)]
|
||||||
|
pub struct ChaChaHeaderv1 {
|
||||||
|
pub config: ChaChaConfigv1,
|
||||||
|
pub plaintext_size: u64,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl ChaChaHeaderv1 {
|
||||||
|
pub const SIZE: usize = ChaChaHeaderv1::MAGIC.len() + std::mem::size_of::<ChaChaConfigv1>() + 8;
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn chachaheader_size() {
|
||||||
|
assert_eq!(ChaChaHeaderv1::SIZE, std::mem::size_of::<ChaChaHeaderv1>())
|
||||||
|
}
|
||||||
|
|
||||||
|
//
|
||||||
|
// MARK: config
|
||||||
|
//
|
||||||
|
|
||||||
|
#[binrw]
|
||||||
|
#[brw(little)]
|
||||||
|
#[derive(Debug, Clone, Copy)]
|
||||||
|
pub struct ChaChaConfigv1 {
|
||||||
|
pub chunk_size: u64,
|
||||||
|
pub nonce_size: u64,
|
||||||
|
pub tag_size: u64,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Default for ChaChaConfigv1 {
|
||||||
|
fn default() -> Self {
|
||||||
|
Self {
|
||||||
|
chunk_size: 64 * 1024,
|
||||||
|
nonce_size: 24,
|
||||||
|
tag_size: 16,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl ChaChaConfigv1 {
|
||||||
|
pub(crate) fn enc_chunk_size(&self) -> u64 {
|
||||||
|
self.chunk_size + self.nonce_size + self.tag_size
|
||||||
|
}
|
||||||
|
}
|
||||||
9
crates/pile-io/src/chacha/mod.rs
Normal file
9
crates/pile-io/src/chacha/mod.rs
Normal file
@@ -0,0 +1,9 @@
|
|||||||
|
mod reader;
|
||||||
|
mod reader_async;
|
||||||
|
mod writer;
|
||||||
|
mod writer_async;
|
||||||
|
|
||||||
|
pub use {reader::*, reader_async::*, writer::*, writer_async::*};
|
||||||
|
|
||||||
|
mod format;
|
||||||
|
pub use format::*;
|
||||||
@@ -1,70 +1,15 @@
|
|||||||
use std::io::{Read, Seek, SeekFrom};
|
use std::io::{Read, Seek, SeekFrom};
|
||||||
|
|
||||||
use binrw::binrw;
|
use crate::{AsyncReader, AsyncSeekReader, chacha::ChaChaHeaderv1};
|
||||||
|
|
||||||
use crate::{AsyncReader, AsyncSeekReader};
|
|
||||||
|
|
||||||
//
|
|
||||||
// MARK: header
|
|
||||||
//
|
|
||||||
|
|
||||||
/// Serialized size of [`ChaChaHeader`] in bytes: 12 magic + 3×8 config + 8 plaintext_size.
|
|
||||||
pub const HEADER_SIZE: usize = 44;
|
|
||||||
|
|
||||||
#[binrw]
|
|
||||||
#[brw(little, magic = b"PileChaChav1")]
|
|
||||||
#[derive(Debug, Clone, Copy)]
|
|
||||||
pub struct ChaChaHeader {
|
|
||||||
pub chunk_size: u64,
|
|
||||||
pub nonce_size: u64,
|
|
||||||
pub tag_size: u64,
|
|
||||||
pub plaintext_size: u64,
|
|
||||||
}
|
|
||||||
|
|
||||||
//
|
|
||||||
// MARK: config
|
|
||||||
//
|
|
||||||
|
|
||||||
#[derive(Debug, Clone, Copy)]
|
|
||||||
pub struct ChaChaReaderConfig {
|
|
||||||
pub chunk_size: u64,
|
|
||||||
pub nonce_size: u64,
|
|
||||||
pub tag_size: u64,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Default for ChaChaReaderConfig {
|
|
||||||
fn default() -> Self {
|
|
||||||
Self {
|
|
||||||
chunk_size: 1_048_576, // 1MiB
|
|
||||||
nonce_size: 24,
|
|
||||||
tag_size: 16,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl ChaChaReaderConfig {
|
|
||||||
pub(crate) fn enc_chunk_size(&self) -> u64 {
|
|
||||||
self.chunk_size + self.nonce_size + self.tag_size
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl From<ChaChaHeader> for ChaChaReaderConfig {
|
|
||||||
fn from(h: ChaChaHeader) -> Self {
|
|
||||||
Self {
|
|
||||||
chunk_size: h.chunk_size,
|
|
||||||
nonce_size: h.nonce_size,
|
|
||||||
tag_size: h.tag_size,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
//
|
//
|
||||||
// MARK: reader
|
// MARK: reader
|
||||||
//
|
//
|
||||||
|
|
||||||
pub struct ChaChaReader<R: Read + Seek> {
|
pub struct ChaChaReaderv1<R: Read + Seek> {
|
||||||
inner: R,
|
inner: R,
|
||||||
config: ChaChaReaderConfig,
|
header: ChaChaHeaderv1,
|
||||||
|
|
||||||
data_offset: u64,
|
data_offset: u64,
|
||||||
encryption_key: [u8; 32],
|
encryption_key: [u8; 32],
|
||||||
cursor: u64,
|
cursor: u64,
|
||||||
@@ -72,17 +17,17 @@ pub struct ChaChaReader<R: Read + Seek> {
|
|||||||
cached_chunk: Option<(u64, Vec<u8>)>,
|
cached_chunk: Option<(u64, Vec<u8>)>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<R: Read + Seek> ChaChaReader<R> {
|
impl<R: Read + Seek> ChaChaReaderv1<R> {
|
||||||
pub fn new(mut inner: R, encryption_key: [u8; 32]) -> Result<Self, std::io::Error> {
|
pub fn new(mut inner: R, encryption_key: [u8; 32]) -> Result<Self, std::io::Error> {
|
||||||
use binrw::BinReaderExt;
|
use binrw::BinReaderExt;
|
||||||
|
|
||||||
inner.seek(SeekFrom::Start(0))?;
|
inner.seek(SeekFrom::Start(0))?;
|
||||||
let header: ChaChaHeader = inner.read_le().map_err(std::io::Error::other)?;
|
let header: ChaChaHeaderv1 = inner.read_le().map_err(std::io::Error::other)?;
|
||||||
let data_offset = inner.stream_position()?;
|
let data_offset = inner.stream_position()?;
|
||||||
|
|
||||||
Ok(Self {
|
Ok(Self {
|
||||||
inner,
|
inner,
|
||||||
config: header.into(),
|
header,
|
||||||
data_offset,
|
data_offset,
|
||||||
encryption_key,
|
encryption_key,
|
||||||
cursor: 0,
|
cursor: 0,
|
||||||
@@ -94,21 +39,22 @@ impl<R: Read + Seek> ChaChaReader<R> {
|
|||||||
fn fetch_chunk(&mut self, chunk_index: u64) -> Result<(), std::io::Error> {
|
fn fetch_chunk(&mut self, chunk_index: u64) -> Result<(), std::io::Error> {
|
||||||
use chacha20poly1305::{KeyInit, XChaCha20Poly1305, XNonce, aead::Aead};
|
use chacha20poly1305::{KeyInit, XChaCha20Poly1305, XNonce, aead::Aead};
|
||||||
|
|
||||||
let enc_start = self.data_offset + chunk_index * self.config.enc_chunk_size();
|
let enc_start = self.data_offset + chunk_index * self.header.config.enc_chunk_size();
|
||||||
self.inner.seek(SeekFrom::Start(enc_start))?;
|
self.inner.seek(SeekFrom::Start(enc_start))?;
|
||||||
|
|
||||||
let mut encrypted = vec![0u8; self.config.enc_chunk_size() as usize];
|
let mut encrypted = vec![0u8; self.header.config.enc_chunk_size() as usize];
|
||||||
let n = self.read_exact_or_eof(&mut encrypted)?;
|
let n = self.read_exact_or_eof(&mut encrypted)?;
|
||||||
encrypted.truncate(n);
|
encrypted.truncate(n);
|
||||||
|
|
||||||
if encrypted.len() < (self.config.nonce_size + self.config.tag_size) as usize {
|
if encrypted.len() < (self.header.config.nonce_size + self.header.config.tag_size) as usize
|
||||||
|
{
|
||||||
return Err(std::io::Error::new(
|
return Err(std::io::Error::new(
|
||||||
std::io::ErrorKind::InvalidData,
|
std::io::ErrorKind::InvalidData,
|
||||||
"encrypted chunk too short",
|
"encrypted chunk too short",
|
||||||
));
|
));
|
||||||
}
|
}
|
||||||
|
|
||||||
let (nonce_bytes, ciphertext) = encrypted.split_at(self.config.nonce_size as usize);
|
let (nonce_bytes, ciphertext) = encrypted.split_at(self.header.config.nonce_size as usize);
|
||||||
let nonce = XNonce::from_slice(nonce_bytes);
|
let nonce = XNonce::from_slice(nonce_bytes);
|
||||||
let key = chacha20poly1305::Key::from_slice(&self.encryption_key);
|
let key = chacha20poly1305::Key::from_slice(&self.encryption_key);
|
||||||
let cipher = XChaCha20Poly1305::new(key);
|
let cipher = XChaCha20Poly1305::new(key);
|
||||||
@@ -132,14 +78,14 @@ impl<R: Read + Seek> ChaChaReader<R> {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<R: Read + Seek + Send> AsyncReader for ChaChaReader<R> {
|
impl<R: Read + Seek + Send> AsyncReader for ChaChaReaderv1<R> {
|
||||||
async fn read(&mut self, buf: &mut [u8]) -> Result<usize, std::io::Error> {
|
async fn read(&mut self, buf: &mut [u8]) -> Result<usize, std::io::Error> {
|
||||||
let remaining = self.plaintext_size.saturating_sub(self.cursor);
|
let remaining = self.plaintext_size.saturating_sub(self.cursor);
|
||||||
if remaining == 0 || buf.is_empty() {
|
if remaining == 0 || buf.is_empty() {
|
||||||
return Ok(0);
|
return Ok(0);
|
||||||
}
|
}
|
||||||
|
|
||||||
let chunk_index = self.cursor / self.config.chunk_size;
|
let chunk_index = self.cursor / self.header.config.chunk_size;
|
||||||
|
|
||||||
let need_fetch = match &self.cached_chunk {
|
let need_fetch = match &self.cached_chunk {
|
||||||
None => true,
|
None => true,
|
||||||
@@ -153,7 +99,7 @@ impl<R: Read + Seek + Send> AsyncReader for ChaChaReader<R> {
|
|||||||
#[expect(clippy::unwrap_used)]
|
#[expect(clippy::unwrap_used)]
|
||||||
let (_, chunk_data) = self.cached_chunk.as_ref().unwrap();
|
let (_, chunk_data) = self.cached_chunk.as_ref().unwrap();
|
||||||
|
|
||||||
let offset_in_chunk = (self.cursor % self.config.chunk_size) as usize;
|
let offset_in_chunk = (self.cursor % self.header.config.chunk_size) as usize;
|
||||||
let available = chunk_data.len() - offset_in_chunk;
|
let available = chunk_data.len() - offset_in_chunk;
|
||||||
let to_copy = available.min(buf.len());
|
let to_copy = available.min(buf.len());
|
||||||
|
|
||||||
@@ -163,7 +109,7 @@ impl<R: Read + Seek + Send> AsyncReader for ChaChaReader<R> {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<R: Read + Seek + Send> AsyncSeekReader for ChaChaReader<R> {
|
impl<R: Read + Seek + Send> AsyncSeekReader for ChaChaReaderv1<R> {
|
||||||
async fn seek(&mut self, pos: SeekFrom) -> Result<u64, std::io::Error> {
|
async fn seek(&mut self, pos: SeekFrom) -> Result<u64, std::io::Error> {
|
||||||
match pos {
|
match pos {
|
||||||
SeekFrom::Start(x) => self.cursor = x.min(self.plaintext_size),
|
SeekFrom::Start(x) => self.cursor = x.min(self.plaintext_size),
|
||||||
@@ -1,10 +1,11 @@
|
|||||||
use std::io::SeekFrom;
|
use std::io::SeekFrom;
|
||||||
|
|
||||||
use crate::{AsyncReader, AsyncSeekReader, ChaChaHeader, ChaChaReaderConfig, HEADER_SIZE};
|
use crate::{AsyncReader, AsyncSeekReader, chacha::ChaChaHeaderv1};
|
||||||
|
|
||||||
pub struct ChaChaReaderAsync<R: AsyncSeekReader> {
|
pub struct ChaChaReaderv1Async<R: AsyncSeekReader> {
|
||||||
inner: R,
|
inner: R,
|
||||||
config: ChaChaReaderConfig,
|
header: ChaChaHeaderv1,
|
||||||
|
|
||||||
data_offset: u64,
|
data_offset: u64,
|
||||||
encryption_key: [u8; 32],
|
encryption_key: [u8; 32],
|
||||||
cursor: u64,
|
cursor: u64,
|
||||||
@@ -12,22 +13,22 @@ pub struct ChaChaReaderAsync<R: AsyncSeekReader> {
|
|||||||
cached_chunk: Option<(u64, Vec<u8>)>,
|
cached_chunk: Option<(u64, Vec<u8>)>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<R: AsyncSeekReader> ChaChaReaderAsync<R> {
|
impl<R: AsyncSeekReader> ChaChaReaderv1Async<R> {
|
||||||
pub async fn new(mut inner: R, encryption_key: [u8; 32]) -> Result<Self, std::io::Error> {
|
pub async fn new(mut inner: R, encryption_key: [u8; 32]) -> Result<Self, std::io::Error> {
|
||||||
use binrw::BinReaderExt;
|
use binrw::BinReaderExt;
|
||||||
use std::io::Cursor;
|
use std::io::Cursor;
|
||||||
|
|
||||||
inner.seek(SeekFrom::Start(0)).await?;
|
inner.seek(SeekFrom::Start(0)).await?;
|
||||||
let mut buf = [0u8; HEADER_SIZE];
|
let mut buf = [0u8; ChaChaHeaderv1::SIZE];
|
||||||
read_exact(&mut inner, &mut buf).await?;
|
read_exact(&mut inner, &mut buf).await?;
|
||||||
let header: ChaChaHeader = Cursor::new(&buf[..])
|
let header: ChaChaHeaderv1 = Cursor::new(&buf[..])
|
||||||
.read_le()
|
.read_le()
|
||||||
.map_err(std::io::Error::other)?;
|
.map_err(std::io::Error::other)?;
|
||||||
|
|
||||||
Ok(Self {
|
Ok(Self {
|
||||||
inner,
|
inner,
|
||||||
config: header.into(),
|
header,
|
||||||
data_offset: HEADER_SIZE as u64,
|
data_offset: buf.len() as u64,
|
||||||
encryption_key,
|
encryption_key,
|
||||||
cursor: 0,
|
cursor: 0,
|
||||||
plaintext_size: header.plaintext_size,
|
plaintext_size: header.plaintext_size,
|
||||||
@@ -38,21 +39,22 @@ impl<R: AsyncSeekReader> ChaChaReaderAsync<R> {
|
|||||||
async fn fetch_chunk(&mut self, chunk_index: u64) -> Result<(), std::io::Error> {
|
async fn fetch_chunk(&mut self, chunk_index: u64) -> Result<(), std::io::Error> {
|
||||||
use chacha20poly1305::{KeyInit, XChaCha20Poly1305, XNonce, aead::Aead};
|
use chacha20poly1305::{KeyInit, XChaCha20Poly1305, XNonce, aead::Aead};
|
||||||
|
|
||||||
let enc_start = self.data_offset + chunk_index * self.config.enc_chunk_size();
|
let enc_start = self.data_offset + chunk_index * self.header.config.enc_chunk_size();
|
||||||
self.inner.seek(SeekFrom::Start(enc_start)).await?;
|
self.inner.seek(SeekFrom::Start(enc_start)).await?;
|
||||||
|
|
||||||
let mut encrypted = vec![0u8; self.config.enc_chunk_size() as usize];
|
let mut encrypted = vec![0u8; self.header.config.enc_chunk_size() as usize];
|
||||||
let n = read_exact_or_eof(&mut self.inner, &mut encrypted).await?;
|
let n = read_exact_or_eof(&mut self.inner, &mut encrypted).await?;
|
||||||
encrypted.truncate(n);
|
encrypted.truncate(n);
|
||||||
|
|
||||||
if encrypted.len() < (self.config.nonce_size + self.config.tag_size) as usize {
|
if encrypted.len() < (self.header.config.nonce_size + self.header.config.tag_size) as usize
|
||||||
|
{
|
||||||
return Err(std::io::Error::new(
|
return Err(std::io::Error::new(
|
||||||
std::io::ErrorKind::InvalidData,
|
std::io::ErrorKind::InvalidData,
|
||||||
"encrypted chunk too short",
|
"encrypted chunk too short",
|
||||||
));
|
));
|
||||||
}
|
}
|
||||||
|
|
||||||
let (nonce_bytes, ciphertext) = encrypted.split_at(self.config.nonce_size as usize);
|
let (nonce_bytes, ciphertext) = encrypted.split_at(self.header.config.nonce_size as usize);
|
||||||
let nonce = XNonce::from_slice(nonce_bytes);
|
let nonce = XNonce::from_slice(nonce_bytes);
|
||||||
let key = chacha20poly1305::Key::from_slice(&self.encryption_key);
|
let key = chacha20poly1305::Key::from_slice(&self.encryption_key);
|
||||||
let cipher = XChaCha20Poly1305::new(key);
|
let cipher = XChaCha20Poly1305::new(key);
|
||||||
@@ -90,14 +92,14 @@ async fn read_exact_or_eof<R: AsyncReader>(
|
|||||||
Ok(total)
|
Ok(total)
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<R: AsyncSeekReader> AsyncReader for ChaChaReaderAsync<R> {
|
impl<R: AsyncSeekReader> AsyncReader for ChaChaReaderv1Async<R> {
|
||||||
async fn read(&mut self, buf: &mut [u8]) -> Result<usize, std::io::Error> {
|
async fn read(&mut self, buf: &mut [u8]) -> Result<usize, std::io::Error> {
|
||||||
let remaining = self.plaintext_size.saturating_sub(self.cursor);
|
let remaining = self.plaintext_size.saturating_sub(self.cursor);
|
||||||
if remaining == 0 || buf.is_empty() {
|
if remaining == 0 || buf.is_empty() {
|
||||||
return Ok(0);
|
return Ok(0);
|
||||||
}
|
}
|
||||||
|
|
||||||
let chunk_index = self.cursor / self.config.chunk_size;
|
let chunk_index = self.cursor / self.header.config.chunk_size;
|
||||||
|
|
||||||
let need_fetch = match &self.cached_chunk {
|
let need_fetch = match &self.cached_chunk {
|
||||||
None => true,
|
None => true,
|
||||||
@@ -111,7 +113,7 @@ impl<R: AsyncSeekReader> AsyncReader for ChaChaReaderAsync<R> {
|
|||||||
#[expect(clippy::unwrap_used)]
|
#[expect(clippy::unwrap_used)]
|
||||||
let (_, chunk_data) = self.cached_chunk.as_ref().unwrap();
|
let (_, chunk_data) = self.cached_chunk.as_ref().unwrap();
|
||||||
|
|
||||||
let offset_in_chunk = (self.cursor % self.config.chunk_size) as usize;
|
let offset_in_chunk = (self.cursor % self.header.config.chunk_size) as usize;
|
||||||
let available = chunk_data.len() - offset_in_chunk;
|
let available = chunk_data.len() - offset_in_chunk;
|
||||||
let to_copy = available.min(buf.len());
|
let to_copy = available.min(buf.len());
|
||||||
|
|
||||||
@@ -121,7 +123,7 @@ impl<R: AsyncSeekReader> AsyncReader for ChaChaReaderAsync<R> {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<R: AsyncSeekReader> AsyncSeekReader for ChaChaReaderAsync<R> {
|
impl<R: AsyncSeekReader> AsyncSeekReader for ChaChaReaderv1Async<R> {
|
||||||
async fn seek(&mut self, pos: SeekFrom) -> Result<u64, std::io::Error> {
|
async fn seek(&mut self, pos: SeekFrom) -> Result<u64, std::io::Error> {
|
||||||
match pos {
|
match pos {
|
||||||
SeekFrom::Start(x) => self.cursor = x.min(self.plaintext_size),
|
SeekFrom::Start(x) => self.cursor = x.min(self.plaintext_size),
|
||||||
@@ -2,11 +2,12 @@ use std::io::SeekFrom;
|
|||||||
|
|
||||||
use tokio::io::{AsyncSeek, AsyncSeekExt, AsyncWrite, AsyncWriteExt};
|
use tokio::io::{AsyncSeek, AsyncSeekExt, AsyncWrite, AsyncWriteExt};
|
||||||
|
|
||||||
use crate::{ChaChaHeader, ChaChaReaderConfig};
|
use crate::chacha::{ChaChaConfigv1, ChaChaHeaderv1};
|
||||||
|
|
||||||
pub struct ChaChaWriterAsync<W: AsyncWrite + AsyncSeek + Unpin + Send> {
|
pub struct ChaChaWriterAsync<W: AsyncWrite + AsyncSeek + Unpin + Send> {
|
||||||
inner: W,
|
inner: W,
|
||||||
config: ChaChaReaderConfig,
|
header: ChaChaHeaderv1,
|
||||||
|
|
||||||
encryption_key: [u8; 32],
|
encryption_key: [u8; 32],
|
||||||
buffer: Vec<u8>,
|
buffer: Vec<u8>,
|
||||||
plaintext_bytes_written: u64,
|
plaintext_bytes_written: u64,
|
||||||
@@ -14,18 +15,15 @@ pub struct ChaChaWriterAsync<W: AsyncWrite + AsyncSeek + Unpin + Send> {
|
|||||||
|
|
||||||
impl<W: AsyncWrite + AsyncSeek + Unpin + Send> ChaChaWriterAsync<W> {
|
impl<W: AsyncWrite + AsyncSeek + Unpin + Send> ChaChaWriterAsync<W> {
|
||||||
pub async fn new(mut inner: W, encryption_key: [u8; 32]) -> Result<Self, std::io::Error> {
|
pub async fn new(mut inner: W, encryption_key: [u8; 32]) -> Result<Self, std::io::Error> {
|
||||||
let config = ChaChaReaderConfig::default();
|
let header = ChaChaHeaderv1 {
|
||||||
let header_bytes = serialize_header(ChaChaHeader {
|
config: ChaChaConfigv1::default(),
|
||||||
chunk_size: config.chunk_size,
|
|
||||||
nonce_size: config.nonce_size,
|
|
||||||
tag_size: config.tag_size,
|
|
||||||
plaintext_size: 0,
|
plaintext_size: 0,
|
||||||
})?;
|
};
|
||||||
inner.write_all(&header_bytes).await?;
|
inner.write_all(&serialize_header(header)?).await?;
|
||||||
|
|
||||||
Ok(Self {
|
Ok(Self {
|
||||||
inner,
|
inner,
|
||||||
config,
|
header,
|
||||||
encryption_key,
|
encryption_key,
|
||||||
buffer: Vec::new(),
|
buffer: Vec::new(),
|
||||||
plaintext_bytes_written: 0,
|
plaintext_bytes_written: 0,
|
||||||
@@ -36,7 +34,7 @@ impl<W: AsyncWrite + AsyncSeek + Unpin + Send> ChaChaWriterAsync<W> {
|
|||||||
self.buffer.extend_from_slice(buf);
|
self.buffer.extend_from_slice(buf);
|
||||||
self.plaintext_bytes_written += buf.len() as u64;
|
self.plaintext_bytes_written += buf.len() as u64;
|
||||||
|
|
||||||
let chunk_size = self.config.chunk_size as usize;
|
let chunk_size = self.header.config.chunk_size as usize;
|
||||||
while self.buffer.len() >= chunk_size {
|
while self.buffer.len() >= chunk_size {
|
||||||
let encrypted = encrypt_chunk(&self.encryption_key, &self.buffer[..chunk_size])?;
|
let encrypted = encrypt_chunk(&self.encryption_key, &self.buffer[..chunk_size])?;
|
||||||
self.inner.write_all(&encrypted).await?;
|
self.inner.write_all(&encrypted).await?;
|
||||||
@@ -55,10 +53,8 @@ impl<W: AsyncWrite + AsyncSeek + Unpin + Send> ChaChaWriterAsync<W> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
self.inner.seek(SeekFrom::Start(0)).await?;
|
self.inner.seek(SeekFrom::Start(0)).await?;
|
||||||
let header_bytes = serialize_header(ChaChaHeader {
|
let header_bytes = serialize_header(ChaChaHeaderv1 {
|
||||||
chunk_size: self.config.chunk_size,
|
config: self.header.config,
|
||||||
nonce_size: self.config.nonce_size,
|
|
||||||
tag_size: self.config.tag_size,
|
|
||||||
plaintext_size: self.plaintext_bytes_written,
|
plaintext_size: self.plaintext_bytes_written,
|
||||||
})?;
|
})?;
|
||||||
self.inner.write_all(&header_bytes).await?;
|
self.inner.write_all(&header_bytes).await?;
|
||||||
@@ -85,7 +81,7 @@ fn encrypt_chunk(key: &[u8; 32], plaintext: &[u8]) -> Result<Vec<u8>, std::io::E
|
|||||||
Ok(output)
|
Ok(output)
|
||||||
}
|
}
|
||||||
|
|
||||||
fn serialize_header(header: ChaChaHeader) -> Result<Vec<u8>, std::io::Error> {
|
fn serialize_header(header: ChaChaHeaderv1) -> Result<Vec<u8>, std::io::Error> {
|
||||||
use binrw::BinWriterExt;
|
use binrw::BinWriterExt;
|
||||||
use std::io::Cursor;
|
use std::io::Cursor;
|
||||||
|
|
||||||
@@ -1,6 +1,6 @@
|
|||||||
use std::io::{Seek, SeekFrom, Write};
|
use std::io::{Seek, SeekFrom, Write};
|
||||||
|
|
||||||
use crate::{ChaChaHeader, ChaChaReaderConfig};
|
use crate::chacha::{ChaChaConfigv1, ChaChaHeaderv1};
|
||||||
|
|
||||||
/// Generate a random 32-byte encryption key suitable for use with [`ChaChaWriter`].
|
/// Generate a random 32-byte encryption key suitable for use with [`ChaChaWriter`].
|
||||||
pub fn generate_key() -> [u8; 32] {
|
pub fn generate_key() -> [u8; 32] {
|
||||||
@@ -9,30 +9,28 @@ pub fn generate_key() -> [u8; 32] {
|
|||||||
XChaCha20Poly1305::generate_key(&mut OsRng).into()
|
XChaCha20Poly1305::generate_key(&mut OsRng).into()
|
||||||
}
|
}
|
||||||
|
|
||||||
pub struct ChaChaWriter<W: Write + Seek> {
|
pub struct ChaChaWriterv1<W: Write + Seek> {
|
||||||
inner: W,
|
inner: W,
|
||||||
config: ChaChaReaderConfig,
|
header: ChaChaHeaderv1,
|
||||||
|
|
||||||
encryption_key: [u8; 32],
|
encryption_key: [u8; 32],
|
||||||
buffer: Vec<u8>,
|
buffer: Vec<u8>,
|
||||||
plaintext_bytes_written: u64,
|
plaintext_bytes_written: u64,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<W: Write + Seek> ChaChaWriter<W> {
|
impl<W: Write + Seek> ChaChaWriterv1<W> {
|
||||||
pub fn new(mut inner: W, encryption_key: [u8; 32]) -> Result<Self, std::io::Error> {
|
pub fn new(mut inner: W, encryption_key: [u8; 32]) -> Result<Self, std::io::Error> {
|
||||||
use binrw::BinWriterExt;
|
use binrw::BinWriterExt;
|
||||||
|
|
||||||
let config = ChaChaReaderConfig::default();
|
let header = ChaChaHeaderv1 {
|
||||||
let header = ChaChaHeader {
|
config: ChaChaConfigv1::default(),
|
||||||
chunk_size: config.chunk_size,
|
|
||||||
nonce_size: config.nonce_size,
|
|
||||||
tag_size: config.tag_size,
|
|
||||||
plaintext_size: 0,
|
plaintext_size: 0,
|
||||||
};
|
};
|
||||||
inner.write_le(&header).map_err(std::io::Error::other)?;
|
inner.write_le(&header).map_err(std::io::Error::other)?;
|
||||||
|
|
||||||
Ok(Self {
|
Ok(Self {
|
||||||
inner,
|
inner,
|
||||||
config,
|
header,
|
||||||
encryption_key,
|
encryption_key,
|
||||||
buffer: Vec::new(),
|
buffer: Vec::new(),
|
||||||
plaintext_bytes_written: 0,
|
plaintext_bytes_written: 0,
|
||||||
@@ -47,10 +45,8 @@ impl<W: Write + Seek> ChaChaWriter<W> {
|
|||||||
self.flush_buffer()?;
|
self.flush_buffer()?;
|
||||||
|
|
||||||
self.inner.seek(SeekFrom::Start(0))?;
|
self.inner.seek(SeekFrom::Start(0))?;
|
||||||
let header = ChaChaHeader {
|
let header = ChaChaHeaderv1 {
|
||||||
chunk_size: self.config.chunk_size,
|
config: self.header.config,
|
||||||
nonce_size: self.config.nonce_size,
|
|
||||||
tag_size: self.config.tag_size,
|
|
||||||
plaintext_size: self.plaintext_bytes_written,
|
plaintext_size: self.plaintext_bytes_written,
|
||||||
};
|
};
|
||||||
self.inner
|
self.inner
|
||||||
@@ -89,12 +85,12 @@ impl<W: Write + Seek> ChaChaWriter<W> {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<W: Write + Seek> Write for ChaChaWriter<W> {
|
impl<W: Write + Seek> Write for ChaChaWriterv1<W> {
|
||||||
fn write(&mut self, buf: &[u8]) -> Result<usize, std::io::Error> {
|
fn write(&mut self, buf: &[u8]) -> Result<usize, std::io::Error> {
|
||||||
self.buffer.extend_from_slice(buf);
|
self.buffer.extend_from_slice(buf);
|
||||||
self.plaintext_bytes_written += buf.len() as u64;
|
self.plaintext_bytes_written += buf.len() as u64;
|
||||||
|
|
||||||
let chunk_size = self.config.chunk_size as usize;
|
let chunk_size = self.header.config.chunk_size as usize;
|
||||||
while self.buffer.len() >= chunk_size {
|
while self.buffer.len() >= chunk_size {
|
||||||
let encrypted = self.encrypt_chunk(&self.buffer[..chunk_size])?;
|
let encrypted = self.encrypt_chunk(&self.buffer[..chunk_size])?;
|
||||||
self.inner.write_all(&encrypted)?;
|
self.inner.write_all(&encrypted)?;
|
||||||
@@ -120,13 +116,13 @@ impl<W: Write + Seek> Write for ChaChaWriter<W> {
|
|||||||
mod tests {
|
mod tests {
|
||||||
use std::io::{Cursor, SeekFrom, Write};
|
use std::io::{Cursor, SeekFrom, Write};
|
||||||
|
|
||||||
use super::ChaChaWriter;
|
use super::ChaChaWriterv1;
|
||||||
use crate::{AsyncReader, AsyncSeekReader, ChaChaReader};
|
use crate::{AsyncReader, AsyncSeekReader, chacha::ChaChaReaderv1};
|
||||||
|
|
||||||
const KEY: [u8; 32] = [42u8; 32];
|
const KEY: [u8; 32] = [42u8; 32];
|
||||||
|
|
||||||
fn encrypt(data: &[u8]) -> Cursor<Vec<u8>> {
|
fn encrypt(data: &[u8]) -> Cursor<Vec<u8>> {
|
||||||
let mut writer = ChaChaWriter::new(Cursor::new(Vec::new()), KEY).unwrap();
|
let mut writer = ChaChaWriterv1::new(Cursor::new(Vec::new()), KEY).unwrap();
|
||||||
writer.write_all(data).unwrap();
|
writer.write_all(data).unwrap();
|
||||||
let mut buf = writer.finish().unwrap();
|
let mut buf = writer.finish().unwrap();
|
||||||
buf.set_position(0);
|
buf.set_position(0);
|
||||||
@@ -134,7 +130,7 @@ mod tests {
|
|||||||
}
|
}
|
||||||
|
|
||||||
async fn decrypt_all(buf: Cursor<Vec<u8>>) -> Vec<u8> {
|
async fn decrypt_all(buf: Cursor<Vec<u8>>) -> Vec<u8> {
|
||||||
let mut reader = ChaChaReader::new(buf, KEY).unwrap();
|
let mut reader = ChaChaReaderv1::new(buf, KEY).unwrap();
|
||||||
reader.read_to_end().await.unwrap()
|
reader.read_to_end().await.unwrap()
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -169,7 +165,7 @@ mod tests {
|
|||||||
async fn roundtrip_incremental_writes() {
|
async fn roundtrip_incremental_writes() {
|
||||||
// Write one byte at a time
|
// Write one byte at a time
|
||||||
let data: Vec<u8> = (0u8..200).collect();
|
let data: Vec<u8> = (0u8..200).collect();
|
||||||
let mut writer = ChaChaWriter::new(Cursor::new(Vec::new()), KEY).unwrap();
|
let mut writer = ChaChaWriterv1::new(Cursor::new(Vec::new()), KEY).unwrap();
|
||||||
for byte in &data {
|
for byte in &data {
|
||||||
writer.write_all(&[*byte]).unwrap();
|
writer.write_all(&[*byte]).unwrap();
|
||||||
}
|
}
|
||||||
@@ -181,7 +177,7 @@ mod tests {
|
|||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
async fn wrong_key_fails() {
|
async fn wrong_key_fails() {
|
||||||
let buf = encrypt(b"secret data");
|
let buf = encrypt(b"secret data");
|
||||||
let mut reader = ChaChaReader::new(buf, [0u8; 32]).unwrap();
|
let mut reader = ChaChaReaderv1::new(buf, [0u8; 32]).unwrap();
|
||||||
assert!(reader.read_to_end().await.is_err());
|
assert!(reader.read_to_end().await.is_err());
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -191,13 +187,13 @@ mod tests {
|
|||||||
let mut buf = encrypt(b"data");
|
let mut buf = encrypt(b"data");
|
||||||
buf.get_mut()[0] = 0xFF;
|
buf.get_mut()[0] = 0xFF;
|
||||||
buf.set_position(0);
|
buf.set_position(0);
|
||||||
assert!(ChaChaReader::new(buf, KEY).is_err());
|
assert!(ChaChaReaderv1::new(buf, KEY).is_err());
|
||||||
}
|
}
|
||||||
|
|
||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
async fn seek_from_start() {
|
async fn seek_from_start() {
|
||||||
let data: Vec<u8> = (0u8..100).collect();
|
let data: Vec<u8> = (0u8..100).collect();
|
||||||
let mut reader = ChaChaReader::new(encrypt(&data), KEY).unwrap();
|
let mut reader = ChaChaReaderv1::new(encrypt(&data), KEY).unwrap();
|
||||||
|
|
||||||
reader.seek(SeekFrom::Start(50)).await.unwrap();
|
reader.seek(SeekFrom::Start(50)).await.unwrap();
|
||||||
let mut buf = [0u8; 10];
|
let mut buf = [0u8; 10];
|
||||||
@@ -211,7 +207,7 @@ mod tests {
|
|||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
async fn seek_from_end() {
|
async fn seek_from_end() {
|
||||||
let data: Vec<u8> = (0u8..100).collect();
|
let data: Vec<u8> = (0u8..100).collect();
|
||||||
let mut reader = ChaChaReader::new(encrypt(&data), KEY).unwrap();
|
let mut reader = ChaChaReaderv1::new(encrypt(&data), KEY).unwrap();
|
||||||
|
|
||||||
reader.seek(SeekFrom::End(-10)).await.unwrap();
|
reader.seek(SeekFrom::End(-10)).await.unwrap();
|
||||||
assert_eq!(reader.read_to_end().await.unwrap(), &data[90..]);
|
assert_eq!(reader.read_to_end().await.unwrap(), &data[90..]);
|
||||||
@@ -221,7 +217,7 @@ mod tests {
|
|||||||
async fn seek_across_chunk_boundary() {
|
async fn seek_across_chunk_boundary() {
|
||||||
// Seek to 6 bytes before the end of chunk 0, read 12 bytes spanning into chunk 1
|
// Seek to 6 bytes before the end of chunk 0, read 12 bytes spanning into chunk 1
|
||||||
let data: Vec<u8> = (0u8..=255).cycle().take(65536 + 500).collect();
|
let data: Vec<u8> = (0u8..=255).cycle().take(65536 + 500).collect();
|
||||||
let mut reader = ChaChaReader::new(encrypt(&data), KEY).unwrap();
|
let mut reader = ChaChaReaderv1::new(encrypt(&data), KEY).unwrap();
|
||||||
|
|
||||||
reader.seek(SeekFrom::Start(65530)).await.unwrap();
|
reader.seek(SeekFrom::Start(65530)).await.unwrap();
|
||||||
let mut buf = vec![0u8; 12];
|
let mut buf = vec![0u8; 12];
|
||||||
@@ -235,7 +231,7 @@ mod tests {
|
|||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
async fn seek_current() {
|
async fn seek_current() {
|
||||||
let data: Vec<u8> = (0u8..=255).cycle().take(200).collect();
|
let data: Vec<u8> = (0u8..=255).cycle().take(200).collect();
|
||||||
let mut reader = ChaChaReader::new(encrypt(&data), KEY).unwrap();
|
let mut reader = ChaChaReaderv1::new(encrypt(&data), KEY).unwrap();
|
||||||
|
|
||||||
// Read 10, seek back 5, read 5 — should get bytes 5..10
|
// Read 10, seek back 5, read 5 — should get bytes 5..10
|
||||||
let mut first = [0u8; 10];
|
let mut first = [0u8; 10];
|
||||||
@@ -255,7 +251,7 @@ mod tests {
|
|||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
async fn seek_past_end_clamps() {
|
async fn seek_past_end_clamps() {
|
||||||
let data = b"hello";
|
let data = b"hello";
|
||||||
let mut reader = ChaChaReader::new(encrypt(data), KEY).unwrap();
|
let mut reader = ChaChaReaderv1::new(encrypt(data), KEY).unwrap();
|
||||||
|
|
||||||
let pos = reader.seek(SeekFrom::Start(9999)).await.unwrap();
|
let pos = reader.seek(SeekFrom::Start(9999)).await.unwrap();
|
||||||
assert_eq!(pos, data.len() as u64);
|
assert_eq!(pos, data.len() as u64);
|
||||||
@@ -4,14 +4,4 @@ pub use asyncreader::*;
|
|||||||
mod s3reader;
|
mod s3reader;
|
||||||
pub use s3reader::*;
|
pub use s3reader::*;
|
||||||
|
|
||||||
mod chachareader;
|
pub mod chacha;
|
||||||
pub use chachareader::*;
|
|
||||||
|
|
||||||
mod chachawriter;
|
|
||||||
pub use chachawriter::*;
|
|
||||||
|
|
||||||
mod chachareader_async;
|
|
||||||
pub use chachareader_async::*;
|
|
||||||
|
|
||||||
mod chachawriter_async;
|
|
||||||
pub use chachawriter_async::*;
|
|
||||||
|
|||||||
@@ -1,10 +1,102 @@
|
|||||||
|
use aws_sdk_s3::config::{BehaviorVersion, Credentials, Region};
|
||||||
use smartstring::{LazyCompact, SmartString};
|
use smartstring::{LazyCompact, SmartString};
|
||||||
use std::{io::SeekFrom, sync::Arc};
|
use std::{fmt::Debug, io::SeekFrom, sync::Arc};
|
||||||
|
|
||||||
use crate::{AsyncReader, AsyncSeekReader};
|
use crate::{AsyncReader, AsyncSeekReader};
|
||||||
|
|
||||||
|
//
|
||||||
|
// MARK: client
|
||||||
|
//
|
||||||
|
|
||||||
|
/// An interface to an S3 bucket.
|
||||||
|
///
|
||||||
|
/// TODO: S3 is slow and expensive. Ideally, we'll have this struct cache data
|
||||||
|
/// so we don't have to download anything twice. This is, however, complicated,
|
||||||
|
/// and doesn't fully solve the "expensive" problem.
|
||||||
|
pub struct S3Client {
|
||||||
|
pub client: aws_sdk_s3::Client,
|
||||||
|
bucket: SmartString<LazyCompact>,
|
||||||
|
|
||||||
|
/// maximum number of bytes to use for cached data
|
||||||
|
cache_limit_bytes: usize,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Debug for S3Client {
|
||||||
|
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||||
|
f.debug_struct("S3Client")
|
||||||
|
.field("bucket", &self.bucket)
|
||||||
|
.field("cache_limit_bytes", &self.cache_limit_bytes)
|
||||||
|
.finish()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl S3Client {
|
||||||
|
pub async fn new(
|
||||||
|
bucket: &str,
|
||||||
|
endpoint: Option<&str>,
|
||||||
|
region: &str,
|
||||||
|
access_key_id: &str,
|
||||||
|
secret_access_key: &str,
|
||||||
|
cache_limit_bytes: usize,
|
||||||
|
) -> Arc<Self> {
|
||||||
|
let client = {
|
||||||
|
let mut s3_config = aws_sdk_s3::config::Builder::new()
|
||||||
|
.behavior_version(BehaviorVersion::latest())
|
||||||
|
.region(Region::new(region.to_owned()))
|
||||||
|
.credentials_provider(Credentials::new(
|
||||||
|
access_key_id,
|
||||||
|
secret_access_key,
|
||||||
|
None,
|
||||||
|
None,
|
||||||
|
"pile",
|
||||||
|
));
|
||||||
|
|
||||||
|
if let Some(ep) = endpoint {
|
||||||
|
s3_config = s3_config.endpoint_url(ep).force_path_style(true);
|
||||||
|
}
|
||||||
|
|
||||||
|
aws_sdk_s3::Client::from_conf(s3_config.build())
|
||||||
|
};
|
||||||
|
|
||||||
|
return Arc::new(Self {
|
||||||
|
bucket: bucket.into(),
|
||||||
|
client,
|
||||||
|
cache_limit_bytes,
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn bucket(&self) -> &str {
|
||||||
|
&self.bucket
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn get(self: &Arc<Self>, key: &str) -> Result<S3Reader, std::io::Error> {
|
||||||
|
let head = self
|
||||||
|
.client
|
||||||
|
.head_object()
|
||||||
|
.bucket(self.bucket.as_str())
|
||||||
|
.key(key)
|
||||||
|
.send()
|
||||||
|
.await
|
||||||
|
.map_err(std::io::Error::other)?;
|
||||||
|
|
||||||
|
let size = head.content_length().unwrap_or(0) as u64;
|
||||||
|
|
||||||
|
Ok(S3Reader {
|
||||||
|
client: self.clone(),
|
||||||
|
bucket: self.bucket.clone(),
|
||||||
|
key: key.into(),
|
||||||
|
cursor: 0,
|
||||||
|
size,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
//
|
||||||
|
// MARK: reader
|
||||||
|
//
|
||||||
|
|
||||||
pub struct S3Reader {
|
pub struct S3Reader {
|
||||||
pub client: Arc<aws_sdk_s3::Client>,
|
pub client: Arc<S3Client>,
|
||||||
pub bucket: SmartString<LazyCompact>,
|
pub bucket: SmartString<LazyCompact>,
|
||||||
pub key: SmartString<LazyCompact>,
|
pub key: SmartString<LazyCompact>,
|
||||||
pub cursor: u64,
|
pub cursor: u64,
|
||||||
@@ -23,6 +115,7 @@ impl AsyncReader for S3Reader {
|
|||||||
let end_byte = start_byte + len_to_read - 1;
|
let end_byte = start_byte + len_to_read - 1;
|
||||||
|
|
||||||
let resp = self
|
let resp = self
|
||||||
|
.client
|
||||||
.client
|
.client
|
||||||
.get_object()
|
.get_object()
|
||||||
.bucket(self.bucket.as_str())
|
.bucket(self.bucket.as_str())
|
||||||
|
|||||||
@@ -31,7 +31,6 @@ image = { workspace = true, optional = true }
|
|||||||
id3 = { workspace = true }
|
id3 = { workspace = true }
|
||||||
tokio = { workspace = true }
|
tokio = { workspace = true }
|
||||||
async-trait = { workspace = true }
|
async-trait = { workspace = true }
|
||||||
aws-sdk-s3 = { workspace = true }
|
|
||||||
mime = { workspace = true }
|
mime = { workspace = true }
|
||||||
mime_guess = { workspace = true }
|
mime_guess = { workspace = true }
|
||||||
|
|
||||||
|
|||||||
@@ -5,7 +5,7 @@ use pile_config::{
|
|||||||
};
|
};
|
||||||
use smartstring::{LazyCompact, SmartString};
|
use smartstring::{LazyCompact, SmartString};
|
||||||
use std::{
|
use std::{
|
||||||
collections::{HashMap, HashSet},
|
collections::{BTreeMap, HashMap, HashSet},
|
||||||
path::PathBuf,
|
path::PathBuf,
|
||||||
sync::{Arc, OnceLock},
|
sync::{Arc, OnceLock},
|
||||||
};
|
};
|
||||||
@@ -22,7 +22,7 @@ pub struct DirDataSource {
|
|||||||
pub name: Label,
|
pub name: Label,
|
||||||
pub dir: PathBuf,
|
pub dir: PathBuf,
|
||||||
pub pattern: GroupPattern,
|
pub pattern: GroupPattern,
|
||||||
pub index: OnceLock<HashMap<SmartString<LazyCompact>, Item>>,
|
pub index: OnceLock<BTreeMap<SmartString<LazyCompact>, Item>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl DirDataSource {
|
impl DirDataSource {
|
||||||
@@ -73,7 +73,7 @@ impl DirDataSource {
|
|||||||
// MARK: resolve groups
|
// MARK: resolve groups
|
||||||
//
|
//
|
||||||
|
|
||||||
let mut index = HashMap::new();
|
let mut index = BTreeMap::new();
|
||||||
'entry: for path in paths_items.difference(&paths_grouped_items) {
|
'entry: for path in paths_items.difference(&paths_grouped_items) {
|
||||||
let path_str = match path.to_str() {
|
let path_str = match path.to_str() {
|
||||||
Some(x) => x,
|
Some(x) => x,
|
||||||
|
|||||||
@@ -17,9 +17,18 @@ pub trait DataSource {
|
|||||||
key: &str,
|
key: &str,
|
||||||
) -> impl Future<Output = Result<Option<crate::value::Item>, std::io::Error>> + Send;
|
) -> impl Future<Output = Result<Option<crate::value::Item>, std::io::Error>> + Send;
|
||||||
|
|
||||||
/// Iterate over all items in this source in an arbitrary order
|
/// Iterate over all items in this source in sorted key order
|
||||||
fn iter(&self) -> impl Iterator<Item = &crate::value::Item>;
|
fn iter(&self) -> impl Iterator<Item = &crate::value::Item>;
|
||||||
|
|
||||||
|
/// Iterate over a page of items, sorted by key
|
||||||
|
fn iter_page(
|
||||||
|
&self,
|
||||||
|
offset: usize,
|
||||||
|
limit: usize,
|
||||||
|
) -> impl Iterator<Item = &crate::value::Item> {
|
||||||
|
self.iter().skip(offset).take(limit)
|
||||||
|
}
|
||||||
|
|
||||||
/// Return the time of the latest change to the data in this source
|
/// Return the time of the latest change to the data in this source
|
||||||
fn latest_change(
|
fn latest_change(
|
||||||
&self,
|
&self,
|
||||||
|
|||||||
@@ -1,12 +1,12 @@
|
|||||||
use aws_sdk_s3::config::{BehaviorVersion, Credentials, Region};
|
|
||||||
use chrono::{DateTime, Utc};
|
use chrono::{DateTime, Utc};
|
||||||
use pile_config::{
|
use pile_config::{
|
||||||
Label, S3Credentials,
|
Label,
|
||||||
pattern::{GroupPattern, GroupSegment},
|
pattern::{GroupPattern, GroupSegment},
|
||||||
};
|
};
|
||||||
|
use pile_io::S3Client;
|
||||||
use smartstring::{LazyCompact, SmartString};
|
use smartstring::{LazyCompact, SmartString};
|
||||||
use std::{
|
use std::{
|
||||||
collections::{HashMap, HashSet},
|
collections::{BTreeMap, HashMap, HashSet},
|
||||||
sync::{Arc, OnceLock},
|
sync::{Arc, OnceLock},
|
||||||
};
|
};
|
||||||
|
|
||||||
@@ -19,51 +19,41 @@ use crate::{
|
|||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
pub struct S3DataSource {
|
pub struct S3DataSource {
|
||||||
pub name: Label,
|
pub name: Label,
|
||||||
pub bucket: SmartString<LazyCompact>,
|
pub client: Arc<S3Client>,
|
||||||
|
|
||||||
pub prefix: Option<SmartString<LazyCompact>>,
|
pub prefix: Option<SmartString<LazyCompact>>,
|
||||||
pub client: Arc<aws_sdk_s3::Client>,
|
|
||||||
pub pattern: GroupPattern,
|
pub pattern: GroupPattern,
|
||||||
pub encryption_key: Option<[u8; 32]>,
|
pub encryption_key: Option<[u8; 32]>,
|
||||||
pub index: OnceLock<HashMap<SmartString<LazyCompact>, Item>>,
|
pub index: OnceLock<BTreeMap<SmartString<LazyCompact>, Item>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl S3DataSource {
|
impl S3DataSource {
|
||||||
pub async fn new(
|
pub async fn new(
|
||||||
name: &Label,
|
name: &Label,
|
||||||
bucket: String,
|
bucket: &str,
|
||||||
prefix: Option<String>,
|
prefix: Option<&str>,
|
||||||
endpoint: Option<String>,
|
endpoint: Option<&str>,
|
||||||
region: String,
|
region: &str,
|
||||||
credentials: &S3Credentials,
|
access_key_id: &str,
|
||||||
|
secret_access_key: &str,
|
||||||
|
cache_limit_bytes: usize,
|
||||||
pattern: GroupPattern,
|
pattern: GroupPattern,
|
||||||
encryption_key: Option<[u8; 32]>,
|
encryption_key: Option<[u8; 32]>,
|
||||||
) -> Result<Arc<Self>, std::io::Error> {
|
) -> Result<Arc<Self>, std::io::Error> {
|
||||||
let client = {
|
let client = S3Client::new(
|
||||||
let creds = Credentials::new(
|
bucket,
|
||||||
&credentials.access_key_id,
|
endpoint,
|
||||||
&credentials.secret_access_key,
|
region,
|
||||||
None,
|
access_key_id,
|
||||||
None,
|
secret_access_key,
|
||||||
"pile",
|
cache_limit_bytes,
|
||||||
);
|
)
|
||||||
|
.await;
|
||||||
let mut s3_config = aws_sdk_s3::config::Builder::new()
|
|
||||||
.behavior_version(BehaviorVersion::latest())
|
|
||||||
.region(Region::new(region))
|
|
||||||
.credentials_provider(creds);
|
|
||||||
|
|
||||||
if let Some(ep) = endpoint {
|
|
||||||
s3_config = s3_config.endpoint_url(ep).force_path_style(true);
|
|
||||||
}
|
|
||||||
|
|
||||||
aws_sdk_s3::Client::from_conf(s3_config.build())
|
|
||||||
};
|
|
||||||
|
|
||||||
let source = Arc::new(Self {
|
let source = Arc::new(Self {
|
||||||
name: name.clone(),
|
name: name.clone(),
|
||||||
bucket: bucket.into(),
|
client,
|
||||||
prefix: prefix.map(|x| x.into()),
|
prefix: prefix.map(|x| x.into()),
|
||||||
client: Arc::new(client),
|
|
||||||
pattern,
|
pattern,
|
||||||
encryption_key,
|
encryption_key,
|
||||||
index: OnceLock::new(),
|
index: OnceLock::new(),
|
||||||
@@ -78,9 +68,10 @@ impl S3DataSource {
|
|||||||
|
|
||||||
loop {
|
loop {
|
||||||
let mut req = source
|
let mut req = source
|
||||||
|
.client
|
||||||
.client
|
.client
|
||||||
.list_objects_v2()
|
.list_objects_v2()
|
||||||
.bucket(source.bucket.as_str());
|
.bucket(source.client.bucket());
|
||||||
|
|
||||||
if let Some(prefix) = &source.prefix {
|
if let Some(prefix) = &source.prefix {
|
||||||
req = req.prefix(prefix.as_str());
|
req = req.prefix(prefix.as_str());
|
||||||
@@ -128,7 +119,7 @@ impl S3DataSource {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
let mut index = HashMap::new();
|
let mut index = BTreeMap::new();
|
||||||
for key in all_keys.difference(&keys_grouped) {
|
for key in all_keys.difference(&keys_grouped) {
|
||||||
let groups = resolve_groups(&source.pattern, key).await;
|
let groups = resolve_groups(&source.pattern, key).await;
|
||||||
let group = groups
|
let group = groups
|
||||||
@@ -191,7 +182,11 @@ impl DataSource for Arc<S3DataSource> {
|
|||||||
let mut continuation_token: Option<String> = None;
|
let mut continuation_token: Option<String> = None;
|
||||||
|
|
||||||
loop {
|
loop {
|
||||||
let mut req = self.client.list_objects_v2().bucket(self.bucket.as_str());
|
let mut req = self
|
||||||
|
.client
|
||||||
|
.client
|
||||||
|
.list_objects_v2()
|
||||||
|
.bucket(self.client.bucket());
|
||||||
|
|
||||||
if let Some(prefix) = &self.prefix {
|
if let Some(prefix) = &self.prefix {
|
||||||
req = req.prefix(prefix.as_str());
|
req = req.prefix(prefix.as_str());
|
||||||
|
|||||||
@@ -1,6 +1,6 @@
|
|||||||
use mime::Mime;
|
use mime::Mime;
|
||||||
use pile_config::Label;
|
use pile_config::Label;
|
||||||
use pile_io::{ChaChaReaderAsync, S3Reader, SyncReadBridge};
|
use pile_io::{SyncReadBridge, chacha::ChaChaReaderv1Async};
|
||||||
use smartstring::{LazyCompact, SmartString};
|
use smartstring::{LazyCompact, SmartString};
|
||||||
use std::{collections::HashMap, fs::File, path::PathBuf, sync::Arc};
|
use std::{collections::HashMap, fs::File, path::PathBuf, sync::Arc};
|
||||||
|
|
||||||
@@ -59,39 +59,13 @@ impl Item {
|
|||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
let head = source
|
let reader = source.client.get(&full_key).await?;
|
||||||
.client
|
|
||||||
.head_object()
|
|
||||||
.bucket(source.bucket.as_str())
|
|
||||||
.key(full_key.as_str())
|
|
||||||
.send()
|
|
||||||
.await
|
|
||||||
.map_err(std::io::Error::other)?;
|
|
||||||
|
|
||||||
let size = head.content_length().unwrap_or(0) as u64;
|
|
||||||
|
|
||||||
match source.encryption_key {
|
match source.encryption_key {
|
||||||
None => ItemReader::S3(S3Reader {
|
None => ItemReader::S3(reader),
|
||||||
client: source.client.clone(),
|
Some(enc_key) => {
|
||||||
bucket: source.bucket.clone(),
|
ItemReader::EncryptedS3(ChaChaReaderv1Async::new(reader, enc_key).await?)
|
||||||
key: full_key,
|
}
|
||||||
cursor: 0,
|
|
||||||
size,
|
|
||||||
}),
|
|
||||||
|
|
||||||
Some(enc_key) => ItemReader::EncryptedS3(
|
|
||||||
ChaChaReaderAsync::new(
|
|
||||||
S3Reader {
|
|
||||||
client: source.client.clone(),
|
|
||||||
bucket: source.bucket.clone(),
|
|
||||||
key: full_key,
|
|
||||||
cursor: 0,
|
|
||||||
size,
|
|
||||||
},
|
|
||||||
enc_key,
|
|
||||||
)
|
|
||||||
.await?,
|
|
||||||
),
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
|
|||||||
@@ -1,4 +1,4 @@
|
|||||||
use pile_io::{AsyncReader, AsyncSeekReader, ChaChaReaderAsync, S3Reader};
|
use pile_io::{AsyncReader, AsyncSeekReader, S3Reader, chacha::ChaChaReaderv1Async};
|
||||||
use std::{fs::File, io::Seek};
|
use std::{fs::File, io::Seek};
|
||||||
|
|
||||||
//
|
//
|
||||||
@@ -8,7 +8,7 @@ use std::{fs::File, io::Seek};
|
|||||||
pub enum ItemReader {
|
pub enum ItemReader {
|
||||||
File(File),
|
File(File),
|
||||||
S3(S3Reader),
|
S3(S3Reader),
|
||||||
EncryptedS3(ChaChaReaderAsync<S3Reader>),
|
EncryptedS3(ChaChaReaderv1Async<S3Reader>),
|
||||||
}
|
}
|
||||||
|
|
||||||
impl AsyncReader for ItemReader {
|
impl AsyncReader for ItemReader {
|
||||||
|
|||||||
@@ -29,3 +29,5 @@ anstyle = { workspace = true }
|
|||||||
toml = { workspace = true }
|
toml = { workspace = true }
|
||||||
serde_json = { workspace = true }
|
serde_json = { workspace = true }
|
||||||
axum = { workspace = true }
|
axum = { workspace = true }
|
||||||
|
utoipa = { workspace = true }
|
||||||
|
utoipa-swagger-ui = { workspace = true }
|
||||||
|
|||||||
@@ -1,6 +1,7 @@
|
|||||||
use anyhow::{Context, Result};
|
use anyhow::{Context, Result};
|
||||||
use clap::Args;
|
use clap::Args;
|
||||||
use pile_io::{AsyncReader, ChaChaReader, ChaChaWriter};
|
use pile_io::AsyncReader;
|
||||||
|
use pile_io::chacha::{ChaChaReaderv1, ChaChaWriterv1};
|
||||||
use pile_toolbox::cancelabletask::{CancelFlag, CancelableTaskError};
|
use pile_toolbox::cancelabletask::{CancelFlag, CancelableTaskError};
|
||||||
use pile_value::source::string_to_key;
|
use pile_value::source::string_to_key;
|
||||||
use std::io::{Cursor, Write};
|
use std::io::{Cursor, Write};
|
||||||
@@ -37,7 +38,7 @@ impl CliCmd for EncryptCommand {
|
|||||||
.await
|
.await
|
||||||
.with_context(|| format!("while reading '{}'", self.path.display()))?;
|
.with_context(|| format!("while reading '{}'", self.path.display()))?;
|
||||||
|
|
||||||
let mut writer = ChaChaWriter::new(Cursor::new(Vec::new()), key)
|
let mut writer = ChaChaWriterv1::new(Cursor::new(Vec::new()), key)
|
||||||
.context("while initializing encryptor")?;
|
.context("while initializing encryptor")?;
|
||||||
writer.write_all(&plaintext).context("while encrypting")?;
|
writer.write_all(&plaintext).context("while encrypting")?;
|
||||||
let buf = writer.finish().context("while finalizing encryptor")?;
|
let buf = writer.finish().context("while finalizing encryptor")?;
|
||||||
@@ -61,7 +62,7 @@ impl CliCmd for DecryptCommand {
|
|||||||
.await
|
.await
|
||||||
.with_context(|| format!("while reading '{}'", self.path.display()))?;
|
.with_context(|| format!("while reading '{}'", self.path.display()))?;
|
||||||
|
|
||||||
let mut reader = ChaChaReader::new(Cursor::new(ciphertext), key)
|
let mut reader = ChaChaReaderv1::new(Cursor::new(ciphertext), key)
|
||||||
.context("while initializing decryptor")?;
|
.context("while initializing decryptor")?;
|
||||||
let plaintext = reader.read_to_end().await.context("while decrypting")?;
|
let plaintext = reader.read_to_end().await.context("while decrypting")?;
|
||||||
|
|
||||||
|
|||||||
@@ -14,6 +14,7 @@ mod list;
|
|||||||
mod lookup;
|
mod lookup;
|
||||||
mod probe;
|
mod probe;
|
||||||
mod serve;
|
mod serve;
|
||||||
|
mod server;
|
||||||
mod upload;
|
mod upload;
|
||||||
|
|
||||||
use crate::{Cli, GlobalContext};
|
use crate::{Cli, GlobalContext};
|
||||||
@@ -73,12 +74,18 @@ pub enum SubCommand {
|
|||||||
cmd: item::ItemCommand,
|
cmd: item::ItemCommand,
|
||||||
},
|
},
|
||||||
|
|
||||||
/// Expose a dataset via an http api
|
/// Expose one dataset via a simple http api
|
||||||
Serve {
|
Serve {
|
||||||
#[command(flatten)]
|
#[command(flatten)]
|
||||||
cmd: serve::ServeCommand,
|
cmd: serve::ServeCommand,
|
||||||
},
|
},
|
||||||
|
|
||||||
|
/// Serve many datasets under an authenticated http api
|
||||||
|
Server {
|
||||||
|
#[command(flatten)]
|
||||||
|
cmd: server::ServerCommand,
|
||||||
|
},
|
||||||
|
|
||||||
/// Upload a filesystem source to an S3 source
|
/// Upload a filesystem source to an S3 source
|
||||||
Upload {
|
Upload {
|
||||||
#[command(flatten)]
|
#[command(flatten)]
|
||||||
@@ -110,6 +117,7 @@ impl CliCmdDispatch for SubCommand {
|
|||||||
Self::Probe { cmd } => cmd.start(ctx),
|
Self::Probe { cmd } => cmd.start(ctx),
|
||||||
Self::Item { cmd } => cmd.start(ctx),
|
Self::Item { cmd } => cmd.start(ctx),
|
||||||
Self::Serve { cmd } => cmd.start(ctx),
|
Self::Serve { cmd } => cmd.start(ctx),
|
||||||
|
Self::Server { cmd } => cmd.start(ctx),
|
||||||
Self::Upload { cmd } => cmd.start(ctx),
|
Self::Upload { cmd } => cmd.start(ctx),
|
||||||
Self::Encrypt { cmd } => cmd.start(ctx),
|
Self::Encrypt { cmd } => cmd.start(ctx),
|
||||||
Self::Decrypt { cmd } => cmd.start(ctx),
|
Self::Decrypt { cmd } => cmd.start(ctx),
|
||||||
|
|||||||
191
crates/pile/src/command/server.rs
Normal file
191
crates/pile/src/command/server.rs
Normal file
@@ -0,0 +1,191 @@
|
|||||||
|
use anyhow::{Context, Result};
|
||||||
|
use axum::{
|
||||||
|
Json, Router,
|
||||||
|
extract::{Request, State},
|
||||||
|
http::StatusCode,
|
||||||
|
middleware::{Next, from_fn_with_state},
|
||||||
|
response::{IntoResponse, Response},
|
||||||
|
routing::get,
|
||||||
|
};
|
||||||
|
use clap::Args;
|
||||||
|
use pile_dataset::Datasets;
|
||||||
|
use pile_toolbox::cancelabletask::{CancelFlag, CancelableTaskError};
|
||||||
|
use serde::Serialize;
|
||||||
|
use std::{fmt::Debug, path::PathBuf, sync::Arc};
|
||||||
|
use tracing::{error, info};
|
||||||
|
use utoipa::{OpenApi, ToSchema};
|
||||||
|
use utoipa_swagger_ui::SwaggerUi;
|
||||||
|
|
||||||
|
use crate::{CliCmd, GlobalContext};
|
||||||
|
|
||||||
|
#[derive(Debug, Args)]
|
||||||
|
pub struct ServerCommand {
|
||||||
|
/// Address to bind to
|
||||||
|
#[arg(default_value = "0.0.0.0:9000")]
|
||||||
|
addr: String,
|
||||||
|
|
||||||
|
/// The datasets we should serve. Can be repeated.
|
||||||
|
#[arg(long, short = 'c')]
|
||||||
|
config: Vec<PathBuf>,
|
||||||
|
|
||||||
|
/// If provided, do not serve docs
|
||||||
|
#[arg(long)]
|
||||||
|
no_docs: bool,
|
||||||
|
|
||||||
|
/// If provided, require this bearer token for all requests
|
||||||
|
#[arg(long)]
|
||||||
|
token: Option<String>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl CliCmd for ServerCommand {
|
||||||
|
async fn run(
|
||||||
|
self,
|
||||||
|
_ctx: GlobalContext,
|
||||||
|
flag: CancelFlag,
|
||||||
|
) -> Result<i32, CancelableTaskError<anyhow::Error>> {
|
||||||
|
let datasets = {
|
||||||
|
let mut datasets = Vec::new();
|
||||||
|
for c in &self.config {
|
||||||
|
let ds = Datasets::open(&c)
|
||||||
|
.await
|
||||||
|
.with_context(|| format!("while opening dataset for {}", c.display()))?;
|
||||||
|
datasets.push(Arc::new(ds));
|
||||||
|
}
|
||||||
|
|
||||||
|
Arc::new(datasets)
|
||||||
|
};
|
||||||
|
|
||||||
|
let bearer = BearerToken(self.token.map(Arc::new));
|
||||||
|
|
||||||
|
let mut router = Router::new();
|
||||||
|
for d in datasets.iter() {
|
||||||
|
let prefix = format!("/{}", d.config.dataset.name);
|
||||||
|
router = router.merge(d.clone().router_prefix(!self.no_docs, Some(&prefix)))
|
||||||
|
}
|
||||||
|
|
||||||
|
router = router.merge(
|
||||||
|
Router::new()
|
||||||
|
.route("/datasets", get(list_datasets))
|
||||||
|
.with_state(datasets.clone()),
|
||||||
|
);
|
||||||
|
|
||||||
|
if !self.no_docs {
|
||||||
|
let docs_path = "/docs";
|
||||||
|
let docs = SwaggerUi::new(docs_path)
|
||||||
|
.url(format!("{}/openapi.json", docs_path), Api::openapi());
|
||||||
|
|
||||||
|
router = router.merge(docs);
|
||||||
|
}
|
||||||
|
|
||||||
|
router = router.layer(from_fn_with_state(bearer, bearer_auth_middleware));
|
||||||
|
|
||||||
|
let app = router.into_make_service_with_connect_info::<std::net::SocketAddr>();
|
||||||
|
|
||||||
|
let listener = match tokio::net::TcpListener::bind(self.addr.clone()).await {
|
||||||
|
Ok(x) => x,
|
||||||
|
Err(error) => {
|
||||||
|
match error.kind() {
|
||||||
|
std::io::ErrorKind::AddrInUse => {
|
||||||
|
error!(
|
||||||
|
message = "Cannot bind to address, already in use",
|
||||||
|
addr = self.addr
|
||||||
|
);
|
||||||
|
}
|
||||||
|
_ => {
|
||||||
|
error!(message = "Error while starting server", ?error);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
std::process::exit(1);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
match listener.local_addr() {
|
||||||
|
Ok(x) => info!("listening on http://{x}"),
|
||||||
|
Err(error) => {
|
||||||
|
error!(message = "Could not determine local address", ?error);
|
||||||
|
return Err(anyhow::Error::from(error).into());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
match axum::serve(listener, app)
|
||||||
|
.with_graceful_shutdown(async move { flag.await_cancel().await })
|
||||||
|
.await
|
||||||
|
{
|
||||||
|
Ok(_) => {}
|
||||||
|
Err(error) => {
|
||||||
|
error!(message = "Error while serving api", ?error);
|
||||||
|
return Err(anyhow::Error::from(error).into());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return Err(CancelableTaskError::Cancelled);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
//
|
||||||
|
// MARK: bearer auth middleware
|
||||||
|
//
|
||||||
|
|
||||||
|
#[derive(Clone)]
|
||||||
|
struct BearerToken(Option<Arc<String>>);
|
||||||
|
|
||||||
|
async fn bearer_auth_middleware(
|
||||||
|
State(BearerToken(expected)): State<BearerToken>,
|
||||||
|
request: Request,
|
||||||
|
next: Next,
|
||||||
|
) -> Response {
|
||||||
|
let Some(expected) = expected else {
|
||||||
|
return next.run(request).await;
|
||||||
|
};
|
||||||
|
|
||||||
|
let authorized = request
|
||||||
|
.headers()
|
||||||
|
.get(axum::http::header::AUTHORIZATION)
|
||||||
|
.and_then(|v| v.to_str().ok())
|
||||||
|
.and_then(|v| v.strip_prefix("Bearer "))
|
||||||
|
.is_some_and(|token| token == expected.as_str());
|
||||||
|
|
||||||
|
if authorized {
|
||||||
|
next.run(request).await
|
||||||
|
} else {
|
||||||
|
StatusCode::UNAUTHORIZED.into_response()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
//
|
||||||
|
// MARK: routes
|
||||||
|
//
|
||||||
|
|
||||||
|
#[derive(OpenApi)]
|
||||||
|
#[openapi(
|
||||||
|
tags(),
|
||||||
|
paths(list_datasets),
|
||||||
|
components(schemas(ListDatasetsResponse))
|
||||||
|
)]
|
||||||
|
pub(crate) struct Api;
|
||||||
|
|
||||||
|
#[derive(Serialize, ToSchema)]
|
||||||
|
pub struct ListDatasetsResponse {
|
||||||
|
name: String,
|
||||||
|
}
|
||||||
|
|
||||||
|
/// List all datasets served by this server
|
||||||
|
#[utoipa::path(
|
||||||
|
get,
|
||||||
|
path = "/list_datasets",
|
||||||
|
responses(
|
||||||
|
(status = 200, description = "List of datasets"),
|
||||||
|
(status = 500, description = "Internal server error"),
|
||||||
|
)
|
||||||
|
)]
|
||||||
|
pub async fn list_datasets(State(state): State<Arc<Vec<Arc<Datasets>>>>) -> Response {
|
||||||
|
let datasets = state
|
||||||
|
.iter()
|
||||||
|
.map(|x| ListDatasetsResponse {
|
||||||
|
name: x.config.dataset.name.clone().into(),
|
||||||
|
})
|
||||||
|
.collect::<Vec<_>>();
|
||||||
|
|
||||||
|
return (StatusCode::OK, Json(datasets)).into_response();
|
||||||
|
}
|
||||||
@@ -4,7 +4,7 @@ use clap::Args;
|
|||||||
use indicatif::ProgressBar;
|
use indicatif::ProgressBar;
|
||||||
use pile_config::Label;
|
use pile_config::Label;
|
||||||
use pile_dataset::{Dataset, Datasets};
|
use pile_dataset::{Dataset, Datasets};
|
||||||
use pile_io::ChaChaWriter;
|
use pile_io::chacha::ChaChaWriterv1;
|
||||||
use pile_toolbox::cancelabletask::{CancelFlag, CancelableTaskError};
|
use pile_toolbox::cancelabletask::{CancelFlag, CancelableTaskError};
|
||||||
use pile_value::source::{DataSource, DirDataSource, S3DataSource, encrypt_path};
|
use pile_value::source::{DataSource, DirDataSource, S3DataSource, encrypt_path};
|
||||||
use std::{
|
use std::{
|
||||||
@@ -71,12 +71,12 @@ impl CliCmd for UploadCommand {
|
|||||||
let bucket = self
|
let bucket = self
|
||||||
.bucket
|
.bucket
|
||||||
.as_deref()
|
.as_deref()
|
||||||
.unwrap_or(s3_ds.bucket.as_str())
|
.unwrap_or(s3_ds.client.bucket())
|
||||||
.to_owned();
|
.to_owned();
|
||||||
let full_prefix = self.prefix.trim_matches('/').to_owned();
|
let full_prefix = self.prefix.trim_matches('/').to_owned();
|
||||||
|
|
||||||
// Check for existing objects at the target prefix
|
// Check for existing objects at the target prefix
|
||||||
let existing_keys = list_prefix(&s3_ds.client, &bucket, &full_prefix)
|
let existing_keys = list_prefix(&s3_ds.client.client, &bucket, &full_prefix)
|
||||||
.await
|
.await
|
||||||
.context("while checking for existing objects at target prefix")?;
|
.context("while checking for existing objects at target prefix")?;
|
||||||
|
|
||||||
@@ -89,6 +89,7 @@ impl CliCmd for UploadCommand {
|
|||||||
);
|
);
|
||||||
for key in &existing_keys {
|
for key in &existing_keys {
|
||||||
s3_ds
|
s3_ds
|
||||||
|
.client
|
||||||
.client
|
.client
|
||||||
.delete_object()
|
.delete_object()
|
||||||
.bucket(&bucket)
|
.bucket(&bucket)
|
||||||
@@ -169,7 +170,7 @@ impl CliCmd for UploadCommand {
|
|||||||
tokio::task::spawn_blocking(move || -> anyhow::Result<Vec<u8>> {
|
tokio::task::spawn_blocking(move || -> anyhow::Result<Vec<u8>> {
|
||||||
let plaintext = std::fs::read(&path)
|
let plaintext = std::fs::read(&path)
|
||||||
.with_context(|| format!("while opening '{}'", path.display()))?;
|
.with_context(|| format!("while opening '{}'", path.display()))?;
|
||||||
let mut writer = ChaChaWriter::new(Cursor::new(Vec::new()), enc_key)
|
let mut writer = ChaChaWriterv1::new(Cursor::new(Vec::new()), enc_key)
|
||||||
.context("while initializing encryptor")?;
|
.context("while initializing encryptor")?;
|
||||||
writer.write_all(&plaintext).context("while encrypting")?;
|
writer.write_all(&plaintext).context("while encrypting")?;
|
||||||
Ok(writer.finish().context("while finalizing")?.into_inner())
|
Ok(writer.finish().context("while finalizing")?.into_inner())
|
||||||
@@ -184,6 +185,7 @@ impl CliCmd for UploadCommand {
|
|||||||
};
|
};
|
||||||
|
|
||||||
client
|
client
|
||||||
|
.client
|
||||||
.put_object()
|
.put_object()
|
||||||
.bucket(&bucket)
|
.bucket(&bucket)
|
||||||
.key(&key)
|
.key(&key)
|
||||||
@@ -224,7 +226,7 @@ fn get_dir_source(
|
|||||||
label: &Label,
|
label: &Label,
|
||||||
name: &str,
|
name: &str,
|
||||||
) -> Result<Arc<DirDataSource>, anyhow::Error> {
|
) -> Result<Arc<DirDataSource>, anyhow::Error> {
|
||||||
match ds.sources.get(label) {
|
match ds.sources.get(label).or(ds.disabled_sources.get(label)) {
|
||||||
Some(Dataset::Dir(d)) => Ok(Arc::clone(d)),
|
Some(Dataset::Dir(d)) => Ok(Arc::clone(d)),
|
||||||
Some(_) => Err(anyhow::anyhow!(
|
Some(_) => Err(anyhow::anyhow!(
|
||||||
"source '{name}' is not a filesystem source"
|
"source '{name}' is not a filesystem source"
|
||||||
@@ -240,7 +242,7 @@ fn get_s3_source(
|
|||||||
label: &Label,
|
label: &Label,
|
||||||
name: &str,
|
name: &str,
|
||||||
) -> Result<Arc<S3DataSource>, anyhow::Error> {
|
) -> Result<Arc<S3DataSource>, anyhow::Error> {
|
||||||
match ds.sources.get(label) {
|
match ds.sources.get(label).or(ds.disabled_sources.get(label)) {
|
||||||
Some(Dataset::S3(s)) => Ok(Arc::clone(s)),
|
Some(Dataset::S3(s)) => Ok(Arc::clone(s)),
|
||||||
Some(_) => Err(anyhow::anyhow!("source '{name}' is not an S3 source")),
|
Some(_) => Err(anyhow::anyhow!("source '{name}' is not an S3 source")),
|
||||||
None => Err(anyhow::anyhow!("s3 source '{name}' not found in config")),
|
None => Err(anyhow::anyhow!("s3 source '{name}' not found in config")),
|
||||||
|
|||||||
Reference in New Issue
Block a user