Compare commits

...

6 Commits

Author SHA1 Message Date
450ea7aa86 Typos
Some checks failed
CI / Typos (push) Successful in 30s
CI / Clippy (push) Successful in 1m39s
CI / Build and test (push) Successful in 2m30s
CI / Build and test (all features) (push) Successful in 6m16s
Docker / build-and-push (push) Failing after 58s
2026-04-15 09:29:06 -07:00
3bc66ddc48 Split server into crate
Some checks failed
CI / Typos (push) Failing after 17s
CI / Build and test (push) Successful in 1m41s
CI / Clippy (push) Has been cancelled
CI / Build and test (all features) (push) Has been cancelled
Docker / build-and-push (push) Has been cancelled
2026-04-15 09:23:52 -07:00
251d130987 Tweak status codes
Some checks failed
CI / Clippy (push) Successful in 1m2s
CI / Typos (push) Failing after 1m10s
Docker / build-and-push (push) Failing after 2m21s
CI / Build and test (all features) (push) Successful in 3m27s
CI / Build and test (push) Successful in 6m7s
2026-04-03 12:35:01 -07:00
0281a33f86 Client tweaks
Some checks failed
CI / Typos (push) Failing after 20s
CI / Build and test (push) Successful in 1m47s
CI / Clippy (push) Successful in 2m25s
Docker / build-and-push (push) Successful in 4m4s
CI / Build and test (all features) (push) Successful in 6m44s
2026-04-03 09:01:51 -07:00
d3ab2684f4 Auto-refresh
Some checks failed
CI / Typos (push) Failing after 31s
CI / Clippy (push) Successful in 58s
Docker / build-and-push (push) Failing after 3m13s
CI / Build and test (all features) (push) Successful in 3m33s
CI / Build and test (push) Successful in 4m3s
2026-04-03 08:57:43 -07:00
4d4e9c93a2 Add hash extractor 2026-04-03 08:57:37 -07:00
25 changed files with 396 additions and 185 deletions

42
Cargo.lock generated
View File

