Compare commits
6 Commits
9b8f825667
...
main
| Author | SHA1 | Date | |
|---|---|---|---|
| 450ea7aa86 | |||
| 3bc66ddc48 | |||
| 251d130987 | |||
| 0281a33f86 | |||
| d3ab2684f4 | |||
| 4d4e9c93a2 |
42
Cargo.lock
generated
42
Cargo.lock
generated
@@ -1994,6 +1994,7 @@ dependencies = [
|
|||||||
"indicatif",
|
"indicatif",
|
||||||
"pile-config",
|
"pile-config",
|
||||||
"pile-dataset",
|
"pile-dataset",
|
||||||
|
"pile-serve",
|
||||||
"pile-toolbox",
|
"pile-toolbox",
|
||||||
"pile-value",
|
"pile-value",
|
||||||
"serde",
|
"serde",
|
||||||
@@ -2016,8 +2017,7 @@ version = "0.0.2"
|
|||||||
dependencies = [
|
dependencies = [
|
||||||
"axum",
|
"axum",
|
||||||
"bytes",
|
"bytes",
|
||||||
"futures-core",
|
"pile-serve",
|
||||||
"pile-dataset",
|
|
||||||
"reqwest",
|
"reqwest",
|
||||||
"serde",
|
"serde",
|
||||||
"thiserror",
|
"thiserror",
|
||||||
@@ -2038,23 +2038,17 @@ dependencies = [
|
|||||||
name = "pile-dataset"
|
name = "pile-dataset"
|
||||||
version = "0.0.2"
|
version = "0.0.2"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"axum",
|
|
||||||
"chrono",
|
"chrono",
|
||||||
"percent-encoding",
|
|
||||||
"pile-config",
|
"pile-config",
|
||||||
"pile-toolbox",
|
"pile-toolbox",
|
||||||
"pile-value",
|
"pile-value",
|
||||||
"regex",
|
"regex",
|
||||||
"serde",
|
|
||||||
"serde_json",
|
"serde_json",
|
||||||
"tantivy",
|
"tantivy",
|
||||||
"thiserror",
|
"thiserror",
|
||||||
"tokio",
|
"tokio",
|
||||||
"tokio-util",
|
|
||||||
"toml",
|
"toml",
|
||||||
"tracing",
|
"tracing",
|
||||||
"utoipa",
|
|
||||||
"utoipa-swagger-ui",
|
|
||||||
]
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
@@ -2078,6 +2072,24 @@ dependencies = [
|
|||||||
"tokio",
|
"tokio",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "pile-serve"
|
||||||
|
version = "0.0.2"
|
||||||
|
dependencies = [
|
||||||
|
"axum",
|
||||||
|
"percent-encoding",
|
||||||
|
"pile-config",
|
||||||
|
"pile-dataset",
|
||||||
|
"pile-value",
|
||||||
|
"serde",
|
||||||
|
"serde_json",
|
||||||
|
"tokio",
|
||||||
|
"tokio-util",
|
||||||
|
"tracing",
|
||||||
|
"utoipa",
|
||||||
|
"utoipa-swagger-ui",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "pile-toolbox"
|
name = "pile-toolbox"
|
||||||
version = "0.0.2"
|
version = "0.0.2"
|
||||||
@@ -2098,6 +2110,7 @@ dependencies = [
|
|||||||
"id3",
|
"id3",
|
||||||
"image",
|
"image",
|
||||||
"kamadak-exif",
|
"kamadak-exif",
|
||||||
|
"md5",
|
||||||
"mime",
|
"mime",
|
||||||
"mime_guess",
|
"mime_guess",
|
||||||
"pdf",
|
"pdf",
|
||||||
@@ -2109,6 +2122,8 @@ dependencies = [
|
|||||||
"reqwest",
|
"reqwest",
|
||||||
"serde",
|
"serde",
|
||||||
"serde_json",
|
"serde_json",
|
||||||
|
"sha1",
|
||||||
|
"sha2 0.11.0-rc.5",
|
||||||
"smartstring",
|
"smartstring",
|
||||||
"strum",
|
"strum",
|
||||||
"tokio",
|
"tokio",
|
||||||
@@ -2654,6 +2669,17 @@ dependencies = [
|
|||||||
"serde",
|
"serde",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "sha1"
|
||||||
|
version = "0.10.6"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "e3bf829a2d51ab4a5ddf1352d8470c140cadc8301b2ae1789db023f01cedd6ba"
|
||||||
|
dependencies = [
|
||||||
|
"cfg-if",
|
||||||
|
"cpufeatures",
|
||||||
|
"digest 0.10.7",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "sha2"
|
name = "sha2"
|
||||||
version = "0.10.9"
|
version = "0.10.9"
|
||||||
|
|||||||
@@ -71,6 +71,7 @@ pile-dataset = { path = "crates/pile-dataset" }
|
|||||||
pile-value = { path = "crates/pile-value" }
|
pile-value = { path = "crates/pile-value" }
|
||||||
pile-io = { path = "crates/pile-io" }
|
pile-io = { path = "crates/pile-io" }
|
||||||
pile-client = { path = "crates/pile-client" }
|
pile-client = { path = "crates/pile-client" }
|
||||||
|
pile-serve = { path = "crates/pile-serve" }
|
||||||
|
|
||||||
# MARK: Clients & servers
|
# MARK: Clients & servers
|
||||||
tantivy = "0.25.0"
|
tantivy = "0.25.0"
|
||||||
@@ -87,7 +88,7 @@ utoipa-swagger-ui = { version = "9.0.2", features = [
|
|||||||
"debug-embed",
|
"debug-embed",
|
||||||
"vendored",
|
"vendored",
|
||||||
] }
|
] }
|
||||||
reqwest = { version = "0.12", features = ["blocking"] }
|
reqwest = { version = "0.12", features = ["blocking", "json", "stream"] }
|
||||||
tracing-loki = "0.2.6"
|
tracing-loki = "0.2.6"
|
||||||
|
|
||||||
# MARK: Async & Parallelism
|
# MARK: Async & Parallelism
|
||||||
@@ -111,6 +112,8 @@ bytes = "1"
|
|||||||
toml = "1.0.3"
|
toml = "1.0.3"
|
||||||
toml_edit = "0.25.4"
|
toml_edit = "0.25.4"
|
||||||
sha2 = "0.11.0-rc.5"
|
sha2 = "0.11.0-rc.5"
|
||||||
|
sha1 = "0.10"
|
||||||
|
md5 = "0.7"
|
||||||
blake3 = "1.8.3"
|
blake3 = "1.8.3"
|
||||||
dotenvy = "0.15.7"
|
dotenvy = "0.15.7"
|
||||||
envy = "0.4.2"
|
envy = "0.4.2"
|
||||||
|
|||||||
@@ -8,10 +8,9 @@ edition = { workspace = true }
|
|||||||
workspace = true
|
workspace = true
|
||||||
|
|
||||||
[dependencies]
|
[dependencies]
|
||||||
pile-dataset = { workspace = true, features = ["axum"] }
|
pile-serve = { workspace = true }
|
||||||
|
|
||||||
reqwest = { version = "0.12", features = ["json", "stream"] }
|
reqwest = { workspace = true }
|
||||||
futures-core = "0.3"
|
|
||||||
serde = { workspace = true }
|
serde = { workspace = true }
|
||||||
thiserror = { workspace = true }
|
thiserror = { workspace = true }
|
||||||
bytes = { workspace = true }
|
bytes = { workspace = true }
|
||||||
|
|||||||
@@ -3,14 +3,12 @@ use axum::{
|
|||||||
routing::any,
|
routing::any,
|
||||||
};
|
};
|
||||||
use bytes::Bytes;
|
use bytes::Bytes;
|
||||||
use futures_core::Stream;
|
|
||||||
use reqwest::{Client, StatusCode, header};
|
use reqwest::{Client, StatusCode, header};
|
||||||
use serde::Deserialize;
|
use serde::Deserialize;
|
||||||
use std::pin::Pin;
|
|
||||||
use thiserror::Error;
|
use thiserror::Error;
|
||||||
use tracing::{trace, warn};
|
use tracing::{trace, warn};
|
||||||
|
|
||||||
pub use pile_dataset::serve::{
|
pub use pile_serve::{
|
||||||
ApiValue, FieldSpec, FieldsResponse, ItemsResponse, LookupRequest, LookupResponse,
|
ApiValue, FieldSpec, FieldsResponse, ItemsResponse, LookupRequest, LookupResponse,
|
||||||
SchemaResponse,
|
SchemaResponse,
|
||||||
};
|
};
|
||||||
@@ -120,26 +118,6 @@ impl DatasetClient {
|
|||||||
check_status(resp).await?.json().await.map_err(Into::into)
|
check_status(resp).await?.json().await.map_err(Into::into)
|
||||||
}
|
}
|
||||||
|
|
||||||
/// `GET /item` — stream the raw bytes of an item.
|
|
||||||
///
|
|
||||||
/// The returned stream yields chunks as they arrive from the server.
|
|
||||||
pub async fn get_item(
|
|
||||||
&self,
|
|
||||||
source: &str,
|
|
||||||
key: &str,
|
|
||||||
) -> Result<Pin<Box<dyn Stream<Item = Result<Bytes, reqwest::Error>> + Send>>, ClientError> {
|
|
||||||
let url = format!("{}/item", self.base_url);
|
|
||||||
trace!(url, source, key, "GET /item");
|
|
||||||
let resp = self
|
|
||||||
.client
|
|
||||||
.get(url)
|
|
||||||
.query(&[("source", source), ("key", key)])
|
|
||||||
.send()
|
|
||||||
.await?;
|
|
||||||
|
|
||||||
Ok(Box::pin(check_status(resp).await?.bytes_stream()))
|
|
||||||
}
|
|
||||||
|
|
||||||
/// `GET /extract` — extract a field from an item by object path (e.g. `$.flac.title`).
|
/// `GET /extract` — extract a field from an item by object path (e.g. `$.flac.title`).
|
||||||
pub async fn get_extract(
|
pub async fn get_extract(
|
||||||
&self,
|
&self,
|
||||||
|
|||||||
@@ -67,7 +67,7 @@ pub enum Source {
|
|||||||
base_pattern: String,
|
base_pattern: String,
|
||||||
|
|
||||||
/// Map of files included in each item.'
|
/// Map of files included in each item.'
|
||||||
/// `{base}` is replaced with the string extraced by base_pattern.
|
/// `{base}` is replaced with the string extracted by base_pattern.
|
||||||
/// Default is `{ item: "{base}" }`
|
/// Default is `{ item: "{base}" }`
|
||||||
#[serde(default = "default_files")]
|
#[serde(default = "default_files")]
|
||||||
files: HashMap<Label, String>,
|
files: HashMap<Label, String>,
|
||||||
|
|||||||
@@ -20,21 +20,7 @@ chrono = { workspace = true }
|
|||||||
toml = { workspace = true }
|
toml = { workspace = true }
|
||||||
thiserror = { workspace = true }
|
thiserror = { workspace = true }
|
||||||
tokio = { workspace = true }
|
tokio = { workspace = true }
|
||||||
tokio-util = { version = "0.7", features = ["io"] }
|
|
||||||
|
|
||||||
serde = { workspace = true, optional = true }
|
|
||||||
axum = { workspace = true, optional = true }
|
|
||||||
percent-encoding = { workspace = true, optional = true }
|
|
||||||
utoipa = { workspace = true, optional = true }
|
|
||||||
utoipa-swagger-ui = { workspace = true, optional = true }
|
|
||||||
|
|
||||||
[features]
|
[features]
|
||||||
default = []
|
default = []
|
||||||
pdfium = ["pile-value/pdfium"]
|
pdfium = ["pile-value/pdfium"]
|
||||||
axum = [
|
|
||||||
"dep:axum",
|
|
||||||
"dep:utoipa",
|
|
||||||
"dep:utoipa-swagger-ui",
|
|
||||||
"dep:serde",
|
|
||||||
"dep:percent-encoding",
|
|
||||||
]
|
|
||||||
|
|||||||
@@ -302,6 +302,7 @@ impl Datasets {
|
|||||||
_threads: usize,
|
_threads: usize,
|
||||||
flag: Option<CancelFlag>,
|
flag: Option<CancelFlag>,
|
||||||
) -> Result<(), CancelableTaskError<DatasetError>> {
|
) -> Result<(), CancelableTaskError<DatasetError>> {
|
||||||
|
let start = Instant::now();
|
||||||
let workdir = match self.path_workdir.as_ref() {
|
let workdir = match self.path_workdir.as_ref() {
|
||||||
Some(x) => x,
|
Some(x) => x,
|
||||||
None => {
|
None => {
|
||||||
@@ -313,6 +314,14 @@ impl Datasets {
|
|||||||
let fts_tmp_dir = workdir.join(".tmp-fts");
|
let fts_tmp_dir = workdir.join(".tmp-fts");
|
||||||
let fts_dir = workdir.join("fts");
|
let fts_dir = workdir.join("fts");
|
||||||
|
|
||||||
|
debug!(
|
||||||
|
message = "Rebuilding fts index",
|
||||||
|
dataset = self.config.dataset.name.as_str(),
|
||||||
|
?fts_dir,
|
||||||
|
?fts_tmp_dir,
|
||||||
|
?workdir
|
||||||
|
);
|
||||||
|
|
||||||
if fts_tmp_dir.is_dir() {
|
if fts_tmp_dir.is_dir() {
|
||||||
warn!("Removing temporary index in {}", fts_dir.display());
|
warn!("Removing temporary index in {}", fts_dir.display());
|
||||||
std::fs::remove_dir_all(&fts_tmp_dir).map_err(DatasetError::from)?;
|
std::fs::remove_dir_all(&fts_tmp_dir).map_err(DatasetError::from)?;
|
||||||
@@ -392,9 +401,18 @@ impl Datasets {
|
|||||||
return Err(CancelableTaskError::Cancelled);
|
return Err(CancelableTaskError::Cancelled);
|
||||||
}
|
}
|
||||||
|
|
||||||
info!("Committing {total} documents");
|
|
||||||
index_writer.commit().map_err(DatasetError::from)?;
|
index_writer.commit().map_err(DatasetError::from)?;
|
||||||
|
|
||||||
|
debug!(
|
||||||
|
message = "Rebuilt fts index",
|
||||||
|
dataset = self.config.dataset.name.as_str(),
|
||||||
|
?fts_dir,
|
||||||
|
?fts_tmp_dir,
|
||||||
|
?workdir,
|
||||||
|
n_docs = total,
|
||||||
|
time_ms = start.elapsed().as_millis()
|
||||||
|
);
|
||||||
|
|
||||||
if fts_dir.is_dir() {
|
if fts_dir.is_dir() {
|
||||||
warn!("Removing existing index in {}", fts_dir.display());
|
warn!("Removing existing index in {}", fts_dir.display());
|
||||||
std::fs::remove_dir_all(&fts_dir).map_err(DatasetError::from)?;
|
std::fs::remove_dir_all(&fts_dir).map_err(DatasetError::from)?;
|
||||||
|
|||||||
@@ -2,6 +2,3 @@ mod dataset;
|
|||||||
pub use dataset::{Dataset, DatasetError, Datasets};
|
pub use dataset::{Dataset, DatasetError, Datasets};
|
||||||
|
|
||||||
pub mod index;
|
pub mod index;
|
||||||
|
|
||||||
#[cfg(feature = "axum")]
|
|
||||||
pub mod serve;
|
|
||||||
|
|||||||
@@ -1,92 +0,0 @@
|
|||||||
use axum::{
|
|
||||||
Router,
|
|
||||||
routing::{get, post},
|
|
||||||
};
|
|
||||||
use std::sync::Arc;
|
|
||||||
use utoipa::OpenApi;
|
|
||||||
use utoipa_swagger_ui::SwaggerUi;
|
|
||||||
|
|
||||||
use crate::Datasets;
|
|
||||||
|
|
||||||
mod lookup;
|
|
||||||
pub use lookup::*;
|
|
||||||
|
|
||||||
mod extract;
|
|
||||||
pub use extract::*;
|
|
||||||
|
|
||||||
mod items;
|
|
||||||
pub use items::*;
|
|
||||||
|
|
||||||
mod config_schema;
|
|
||||||
pub use config_schema::*;
|
|
||||||
|
|
||||||
mod schema_field;
|
|
||||||
pub use schema_field::*;
|
|
||||||
|
|
||||||
mod schema;
|
|
||||||
pub use schema::*;
|
|
||||||
|
|
||||||
#[derive(OpenApi)]
|
|
||||||
#[openapi(
|
|
||||||
tags(),
|
|
||||||
paths(
|
|
||||||
lookup,
|
|
||||||
get_extract,
|
|
||||||
items_list,
|
|
||||||
config_schema,
|
|
||||||
schema_field,
|
|
||||||
schema_all
|
|
||||||
),
|
|
||||||
components(schemas(
|
|
||||||
LookupRequest,
|
|
||||||
LookupResponse,
|
|
||||||
LookupResult,
|
|
||||||
ExtractQuery,
|
|
||||||
ItemsQuery,
|
|
||||||
ItemsResponse,
|
|
||||||
ItemRef
|
|
||||||
))
|
|
||||||
)]
|
|
||||||
pub(crate) struct Api;
|
|
||||||
|
|
||||||
impl Datasets {
|
|
||||||
#[inline]
|
|
||||||
pub fn router(self: Arc<Self>, with_docs: bool) -> Router<()> {
|
|
||||||
self.router_prefix(with_docs, None)
|
|
||||||
}
|
|
||||||
|
|
||||||
#[inline]
|
|
||||||
pub fn router_prefix(self: Arc<Self>, with_docs: bool, prefix: Option<&str>) -> Router<()> {
|
|
||||||
let mut router = Router::new()
|
|
||||||
.route("/lookup", post(lookup))
|
|
||||||
.route("/extract", get(get_extract))
|
|
||||||
.route("/items", get(items_list))
|
|
||||||
.route("/config/schema", get(config_schema))
|
|
||||||
.route("/schema", get(schema_all))
|
|
||||||
.route("/schema/{field}", get(schema_field))
|
|
||||||
.with_state(self.clone());
|
|
||||||
|
|
||||||
if let Some(prefix) = prefix {
|
|
||||||
router = Router::new().nest(prefix, router);
|
|
||||||
}
|
|
||||||
|
|
||||||
if with_docs {
|
|
||||||
let docs_path = match prefix {
|
|
||||||
None => "/docs".into(),
|
|
||||||
Some(prefix) => format!("{prefix}/docs"),
|
|
||||||
};
|
|
||||||
|
|
||||||
let api = Api::openapi();
|
|
||||||
let api = match prefix {
|
|
||||||
None => api,
|
|
||||||
Some(prefix) => utoipa::openapi::OpenApi::default().nest(prefix, api),
|
|
||||||
};
|
|
||||||
|
|
||||||
let docs =
|
|
||||||
SwaggerUi::new(docs_path.clone()).url(format!("{}/openapi.json", docs_path), api);
|
|
||||||
|
|
||||||
router = router.merge(docs);
|
|
||||||
}
|
|
||||||
router
|
|
||||||
}
|
|
||||||
}
|
|
||||||
28
crates/pile-serve/Cargo.toml
Normal file
28
crates/pile-serve/Cargo.toml
Normal file
@@ -0,0 +1,28 @@
|
|||||||
|
[package]
|
||||||
|
name = "pile-serve"
|
||||||
|
version = { workspace = true }
|
||||||
|
rust-version = { workspace = true }
|
||||||
|
edition = { workspace = true }
|
||||||
|
|
||||||
|
[lints]
|
||||||
|
workspace = true
|
||||||
|
|
||||||
|
[dependencies]
|
||||||
|
pile-config = { workspace = true }
|
||||||
|
pile-value = { workspace = true }
|
||||||
|
pile-dataset = { workspace = true }
|
||||||
|
|
||||||
|
serde_json = { workspace = true }
|
||||||
|
tracing = { workspace = true }
|
||||||
|
tokio = { workspace = true }
|
||||||
|
tokio-util = { version = "0.7", features = ["io"] }
|
||||||
|
|
||||||
|
serde = { workspace = true }
|
||||||
|
axum = { workspace = true }
|
||||||
|
percent-encoding = { workspace = true }
|
||||||
|
utoipa = { workspace = true }
|
||||||
|
utoipa-swagger-ui = { workspace = true }
|
||||||
|
|
||||||
|
[features]
|
||||||
|
default = []
|
||||||
|
pdfium = ["pile-value/pdfium"]
|
||||||
@@ -4,12 +4,11 @@ use axum::{
|
|||||||
http::StatusCode,
|
http::StatusCode,
|
||||||
response::{IntoResponse, Response},
|
response::{IntoResponse, Response},
|
||||||
};
|
};
|
||||||
|
use pile_dataset::Datasets;
|
||||||
use std::{collections::HashMap, sync::Arc};
|
use std::{collections::HashMap, sync::Arc};
|
||||||
|
|
||||||
pub use pile_config::FieldSpec;
|
pub use pile_config::FieldSpec;
|
||||||
|
|
||||||
use crate::Datasets;
|
|
||||||
|
|
||||||
pub type FieldsResponse = HashMap<String, FieldSpec>;
|
pub type FieldsResponse = HashMap<String, FieldSpec>;
|
||||||
|
|
||||||
/// Retrieve this dataset's schema.
|
/// Retrieve this dataset's schema.
|
||||||
@@ -7,6 +7,7 @@ use axum::{
|
|||||||
};
|
};
|
||||||
use percent_encoding::percent_decode_str;
|
use percent_encoding::percent_decode_str;
|
||||||
use pile_config::{Label, objectpath::ObjectPath};
|
use pile_config::{Label, objectpath::ObjectPath};
|
||||||
|
use pile_dataset::Datasets;
|
||||||
use pile_value::{
|
use pile_value::{
|
||||||
extract::traits::ExtractState,
|
extract::traits::ExtractState,
|
||||||
value::{BinaryPileValue, PileValue},
|
value::{BinaryPileValue, PileValue},
|
||||||
@@ -17,8 +18,6 @@ use tokio_util::io::ReaderStream;
|
|||||||
use tracing::debug;
|
use tracing::debug;
|
||||||
use utoipa::ToSchema;
|
use utoipa::ToSchema;
|
||||||
|
|
||||||
use crate::Datasets;
|
|
||||||
|
|
||||||
#[derive(Deserialize, ToSchema)]
|
#[derive(Deserialize, ToSchema)]
|
||||||
pub struct ExtractQuery {
|
pub struct ExtractQuery {
|
||||||
source: String,
|
source: String,
|
||||||
@@ -101,17 +100,24 @@ pub async fn get_extract(
|
|||||||
let mut value = None;
|
let mut value = None;
|
||||||
for path in &paths {
|
for path in &paths {
|
||||||
match item.query(&extract_state, path).await {
|
match item.query(&extract_state, path).await {
|
||||||
Ok(Some(PileValue::Null)) | Ok(None) => continue,
|
Ok(None) => continue,
|
||||||
|
|
||||||
|
Ok(Some(PileValue::Null)) => {
|
||||||
|
value = Some(PileValue::Null);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
Ok(Some(v)) => {
|
Ok(Some(v)) => {
|
||||||
value = Some(v);
|
value = Some(v);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
Err(e) => return (StatusCode::INTERNAL_SERVER_ERROR, format!("{e:?}")).into_response(),
|
Err(e) => return (StatusCode::INTERNAL_SERVER_ERROR, format!("{e:?}")).into_response(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
let Some(value) = value else {
|
let Some(value) = value else {
|
||||||
return StatusCode::NOT_FOUND.into_response();
|
return (StatusCode::BAD_REQUEST, "no value").into_response();
|
||||||
};
|
};
|
||||||
|
|
||||||
debug!(
|
debug!(
|
||||||
@@ -177,6 +183,7 @@ pub async fn get_extract(
|
|||||||
Json(json),
|
Json(json),
|
||||||
)
|
)
|
||||||
.into_response(),
|
.into_response(),
|
||||||
|
|
||||||
Err(e) => (StatusCode::INTERNAL_SERVER_ERROR, format!("{e:?}")).into_response(),
|
Err(e) => (StatusCode::INTERNAL_SERVER_ERROR, format!("{e:?}")).into_response(),
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
@@ -4,13 +4,12 @@ use axum::{
|
|||||||
http::StatusCode,
|
http::StatusCode,
|
||||||
response::{IntoResponse, Response},
|
response::{IntoResponse, Response},
|
||||||
};
|
};
|
||||||
|
use pile_dataset::Datasets;
|
||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use tracing::debug;
|
use tracing::debug;
|
||||||
use utoipa::ToSchema;
|
use utoipa::ToSchema;
|
||||||
|
|
||||||
use crate::Datasets;
|
|
||||||
|
|
||||||
#[derive(Deserialize, ToSchema)]
|
#[derive(Deserialize, ToSchema)]
|
||||||
pub struct ItemsQuery {
|
pub struct ItemsQuery {
|
||||||
#[serde(default)]
|
#[serde(default)]
|
||||||
89
crates/pile-serve/src/lib.rs
Normal file
89
crates/pile-serve/src/lib.rs
Normal file
@@ -0,0 +1,89 @@
|
|||||||
|
use axum::{
|
||||||
|
Router,
|
||||||
|
routing::{get, post},
|
||||||
|
};
|
||||||
|
use pile_dataset::Datasets;
|
||||||
|
use std::sync::Arc;
|
||||||
|
use utoipa::OpenApi;
|
||||||
|
use utoipa_swagger_ui::SwaggerUi;
|
||||||
|
|
||||||
|
mod lookup;
|
||||||
|
pub use lookup::*;
|
||||||
|
|
||||||
|
mod extract;
|
||||||
|
pub use extract::*;
|
||||||
|
|
||||||
|
mod items;
|
||||||
|
pub use items::*;
|
||||||
|
|
||||||
|
mod config_schema;
|
||||||
|
pub use config_schema::*;
|
||||||
|
|
||||||
|
mod schema_field;
|
||||||
|
pub use schema_field::*;
|
||||||
|
|
||||||
|
mod schema;
|
||||||
|
pub use schema::*;
|
||||||
|
|
||||||
|
#[derive(OpenApi)]
|
||||||
|
#[openapi(
|
||||||
|
tags(),
|
||||||
|
paths(
|
||||||
|
lookup,
|
||||||
|
get_extract,
|
||||||
|
items_list,
|
||||||
|
config_schema,
|
||||||
|
schema_field,
|
||||||
|
schema_all
|
||||||
|
),
|
||||||
|
components(schemas(
|
||||||
|
LookupRequest,
|
||||||
|
LookupResponse,
|
||||||
|
LookupResult,
|
||||||
|
ExtractQuery,
|
||||||
|
ItemsQuery,
|
||||||
|
ItemsResponse,
|
||||||
|
ItemRef
|
||||||
|
))
|
||||||
|
)]
|
||||||
|
pub(crate) struct Api;
|
||||||
|
|
||||||
|
#[inline]
|
||||||
|
pub fn router(ds: Arc<Datasets>, with_docs: bool) -> Router<()> {
|
||||||
|
router_prefix(ds, with_docs, None)
|
||||||
|
}
|
||||||
|
|
||||||
|
#[inline]
|
||||||
|
pub fn router_prefix(ds: Arc<Datasets>, with_docs: bool, prefix: Option<&str>) -> Router<()> {
|
||||||
|
let mut router = Router::new()
|
||||||
|
.route("/lookup", post(lookup))
|
||||||
|
.route("/extract", get(get_extract))
|
||||||
|
.route("/items", get(items_list))
|
||||||
|
.route("/config/schema", get(config_schema))
|
||||||
|
.route("/schema", get(schema_all))
|
||||||
|
.route("/schema/{field}", get(schema_field))
|
||||||
|
.with_state(ds.clone());
|
||||||
|
|
||||||
|
if let Some(prefix) = prefix {
|
||||||
|
router = Router::new().nest(prefix, router);
|
||||||
|
}
|
||||||
|
|
||||||
|
if with_docs {
|
||||||
|
let docs_path = match prefix {
|
||||||
|
None => "/docs".into(),
|
||||||
|
Some(prefix) => format!("{prefix}/docs"),
|
||||||
|
};
|
||||||
|
|
||||||
|
let api = Api::openapi();
|
||||||
|
let api = match prefix {
|
||||||
|
None => api,
|
||||||
|
Some(prefix) => utoipa::openapi::OpenApi::default().nest(prefix, api),
|
||||||
|
};
|
||||||
|
|
||||||
|
let docs =
|
||||||
|
SwaggerUi::new(docs_path.clone()).url(format!("{}/openapi.json", docs_path), api);
|
||||||
|
|
||||||
|
router = router.merge(docs);
|
||||||
|
}
|
||||||
|
router
|
||||||
|
}
|
||||||
@@ -4,13 +4,12 @@ use axum::{
|
|||||||
http::StatusCode,
|
http::StatusCode,
|
||||||
response::{IntoResponse, Response},
|
response::{IntoResponse, Response},
|
||||||
};
|
};
|
||||||
|
use pile_dataset::Datasets;
|
||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
use std::{sync::Arc, time::Instant};
|
use std::{sync::Arc, time::Instant};
|
||||||
use tracing::debug;
|
use tracing::debug;
|
||||||
use utoipa::ToSchema;
|
use utoipa::ToSchema;
|
||||||
|
|
||||||
use crate::Datasets;
|
|
||||||
|
|
||||||
#[derive(Serialize, Deserialize, ToSchema, Debug)]
|
#[derive(Serialize, Deserialize, ToSchema, Debug)]
|
||||||
pub struct LookupRequest {
|
pub struct LookupRequest {
|
||||||
pub query: String,
|
pub query: String,
|
||||||
@@ -5,13 +5,12 @@ use axum::{
|
|||||||
response::{IntoResponse, Response},
|
response::{IntoResponse, Response},
|
||||||
};
|
};
|
||||||
use pile_config::Label;
|
use pile_config::Label;
|
||||||
|
use pile_dataset::Datasets;
|
||||||
use pile_value::{extract::traits::ExtractState, value::PileValue};
|
use pile_value::{extract::traits::ExtractState, value::PileValue};
|
||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
use std::{collections::HashMap, sync::Arc};
|
use std::{collections::HashMap, sync::Arc};
|
||||||
use utoipa::IntoParams;
|
use utoipa::IntoParams;
|
||||||
|
|
||||||
use crate::Datasets;
|
|
||||||
|
|
||||||
#[derive(Deserialize, IntoParams)]
|
#[derive(Deserialize, IntoParams)]
|
||||||
pub struct SchemaQuery {
|
pub struct SchemaQuery {
|
||||||
source: String,
|
source: String,
|
||||||
@@ -21,7 +20,7 @@ pub struct SchemaQuery {
|
|||||||
hidden: bool,
|
hidden: bool,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Serialize, Deserialize)]
|
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||||
#[serde(untagged)]
|
#[serde(untagged)]
|
||||||
pub enum ApiValue {
|
pub enum ApiValue {
|
||||||
Binary { binary: bool, mime: String },
|
Binary { binary: bool, mime: String },
|
||||||
@@ -6,6 +6,7 @@ use axum::{
|
|||||||
response::{IntoResponse, Response},
|
response::{IntoResponse, Response},
|
||||||
};
|
};
|
||||||
use pile_config::Label;
|
use pile_config::Label;
|
||||||
|
use pile_dataset::Datasets;
|
||||||
use pile_value::{
|
use pile_value::{
|
||||||
extract::traits::ExtractState,
|
extract::traits::ExtractState,
|
||||||
value::{BinaryPileValue, PileValue},
|
value::{BinaryPileValue, PileValue},
|
||||||
@@ -16,8 +17,6 @@ use tokio_util::io::ReaderStream;
|
|||||||
use tracing::debug;
|
use tracing::debug;
|
||||||
use utoipa::IntoParams;
|
use utoipa::IntoParams;
|
||||||
|
|
||||||
use crate::Datasets;
|
|
||||||
|
|
||||||
#[derive(Deserialize, IntoParams)]
|
#[derive(Deserialize, IntoParams)]
|
||||||
pub struct SchemaFieldQuery {
|
pub struct SchemaFieldQuery {
|
||||||
source: String,
|
source: String,
|
||||||
@@ -84,17 +83,24 @@ pub async fn schema_field(
|
|||||||
let mut value = None;
|
let mut value = None;
|
||||||
for path in paths {
|
for path in paths {
|
||||||
match item.query(&extract_state, path).await {
|
match item.query(&extract_state, path).await {
|
||||||
Ok(Some(PileValue::Null)) | Ok(None) => continue,
|
Ok(None) => continue,
|
||||||
|
|
||||||
|
Ok(Some(PileValue::Null)) => {
|
||||||
|
value = Some(PileValue::Null);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
Ok(Some(v)) => {
|
Ok(Some(v)) => {
|
||||||
value = Some(v);
|
value = Some(v);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
Err(e) => return (StatusCode::INTERNAL_SERVER_ERROR, format!("{e:?}")).into_response(),
|
Err(e) => return (StatusCode::INTERNAL_SERVER_ERROR, format!("{e:?}")).into_response(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
let Some(value) = value else {
|
let Some(value) = value else {
|
||||||
return StatusCode::NOT_FOUND.into_response();
|
return (StatusCode::BAD_REQUEST, "no value").into_response();
|
||||||
};
|
};
|
||||||
|
|
||||||
debug!(
|
debug!(
|
||||||
@@ -21,6 +21,9 @@ toml = { workspace = true }
|
|||||||
smartstring = { workspace = true }
|
smartstring = { workspace = true }
|
||||||
regex = { workspace = true }
|
regex = { workspace = true }
|
||||||
blake3 = { workspace = true }
|
blake3 = { workspace = true }
|
||||||
|
sha2 = { workspace = true }
|
||||||
|
sha1 = { workspace = true }
|
||||||
|
md5 = { workspace = true }
|
||||||
epub = { workspace = true }
|
epub = { workspace = true }
|
||||||
kamadak-exif = { workspace = true }
|
kamadak-exif = { workspace = true }
|
||||||
pdf = { workspace = true }
|
pdf = { workspace = true }
|
||||||
|
|||||||
111
crates/pile-value/src/extract/blob/hash.rs
Normal file
111
crates/pile-value/src/extract/blob/hash.rs
Normal file
@@ -0,0 +1,111 @@
|
|||||||
|
use crate::{
|
||||||
|
extract::traits::{ExtractState, ObjectExtractor},
|
||||||
|
value::{BinaryPileValue, PileValue},
|
||||||
|
};
|
||||||
|
use pile_config::Label;
|
||||||
|
use pile_io::SyncReadBridge;
|
||||||
|
use std::{io::Read, sync::Arc};
|
||||||
|
use tokio::sync::OnceCell;
|
||||||
|
|
||||||
|
fn to_hex(bytes: &[u8]) -> String {
|
||||||
|
bytes.iter().map(|b| format!("{b:02x}")).collect()
|
||||||
|
}
|
||||||
|
|
||||||
|
macro_rules! hash_algos {
|
||||||
|
($($name:ident),* $(,)?) => {
|
||||||
|
pub struct HashExtractor {
|
||||||
|
item: BinaryPileValue,
|
||||||
|
$($name: OnceCell<String>,)*
|
||||||
|
}
|
||||||
|
|
||||||
|
impl HashExtractor {
|
||||||
|
pub fn new(item: &BinaryPileValue) -> Self {
|
||||||
|
Self {
|
||||||
|
item: item.clone(),
|
||||||
|
$($name: OnceCell::new(),)*
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
static LABELS: std::sync::LazyLock<Vec<Label>> = std::sync::LazyLock::new(|| {
|
||||||
|
vec![$(Label::new(stringify!($name)).unwrap()),*]
|
||||||
|
});
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
hash_algos!(blake3, md5, sha1, sha224, sha256, sha384, sha512);
|
||||||
|
|
||||||
|
impl HashExtractor {
|
||||||
|
async fn compute(&self, name: &Label) -> Result<Option<String>, std::io::Error> {
|
||||||
|
let name_str = name.as_ref();
|
||||||
|
|
||||||
|
macro_rules! algo {
|
||||||
|
($cell:ident, $compute:expr) => {
|
||||||
|
if name_str == stringify!($cell) {
|
||||||
|
return Ok(Some(
|
||||||
|
self.$cell
|
||||||
|
.get_or_try_init(|| async {
|
||||||
|
let read = self.item.read().await?;
|
||||||
|
let mut read = SyncReadBridge::new_current(read);
|
||||||
|
tokio::task::spawn_blocking(move || {
|
||||||
|
let mut bytes = Vec::new();
|
||||||
|
read.read_to_end(&mut bytes)?;
|
||||||
|
Ok::<String, std::io::Error>($compute(&bytes))
|
||||||
|
})
|
||||||
|
.await?
|
||||||
|
})
|
||||||
|
.await?
|
||||||
|
.clone(),
|
||||||
|
));
|
||||||
|
}
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
algo!(blake3, |b: &Vec<u8>| blake3::hash(b).to_hex().to_string());
|
||||||
|
algo!(md5, |b: &Vec<u8>| format!("{:x}", md5::compute(b)));
|
||||||
|
algo!(sha1, |b: &Vec<u8>| {
|
||||||
|
use sha1::Digest;
|
||||||
|
to_hex(sha1::Sha1::digest(b).as_ref())
|
||||||
|
});
|
||||||
|
algo!(sha224, |b: &Vec<u8>| {
|
||||||
|
use sha2::Digest;
|
||||||
|
to_hex(sha2::Sha224::digest(b).as_ref())
|
||||||
|
});
|
||||||
|
algo!(sha256, |b: &Vec<u8>| {
|
||||||
|
use sha2::Digest;
|
||||||
|
to_hex(sha2::Sha256::digest(b).as_ref())
|
||||||
|
});
|
||||||
|
algo!(sha384, |b: &Vec<u8>| {
|
||||||
|
use sha2::Digest;
|
||||||
|
to_hex(sha2::Sha384::digest(b).as_ref())
|
||||||
|
});
|
||||||
|
algo!(sha512, |b: &Vec<u8>| {
|
||||||
|
use sha2::Digest;
|
||||||
|
to_hex(sha2::Sha512::digest(b).as_ref())
|
||||||
|
});
|
||||||
|
|
||||||
|
Ok(None)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[async_trait::async_trait]
|
||||||
|
impl ObjectExtractor for HashExtractor {
|
||||||
|
async fn field(
|
||||||
|
&self,
|
||||||
|
_state: &ExtractState,
|
||||||
|
name: &Label,
|
||||||
|
args: Option<&str>,
|
||||||
|
) -> Result<Option<PileValue>, std::io::Error> {
|
||||||
|
if args.is_some() {
|
||||||
|
return Ok(None);
|
||||||
|
}
|
||||||
|
Ok(self
|
||||||
|
.compute(name)
|
||||||
|
.await?
|
||||||
|
.map(|s| PileValue::String(Arc::new(s.into()))))
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn fields(&self) -> Result<Vec<Label>, std::io::Error> {
|
||||||
|
Ok(LABELS.clone())
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -31,6 +31,9 @@ pub use text::*;
|
|||||||
mod image;
|
mod image;
|
||||||
pub use image::*;
|
pub use image::*;
|
||||||
|
|
||||||
|
mod hash;
|
||||||
|
pub use hash::*;
|
||||||
|
|
||||||
use crate::{
|
use crate::{
|
||||||
extract::{
|
extract::{
|
||||||
misc::MapExtractor,
|
misc::MapExtractor,
|
||||||
@@ -85,6 +88,10 @@ impl BinaryExtractor {
|
|||||||
Label::new("text").unwrap(),
|
Label::new("text").unwrap(),
|
||||||
PileValue::ObjectExtractor(Arc::new(TextExtractor::new(item))),
|
PileValue::ObjectExtractor(Arc::new(TextExtractor::new(item))),
|
||||||
),
|
),
|
||||||
|
(
|
||||||
|
Label::new("hash").unwrap(),
|
||||||
|
PileValue::ObjectExtractor(Arc::new(HashExtractor::new(item))),
|
||||||
|
),
|
||||||
]),
|
]),
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|||||||
@@ -1,6 +1,5 @@
|
|||||||
use mime::Mime;
|
use mime::Mime;
|
||||||
use pile_config::objectpath::{ObjectPath, PathSegment};
|
use pile_config::objectpath::{ObjectPath, PathSegment};
|
||||||
use pile_io::SyncReadBridge;
|
|
||||||
use serde_json::{Map, Value};
|
use serde_json::{Map, Value};
|
||||||
use smartstring::{LazyCompact, SmartString};
|
use smartstring::{LazyCompact, SmartString};
|
||||||
use std::{fmt::Debug, fs::File, io::Cursor, path::PathBuf, sync::Arc};
|
use std::{fmt::Debug, fs::File, io::Cursor, path::PathBuf, sync::Arc};
|
||||||
@@ -50,18 +49,6 @@ impl BinaryPileValue {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn hash(&self) -> Result<blake3::Hash, std::io::Error> {
|
|
||||||
let read = self.read().await?;
|
|
||||||
let mut read = SyncReadBridge::new_current(read);
|
|
||||||
let out = tokio::task::spawn_blocking(move || {
|
|
||||||
let mut hasher = blake3::Hasher::new();
|
|
||||||
std::io::copy(&mut read, &mut hasher)?;
|
|
||||||
return Ok::<_, std::io::Error>(hasher.finalize());
|
|
||||||
})
|
|
||||||
.await??;
|
|
||||||
return Ok(out);
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn mime(&self) -> &Mime {
|
pub fn mime(&self) -> &Mime {
|
||||||
match self {
|
match self {
|
||||||
Self::Blob { mime, .. } => mime,
|
Self::Blob { mime, .. } => mime,
|
||||||
|
|||||||
@@ -9,8 +9,9 @@ workspace = true
|
|||||||
|
|
||||||
[dependencies]
|
[dependencies]
|
||||||
pile-toolbox = { workspace = true }
|
pile-toolbox = { workspace = true }
|
||||||
pile-dataset = { workspace = true, features = ["axum", "pdfium"] }
|
pile-dataset = { workspace = true }
|
||||||
pile-value = { workspace = true, features = ["pdfium"] }
|
pile-serve = { workspace = true }
|
||||||
|
pile-value = { workspace = true }
|
||||||
pile-config = { workspace = true }
|
pile-config = { workspace = true }
|
||||||
|
|
||||||
tracing = { workspace = true }
|
tracing = { workspace = true }
|
||||||
@@ -34,3 +35,7 @@ base64 = { workspace = true }
|
|||||||
dotenvy = { workspace = true }
|
dotenvy = { workspace = true }
|
||||||
envy = { workspace = true }
|
envy = { workspace = true }
|
||||||
thiserror = { workspace = true }
|
thiserror = { workspace = true }
|
||||||
|
|
||||||
|
[features]
|
||||||
|
default = ["pdfium"]
|
||||||
|
pdfium = ["pile-dataset/pdfium", "pile-serve/pdfium", "pile-value/pdfium"]
|
||||||
|
|||||||
@@ -57,8 +57,7 @@ impl CliCmd for ServeCommand {
|
|||||||
})?;
|
})?;
|
||||||
}
|
}
|
||||||
|
|
||||||
let app = Arc::new(ds)
|
let app = pile_serve::router(Arc::new(ds), true)
|
||||||
.router(true)
|
|
||||||
.into_make_service_with_connect_info::<std::net::SocketAddr>();
|
.into_make_service_with_connect_info::<std::net::SocketAddr>();
|
||||||
|
|
||||||
let listener = match tokio::net::TcpListener::bind(self.addr.clone()).await {
|
let listener = match tokio::net::TcpListener::bind(self.addr.clone()).await {
|
||||||
|
|||||||
@@ -8,10 +8,11 @@ use axum::{
|
|||||||
routing::get,
|
routing::get,
|
||||||
};
|
};
|
||||||
use clap::Args;
|
use clap::Args;
|
||||||
use pile_dataset::Datasets;
|
use pile_dataset::{DatasetError, Datasets};
|
||||||
use pile_toolbox::cancelabletask::{CancelFlag, CancelableTaskError};
|
use pile_toolbox::cancelabletask::{CancelFlag, CancelableTaskError};
|
||||||
|
use pile_value::extract::traits::ExtractState;
|
||||||
use serde::Serialize;
|
use serde::Serialize;
|
||||||
use std::{fmt::Debug, path::PathBuf, sync::Arc};
|
use std::{fmt::Debug, path::PathBuf, sync::Arc, time::Duration};
|
||||||
use tracing::{error, info};
|
use tracing::{error, info};
|
||||||
use utoipa::{OpenApi, ToSchema};
|
use utoipa::{OpenApi, ToSchema};
|
||||||
use utoipa_swagger_ui::SwaggerUi;
|
use utoipa_swagger_ui::SwaggerUi;
|
||||||
@@ -27,6 +28,18 @@ pub struct ServerCommand {
|
|||||||
/// If provided, do not serve docs
|
/// If provided, do not serve docs
|
||||||
#[arg(long)]
|
#[arg(long)]
|
||||||
no_docs: bool,
|
no_docs: bool,
|
||||||
|
|
||||||
|
/// If provided, never auto-refresh indices
|
||||||
|
#[arg(long)]
|
||||||
|
no_refresh: bool,
|
||||||
|
|
||||||
|
/// Number of threads to use to refresh indices
|
||||||
|
#[arg(long, default_value = "5")]
|
||||||
|
refresh_jobs: usize,
|
||||||
|
|
||||||
|
/// Refresh indices every `n` seconds
|
||||||
|
#[arg(long, default_value = "300")]
|
||||||
|
refresh_delay: usize,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl CliCmd for ServerCommand {
|
impl CliCmd for ServerCommand {
|
||||||
@@ -47,12 +60,57 @@ impl CliCmd for ServerCommand {
|
|||||||
Arc::new(datasets)
|
Arc::new(datasets)
|
||||||
};
|
};
|
||||||
|
|
||||||
|
// Start auto-refresh task
|
||||||
|
if !self.no_refresh {
|
||||||
|
let datasets = datasets.clone();
|
||||||
|
let jobs = self.refresh_jobs.max(1);
|
||||||
|
let delay = self.refresh_delay.max(1);
|
||||||
|
|
||||||
|
async fn refresh_dataset(ds: &Datasets, jobs: usize) -> Result<(), DatasetError> {
|
||||||
|
if ds.needs_fts().await? {
|
||||||
|
let state = ExtractState { ignore_mime: false };
|
||||||
|
match ds.fts_refresh(&state, jobs, None).await {
|
||||||
|
Ok(()) => {}
|
||||||
|
Err(CancelableTaskError::Error(err)) => return Err(err),
|
||||||
|
Err(CancelableTaskError::Cancelled) => unreachable!(),
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
return Ok(());
|
||||||
|
}
|
||||||
|
|
||||||
|
tokio::task::spawn(async move {
|
||||||
|
loop {
|
||||||
|
for ds in datasets.iter() {
|
||||||
|
match refresh_dataset(ds, jobs).await {
|
||||||
|
Ok(x) => x,
|
||||||
|
Err(error) => {
|
||||||
|
error!(
|
||||||
|
message = "Error while refreshing dataset",
|
||||||
|
dataset = ds.config.dataset.name.as_str(),
|
||||||
|
?error
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
tokio::time::sleep(Duration::from_secs(10)).await;
|
||||||
|
}
|
||||||
|
|
||||||
|
tokio::time::sleep(Duration::from_secs(delay as u64)).await;
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
let bearer = BearerToken(ctx.config.api_token.clone().map(Arc::new));
|
let bearer = BearerToken(ctx.config.api_token.clone().map(Arc::new));
|
||||||
|
|
||||||
let mut router = Router::new();
|
let mut router = Router::new();
|
||||||
for d in datasets.iter() {
|
for d in datasets.iter() {
|
||||||
let prefix = format!("/{}", d.config.dataset.name);
|
let prefix = format!("/{}", d.config.dataset.name);
|
||||||
router = router.merge(d.clone().router_prefix(!self.no_docs, Some(&prefix)))
|
router = router.merge(pile_serve::router_prefix(
|
||||||
|
d.clone(),
|
||||||
|
!self.no_docs,
|
||||||
|
Some(&prefix),
|
||||||
|
))
|
||||||
}
|
}
|
||||||
|
|
||||||
router = router.merge(
|
router = router.merge(
|
||||||
|
|||||||
@@ -1,6 +1,6 @@
|
|||||||
use serde::Deserialize;
|
use serde::Deserialize;
|
||||||
use std::{num::NonZeroUsize, path::PathBuf};
|
use std::{num::NonZeroUsize, path::PathBuf};
|
||||||
use tracing::info;
|
use tracing::debug;
|
||||||
|
|
||||||
use crate::config::{
|
use crate::config::{
|
||||||
env::load_env,
|
env::load_env,
|
||||||
@@ -89,7 +89,7 @@ impl PileServerConfig {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
info!(message = "Config loaded", ?config);
|
debug!(message = "Config loaded", ?config);
|
||||||
|
|
||||||
return config;
|
return config;
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user