Compare commits
6 Commits
9b8f825667
...
main
| Author | SHA1 | Date | |
|---|---|---|---|
| 450ea7aa86 | |||
| 3bc66ddc48 | |||
| 251d130987 | |||
| 0281a33f86 | |||
| d3ab2684f4 | |||
| 4d4e9c93a2 |
27
Cargo.lock
generated
27
Cargo.lock
generated
@@ -1994,6 +1994,7 @@ dependencies = [
|
||||
"indicatif",
|
||||
"pile-config",
|
||||
"pile-dataset",
|
||||
"pile-serve",
|
||||
"pile-toolbox",
|
||||
"pile-value",
|
||||
"serde",
|
||||
@@ -2016,7 +2017,7 @@ version = "0.0.2"
|
||||
dependencies = [
|
||||
"axum",
|
||||
"bytes",
|
||||
"pile-dataset",
|
||||
"pile-serve",
|
||||
"reqwest",
|
||||
"serde",
|
||||
"thiserror",
|
||||
@@ -2037,23 +2038,17 @@ dependencies = [
|
||||
name = "pile-dataset"
|
||||
version = "0.0.2"
|
||||
dependencies = [
|
||||
"axum",
|
||||
"chrono",
|
||||
"percent-encoding",
|
||||
"pile-config",
|
||||
"pile-toolbox",
|
||||
"pile-value",
|
||||
"regex",
|
||||
"serde",
|
||||
"serde_json",
|
||||
"tantivy",
|
||||
"thiserror",
|
||||
"tokio",
|
||||
"tokio-util",
|
||||
"toml",
|
||||
"tracing",
|
||||
"utoipa",
|
||||
"utoipa-swagger-ui",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@@ -2077,6 +2072,24 @@ dependencies = [
|
||||
"tokio",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "pile-serve"
|
||||
version = "0.0.2"
|
||||
dependencies = [
|
||||
"axum",
|
||||
"percent-encoding",
|
||||
"pile-config",
|
||||
"pile-dataset",
|
||||
"pile-value",
|
||||
"serde",
|
||||
"serde_json",
|
||||
"tokio",
|
||||
"tokio-util",
|
||||
"tracing",
|
||||
"utoipa",
|
||||
"utoipa-swagger-ui",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "pile-toolbox"
|
||||
version = "0.0.2"
|
||||
|
||||
@@ -71,6 +71,7 @@ pile-dataset = { path = "crates/pile-dataset" }
|
||||
pile-value = { path = "crates/pile-value" }
|
||||
pile-io = { path = "crates/pile-io" }
|
||||
pile-client = { path = "crates/pile-client" }
|
||||
pile-serve = { path = "crates/pile-serve" }
|
||||
|
||||
# MARK: Clients & servers
|
||||
tantivy = "0.25.0"
|
||||
@@ -87,7 +88,7 @@ utoipa-swagger-ui = { version = "9.0.2", features = [
|
||||
"debug-embed",
|
||||
"vendored",
|
||||
] }
|
||||
reqwest = { version = "0.12", features = ["blocking"] }
|
||||
reqwest = { version = "0.12", features = ["blocking", "json", "stream"] }
|
||||
tracing-loki = "0.2.6"
|
||||
|
||||
# MARK: Async & Parallelism
|
||||
|
||||
@@ -8,9 +8,9 @@ edition = { workspace = true }
|
||||
workspace = true
|
||||
|
||||
[dependencies]
|
||||
pile-dataset = { workspace = true, features = ["axum"] }
|
||||
pile-serve = { workspace = true }
|
||||
|
||||
reqwest = { version = "0.12", features = ["json", "stream"] }
|
||||
reqwest = { workspace = true }
|
||||
serde = { workspace = true }
|
||||
thiserror = { workspace = true }
|
||||
bytes = { workspace = true }
|
||||
|
||||
@@ -8,7 +8,7 @@ use serde::Deserialize;
|
||||
use thiserror::Error;
|
||||
use tracing::{trace, warn};
|
||||
|
||||
pub use pile_dataset::serve::{
|
||||
pub use pile_serve::{
|
||||
ApiValue, FieldSpec, FieldsResponse, ItemsResponse, LookupRequest, LookupResponse,
|
||||
SchemaResponse,
|
||||
};
|
||||
|
||||
@@ -67,7 +67,7 @@ pub enum Source {
|
||||
base_pattern: String,
|
||||
|
||||
/// Map of files included in each item.'
|
||||
/// `{base}` is replaced with the string extraced by base_pattern.
|
||||
/// `{base}` is replaced with the string extracted by base_pattern.
|
||||
/// Default is `{ item: "{base}" }`
|
||||
#[serde(default = "default_files")]
|
||||
files: HashMap<Label, String>,
|
||||
|
||||
@@ -20,21 +20,7 @@ chrono = { workspace = true }
|
||||
toml = { workspace = true }
|
||||
thiserror = { workspace = true }
|
||||
tokio = { workspace = true }
|
||||
tokio-util = { version = "0.7", features = ["io"] }
|
||||
|
||||
serde = { workspace = true, optional = true }
|
||||
axum = { workspace = true, optional = true }
|
||||
percent-encoding = { workspace = true, optional = true }
|
||||
utoipa = { workspace = true, optional = true }
|
||||
utoipa-swagger-ui = { workspace = true, optional = true }
|
||||
|
||||
[features]
|
||||
default = []
|
||||
pdfium = ["pile-value/pdfium"]
|
||||
axum = [
|
||||
"dep:axum",
|
||||
"dep:utoipa",
|
||||
"dep:utoipa-swagger-ui",
|
||||
"dep:serde",
|
||||
"dep:percent-encoding",
|
||||
]
|
||||
|
||||
@@ -302,6 +302,7 @@ impl Datasets {
|
||||
_threads: usize,
|
||||
flag: Option<CancelFlag>,
|
||||
) -> Result<(), CancelableTaskError<DatasetError>> {
|
||||
let start = Instant::now();
|
||||
let workdir = match self.path_workdir.as_ref() {
|
||||
Some(x) => x,
|
||||
None => {
|
||||
@@ -313,6 +314,14 @@ impl Datasets {
|
||||
let fts_tmp_dir = workdir.join(".tmp-fts");
|
||||
let fts_dir = workdir.join("fts");
|
||||
|
||||
debug!(
|
||||
message = "Rebuilding fts index",
|
||||
dataset = self.config.dataset.name.as_str(),
|
||||
?fts_dir,
|
||||
?fts_tmp_dir,
|
||||
?workdir
|
||||
);
|
||||
|
||||
if fts_tmp_dir.is_dir() {
|
||||
warn!("Removing temporary index in {}", fts_dir.display());
|
||||
std::fs::remove_dir_all(&fts_tmp_dir).map_err(DatasetError::from)?;
|
||||
@@ -392,9 +401,18 @@ impl Datasets {
|
||||
return Err(CancelableTaskError::Cancelled);
|
||||
}
|
||||
|
||||
info!("Committing {total} documents");
|
||||
index_writer.commit().map_err(DatasetError::from)?;
|
||||
|
||||
debug!(
|
||||
message = "Rebuilt fts index",
|
||||
dataset = self.config.dataset.name.as_str(),
|
||||
?fts_dir,
|
||||
?fts_tmp_dir,
|
||||
?workdir,
|
||||
n_docs = total,
|
||||
time_ms = start.elapsed().as_millis()
|
||||
);
|
||||
|
||||
if fts_dir.is_dir() {
|
||||
warn!("Removing existing index in {}", fts_dir.display());
|
||||
std::fs::remove_dir_all(&fts_dir).map_err(DatasetError::from)?;
|
||||
|
||||
@@ -2,6 +2,3 @@ mod dataset;
|
||||
pub use dataset::{Dataset, DatasetError, Datasets};
|
||||
|
||||
pub mod index;
|
||||
|
||||
#[cfg(feature = "axum")]
|
||||
pub mod serve;
|
||||
|
||||
@@ -1,92 +0,0 @@
|
||||
use axum::{
|
||||
Router,
|
||||
routing::{get, post},
|
||||
};
|
||||
use std::sync::Arc;
|
||||
use utoipa::OpenApi;
|
||||
use utoipa_swagger_ui::SwaggerUi;
|
||||
|
||||
use crate::Datasets;
|
||||
|
||||
mod lookup;
|
||||
pub use lookup::*;
|
||||
|
||||
mod extract;
|
||||
pub use extract::*;
|
||||
|
||||
mod items;
|
||||
pub use items::*;
|
||||
|
||||
mod config_schema;
|
||||
pub use config_schema::*;
|
||||
|
||||
mod schema_field;
|
||||
pub use schema_field::*;
|
||||
|
||||
mod schema;
|
||||
pub use schema::*;
|
||||
|
||||
#[derive(OpenApi)]
|
||||
#[openapi(
|
||||
tags(),
|
||||
paths(
|
||||
lookup,
|
||||
get_extract,
|
||||
items_list,
|
||||
config_schema,
|
||||
schema_field,
|
||||
schema_all
|
||||
),
|
||||
components(schemas(
|
||||
LookupRequest,
|
||||
LookupResponse,
|
||||
LookupResult,
|
||||
ExtractQuery,
|
||||
ItemsQuery,
|
||||
ItemsResponse,
|
||||
ItemRef
|
||||
))
|
||||
)]
|
||||
pub(crate) struct Api;
|
||||
|
||||
impl Datasets {
|
||||
#[inline]
|
||||
pub fn router(self: Arc<Self>, with_docs: bool) -> Router<()> {
|
||||
self.router_prefix(with_docs, None)
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub fn router_prefix(self: Arc<Self>, with_docs: bool, prefix: Option<&str>) -> Router<()> {
|
||||
let mut router = Router::new()
|
||||
.route("/lookup", post(lookup))
|
||||
.route("/extract", get(get_extract))
|
||||
.route("/items", get(items_list))
|
||||
.route("/config/schema", get(config_schema))
|
||||
.route("/schema", get(schema_all))
|
||||
.route("/schema/{field}", get(schema_field))
|
||||
.with_state(self.clone());
|
||||
|
||||
if let Some(prefix) = prefix {
|
||||
router = Router::new().nest(prefix, router);
|
||||
}
|
||||
|
||||
if with_docs {
|
||||
let docs_path = match prefix {
|
||||
None => "/docs".into(),
|
||||
Some(prefix) => format!("{prefix}/docs"),
|
||||
};
|
||||
|
||||
let api = Api::openapi();
|
||||
let api = match prefix {
|
||||
None => api,
|
||||
Some(prefix) => utoipa::openapi::OpenApi::default().nest(prefix, api),
|
||||
};
|
||||
|
||||
let docs =
|
||||
SwaggerUi::new(docs_path.clone()).url(format!("{}/openapi.json", docs_path), api);
|
||||
|
||||
router = router.merge(docs);
|
||||
}
|
||||
router
|
||||
}
|
||||
}
|
||||
28
crates/pile-serve/Cargo.toml
Normal file
28
crates/pile-serve/Cargo.toml
Normal file
@@ -0,0 +1,28 @@
|
||||
[package]
|
||||
name = "pile-serve"
|
||||
version = { workspace = true }
|
||||
rust-version = { workspace = true }
|
||||
edition = { workspace = true }
|
||||
|
||||
[lints]
|
||||
workspace = true
|
||||
|
||||
[dependencies]
|
||||
pile-config = { workspace = true }
|
||||
pile-value = { workspace = true }
|
||||
pile-dataset = { workspace = true }
|
||||
|
||||
serde_json = { workspace = true }
|
||||
tracing = { workspace = true }
|
||||
tokio = { workspace = true }
|
||||
tokio-util = { version = "0.7", features = ["io"] }
|
||||
|
||||
serde = { workspace = true }
|
||||
axum = { workspace = true }
|
||||
percent-encoding = { workspace = true }
|
||||
utoipa = { workspace = true }
|
||||
utoipa-swagger-ui = { workspace = true }
|
||||
|
||||
[features]
|
||||
default = []
|
||||
pdfium = ["pile-value/pdfium"]
|
||||
@@ -4,12 +4,11 @@ use axum::{
|
||||
http::StatusCode,
|
||||
response::{IntoResponse, Response},
|
||||
};
|
||||
use pile_dataset::Datasets;
|
||||
use std::{collections::HashMap, sync::Arc};
|
||||
|
||||
pub use pile_config::FieldSpec;
|
||||
|
||||
use crate::Datasets;
|
||||
|
||||
pub type FieldsResponse = HashMap<String, FieldSpec>;
|
||||
|
||||
/// Retrieve this dataset's schema.
|
||||
@@ -7,6 +7,7 @@ use axum::{
|
||||
};
|
||||
use percent_encoding::percent_decode_str;
|
||||
use pile_config::{Label, objectpath::ObjectPath};
|
||||
use pile_dataset::Datasets;
|
||||
use pile_value::{
|
||||
extract::traits::ExtractState,
|
||||
value::{BinaryPileValue, PileValue},
|
||||
@@ -17,8 +18,6 @@ use tokio_util::io::ReaderStream;
|
||||
use tracing::debug;
|
||||
use utoipa::ToSchema;
|
||||
|
||||
use crate::Datasets;
|
||||
|
||||
#[derive(Deserialize, ToSchema)]
|
||||
pub struct ExtractQuery {
|
||||
source: String,
|
||||
@@ -101,17 +100,24 @@ pub async fn get_extract(
|
||||
let mut value = None;
|
||||
for path in &paths {
|
||||
match item.query(&extract_state, path).await {
|
||||
Ok(Some(PileValue::Null)) | Ok(None) => continue,
|
||||
Ok(None) => continue,
|
||||
|
||||
Ok(Some(PileValue::Null)) => {
|
||||
value = Some(PileValue::Null);
|
||||
continue;
|
||||
}
|
||||
|
||||
Ok(Some(v)) => {
|
||||
value = Some(v);
|
||||
break;
|
||||
}
|
||||
|
||||
Err(e) => return (StatusCode::INTERNAL_SERVER_ERROR, format!("{e:?}")).into_response(),
|
||||
}
|
||||
}
|
||||
|
||||
let Some(value) = value else {
|
||||
return StatusCode::NOT_FOUND.into_response();
|
||||
return (StatusCode::BAD_REQUEST, "no value").into_response();
|
||||
};
|
||||
|
||||
debug!(
|
||||
@@ -177,6 +183,7 @@ pub async fn get_extract(
|
||||
Json(json),
|
||||
)
|
||||
.into_response(),
|
||||
|
||||
Err(e) => (StatusCode::INTERNAL_SERVER_ERROR, format!("{e:?}")).into_response(),
|
||||
},
|
||||
}
|
||||
@@ -4,13 +4,12 @@ use axum::{
|
||||
http::StatusCode,
|
||||
response::{IntoResponse, Response},
|
||||
};
|
||||
use pile_dataset::Datasets;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::sync::Arc;
|
||||
use tracing::debug;
|
||||
use utoipa::ToSchema;
|
||||
|
||||
use crate::Datasets;
|
||||
|
||||
#[derive(Deserialize, ToSchema)]
|
||||
pub struct ItemsQuery {
|
||||
#[serde(default)]
|
||||
89
crates/pile-serve/src/lib.rs
Normal file
89
crates/pile-serve/src/lib.rs
Normal file
@@ -0,0 +1,89 @@
|
||||
use axum::{
|
||||
Router,
|
||||
routing::{get, post},
|
||||
};
|
||||
use pile_dataset::Datasets;
|
||||
use std::sync::Arc;
|
||||
use utoipa::OpenApi;
|
||||
use utoipa_swagger_ui::SwaggerUi;
|
||||
|
||||
mod lookup;
|
||||
pub use lookup::*;
|
||||
|
||||
mod extract;
|
||||
pub use extract::*;
|
||||
|
||||
mod items;
|
||||
pub use items::*;
|
||||
|
||||
mod config_schema;
|
||||
pub use config_schema::*;
|
||||
|
||||
mod schema_field;
|
||||
pub use schema_field::*;
|
||||
|
||||
mod schema;
|
||||
pub use schema::*;
|
||||
|
||||
#[derive(OpenApi)]
|
||||
#[openapi(
|
||||
tags(),
|
||||
paths(
|
||||
lookup,
|
||||
get_extract,
|
||||
items_list,
|
||||
config_schema,
|
||||
schema_field,
|
||||
schema_all
|
||||
),
|
||||
components(schemas(
|
||||
LookupRequest,
|
||||
LookupResponse,
|
||||
LookupResult,
|
||||
ExtractQuery,
|
||||
ItemsQuery,
|
||||
ItemsResponse,
|
||||
ItemRef
|
||||
))
|
||||
)]
|
||||
pub(crate) struct Api;
|
||||
|
||||
#[inline]
|
||||
pub fn router(ds: Arc<Datasets>, with_docs: bool) -> Router<()> {
|
||||
router_prefix(ds, with_docs, None)
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub fn router_prefix(ds: Arc<Datasets>, with_docs: bool, prefix: Option<&str>) -> Router<()> {
|
||||
let mut router = Router::new()
|
||||
.route("/lookup", post(lookup))
|
||||
.route("/extract", get(get_extract))
|
||||
.route("/items", get(items_list))
|
||||
.route("/config/schema", get(config_schema))
|
||||
.route("/schema", get(schema_all))
|
||||
.route("/schema/{field}", get(schema_field))
|
||||
.with_state(ds.clone());
|
||||
|
||||
if let Some(prefix) = prefix {
|
||||
router = Router::new().nest(prefix, router);
|
||||
}
|
||||
|
||||
if with_docs {
|
||||
let docs_path = match prefix {
|
||||
None => "/docs".into(),
|
||||
Some(prefix) => format!("{prefix}/docs"),
|
||||
};
|
||||
|
||||
let api = Api::openapi();
|
||||
let api = match prefix {
|
||||
None => api,
|
||||
Some(prefix) => utoipa::openapi::OpenApi::default().nest(prefix, api),
|
||||
};
|
||||
|
||||
let docs =
|
||||
SwaggerUi::new(docs_path.clone()).url(format!("{}/openapi.json", docs_path), api);
|
||||
|
||||
router = router.merge(docs);
|
||||
}
|
||||
router
|
||||
}
|
||||
@@ -4,13 +4,12 @@ use axum::{
|
||||
http::StatusCode,
|
||||
response::{IntoResponse, Response},
|
||||
};
|
||||
use pile_dataset::Datasets;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::{sync::Arc, time::Instant};
|
||||
use tracing::debug;
|
||||
use utoipa::ToSchema;
|
||||
|
||||
use crate::Datasets;
|
||||
|
||||
#[derive(Serialize, Deserialize, ToSchema, Debug)]
|
||||
pub struct LookupRequest {
|
||||
pub query: String,
|
||||
@@ -5,13 +5,12 @@ use axum::{
|
||||
response::{IntoResponse, Response},
|
||||
};
|
||||
use pile_config::Label;
|
||||
use pile_dataset::Datasets;
|
||||
use pile_value::{extract::traits::ExtractState, value::PileValue};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::{collections::HashMap, sync::Arc};
|
||||
use utoipa::IntoParams;
|
||||
|
||||
use crate::Datasets;
|
||||
|
||||
#[derive(Deserialize, IntoParams)]
|
||||
pub struct SchemaQuery {
|
||||
source: String,
|
||||
@@ -21,7 +20,7 @@ pub struct SchemaQuery {
|
||||
hidden: bool,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize)]
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
#[serde(untagged)]
|
||||
pub enum ApiValue {
|
||||
Binary { binary: bool, mime: String },
|
||||
@@ -6,6 +6,7 @@ use axum::{
|
||||
response::{IntoResponse, Response},
|
||||
};
|
||||
use pile_config::Label;
|
||||
use pile_dataset::Datasets;
|
||||
use pile_value::{
|
||||
extract::traits::ExtractState,
|
||||
value::{BinaryPileValue, PileValue},
|
||||
@@ -16,8 +17,6 @@ use tokio_util::io::ReaderStream;
|
||||
use tracing::debug;
|
||||
use utoipa::IntoParams;
|
||||
|
||||
use crate::Datasets;
|
||||
|
||||
#[derive(Deserialize, IntoParams)]
|
||||
pub struct SchemaFieldQuery {
|
||||
source: String,
|
||||
@@ -84,17 +83,24 @@ pub async fn schema_field(
|
||||
let mut value = None;
|
||||
for path in paths {
|
||||
match item.query(&extract_state, path).await {
|
||||
Ok(Some(PileValue::Null)) | Ok(None) => continue,
|
||||
Ok(None) => continue,
|
||||
|
||||
Ok(Some(PileValue::Null)) => {
|
||||
value = Some(PileValue::Null);
|
||||
continue;
|
||||
}
|
||||
|
||||
Ok(Some(v)) => {
|
||||
value = Some(v);
|
||||
break;
|
||||
}
|
||||
|
||||
Err(e) => return (StatusCode::INTERNAL_SERVER_ERROR, format!("{e:?}")).into_response(),
|
||||
}
|
||||
}
|
||||
|
||||
let Some(value) = value else {
|
||||
return StatusCode::NOT_FOUND.into_response();
|
||||
return (StatusCode::BAD_REQUEST, "no value").into_response();
|
||||
};
|
||||
|
||||
debug!(
|
||||
@@ -27,7 +27,6 @@ macro_rules! hash_algos {
|
||||
}
|
||||
}
|
||||
|
||||
#[expect(clippy::unwrap_used)]
|
||||
static LABELS: std::sync::LazyLock<Vec<Label>> = std::sync::LazyLock::new(|| {
|
||||
vec![$(Label::new(stringify!($name)).unwrap()),*]
|
||||
});
|
||||
|
||||
@@ -9,8 +9,9 @@ workspace = true
|
||||
|
||||
[dependencies]
|
||||
pile-toolbox = { workspace = true }
|
||||
pile-dataset = { workspace = true, features = ["axum", "pdfium"] }
|
||||
pile-value = { workspace = true, features = ["pdfium"] }
|
||||
pile-dataset = { workspace = true }
|
||||
pile-serve = { workspace = true }
|
||||
pile-value = { workspace = true }
|
||||
pile-config = { workspace = true }
|
||||
|
||||
tracing = { workspace = true }
|
||||
@@ -34,3 +35,7 @@ base64 = { workspace = true }
|
||||
dotenvy = { workspace = true }
|
||||
envy = { workspace = true }
|
||||
thiserror = { workspace = true }
|
||||
|
||||
[features]
|
||||
default = ["pdfium"]
|
||||
pdfium = ["pile-dataset/pdfium", "pile-serve/pdfium", "pile-value/pdfium"]
|
||||
|
||||
@@ -57,8 +57,7 @@ impl CliCmd for ServeCommand {
|
||||
})?;
|
||||
}
|
||||
|
||||
let app = Arc::new(ds)
|
||||
.router(true)
|
||||
let app = pile_serve::router(Arc::new(ds), true)
|
||||
.into_make_service_with_connect_info::<std::net::SocketAddr>();
|
||||
|
||||
let listener = match tokio::net::TcpListener::bind(self.addr.clone()).await {
|
||||
|
||||
@@ -8,10 +8,11 @@ use axum::{
|
||||
routing::get,
|
||||
};
|
||||
use clap::Args;
|
||||
use pile_dataset::Datasets;
|
||||
use pile_dataset::{DatasetError, Datasets};
|
||||
use pile_toolbox::cancelabletask::{CancelFlag, CancelableTaskError};
|
||||
use pile_value::extract::traits::ExtractState;
|
||||
use serde::Serialize;
|
||||
use std::{fmt::Debug, path::PathBuf, sync::Arc};
|
||||
use std::{fmt::Debug, path::PathBuf, sync::Arc, time::Duration};
|
||||
use tracing::{error, info};
|
||||
use utoipa::{OpenApi, ToSchema};
|
||||
use utoipa_swagger_ui::SwaggerUi;
|
||||
@@ -27,6 +28,18 @@ pub struct ServerCommand {
|
||||
/// If provided, do not serve docs
|
||||
#[arg(long)]
|
||||
no_docs: bool,
|
||||
|
||||
/// If provided, never auto-refresh indices
|
||||
#[arg(long)]
|
||||
no_refresh: bool,
|
||||
|
||||
/// Number of threads to use to refresh indices
|
||||
#[arg(long, default_value = "5")]
|
||||
refresh_jobs: usize,
|
||||
|
||||
/// Refresh indices every `n` seconds
|
||||
#[arg(long, default_value = "300")]
|
||||
refresh_delay: usize,
|
||||
}
|
||||
|
||||
impl CliCmd for ServerCommand {
|
||||
@@ -47,12 +60,57 @@ impl CliCmd for ServerCommand {
|
||||
Arc::new(datasets)
|
||||
};
|
||||
|
||||
// Start auto-refresh task
|
||||
if !self.no_refresh {
|
||||
let datasets = datasets.clone();
|
||||
let jobs = self.refresh_jobs.max(1);
|
||||
let delay = self.refresh_delay.max(1);
|
||||
|
||||
async fn refresh_dataset(ds: &Datasets, jobs: usize) -> Result<(), DatasetError> {
|
||||
if ds.needs_fts().await? {
|
||||
let state = ExtractState { ignore_mime: false };
|
||||
match ds.fts_refresh(&state, jobs, None).await {
|
||||
Ok(()) => {}
|
||||
Err(CancelableTaskError::Error(err)) => return Err(err),
|
||||
Err(CancelableTaskError::Cancelled) => unreachable!(),
|
||||
};
|
||||
}
|
||||
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
tokio::task::spawn(async move {
|
||||
loop {
|
||||
for ds in datasets.iter() {
|
||||
match refresh_dataset(ds, jobs).await {
|
||||
Ok(x) => x,
|
||||
Err(error) => {
|
||||
error!(
|
||||
message = "Error while refreshing dataset",
|
||||
dataset = ds.config.dataset.name.as_str(),
|
||||
?error
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
tokio::time::sleep(Duration::from_secs(10)).await;
|
||||
}
|
||||
|
||||
tokio::time::sleep(Duration::from_secs(delay as u64)).await;
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
let bearer = BearerToken(ctx.config.api_token.clone().map(Arc::new));
|
||||
|
||||
let mut router = Router::new();
|
||||
for d in datasets.iter() {
|
||||
let prefix = format!("/{}", d.config.dataset.name);
|
||||
router = router.merge(d.clone().router_prefix(!self.no_docs, Some(&prefix)))
|
||||
router = router.merge(pile_serve::router_prefix(
|
||||
d.clone(),
|
||||
!self.no_docs,
|
||||
Some(&prefix),
|
||||
))
|
||||
}
|
||||
|
||||
router = router.merge(
|
||||
|
||||
Reference in New Issue
Block a user