Compare commits

..

1 Commits

Author SHA1 Message Date
9b8f825667 Add hash extractor
Some checks failed
CI / Typos (push) Failing after 18s
CI / Build and test (push) Successful in 1m43s
CI / Clippy (push) Successful in 2m31s
Docker / build-and-push (push) Successful in 4m36s
CI / Build and test (all features) (push) Successful in 6m52s
2026-04-02 22:56:31 -07:00
21 changed files with 147 additions and 257 deletions

27
Cargo.lock generated
View File

@@ -1994,7 +1994,6 @@ dependencies = [
"indicatif", "indicatif",
"pile-config", "pile-config",
"pile-dataset", "pile-dataset",
"pile-serve",
"pile-toolbox", "pile-toolbox",
"pile-value", "pile-value",
"serde", "serde",
@@ -2017,7 +2016,7 @@ version = "0.0.2"
dependencies = [ dependencies = [
"axum", "axum",
"bytes", "bytes",
"pile-serve", "pile-dataset",
"reqwest", "reqwest",
"serde", "serde",
"thiserror", "thiserror",
@@ -2038,17 +2037,23 @@ dependencies = [
name = "pile-dataset" name = "pile-dataset"
version = "0.0.2" version = "0.0.2"
dependencies = [ dependencies = [
"axum",
"chrono", "chrono",
"percent-encoding",
"pile-config", "pile-config",
"pile-toolbox", "pile-toolbox",
"pile-value", "pile-value",
"regex", "regex",
"serde",
"serde_json", "serde_json",
"tantivy", "tantivy",
"thiserror", "thiserror",
"tokio", "tokio",
"tokio-util",
"toml", "toml",
"tracing", "tracing",
"utoipa",
"utoipa-swagger-ui",
] ]
[[package]] [[package]]
@@ -2072,24 +2077,6 @@ dependencies = [
"tokio", "tokio",
] ]
[[package]]
name = "pile-serve"
version = "0.0.2"
dependencies = [
"axum",
"percent-encoding",
"pile-config",
"pile-dataset",
"pile-value",
"serde",
"serde_json",
"tokio",
"tokio-util",
"tracing",
"utoipa",
"utoipa-swagger-ui",
]
[[package]] [[package]]
name = "pile-toolbox" name = "pile-toolbox"
version = "0.0.2" version = "0.0.2"

View File

@@ -71,7 +71,6 @@ pile-dataset = { path = "crates/pile-dataset" }
pile-value = { path = "crates/pile-value" } pile-value = { path = "crates/pile-value" }
pile-io = { path = "crates/pile-io" } pile-io = { path = "crates/pile-io" }
pile-client = { path = "crates/pile-client" } pile-client = { path = "crates/pile-client" }
pile-serve = { path = "crates/pile-serve" }
# MARK: Clients & servers # MARK: Clients & servers
tantivy = "0.25.0" tantivy = "0.25.0"
@@ -88,7 +87,7 @@ utoipa-swagger-ui = { version = "9.0.2", features = [
"debug-embed", "debug-embed",
"vendored", "vendored",
] } ] }
reqwest = { version = "0.12", features = ["blocking", "json", "stream"] } reqwest = { version = "0.12", features = ["blocking"] }
tracing-loki = "0.2.6" tracing-loki = "0.2.6"
# MARK: Async & Parallelism # MARK: Async & Parallelism

View File

@@ -8,9 +8,9 @@ edition = { workspace = true }
workspace = true workspace = true
[dependencies] [dependencies]
pile-serve = { workspace = true } pile-dataset = { workspace = true, features = ["axum"] }
reqwest = { workspace = true } reqwest = { version = "0.12", features = ["json", "stream"] }
serde = { workspace = true } serde = { workspace = true }
thiserror = { workspace = true } thiserror = { workspace = true }
bytes = { workspace = true } bytes = { workspace = true }

View File

@@ -8,7 +8,7 @@ use serde::Deserialize;
use thiserror::Error; use thiserror::Error;
use tracing::{trace, warn}; use tracing::{trace, warn};
pub use pile_serve::{ pub use pile_dataset::serve::{
ApiValue, FieldSpec, FieldsResponse, ItemsResponse, LookupRequest, LookupResponse, ApiValue, FieldSpec, FieldsResponse, ItemsResponse, LookupRequest, LookupResponse,
SchemaResponse, SchemaResponse,
}; };

View File

@@ -67,7 +67,7 @@ pub enum Source {
base_pattern: String, base_pattern: String,
/// Map of files included in each item.' /// Map of files included in each item.'
/// `{base}` is replaced with the string extracted by base_pattern. /// `{base}` is replaced with the string extraced by base_pattern.
/// Default is `{ item: "{base}" }` /// Default is `{ item: "{base}" }`
#[serde(default = "default_files")] #[serde(default = "default_files")]
files: HashMap<Label, String>, files: HashMap<Label, String>,

View File

@@ -20,7 +20,21 @@ chrono = { workspace = true }
toml = { workspace = true } toml = { workspace = true }
thiserror = { workspace = true } thiserror = { workspace = true }
tokio = { workspace = true } tokio = { workspace = true }
tokio-util = { version = "0.7", features = ["io"] }
serde = { workspace = true, optional = true }
axum = { workspace = true, optional = true }
percent-encoding = { workspace = true, optional = true }
utoipa = { workspace = true, optional = true }
utoipa-swagger-ui = { workspace = true, optional = true }
[features] [features]
default = [] default = []
pdfium = ["pile-value/pdfium"] pdfium = ["pile-value/pdfium"]
axum = [
"dep:axum",
"dep:utoipa",
"dep:utoipa-swagger-ui",
"dep:serde",
"dep:percent-encoding",
]

View File

@@ -302,7 +302,6 @@ impl Datasets {
_threads: usize, _threads: usize,
flag: Option<CancelFlag>, flag: Option<CancelFlag>,
) -> Result<(), CancelableTaskError<DatasetError>> { ) -> Result<(), CancelableTaskError<DatasetError>> {
let start = Instant::now();
let workdir = match self.path_workdir.as_ref() { let workdir = match self.path_workdir.as_ref() {
Some(x) => x, Some(x) => x,
None => { None => {
@@ -314,14 +313,6 @@ impl Datasets {
let fts_tmp_dir = workdir.join(".tmp-fts"); let fts_tmp_dir = workdir.join(".tmp-fts");
let fts_dir = workdir.join("fts"); let fts_dir = workdir.join("fts");
debug!(
message = "Rebuilding fts index",
dataset = self.config.dataset.name.as_str(),
?fts_dir,
?fts_tmp_dir,
?workdir
);
if fts_tmp_dir.is_dir() { if fts_tmp_dir.is_dir() {
warn!("Removing temporary index in {}", fts_dir.display()); warn!("Removing temporary index in {}", fts_dir.display());
std::fs::remove_dir_all(&fts_tmp_dir).map_err(DatasetError::from)?; std::fs::remove_dir_all(&fts_tmp_dir).map_err(DatasetError::from)?;
@@ -401,18 +392,9 @@ impl Datasets {
return Err(CancelableTaskError::Cancelled); return Err(CancelableTaskError::Cancelled);
} }
info!("Committing {total} documents");
index_writer.commit().map_err(DatasetError::from)?; index_writer.commit().map_err(DatasetError::from)?;
debug!(
message = "Rebuilt fts index",
dataset = self.config.dataset.name.as_str(),
?fts_dir,
?fts_tmp_dir,
?workdir,
n_docs = total,
time_ms = start.elapsed().as_millis()
);
if fts_dir.is_dir() { if fts_dir.is_dir() {
warn!("Removing existing index in {}", fts_dir.display()); warn!("Removing existing index in {}", fts_dir.display());
std::fs::remove_dir_all(&fts_dir).map_err(DatasetError::from)?; std::fs::remove_dir_all(&fts_dir).map_err(DatasetError::from)?;

View File

@@ -2,3 +2,6 @@ mod dataset;
pub use dataset::{Dataset, DatasetError, Datasets}; pub use dataset::{Dataset, DatasetError, Datasets};
pub mod index; pub mod index;
#[cfg(feature = "axum")]
pub mod serve;

View File

@@ -4,11 +4,12 @@ use axum::{
http::StatusCode, http::StatusCode,
response::{IntoResponse, Response}, response::{IntoResponse, Response},
}; };
use pile_dataset::Datasets;
use std::{collections::HashMap, sync::Arc}; use std::{collections::HashMap, sync::Arc};
pub use pile_config::FieldSpec; pub use pile_config::FieldSpec;
use crate::Datasets;
pub type FieldsResponse = HashMap<String, FieldSpec>; pub type FieldsResponse = HashMap<String, FieldSpec>;
/// Retrieve this dataset's schema. /// Retrieve this dataset's schema.

View File

@@ -7,7 +7,6 @@ use axum::{
}; };
use percent_encoding::percent_decode_str; use percent_encoding::percent_decode_str;
use pile_config::{Label, objectpath::ObjectPath}; use pile_config::{Label, objectpath::ObjectPath};
use pile_dataset::Datasets;
use pile_value::{ use pile_value::{
extract::traits::ExtractState, extract::traits::ExtractState,
value::{BinaryPileValue, PileValue}, value::{BinaryPileValue, PileValue},
@@ -18,6 +17,8 @@ use tokio_util::io::ReaderStream;
use tracing::debug; use tracing::debug;
use utoipa::ToSchema; use utoipa::ToSchema;
use crate::Datasets;
#[derive(Deserialize, ToSchema)] #[derive(Deserialize, ToSchema)]
pub struct ExtractQuery { pub struct ExtractQuery {
source: String, source: String,
@@ -100,24 +101,17 @@ pub async fn get_extract(
let mut value = None; let mut value = None;
for path in &paths { for path in &paths {
match item.query(&extract_state, path).await { match item.query(&extract_state, path).await {
Ok(None) => continue, Ok(Some(PileValue::Null)) | Ok(None) => continue,
Ok(Some(PileValue::Null)) => {
value = Some(PileValue::Null);
continue;
}
Ok(Some(v)) => { Ok(Some(v)) => {
value = Some(v); value = Some(v);
break; break;
} }
Err(e) => return (StatusCode::INTERNAL_SERVER_ERROR, format!("{e:?}")).into_response(), Err(e) => return (StatusCode::INTERNAL_SERVER_ERROR, format!("{e:?}")).into_response(),
} }
} }
let Some(value) = value else { let Some(value) = value else {
return (StatusCode::BAD_REQUEST, "no value").into_response(); return StatusCode::NOT_FOUND.into_response();
}; };
debug!( debug!(
@@ -183,7 +177,6 @@ pub async fn get_extract(
Json(json), Json(json),
) )
.into_response(), .into_response(),
Err(e) => (StatusCode::INTERNAL_SERVER_ERROR, format!("{e:?}")).into_response(), Err(e) => (StatusCode::INTERNAL_SERVER_ERROR, format!("{e:?}")).into_response(),
}, },
} }

View File

@@ -4,12 +4,13 @@ use axum::{
http::StatusCode, http::StatusCode,
response::{IntoResponse, Response}, response::{IntoResponse, Response},
}; };
use pile_dataset::Datasets;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use std::sync::Arc; use std::sync::Arc;
use tracing::debug; use tracing::debug;
use utoipa::ToSchema; use utoipa::ToSchema;
use crate::Datasets;
#[derive(Deserialize, ToSchema)] #[derive(Deserialize, ToSchema)]
pub struct ItemsQuery { pub struct ItemsQuery {
#[serde(default)] #[serde(default)]

View File

@@ -4,12 +4,13 @@ use axum::{
http::StatusCode, http::StatusCode,
response::{IntoResponse, Response}, response::{IntoResponse, Response},
}; };
use pile_dataset::Datasets;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use std::{sync::Arc, time::Instant}; use std::{sync::Arc, time::Instant};
use tracing::debug; use tracing::debug;
use utoipa::ToSchema; use utoipa::ToSchema;
use crate::Datasets;
#[derive(Serialize, Deserialize, ToSchema, Debug)] #[derive(Serialize, Deserialize, ToSchema, Debug)]
pub struct LookupRequest { pub struct LookupRequest {
pub query: String, pub query: String,

View File

@@ -0,0 +1,92 @@
use axum::{
Router,
routing::{get, post},
};
use std::sync::Arc;
use utoipa::OpenApi;
use utoipa_swagger_ui::SwaggerUi;
use crate::Datasets;
mod lookup;
pub use lookup::*;
mod extract;
pub use extract::*;
mod items;
pub use items::*;
mod config_schema;
pub use config_schema::*;
mod schema_field;
pub use schema_field::*;
mod schema;
pub use schema::*;
#[derive(OpenApi)]
#[openapi(
tags(),
paths(
lookup,
get_extract,
items_list,
config_schema,
schema_field,
schema_all
),
components(schemas(
LookupRequest,
LookupResponse,
LookupResult,
ExtractQuery,
ItemsQuery,
ItemsResponse,
ItemRef
))
)]
pub(crate) struct Api;
impl Datasets {
#[inline]
pub fn router(self: Arc<Self>, with_docs: bool) -> Router<()> {
self.router_prefix(with_docs, None)
}
#[inline]
pub fn router_prefix(self: Arc<Self>, with_docs: bool, prefix: Option<&str>) -> Router<()> {
let mut router = Router::new()
.route("/lookup", post(lookup))
.route("/extract", get(get_extract))
.route("/items", get(items_list))
.route("/config/schema", get(config_schema))
.route("/schema", get(schema_all))
.route("/schema/{field}", get(schema_field))
.with_state(self.clone());
if let Some(prefix) = prefix {
router = Router::new().nest(prefix, router);
}
if with_docs {
let docs_path = match prefix {
None => "/docs".into(),
Some(prefix) => format!("{prefix}/docs"),
};
let api = Api::openapi();
let api = match prefix {
None => api,
Some(prefix) => utoipa::openapi::OpenApi::default().nest(prefix, api),
};
let docs =
SwaggerUi::new(docs_path.clone()).url(format!("{}/openapi.json", docs_path), api);
router = router.merge(docs);
}
router
}
}

View File

@@ -5,12 +5,13 @@ use axum::{
response::{IntoResponse, Response}, response::{IntoResponse, Response},
}; };
use pile_config::Label; use pile_config::Label;
use pile_dataset::Datasets;
use pile_value::{extract::traits::ExtractState, value::PileValue}; use pile_value::{extract::traits::ExtractState, value::PileValue};
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use std::{collections::HashMap, sync::Arc}; use std::{collections::HashMap, sync::Arc};
use utoipa::IntoParams; use utoipa::IntoParams;
use crate::Datasets;
#[derive(Deserialize, IntoParams)] #[derive(Deserialize, IntoParams)]
pub struct SchemaQuery { pub struct SchemaQuery {
source: String, source: String,
@@ -20,7 +21,7 @@ pub struct SchemaQuery {
hidden: bool, hidden: bool,
} }
#[derive(Debug, Clone, Serialize, Deserialize)] #[derive(Serialize, Deserialize)]
#[serde(untagged)] #[serde(untagged)]
pub enum ApiValue { pub enum ApiValue {
Binary { binary: bool, mime: String }, Binary { binary: bool, mime: String },

View File

@@ -6,7 +6,6 @@ use axum::{
response::{IntoResponse, Response}, response::{IntoResponse, Response},
}; };
use pile_config::Label; use pile_config::Label;
use pile_dataset::Datasets;
use pile_value::{ use pile_value::{
extract::traits::ExtractState, extract::traits::ExtractState,
value::{BinaryPileValue, PileValue}, value::{BinaryPileValue, PileValue},
@@ -17,6 +16,8 @@ use tokio_util::io::ReaderStream;
use tracing::debug; use tracing::debug;
use utoipa::IntoParams; use utoipa::IntoParams;
use crate::Datasets;
#[derive(Deserialize, IntoParams)] #[derive(Deserialize, IntoParams)]
pub struct SchemaFieldQuery { pub struct SchemaFieldQuery {
source: String, source: String,
@@ -83,24 +84,17 @@ pub async fn schema_field(
let mut value = None; let mut value = None;
for path in paths { for path in paths {
match item.query(&extract_state, path).await { match item.query(&extract_state, path).await {
Ok(None) => continue, Ok(Some(PileValue::Null)) | Ok(None) => continue,
Ok(Some(PileValue::Null)) => {
value = Some(PileValue::Null);
continue;
}
Ok(Some(v)) => { Ok(Some(v)) => {
value = Some(v); value = Some(v);
break; break;
} }
Err(e) => return (StatusCode::INTERNAL_SERVER_ERROR, format!("{e:?}")).into_response(), Err(e) => return (StatusCode::INTERNAL_SERVER_ERROR, format!("{e:?}")).into_response(),
} }
} }
let Some(value) = value else { let Some(value) = value else {
return (StatusCode::BAD_REQUEST, "no value").into_response(); return StatusCode::NOT_FOUND.into_response();
}; };
debug!( debug!(

View File

@@ -1,28 +0,0 @@
[package]
name = "pile-serve"
version = { workspace = true }
rust-version = { workspace = true }
edition = { workspace = true }
[lints]
workspace = true
[dependencies]
pile-config = { workspace = true }
pile-value = { workspace = true }
pile-dataset = { workspace = true }
serde_json = { workspace = true }
tracing = { workspace = true }
tokio = { workspace = true }
tokio-util = { version = "0.7", features = ["io"] }
serde = { workspace = true }
axum = { workspace = true }
percent-encoding = { workspace = true }
utoipa = { workspace = true }
utoipa-swagger-ui = { workspace = true }
[features]
default = []
pdfium = ["pile-value/pdfium"]

View File

@@ -1,89 +0,0 @@
use axum::{
Router,
routing::{get, post},
};
use pile_dataset::Datasets;
use std::sync::Arc;
use utoipa::OpenApi;
use utoipa_swagger_ui::SwaggerUi;
mod lookup;
pub use lookup::*;
mod extract;
pub use extract::*;
mod items;
pub use items::*;
mod config_schema;
pub use config_schema::*;
mod schema_field;
pub use schema_field::*;
mod schema;
pub use schema::*;
#[derive(OpenApi)]
#[openapi(
tags(),
paths(
lookup,
get_extract,
items_list,
config_schema,
schema_field,
schema_all
),
components(schemas(
LookupRequest,
LookupResponse,
LookupResult,
ExtractQuery,
ItemsQuery,
ItemsResponse,
ItemRef
))
)]
pub(crate) struct Api;
#[inline]
pub fn router(ds: Arc<Datasets>, with_docs: bool) -> Router<()> {
router_prefix(ds, with_docs, None)
}
#[inline]
pub fn router_prefix(ds: Arc<Datasets>, with_docs: bool, prefix: Option<&str>) -> Router<()> {
let mut router = Router::new()
.route("/lookup", post(lookup))
.route("/extract", get(get_extract))
.route("/items", get(items_list))
.route("/config/schema", get(config_schema))
.route("/schema", get(schema_all))
.route("/schema/{field}", get(schema_field))
.with_state(ds.clone());
if let Some(prefix) = prefix {
router = Router::new().nest(prefix, router);
}
if with_docs {
let docs_path = match prefix {
None => "/docs".into(),
Some(prefix) => format!("{prefix}/docs"),
};
let api = Api::openapi();
let api = match prefix {
None => api,
Some(prefix) => utoipa::openapi::OpenApi::default().nest(prefix, api),
};
let docs =
SwaggerUi::new(docs_path.clone()).url(format!("{}/openapi.json", docs_path), api);
router = router.merge(docs);
}
router
}

View File

@@ -27,6 +27,7 @@ macro_rules! hash_algos {
} }
} }
#[expect(clippy::unwrap_used)]
static LABELS: std::sync::LazyLock<Vec<Label>> = std::sync::LazyLock::new(|| { static LABELS: std::sync::LazyLock<Vec<Label>> = std::sync::LazyLock::new(|| {
vec![$(Label::new(stringify!($name)).unwrap()),*] vec![$(Label::new(stringify!($name)).unwrap()),*]
}); });

View File

@@ -9,9 +9,8 @@ workspace = true
[dependencies] [dependencies]
pile-toolbox = { workspace = true } pile-toolbox = { workspace = true }
pile-dataset = { workspace = true } pile-dataset = { workspace = true, features = ["axum", "pdfium"] }
pile-serve = { workspace = true } pile-value = { workspace = true, features = ["pdfium"] }
pile-value = { workspace = true }
pile-config = { workspace = true } pile-config = { workspace = true }
tracing = { workspace = true } tracing = { workspace = true }
@@ -35,7 +34,3 @@ base64 = { workspace = true }
dotenvy = { workspace = true } dotenvy = { workspace = true }
envy = { workspace = true } envy = { workspace = true }
thiserror = { workspace = true } thiserror = { workspace = true }
[features]
default = ["pdfium"]
pdfium = ["pile-dataset/pdfium", "pile-serve/pdfium", "pile-value/pdfium"]

View File

@@ -57,7 +57,8 @@ impl CliCmd for ServeCommand {
})?; })?;
} }
let app = pile_serve::router(Arc::new(ds), true) let app = Arc::new(ds)
.router(true)
.into_make_service_with_connect_info::<std::net::SocketAddr>(); .into_make_service_with_connect_info::<std::net::SocketAddr>();
let listener = match tokio::net::TcpListener::bind(self.addr.clone()).await { let listener = match tokio::net::TcpListener::bind(self.addr.clone()).await {

View File

@@ -8,11 +8,10 @@ use axum::{
routing::get, routing::get,
}; };
use clap::Args; use clap::Args;
use pile_dataset::{DatasetError, Datasets}; use pile_dataset::Datasets;
use pile_toolbox::cancelabletask::{CancelFlag, CancelableTaskError}; use pile_toolbox::cancelabletask::{CancelFlag, CancelableTaskError};
use pile_value::extract::traits::ExtractState;
use serde::Serialize; use serde::Serialize;
use std::{fmt::Debug, path::PathBuf, sync::Arc, time::Duration}; use std::{fmt::Debug, path::PathBuf, sync::Arc};
use tracing::{error, info}; use tracing::{error, info};
use utoipa::{OpenApi, ToSchema}; use utoipa::{OpenApi, ToSchema};
use utoipa_swagger_ui::SwaggerUi; use utoipa_swagger_ui::SwaggerUi;
@@ -28,18 +27,6 @@ pub struct ServerCommand {
/// If provided, do not serve docs /// If provided, do not serve docs
#[arg(long)] #[arg(long)]
no_docs: bool, no_docs: bool,
/// If provided, never auto-refresh indices
#[arg(long)]
no_refresh: bool,
/// Number of threads to use to refresh indices
#[arg(long, default_value = "5")]
refresh_jobs: usize,
/// Refresh indices every `n` seconds
#[arg(long, default_value = "300")]
refresh_delay: usize,
} }
impl CliCmd for ServerCommand { impl CliCmd for ServerCommand {
@@ -60,57 +47,12 @@ impl CliCmd for ServerCommand {
Arc::new(datasets) Arc::new(datasets)
}; };
// Start auto-refresh task
if !self.no_refresh {
let datasets = datasets.clone();
let jobs = self.refresh_jobs.max(1);
let delay = self.refresh_delay.max(1);
async fn refresh_dataset(ds: &Datasets, jobs: usize) -> Result<(), DatasetError> {
if ds.needs_fts().await? {
let state = ExtractState { ignore_mime: false };
match ds.fts_refresh(&state, jobs, None).await {
Ok(()) => {}
Err(CancelableTaskError::Error(err)) => return Err(err),
Err(CancelableTaskError::Cancelled) => unreachable!(),
};
}
return Ok(());
}
tokio::task::spawn(async move {
loop {
for ds in datasets.iter() {
match refresh_dataset(ds, jobs).await {
Ok(x) => x,
Err(error) => {
error!(
message = "Error while refreshing dataset",
dataset = ds.config.dataset.name.as_str(),
?error
);
}
}
tokio::time::sleep(Duration::from_secs(10)).await;
}
tokio::time::sleep(Duration::from_secs(delay as u64)).await;
}
});
}
let bearer = BearerToken(ctx.config.api_token.clone().map(Arc::new)); let bearer = BearerToken(ctx.config.api_token.clone().map(Arc::new));
let mut router = Router::new(); let mut router = Router::new();
for d in datasets.iter() { for d in datasets.iter() {
let prefix = format!("/{}", d.config.dataset.name); let prefix = format!("/{}", d.config.dataset.name);
router = router.merge(pile_serve::router_prefix( router = router.merge(d.clone().router_prefix(!self.no_docs, Some(&prefix)))
d.clone(),
!self.no_docs,
Some(&prefix),
))
} }
router = router.merge( router = router.merge(