@@ -1994,6 +1994,7 @@ dependencies = [
"indicatif",
"pile-config",
"pile-dataset",
"pile-serve",
"pile-toolbox",
"pile-value",
"serde",
@@ -2016,8 +2017,7 @@ version = "0.0.2"
dependencies = [
"axum",
"bytes",
"futures-core",
"pile-dataset",
"pile-serve",
"reqwest",
"serde",
"thiserror",
@@ -2038,23 +2038,17 @@ dependencies = [
name = "pile-dataset"
version = "0.0.2"
dependencies = [
"axum",
"chrono",
"percent-encoding",
"pile-config",
"pile-toolbox",
"pile-value",
"regex",
"serde",
"serde_json",
"tantivy",
"thiserror",
"tokio",
"tokio-util",
"toml",
"tracing",
"utoipa",
"utoipa-swagger-ui",
]
[[package]]
@@ -2078,6 +2072,24 @@ dependencies = [
"tokio",
]
[[package]]
name = "pile-serve"
version = "0.0.2"
dependencies = [
"axum",
"percent-encoding",
"pile-config",
"pile-dataset",
"pile-value",
"serde",
"serde_json",
"tokio",
"tokio-util",
"tracing",
"utoipa",
"utoipa-swagger-ui",
]
[[package]]
name = "pile-toolbox"
version = "0.0.2"
@@ -2098,6 +2110,7 @@ dependencies = [
"id3",
"image",
"kamadak-exif",
"md5",
"mime",
"mime_guess",
"pdf",
@@ -2109,6 +2122,8 @@ dependencies = [
"reqwest",
"serde",
"serde_json",
"sha1",
"sha2 0.11.0-rc.5",
"smartstring",
"strum",
"tokio",
@@ -2654,6 +2669,17 @@ dependencies = [
"serde",
]
[[package]]
name = "sha1"
version = "0.10.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e3bf829a2d51ab4a5ddf1352d8470c140cadc8301b2ae1789db023f01cedd6ba"
dependencies = [
"cfg-if",
"cpufeatures",
"digest 0.10.7",
]
[[package]]
name = "sha2"
version = "0.10.9"

View File

@@ -71,6 +71,7 @@ pile-dataset = { path = "crates/pile-dataset" }
pile-value = { path = "crates/pile-value" }
pile-io = { path = "crates/pile-io" }
pile-client = { path = "crates/pile-client" }
pile-serve = { path = "crates/pile-serve" }
# MARK: Clients & servers
tantivy = "0.25.0"
@@ -87,7 +88,7 @@ utoipa-swagger-ui = { version = "9.0.2", features = [
"debug-embed",
"vendored",
] }
reqwest = { version = "0.12", features = ["blocking"] }
reqwest = { version = "0.12", features = ["blocking", "json", "stream"] }
tracing-loki = "0.2.6"
# MARK: Async & Parallelism
@@ -111,6 +112,8 @@ bytes = "1"
toml = "1.0.3"
toml_edit = "0.25.4"
sha2 = "0.11.0-rc.5"
sha1 = "0.10"
md5 = "0.7"
blake3 = "1.8.3"
dotenvy = "0.15.7"
envy = "0.4.2"

View File

@@ -8,10 +8,9 @@ edition = { workspace = true }
workspace = true
[dependencies]
pile-dataset = { workspace = true, features = ["axum"] }
pile-serve = { workspace = true }
reqwest = { version = "0.12", features = ["json", "stream"] }
futures-core = "0.3"
reqwest = { workspace = true }
serde = { workspace = true }
thiserror = { workspace = true }
bytes = { workspace = true }

View File

@@ -3,14 +3,12 @@ use axum::{
routing::any,
};
use bytes::Bytes;
use futures_core::Stream;
use reqwest::{Client, StatusCode, header};
use serde::Deserialize;
use std::pin::Pin;
use thiserror::Error;
use tracing::{trace, warn};
pub use pile_dataset::serve::{
pub use pile_serve::{
ApiValue, FieldSpec, FieldsResponse, ItemsResponse, LookupRequest, LookupResponse,
SchemaResponse,
};
@@ -120,26 +118,6 @@ impl DatasetClient {
check_status(resp).await?.json().await.map_err(Into::into)
}
/// `GET /item` — stream the raw bytes of an item.
///
/// The returned stream yields chunks as they arrive from the server.
pub async fn get_item(
&self,
source: &str,
key: &str,
) -> Result<Pin<Box<dyn Stream<Item = Result<Bytes, reqwest::Error>> + Send>>, ClientError> {
let url = format!("{}/item", self.base_url);
trace!(url, source, key, "GET /item");
let resp = self
.client
.get(url)
.query(&[("source", source), ("key", key)])
.send()
.await?;
Ok(Box::pin(check_status(resp).await?.bytes_stream()))
}
/// `GET /extract` — extract a field from an item by object path (e.g. `$.flac.title`).
pub async fn get_extract(
&self,

View File

@@ -67,7 +67,7 @@ pub enum Source {
base_pattern: String,
/// Map of files included in each item.'
/// `{base}` is replaced with the string extraced by base_pattern.
/// `{base}` is replaced with the string extracted by base_pattern.
/// Default is `{ item: "{base}" }`
#[serde(default = "default_files")]
files: HashMap<Label, String>,

View File

@@ -20,21 +20,7 @@ chrono = { workspace = true }
toml = { workspace = true }
thiserror = { workspace = true }
tokio = { workspace = true }
tokio-util = { version = "0.7", features = ["io"] }
serde = { workspace = true, optional = true }
axum = { workspace = true, optional = true }
percent-encoding = { workspace = true, optional = true }
utoipa = { workspace = true, optional = true }
utoipa-swagger-ui = { workspace = true, optional = true }
[features]
default = []
pdfium = ["pile-value/pdfium"]
axum = [
"dep:axum",
"dep:utoipa",
"dep:utoipa-swagger-ui",
"dep:serde",
"dep:percent-encoding",
]

View File

@@ -302,6 +302,7 @@ impl Datasets {
_threads: usize,
flag: Option<CancelFlag>,
) -> Result<(), CancelableTaskError<DatasetError>> {
let start = Instant::now();
let workdir = match self.path_workdir.as_ref() {
Some(x) => x,
None => {
@@ -313,6 +314,14 @@ impl Datasets {
let fts_tmp_dir = workdir.join(".tmp-fts");
let fts_dir = workdir.join("fts");
debug!(
message = "Rebuilding fts index",
dataset = self.config.dataset.name.as_str(),
?fts_dir,
?fts_tmp_dir,
?workdir
);
if fts_tmp_dir.is_dir() {
warn!("Removing temporary index in {}", fts_dir.display());
std::fs::remove_dir_all(&fts_tmp_dir).map_err(DatasetError::from)?;
@@ -392,9 +401,18 @@ impl Datasets {
return Err(CancelableTaskError::Cancelled);
}
info!("Committing {total} documents");
index_writer.commit().map_err(DatasetError::from)?;
debug!(
message = "Rebuilt fts index",
dataset = self.config.dataset.name.as_str(),
?fts_dir,
?fts_tmp_dir,
?workdir,
n_docs = total,
time_ms = start.elapsed().as_millis()
);
if fts_dir.is_dir() {
warn!("Removing existing index in {}", fts_dir.display());
std::fs::remove_dir_all(&fts_dir).map_err(DatasetError::from)?;

View File

@@ -2,6 +2,3 @@ mod dataset;
pub use dataset::{Dataset, DatasetError, Datasets};
pub mod index;
#[cfg(feature = "axum")]
pub mod serve;

View File

@@ -1,92 +0,0 @@
use axum::{
Router,
routing::{get, post},
};
use std::sync::Arc;
use utoipa::OpenApi;
use utoipa_swagger_ui::SwaggerUi;
use crate::Datasets;
mod lookup;
pub use lookup::*;
mod extract;
pub use extract::*;
mod items;
pub use items::*;
mod config_schema;
pub use config_schema::*;
mod schema_field;
pub use schema_field::*;
mod schema;
pub use schema::*;
#[derive(OpenApi)]
#[openapi(
tags(),
paths(
lookup,
get_extract,
items_list,
config_schema,
schema_field,
schema_all
),
components(schemas(
LookupRequest,
LookupResponse,
LookupResult,
ExtractQuery,
ItemsQuery,
ItemsResponse,
ItemRef
))
)]
pub(crate) struct Api;
impl Datasets {
#[inline]
pub fn router(self: Arc<Self>, with_docs: bool) -> Router<()> {
self.router_prefix(with_docs, None)
}
#[inline]
pub fn router_prefix(self: Arc<Self>, with_docs: bool, prefix: Option<&str>) -> Router<()> {
let mut router = Router::new()
.route("/lookup", post(lookup))
.route("/extract", get(get_extract))
.route("/items", get(items_list))
.route("/config/schema", get(config_schema))
.route("/schema", get(schema_all))
.route("/schema/{field}", get(schema_field))
.with_state(self.clone());
if let Some(prefix) = prefix {
router = Router::new().nest(prefix, router);
}
if with_docs {
let docs_path = match prefix {
None => "/docs".into(),
Some(prefix) => format!("{prefix}/docs"),
};
let api = Api::openapi();
let api = match prefix {
None => api,
Some(prefix) => utoipa::openapi::OpenApi::default().nest(prefix, api),
};
let docs =
SwaggerUi::new(docs_path.clone()).url(format!("{}/openapi.json", docs_path), api);
router = router.merge(docs);
}
router
}
}

View File

@@ -0,0 +1,28 @@
[package]
name = "pile-serve"
version = { workspace = true }
rust-version = { workspace = true }
edition = { workspace = true }
[lints]
workspace = true
[dependencies]
pile-config = { workspace = true }
pile-value = { workspace = true }
pile-dataset = { workspace = true }
serde_json = { workspace = true }
tracing = { workspace = true }
tokio = { workspace = true }
tokio-util = { version = "0.7", features = ["io"] }
serde = { workspace = true }
axum = { workspace = true }
percent-encoding = { workspace = true }
utoipa = { workspace = true }
utoipa-swagger-ui = { workspace = true }
[features]
default = []
pdfium = ["pile-value/pdfium"]

View File

@@ -4,12 +4,11 @@ use axum::{
http::StatusCode,
response::{IntoResponse, Response},
};
use pile_dataset::Datasets;
use std::{collections::HashMap, sync::Arc};
pub use pile_config::FieldSpec;
use crate::Datasets;
pub type FieldsResponse = HashMap<String, FieldSpec>;
/// Retrieve this dataset's schema.

View File

@@ -7,6 +7,7 @@ use axum::{
};
use percent_encoding::percent_decode_str;
use pile_config::{Label, objectpath::ObjectPath};
use pile_dataset::Datasets;
use pile_value::{
extract::traits::ExtractState,
value::{BinaryPileValue, PileValue},
@@ -17,8 +18,6 @@ use tokio_util::io::ReaderStream;
use tracing::debug;
use utoipa::ToSchema;
use crate::Datasets;
#[derive(Deserialize, ToSchema)]
pub struct ExtractQuery {
source: String,
@@ -101,17 +100,24 @@ pub async fn get_extract(
let mut value = None;
for path in &paths {
match item.query(&extract_state, path).await {
Ok(Some(PileValue::Null)) | Ok(None) => continue,
Ok(None) => continue,
Ok(Some(PileValue::Null)) => {
value = Some(PileValue::Null);
continue;
}
Ok(Some(v)) => {
value = Some(v);
break;
}
Err(e) => return (StatusCode::INTERNAL_SERVER_ERROR, format!("{e:?}")).into_response(),
}
}
let Some(value) = value else {
return StatusCode::NOT_FOUND.into_response();
return (StatusCode::BAD_REQUEST, "no value").into_response();
};
debug!(
@@ -177,6 +183,7 @@ pub async fn get_extract(
Json(json),
)
.into_response(),
Err(e) => (StatusCode::INTERNAL_SERVER_ERROR, format!("{e:?}")).into_response(),
},
}

View File

@@ -4,13 +4,12 @@ use axum::{
http::StatusCode,
response::{IntoResponse, Response},
};
use pile_dataset::Datasets;
use serde::{Deserialize, Serialize};
use std::sync::Arc;
use tracing::debug;
use utoipa::ToSchema;
use crate::Datasets;
#[derive(Deserialize, ToSchema)]
pub struct ItemsQuery {
#[serde(default)]

View File

@@ -0,0 +1,89 @@
use axum::{
Router,
routing::{get, post},
};
use pile_dataset::Datasets;
use std::sync::Arc;
use utoipa::OpenApi;
use utoipa_swagger_ui::SwaggerUi;
mod lookup;
pub use lookup::*;
mod extract;
pub use extract::*;
mod items;
pub use items::*;
mod config_schema;
pub use config_schema::*;
mod schema_field;
pub use schema_field::*;
mod schema;
pub use schema::*;
#[derive(OpenApi)]
#[openapi(
tags(),
paths(
lookup,
get_extract,
items_list,
config_schema,
schema_field,
schema_all
),
components(schemas(
LookupRequest,
LookupResponse,
LookupResult,
ExtractQuery,
ItemsQuery,
ItemsResponse,
ItemRef
))
)]
pub(crate) struct Api;
#[inline]
pub fn router(ds: Arc<Datasets>, with_docs: bool) -> Router<()> {
router_prefix(ds, with_docs, None)
}
#[inline]
pub fn router_prefix(ds: Arc<Datasets>, with_docs: bool, prefix: Option<&str>) -> Router<()> {
let mut router = Router::new()
.route("/lookup", post(lookup))
.route("/extract", get(get_extract))
.route("/items", get(items_list))
.route("/config/schema", get(config_schema))
.route("/schema", get(schema_all))
.route("/schema/{field}", get(schema_field))
.with_state(ds.clone());
if let Some(prefix) = prefix {
router = Router::new().nest(prefix, router);
}
if with_docs {
let docs_path = match prefix {
None => "/docs".into(),
Some(prefix) => format!("{prefix}/docs"),
};
let api = Api::openapi();
let api = match prefix {
None => api,
Some(prefix) => utoipa::openapi::OpenApi::default().nest(prefix, api),
};
let docs =
SwaggerUi::new(docs_path.clone()).url(format!("{}/openapi.json", docs_path), api);
router = router.merge(docs);
}
router
}

View File

@@ -4,13 +4,12 @@ use axum::{
http::StatusCode,
response::{IntoResponse, Response},
};
use pile_dataset::Datasets;
use serde::{Deserialize, Serialize};
use std::{sync::Arc, time::Instant};
use tracing::debug;
use utoipa::ToSchema;
use crate::Datasets;
#[derive(Serialize, Deserialize, ToSchema, Debug)]
pub struct LookupRequest {
pub query: String,

View File

@@ -5,13 +5,12 @@ use axum::{
response::{IntoResponse, Response},
};
use pile_config::Label;
use pile_dataset::Datasets;
use pile_value::{extract::traits::ExtractState, value::PileValue};
use serde::{Deserialize, Serialize};
use std::{collections::HashMap, sync::Arc};
use utoipa::IntoParams;
use crate::Datasets;
#[derive(Deserialize, IntoParams)]
pub struct SchemaQuery {
source: String,
@@ -21,7 +20,7 @@ pub struct SchemaQuery {
hidden: bool,
}
#[derive(Serialize, Deserialize)]
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(untagged)]
pub enum ApiValue {
Binary { binary: bool, mime: String },

View File

@@ -6,6 +6,7 @@ use axum::{
response::{IntoResponse, Response},
};
use pile_config::Label;
use pile_dataset::Datasets;
use pile_value::{
extract::traits::ExtractState,
value::{BinaryPileValue, PileValue},
@@ -16,8 +17,6 @@ use tokio_util::io::ReaderStream;
use tracing::debug;
use utoipa::IntoParams;
use crate::Datasets;
#[derive(Deserialize, IntoParams)]
pub struct SchemaFieldQuery {
source: String,
@@ -84,17 +83,24 @@ pub async fn schema_field(
let mut value = None;
for path in paths {
match item.query(&extract_state, path).await {
Ok(Some(PileValue::Null)) | Ok(None) => continue,
Ok(None) => continue,
Ok(Some(PileValue::Null)) => {
value = Some(PileValue::Null);
continue;
}
Ok(Some(v)) => {
value = Some(v);
break;
}
Err(e) => return (StatusCode::INTERNAL_SERVER_ERROR, format!("{e:?}")).into_response(),
}
}
let Some(value) = value else {
return StatusCode::NOT_FOUND.into_response();
return (StatusCode::BAD_REQUEST, "no value").into_response();
};
debug!(

View File

@@ -21,6 +21,9 @@ toml = { workspace = true }
smartstring = { workspace = true }
regex = { workspace = true }
blake3 = { workspace = true }
sha2 = { workspace = true }
sha1 = { workspace = true }
md5 = { workspace = true }
epub = { workspace = true }
kamadak-exif = { workspace = true }
pdf = { workspace = true }

View File

@@ -0,0 +1,111 @@
use crate::{
extract::traits::{ExtractState, ObjectExtractor},
value::{BinaryPileValue, PileValue},
};
use pile_config::Label;
use pile_io::SyncReadBridge;
use std::{io::Read, sync::Arc};
use tokio::sync::OnceCell;
fn to_hex(bytes: &[u8]) -> String {
bytes.iter().map(|b| format!("{b:02x}")).collect()
}
macro_rules! hash_algos {
($($name:ident),* $(,)?) => {
pub struct HashExtractor {
item: BinaryPileValue,
$($name: OnceCell<String>,)*
}
impl HashExtractor {
pub fn new(item: &BinaryPileValue) -> Self {
Self {
item: item.clone(),
$($name: OnceCell::new(),)*
}
}
}
static LABELS: std::sync::LazyLock<Vec<Label>> = std::sync::LazyLock::new(|| {
vec![$(Label::new(stringify!($name)).unwrap()),*]
});
};
}
hash_algos!(blake3, md5, sha1, sha224, sha256, sha384, sha512);
impl HashExtractor {
async fn compute(&self, name: &Label) -> Result<Option<String>, std::io::Error> {
let name_str = name.as_ref();
macro_rules! algo {
($cell:ident, $compute:expr) => {
if name_str == stringify!($cell) {
return Ok(Some(
self.$cell
.get_or_try_init(|| async {
let read = self.item.read().await?;
let mut read = SyncReadBridge::new_current(read);
tokio::task::spawn_blocking(move || {
let mut bytes = Vec::new();
read.read_to_end(&mut bytes)?;
Ok::<String, std::io::Error>($compute(&bytes))
})
.await?
})
.await?
.clone(),
));
}
};
}
algo!(blake3, |b: &Vec<u8>| blake3::hash(b).to_hex().to_string());
algo!(md5, |b: &Vec<u8>| format!("{:x}", md5::compute(b)));
algo!(sha1, |b: &Vec<u8>| {
use sha1::Digest;
to_hex(sha1::Sha1::digest(b).as_ref())
});
algo!(sha224, |b: &Vec<u8>| {
use sha2::Digest;
to_hex(sha2::Sha224::digest(b).as_ref())
});
algo!(sha256, |b: &Vec<u8>| {
use sha2::Digest;
to_hex(sha2::Sha256::digest(b).as_ref())
});
algo!(sha384, |b: &Vec<u8>| {
use sha2::Digest;
to_hex(sha2::Sha384::digest(b).as_ref())
});
algo!(sha512, |b: &Vec<u8>| {
use sha2::Digest;
to_hex(sha2::Sha512::digest(b).as_ref())
});
Ok(None)
}
}
#[async_trait::async_trait]
impl ObjectExtractor for HashExtractor {
async fn field(
&self,
_state: &ExtractState,
name: &Label,
args: Option<&str>,
) -> Result<Option<PileValue>, std::io::Error> {
if args.is_some() {
return Ok(None);
}
Ok(self
.compute(name)
.await?
.map(|s| PileValue::String(Arc::new(s.into()))))
}
async fn fields(&self) -> Result<Vec<Label>, std::io::Error> {
Ok(LABELS.clone())
}
}

View File

@@ -31,6 +31,9 @@ pub use text::*;
mod image;
pub use image::*;
mod hash;
pub use hash::*;
use crate::{
extract::{
misc::MapExtractor,
@@ -85,6 +88,10 @@ impl BinaryExtractor {
Label::new("text").unwrap(),
PileValue::ObjectExtractor(Arc::new(TextExtractor::new(item))),
),
(
Label::new("hash").unwrap(),
PileValue::ObjectExtractor(Arc::new(HashExtractor::new(item))),
),
]),
};

View File

@@ -1,6 +1,5 @@
use mime::Mime;
use pile_config::objectpath::{ObjectPath, PathSegment};
use pile_io::SyncReadBridge;
use serde_json::{Map, Value};
use smartstring::{LazyCompact, SmartString};
use std::{fmt::Debug, fs::File, io::Cursor, path::PathBuf, sync::Arc};
@@ -50,18 +49,6 @@ impl BinaryPileValue {
}
}
pub async fn hash(&self) -> Result<blake3::Hash, std::io::Error> {
let read = self.read().await?;
let mut read = SyncReadBridge::new_current(read);
let out = tokio::task::spawn_blocking(move || {
let mut hasher = blake3::Hasher::new();
std::io::copy(&mut read, &mut hasher)?;
return Ok::<_, std::io::Error>(hasher.finalize());
})
.await??;
return Ok(out);
}
pub fn mime(&self) -> &Mime {
match self {
Self::Blob { mime, .. } => mime,

View File

@@ -9,8 +9,9 @@ workspace = true
[dependencies]
pile-toolbox = { workspace = true }
pile-dataset = { workspace = true, features = ["axum", "pdfium"] }
pile-value = { workspace = true, features = ["pdfium"] }
pile-dataset = { workspace = true }
pile-serve = { workspace = true }
pile-value = { workspace = true }
pile-config = { workspace = true }
tracing = { workspace = true }
@@ -34,3 +35,7 @@ base64 = { workspace = true }
dotenvy = { workspace = true }
envy = { workspace = true }
thiserror = { workspace = true }
[features]
default = ["pdfium"]
pdfium = ["pile-dataset/pdfium", "pile-serve/pdfium", "pile-value/pdfium"]

View File

@@ -57,8 +57,7 @@ impl CliCmd for ServeCommand {
})?;
}
let app = Arc::new(ds)
.router(true)
let app = pile_serve::router(Arc::new(ds), true)
.into_make_service_with_connect_info::<std::net::SocketAddr>();
let listener = match tokio::net::TcpListener::bind(self.addr.clone()).await {

View File

@@ -8,10 +8,11 @@ use axum::{
routing::get,
};
use clap::Args;
use pile_dataset::Datasets;
use pile_dataset::{DatasetError, Datasets};
use pile_toolbox::cancelabletask::{CancelFlag, CancelableTaskError};
use pile_value::extract::traits::ExtractState;
use serde::Serialize;
use std::{fmt::Debug, path::PathBuf, sync::Arc};
use std::{fmt::Debug, path::PathBuf, sync::Arc, time::Duration};
use tracing::{error, info};
use utoipa::{OpenApi, ToSchema};
use utoipa_swagger_ui::SwaggerUi;
@@ -27,6 +28,18 @@ pub struct ServerCommand {
/// If provided, do not serve docs
#[arg(long)]
no_docs: bool,
/// If provided, never auto-refresh indices
#[arg(long)]
no_refresh: bool,
/// Number of threads to use to refresh indices
#[arg(long, default_value = "5")]
refresh_jobs: usize,
/// Refresh indices every `n` seconds
#[arg(long, default_value = "300")]
refresh_delay: usize,
}
impl CliCmd for ServerCommand {
@@ -47,12 +60,57 @@ impl CliCmd for ServerCommand {
Arc::new(datasets)
};
// Start auto-refresh task
if !self.no_refresh {
let datasets = datasets.clone();
let jobs = self.refresh_jobs.max(1);
let delay = self.refresh_delay.max(1);
async fn refresh_dataset(ds: &Datasets, jobs: usize) -> Result<(), DatasetError> {
if ds.needs_fts().await? {
let state = ExtractState { ignore_mime: false };
match ds.fts_refresh(&state, jobs, None).await {
Ok(()) => {}
Err(CancelableTaskError::Error(err)) => return Err(err),
Err(CancelableTaskError::Cancelled) => unreachable!(),
};
}
return Ok(());
}
tokio::task::spawn(async move {
loop {
for ds in datasets.iter() {
match refresh_dataset(ds, jobs).await {
Ok(x) => x,
Err(error) => {
error!(
message = "Error while refreshing dataset",
dataset = ds.config.dataset.name.as_str(),
?error
);
}
}
tokio::time::sleep(Duration::from_secs(10)).await;
}
tokio::time::sleep(Duration::from_secs(delay as u64)).await;
}
});
}
let bearer = BearerToken(ctx.config.api_token.clone().map(Arc::new));
let mut router = Router::new();
for d in datasets.iter() {
let prefix = format!("/{}", d.config.dataset.name);
router = router.merge(d.clone().router_prefix(!self.no_docs, Some(&prefix)))
router = router.merge(pile_serve::router_prefix(
d.clone(),
!self.no_docs,
Some(&prefix),
))
}
router = router.merge(

View File

@@ -1,6 +1,6 @@
use serde::Deserialize;
use std::{num::NonZeroUsize, path::PathBuf};
use tracing::info;
use tracing::debug;
use crate::config::{
env::load_env,
@@ -89,7 +89,7 @@ impl PileServerConfig {
}
}
info!(message = "Config loaded", ?config);
debug!(message = "Config loaded", ?config);
return config;
}