Compare commits

..

6 Commits

Author SHA1 Message Date
450ea7aa86 Typos
Some checks failed
CI / Typos (push) Successful in 30s
CI / Clippy (push) Successful in 1m39s
CI / Build and test (push) Successful in 2m30s
CI / Build and test (all features) (push) Successful in 6m16s
Docker / build-and-push (push) Failing after 58s
2026-04-15 09:29:06 -07:00
3bc66ddc48 Split server into crate
Some checks failed
CI / Typos (push) Failing after 17s
CI / Build and test (push) Successful in 1m41s
CI / Clippy (push) Has been cancelled
CI / Build and test (all features) (push) Has been cancelled
Docker / build-and-push (push) Has been cancelled
2026-04-15 09:23:52 -07:00
251d130987 Tweak status codes
Some checks failed
CI / Clippy (push) Successful in 1m2s
CI / Typos (push) Failing after 1m10s
Docker / build-and-push (push) Failing after 2m21s
CI / Build and test (all features) (push) Successful in 3m27s
CI / Build and test (push) Successful in 6m7s
2026-04-03 12:35:01 -07:00
0281a33f86 Client tweaks
Some checks failed
CI / Typos (push) Failing after 20s
CI / Build and test (push) Successful in 1m47s
CI / Clippy (push) Successful in 2m25s
Docker / build-and-push (push) Successful in 4m4s
CI / Build and test (all features) (push) Successful in 6m44s
2026-04-03 09:01:51 -07:00
d3ab2684f4 Auto-refresh
Some checks failed
CI / Typos (push) Failing after 31s
CI / Clippy (push) Successful in 58s
Docker / build-and-push (push) Failing after 3m13s
CI / Build and test (all features) (push) Successful in 3m33s
CI / Build and test (push) Successful in 4m3s
2026-04-03 08:57:43 -07:00
4d4e9c93a2 Add hash extractor 2026-04-03 08:57:37 -07:00
21 changed files with 257 additions and 147 deletions

27
Cargo.lock generated
View File

@@ -1994,6 +1994,7 @@ dependencies = [
"indicatif",
"pile-config",
"pile-dataset",
"pile-serve",
"pile-toolbox",
"pile-value",
"serde",
@@ -2016,7 +2017,7 @@ version = "0.0.2"
dependencies = [
"axum",
"bytes",
"pile-dataset",
"pile-serve",
"reqwest",
"serde",
"thiserror",
@@ -2037,23 +2038,17 @@ dependencies = [
name = "pile-dataset"
version = "0.0.2"
dependencies = [
"axum",
"chrono",
"percent-encoding",
"pile-config",
"pile-toolbox",
"pile-value",
"regex",
"serde",
"serde_json",
"tantivy",
"thiserror",
"tokio",
"tokio-util",
"toml",
"tracing",
"utoipa",
"utoipa-swagger-ui",
]
[[package]]
@@ -2077,6 +2072,24 @@ dependencies = [
"tokio",
]
[[package]]
name = "pile-serve"
version = "0.0.2"
dependencies = [
"axum",
"percent-encoding",
"pile-config",
"pile-dataset",
"pile-value",
"serde",
"serde_json",
"tokio",
"tokio-util",
"tracing",
"utoipa",
"utoipa-swagger-ui",
]
[[package]]
name = "pile-toolbox"
version = "0.0.2"

View File

@@ -71,6 +71,7 @@ pile-dataset = { path = "crates/pile-dataset" }
pile-value = { path = "crates/pile-value" }
pile-io = { path = "crates/pile-io" }
pile-client = { path = "crates/pile-client" }
pile-serve = { path = "crates/pile-serve" }
# MARK: Clients & servers
tantivy = "0.25.0"
@@ -87,7 +88,7 @@ utoipa-swagger-ui = { version = "9.0.2", features = [
"debug-embed",
"vendored",
] }
reqwest = { version = "0.12", features = ["blocking"] }
reqwest = { version = "0.12", features = ["blocking", "json", "stream"] }
tracing-loki = "0.2.6"
# MARK: Async & Parallelism

View File

@@ -8,9 +8,9 @@ edition = { workspace = true }
workspace = true
[dependencies]
pile-dataset = { workspace = true, features = ["axum"] }
pile-serve = { workspace = true }
reqwest = { version = "0.12", features = ["json", "stream"] }
reqwest = { workspace = true }
serde = { workspace = true }
thiserror = { workspace = true }
bytes = { workspace = true }

View File

@@ -8,7 +8,7 @@ use serde::Deserialize;
use thiserror::Error;
use tracing::{trace, warn};
pub use pile_dataset::serve::{
pub use pile_serve::{
ApiValue, FieldSpec, FieldsResponse, ItemsResponse, LookupRequest, LookupResponse,
SchemaResponse,
};

View File

@@ -67,7 +67,7 @@ pub enum Source {
base_pattern: String,
/// Map of files included in each item.'
/// `{base}` is replaced with the string extraced by base_pattern.
/// `{base}` is replaced with the string extracted by base_pattern.
/// Default is `{ item: "{base}" }`
#[serde(default = "default_files")]
files: HashMap<Label, String>,

View File

@@ -20,21 +20,7 @@ chrono = { workspace = true }
toml = { workspace = true }
thiserror = { workspace = true }
tokio = { workspace = true }
tokio-util = { version = "0.7", features = ["io"] }
serde = { workspace = true, optional = true }
axum = { workspace = true, optional = true }
percent-encoding = { workspace = true, optional = true }
utoipa = { workspace = true, optional = true }
utoipa-swagger-ui = { workspace = true, optional = true }
[features]
default = []
pdfium = ["pile-value/pdfium"]
axum = [
"dep:axum",
"dep:utoipa",
"dep:utoipa-swagger-ui",
"dep:serde",
"dep:percent-encoding",
]

View File

@@ -302,6 +302,7 @@ impl Datasets {
_threads: usize,
flag: Option<CancelFlag>,
) -> Result<(), CancelableTaskError<DatasetError>> {
let start = Instant::now();
let workdir = match self.path_workdir.as_ref() {
Some(x) => x,
None => {
@@ -313,6 +314,14 @@ impl Datasets {
let fts_tmp_dir = workdir.join(".tmp-fts");
let fts_dir = workdir.join("fts");
debug!(
message = "Rebuilding fts index",
dataset = self.config.dataset.name.as_str(),
?fts_dir,
?fts_tmp_dir,
?workdir
);
if fts_tmp_dir.is_dir() {
warn!("Removing temporary index in {}", fts_dir.display());
std::fs::remove_dir_all(&fts_tmp_dir).map_err(DatasetError::from)?;
@@ -392,9 +401,18 @@ impl Datasets {
return Err(CancelableTaskError::Cancelled);
}
info!("Committing {total} documents");
index_writer.commit().map_err(DatasetError::from)?;
debug!(
message = "Rebuilt fts index",
dataset = self.config.dataset.name.as_str(),
?fts_dir,
?fts_tmp_dir,
?workdir,
n_docs = total,
time_ms = start.elapsed().as_millis()
);
if fts_dir.is_dir() {
warn!("Removing existing index in {}", fts_dir.display());
std::fs::remove_dir_all(&fts_dir).map_err(DatasetError::from)?;

View File

@@ -2,6 +2,3 @@ mod dataset;
pub use dataset::{Dataset, DatasetError, Datasets};
pub mod index;
#[cfg(feature = "axum")]
pub mod serve;

View File

@@ -1,92 +0,0 @@
use axum::{
Router,
routing::{get, post},
};
use std::sync::Arc;
use utoipa::OpenApi;
use utoipa_swagger_ui::SwaggerUi;
use crate::Datasets;
mod lookup;
pub use lookup::*;
mod extract;
pub use extract::*;
mod items;
pub use items::*;
mod config_schema;
pub use config_schema::*;
mod schema_field;
pub use schema_field::*;
mod schema;
pub use schema::*;
#[derive(OpenApi)]
#[openapi(
tags(),
paths(
lookup,
get_extract,
items_list,
config_schema,
schema_field,
schema_all
),
components(schemas(
LookupRequest,
LookupResponse,
LookupResult,
ExtractQuery,
ItemsQuery,
ItemsResponse,
ItemRef
))
)]
pub(crate) struct Api;
impl Datasets {
#[inline]
pub fn router(self: Arc<Self>, with_docs: bool) -> Router<()> {
self.router_prefix(with_docs, None)
}
#[inline]
pub fn router_prefix(self: Arc<Self>, with_docs: bool, prefix: Option<&str>) -> Router<()> {
let mut router = Router::new()
.route("/lookup", post(lookup))
.route("/extract", get(get_extract))
.route("/items", get(items_list))
.route("/config/schema", get(config_schema))
.route("/schema", get(schema_all))
.route("/schema/{field}", get(schema_field))
.with_state(self.clone());
if let Some(prefix) = prefix {
router = Router::new().nest(prefix, router);
}
if with_docs {
let docs_path = match prefix {
None => "/docs".into(),
Some(prefix) => format!("{prefix}/docs"),
};
let api = Api::openapi();
let api = match prefix {
None => api,
Some(prefix) => utoipa::openapi::OpenApi::default().nest(prefix, api),
};
let docs =
SwaggerUi::new(docs_path.clone()).url(format!("{}/openapi.json", docs_path), api);
router = router.merge(docs);
}
router
}
}

View File

@@ -0,0 +1,28 @@
[package]
name = "pile-serve"
version = { workspace = true }
rust-version = { workspace = true }
edition = { workspace = true }
[lints]
workspace = true
[dependencies]
pile-config = { workspace = true }
pile-value = { workspace = true }
pile-dataset = { workspace = true }
serde_json = { workspace = true }
tracing = { workspace = true }
tokio = { workspace = true }
tokio-util = { version = "0.7", features = ["io"] }
serde = { workspace = true }
axum = { workspace = true }
percent-encoding = { workspace = true }
utoipa = { workspace = true }
utoipa-swagger-ui = { workspace = true }
[features]
default = []
pdfium = ["pile-value/pdfium"]

View File

@@ -4,12 +4,11 @@ use axum::{
http::StatusCode,
response::{IntoResponse, Response},
};
use pile_dataset::Datasets;
use std::{collections::HashMap, sync::Arc};
pub use pile_config::FieldSpec;
use crate::Datasets;
pub type FieldsResponse = HashMap<String, FieldSpec>;
/// Retrieve this dataset's schema.

View File

@@ -7,6 +7,7 @@ use axum::{
};
use percent_encoding::percent_decode_str;
use pile_config::{Label, objectpath::ObjectPath};
use pile_dataset::Datasets;
use pile_value::{
extract::traits::ExtractState,
value::{BinaryPileValue, PileValue},
@@ -17,8 +18,6 @@ use tokio_util::io::ReaderStream;
use tracing::debug;
use utoipa::ToSchema;
use crate::Datasets;
#[derive(Deserialize, ToSchema)]
pub struct ExtractQuery {
source: String,
@@ -101,17 +100,24 @@ pub async fn get_extract(
let mut value = None;
for path in &paths {
match item.query(&extract_state, path).await {
Ok(Some(PileValue::Null)) | Ok(None) => continue,
Ok(None) => continue,
Ok(Some(PileValue::Null)) => {
value = Some(PileValue::Null);
continue;
}
Ok(Some(v)) => {
value = Some(v);
break;
}
Err(e) => return (StatusCode::INTERNAL_SERVER_ERROR, format!("{e:?}")).into_response(),
}
}
let Some(value) = value else {
return StatusCode::NOT_FOUND.into_response();
return (StatusCode::BAD_REQUEST, "no value").into_response();
};
debug!(
@@ -177,6 +183,7 @@ pub async fn get_extract(
Json(json),
)
.into_response(),
Err(e) => (StatusCode::INTERNAL_SERVER_ERROR, format!("{e:?}")).into_response(),
},
}

View File

@@ -4,13 +4,12 @@ use axum::{
http::StatusCode,
response::{IntoResponse, Response},
};
use pile_dataset::Datasets;
use serde::{Deserialize, Serialize};
use std::sync::Arc;
use tracing::debug;
use utoipa::ToSchema;
use crate::Datasets;
#[derive(Deserialize, ToSchema)]
pub struct ItemsQuery {
#[serde(default)]

View File

@@ -0,0 +1,89 @@
use axum::{
Router,
routing::{get, post},
};
use pile_dataset::Datasets;
use std::sync::Arc;
use utoipa::OpenApi;
use utoipa_swagger_ui::SwaggerUi;
mod lookup;
pub use lookup::*;
mod extract;
pub use extract::*;
mod items;
pub use items::*;
mod config_schema;
pub use config_schema::*;
mod schema_field;
pub use schema_field::*;
mod schema;
pub use schema::*;
#[derive(OpenApi)]
#[openapi(
tags(),
paths(
lookup,
get_extract,
items_list,
config_schema,
schema_field,
schema_all
),
components(schemas(
LookupRequest,
LookupResponse,
LookupResult,
ExtractQuery,
ItemsQuery,
ItemsResponse,
ItemRef
))
)]
pub(crate) struct Api;
#[inline]
pub fn router(ds: Arc<Datasets>, with_docs: bool) -> Router<()> {
router_prefix(ds, with_docs, None)
}
#[inline]
pub fn router_prefix(ds: Arc<Datasets>, with_docs: bool, prefix: Option<&str>) -> Router<()> {
let mut router = Router::new()
.route("/lookup", post(lookup))
.route("/extract", get(get_extract))
.route("/items", get(items_list))
.route("/config/schema", get(config_schema))
.route("/schema", get(schema_all))
.route("/schema/{field}", get(schema_field))
.with_state(ds.clone());
if let Some(prefix) = prefix {
router = Router::new().nest(prefix, router);
}
if with_docs {
let docs_path = match prefix {
None => "/docs".into(),
Some(prefix) => format!("{prefix}/docs"),
};
let api = Api::openapi();
let api = match prefix {
None => api,
Some(prefix) => utoipa::openapi::OpenApi::default().nest(prefix, api),
};
let docs =
SwaggerUi::new(docs_path.clone()).url(format!("{}/openapi.json", docs_path), api);
router = router.merge(docs);
}
router
}

View File

@@ -4,13 +4,12 @@ use axum::{
http::StatusCode,
response::{IntoResponse, Response},
};
use pile_dataset::Datasets;
use serde::{Deserialize, Serialize};
use std::{sync::Arc, time::Instant};
use tracing::debug;
use utoipa::ToSchema;
use crate::Datasets;
#[derive(Serialize, Deserialize, ToSchema, Debug)]
pub struct LookupRequest {
pub query: String,

View File

@@ -5,13 +5,12 @@ use axum::{
response::{IntoResponse, Response},
};
use pile_config::Label;
use pile_dataset::Datasets;
use pile_value::{extract::traits::ExtractState, value::PileValue};
use serde::{Deserialize, Serialize};
use std::{collections::HashMap, sync::Arc};
use utoipa::IntoParams;
use crate::Datasets;
#[derive(Deserialize, IntoParams)]
pub struct SchemaQuery {
source: String,
@@ -21,7 +20,7 @@ pub struct SchemaQuery {
hidden: bool,
}
#[derive(Serialize, Deserialize)]
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(untagged)]
pub enum ApiValue {
Binary { binary: bool, mime: String },

View File

@@ -6,6 +6,7 @@ use axum::{
response::{IntoResponse, Response},
};
use pile_config::Label;
use pile_dataset::Datasets;
use pile_value::{
extract::traits::ExtractState,
value::{BinaryPileValue, PileValue},
@@ -16,8 +17,6 @@ use tokio_util::io::ReaderStream;
use tracing::debug;
use utoipa::IntoParams;
use crate::Datasets;
#[derive(Deserialize, IntoParams)]
pub struct SchemaFieldQuery {
source: String,
@@ -84,17 +83,24 @@ pub async fn schema_field(
let mut value = None;
for path in paths {
match item.query(&extract_state, path).await {
Ok(Some(PileValue::Null)) | Ok(None) => continue,
Ok(None) => continue,
Ok(Some(PileValue::Null)) => {
value = Some(PileValue::Null);
continue;
}
Ok(Some(v)) => {
value = Some(v);
break;
}
Err(e) => return (StatusCode::INTERNAL_SERVER_ERROR, format!("{e:?}")).into_response(),
}
}
let Some(value) = value else {
return StatusCode::NOT_FOUND.into_response();
return (StatusCode::BAD_REQUEST, "no value").into_response();
};
debug!(

View File

@@ -27,7 +27,6 @@ macro_rules! hash_algos {
}
}
#[expect(clippy::unwrap_used)]
static LABELS: std::sync::LazyLock<Vec<Label>> = std::sync::LazyLock::new(|| {
vec![$(Label::new(stringify!($name)).unwrap()),*]
});

View File

@@ -9,8 +9,9 @@ workspace = true
[dependencies]
pile-toolbox = { workspace = true }
pile-dataset = { workspace = true, features = ["axum", "pdfium"] }
pile-value = { workspace = true, features = ["pdfium"] }
pile-dataset = { workspace = true }
pile-serve = { workspace = true }
pile-value = { workspace = true }
pile-config = { workspace = true }
tracing = { workspace = true }
@@ -34,3 +35,7 @@ base64 = { workspace = true }
dotenvy = { workspace = true }
envy = { workspace = true }
thiserror = { workspace = true }
[features]
default = ["pdfium"]
pdfium = ["pile-dataset/pdfium", "pile-serve/pdfium", "pile-value/pdfium"]

View File

@@ -57,8 +57,7 @@ impl CliCmd for ServeCommand {
})?;
}
let app = Arc::new(ds)
.router(true)
let app = pile_serve::router(Arc::new(ds), true)
.into_make_service_with_connect_info::<std::net::SocketAddr>();
let listener = match tokio::net::TcpListener::bind(self.addr.clone()).await {

View File

@@ -8,10 +8,11 @@ use axum::{
routing::get,
};
use clap::Args;
use pile_dataset::Datasets;
use pile_dataset::{DatasetError, Datasets};
use pile_toolbox::cancelabletask::{CancelFlag, CancelableTaskError};
use pile_value::extract::traits::ExtractState;
use serde::Serialize;
use std::{fmt::Debug, path::PathBuf, sync::Arc};
use std::{fmt::Debug, path::PathBuf, sync::Arc, time::Duration};
use tracing::{error, info};
use utoipa::{OpenApi, ToSchema};
use utoipa_swagger_ui::SwaggerUi;
@@ -27,6 +28,18 @@ pub struct ServerCommand {
/// If provided, do not serve docs
#[arg(long)]
no_docs: bool,
/// If provided, never auto-refresh indices
#[arg(long)]
no_refresh: bool,
/// Number of threads to use to refresh indices
#[arg(long, default_value = "5")]
refresh_jobs: usize,
/// Refresh indices every `n` seconds
#[arg(long, default_value = "300")]
refresh_delay: usize,
}
impl CliCmd for ServerCommand {
@@ -47,12 +60,57 @@ impl CliCmd for ServerCommand {
Arc::new(datasets)
};
// Start auto-refresh task
if !self.no_refresh {
let datasets = datasets.clone();
let jobs = self.refresh_jobs.max(1);
let delay = self.refresh_delay.max(1);
async fn refresh_dataset(ds: &Datasets, jobs: usize) -> Result<(), DatasetError> {
if ds.needs_fts().await? {
let state = ExtractState { ignore_mime: false };
match ds.fts_refresh(&state, jobs, None).await {
Ok(()) => {}
Err(CancelableTaskError::Error(err)) => return Err(err),
Err(CancelableTaskError::Cancelled) => unreachable!(),
};
}
return Ok(());
}
tokio::task::spawn(async move {
loop {
for ds in datasets.iter() {
match refresh_dataset(ds, jobs).await {
Ok(x) => x,
Err(error) => {
error!(
message = "Error while refreshing dataset",
dataset = ds.config.dataset.name.as_str(),
?error
);
}
}
tokio::time::sleep(Duration::from_secs(10)).await;
}
tokio::time::sleep(Duration::from_secs(delay as u64)).await;
}
});
}
let bearer = BearerToken(ctx.config.api_token.clone().map(Arc::new));
let mut router = Router::new();
for d in datasets.iter() {
let prefix = format!("/{}", d.config.dataset.name);
router = router.merge(d.clone().router_prefix(!self.no_docs, Some(&prefix)))
router = router.merge(pile_serve::router_prefix(
d.clone(),
!self.no_docs,
Some(&prefix),
))
}
router = router.merge(