Compare commits
1 Commits
main
...
9b8f825667
| Author | SHA1 | Date | |
|---|---|---|---|
| 9b8f825667 |
27
Cargo.lock
generated
27
Cargo.lock
generated
@@ -1994,7 +1994,6 @@ dependencies = [
|
||||
"indicatif",
|
||||
"pile-config",
|
||||
"pile-dataset",
|
||||
"pile-serve",
|
||||
"pile-toolbox",
|
||||
"pile-value",
|
||||
"serde",
|
||||
@@ -2017,7 +2016,7 @@ version = "0.0.2"
|
||||
dependencies = [
|
||||
"axum",
|
||||
"bytes",
|
||||
"pile-serve",
|
||||
"pile-dataset",
|
||||
"reqwest",
|
||||
"serde",
|
||||
"thiserror",
|
||||
@@ -2038,17 +2037,23 @@ dependencies = [
|
||||
name = "pile-dataset"
|
||||
version = "0.0.2"
|
||||
dependencies = [
|
||||
"axum",
|
||||
"chrono",
|
||||
"percent-encoding",
|
||||
"pile-config",
|
||||
"pile-toolbox",
|
||||
"pile-value",
|
||||
"regex",
|
||||
"serde",
|
||||
"serde_json",
|
||||
"tantivy",
|
||||
"thiserror",
|
||||
"tokio",
|
||||
"tokio-util",
|
||||
"toml",
|
||||
"tracing",
|
||||
"utoipa",
|
||||
"utoipa-swagger-ui",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@@ -2072,24 +2077,6 @@ dependencies = [
|
||||
"tokio",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "pile-serve"
|
||||
version = "0.0.2"
|
||||
dependencies = [
|
||||
"axum",
|
||||
"percent-encoding",
|
||||
"pile-config",
|
||||
"pile-dataset",
|
||||
"pile-value",
|
||||
"serde",
|
||||
"serde_json",
|
||||
"tokio",
|
||||
"tokio-util",
|
||||
"tracing",
|
||||
"utoipa",
|
||||
"utoipa-swagger-ui",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "pile-toolbox"
|
||||
version = "0.0.2"
|
||||
|
||||
@@ -71,7 +71,6 @@ pile-dataset = { path = "crates/pile-dataset" }
|
||||
pile-value = { path = "crates/pile-value" }
|
||||
pile-io = { path = "crates/pile-io" }
|
||||
pile-client = { path = "crates/pile-client" }
|
||||
pile-serve = { path = "crates/pile-serve" }
|
||||
|
||||
# MARK: Clients & servers
|
||||
tantivy = "0.25.0"
|
||||
@@ -88,7 +87,7 @@ utoipa-swagger-ui = { version = "9.0.2", features = [
|
||||
"debug-embed",
|
||||
"vendored",
|
||||
] }
|
||||
reqwest = { version = "0.12", features = ["blocking", "json", "stream"] }
|
||||
reqwest = { version = "0.12", features = ["blocking"] }
|
||||
tracing-loki = "0.2.6"
|
||||
|
||||
# MARK: Async & Parallelism
|
||||
|
||||
@@ -8,9 +8,9 @@ edition = { workspace = true }
|
||||
workspace = true
|
||||
|
||||
[dependencies]
|
||||
pile-serve = { workspace = true }
|
||||
pile-dataset = { workspace = true, features = ["axum"] }
|
||||
|
||||
reqwest = { workspace = true }
|
||||
reqwest = { version = "0.12", features = ["json", "stream"] }
|
||||
serde = { workspace = true }
|
||||
thiserror = { workspace = true }
|
||||
bytes = { workspace = true }
|
||||
|
||||
@@ -8,7 +8,7 @@ use serde::Deserialize;
|
||||
use thiserror::Error;
|
||||
use tracing::{trace, warn};
|
||||
|
||||
pub use pile_serve::{
|
||||
pub use pile_dataset::serve::{
|
||||
ApiValue, FieldSpec, FieldsResponse, ItemsResponse, LookupRequest, LookupResponse,
|
||||
SchemaResponse,
|
||||
};
|
||||
|
||||
@@ -67,7 +67,7 @@ pub enum Source {
|
||||
base_pattern: String,
|
||||
|
||||
/// Map of files included in each item.'
|
||||
/// `{base}` is replaced with the string extracted by base_pattern.
|
||||
/// `{base}` is replaced with the string extraced by base_pattern.
|
||||
/// Default is `{ item: "{base}" }`
|
||||
#[serde(default = "default_files")]
|
||||
files: HashMap<Label, String>,
|
||||
|
||||
@@ -20,7 +20,21 @@ chrono = { workspace = true }
|
||||
toml = { workspace = true }
|
||||
thiserror = { workspace = true }
|
||||
tokio = { workspace = true }
|
||||
tokio-util = { version = "0.7", features = ["io"] }
|
||||
|
||||
serde = { workspace = true, optional = true }
|
||||
axum = { workspace = true, optional = true }
|
||||
percent-encoding = { workspace = true, optional = true }
|
||||
utoipa = { workspace = true, optional = true }
|
||||
utoipa-swagger-ui = { workspace = true, optional = true }
|
||||
|
||||
[features]
|
||||
default = []
|
||||
pdfium = ["pile-value/pdfium"]
|
||||
axum = [
|
||||
"dep:axum",
|
||||
"dep:utoipa",
|
||||
"dep:utoipa-swagger-ui",
|
||||
"dep:serde",
|
||||
"dep:percent-encoding",
|
||||
]
|
||||
|
||||
@@ -302,7 +302,6 @@ impl Datasets {
|
||||
_threads: usize,
|
||||
flag: Option<CancelFlag>,
|
||||
) -> Result<(), CancelableTaskError<DatasetError>> {
|
||||
let start = Instant::now();
|
||||
let workdir = match self.path_workdir.as_ref() {
|
||||
Some(x) => x,
|
||||
None => {
|
||||
@@ -314,14 +313,6 @@ impl Datasets {
|
||||
let fts_tmp_dir = workdir.join(".tmp-fts");
|
||||
let fts_dir = workdir.join("fts");
|
||||
|
||||
debug!(
|
||||
message = "Rebuilding fts index",
|
||||
dataset = self.config.dataset.name.as_str(),
|
||||
?fts_dir,
|
||||
?fts_tmp_dir,
|
||||
?workdir
|
||||
);
|
||||
|
||||
if fts_tmp_dir.is_dir() {
|
||||
warn!("Removing temporary index in {}", fts_dir.display());
|
||||
std::fs::remove_dir_all(&fts_tmp_dir).map_err(DatasetError::from)?;
|
||||
@@ -401,18 +392,9 @@ impl Datasets {
|
||||
return Err(CancelableTaskError::Cancelled);
|
||||
}
|
||||
|
||||
info!("Committing {total} documents");
|
||||
index_writer.commit().map_err(DatasetError::from)?;
|
||||
|
||||
debug!(
|
||||
message = "Rebuilt fts index",
|
||||
dataset = self.config.dataset.name.as_str(),
|
||||
?fts_dir,
|
||||
?fts_tmp_dir,
|
||||
?workdir,
|
||||
n_docs = total,
|
||||
time_ms = start.elapsed().as_millis()
|
||||
);
|
||||
|
||||
if fts_dir.is_dir() {
|
||||
warn!("Removing existing index in {}", fts_dir.display());
|
||||
std::fs::remove_dir_all(&fts_dir).map_err(DatasetError::from)?;
|
||||
|
||||
@@ -2,3 +2,6 @@ mod dataset;
|
||||
pub use dataset::{Dataset, DatasetError, Datasets};
|
||||
|
||||
pub mod index;
|
||||
|
||||
#[cfg(feature = "axum")]
|
||||
pub mod serve;
|
||||
|
||||
@@ -4,11 +4,12 @@ use axum::{
|
||||
http::StatusCode,
|
||||
response::{IntoResponse, Response},
|
||||
};
|
||||
use pile_dataset::Datasets;
|
||||
use std::{collections::HashMap, sync::Arc};
|
||||
|
||||
pub use pile_config::FieldSpec;
|
||||
|
||||
use crate::Datasets;
|
||||
|
||||
pub type FieldsResponse = HashMap<String, FieldSpec>;
|
||||
|
||||
/// Retrieve this dataset's schema.
|
||||
@@ -7,7 +7,6 @@ use axum::{
|
||||
};
|
||||
use percent_encoding::percent_decode_str;
|
||||
use pile_config::{Label, objectpath::ObjectPath};
|
||||
use pile_dataset::Datasets;
|
||||
use pile_value::{
|
||||
extract::traits::ExtractState,
|
||||
value::{BinaryPileValue, PileValue},
|
||||
@@ -18,6 +17,8 @@ use tokio_util::io::ReaderStream;
|
||||
use tracing::debug;
|
||||
use utoipa::ToSchema;
|
||||
|
||||
use crate::Datasets;
|
||||
|
||||
#[derive(Deserialize, ToSchema)]
|
||||
pub struct ExtractQuery {
|
||||
source: String,
|
||||
@@ -100,24 +101,17 @@ pub async fn get_extract(
|
||||
let mut value = None;
|
||||
for path in &paths {
|
||||
match item.query(&extract_state, path).await {
|
||||
Ok(None) => continue,
|
||||
|
||||
Ok(Some(PileValue::Null)) => {
|
||||
value = Some(PileValue::Null);
|
||||
continue;
|
||||
}
|
||||
|
||||
Ok(Some(PileValue::Null)) | Ok(None) => continue,
|
||||
Ok(Some(v)) => {
|
||||
value = Some(v);
|
||||
break;
|
||||
}
|
||||
|
||||
Err(e) => return (StatusCode::INTERNAL_SERVER_ERROR, format!("{e:?}")).into_response(),
|
||||
}
|
||||
}
|
||||
|
||||
let Some(value) = value else {
|
||||
return (StatusCode::BAD_REQUEST, "no value").into_response();
|
||||
return StatusCode::NOT_FOUND.into_response();
|
||||
};
|
||||
|
||||
debug!(
|
||||
@@ -183,7 +177,6 @@ pub async fn get_extract(
|
||||
Json(json),
|
||||
)
|
||||
.into_response(),
|
||||
|
||||
Err(e) => (StatusCode::INTERNAL_SERVER_ERROR, format!("{e:?}")).into_response(),
|
||||
},
|
||||
}
|
||||
@@ -4,12 +4,13 @@ use axum::{
|
||||
http::StatusCode,
|
||||
response::{IntoResponse, Response},
|
||||
};
|
||||
use pile_dataset::Datasets;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::sync::Arc;
|
||||
use tracing::debug;
|
||||
use utoipa::ToSchema;
|
||||
|
||||
use crate::Datasets;
|
||||
|
||||
#[derive(Deserialize, ToSchema)]
|
||||
pub struct ItemsQuery {
|
||||
#[serde(default)]
|
||||
@@ -4,12 +4,13 @@ use axum::{
|
||||
http::StatusCode,
|
||||
response::{IntoResponse, Response},
|
||||
};
|
||||
use pile_dataset::Datasets;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::{sync::Arc, time::Instant};
|
||||
use tracing::debug;
|
||||
use utoipa::ToSchema;
|
||||
|
||||
use crate::Datasets;
|
||||
|
||||
#[derive(Serialize, Deserialize, ToSchema, Debug)]
|
||||
pub struct LookupRequest {
|
||||
pub query: String,
|
||||
92
crates/pile-dataset/src/serve/mod.rs
Normal file
92
crates/pile-dataset/src/serve/mod.rs
Normal file
@@ -0,0 +1,92 @@
|
||||
use axum::{
|
||||
Router,
|
||||
routing::{get, post},
|
||||
};
|
||||
use std::sync::Arc;
|
||||
use utoipa::OpenApi;
|
||||
use utoipa_swagger_ui::SwaggerUi;
|
||||
|
||||
use crate::Datasets;
|
||||
|
||||
mod lookup;
|
||||
pub use lookup::*;
|
||||
|
||||
mod extract;
|
||||
pub use extract::*;
|
||||
|
||||
mod items;
|
||||
pub use items::*;
|
||||
|
||||
mod config_schema;
|
||||
pub use config_schema::*;
|
||||
|
||||
mod schema_field;
|
||||
pub use schema_field::*;
|
||||
|
||||
mod schema;
|
||||
pub use schema::*;
|
||||
|
||||
#[derive(OpenApi)]
|
||||
#[openapi(
|
||||
tags(),
|
||||
paths(
|
||||
lookup,
|
||||
get_extract,
|
||||
items_list,
|
||||
config_schema,
|
||||
schema_field,
|
||||
schema_all
|
||||
),
|
||||
components(schemas(
|
||||
LookupRequest,
|
||||
LookupResponse,
|
||||
LookupResult,
|
||||
ExtractQuery,
|
||||
ItemsQuery,
|
||||
ItemsResponse,
|
||||
ItemRef
|
||||
))
|
||||
)]
|
||||
pub(crate) struct Api;
|
||||
|
||||
impl Datasets {
|
||||
#[inline]
|
||||
pub fn router(self: Arc<Self>, with_docs: bool) -> Router<()> {
|
||||
self.router_prefix(with_docs, None)
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub fn router_prefix(self: Arc<Self>, with_docs: bool, prefix: Option<&str>) -> Router<()> {
|
||||
let mut router = Router::new()
|
||||
.route("/lookup", post(lookup))
|
||||
.route("/extract", get(get_extract))
|
||||
.route("/items", get(items_list))
|
||||
.route("/config/schema", get(config_schema))
|
||||
.route("/schema", get(schema_all))
|
||||
.route("/schema/{field}", get(schema_field))
|
||||
.with_state(self.clone());
|
||||
|
||||
if let Some(prefix) = prefix {
|
||||
router = Router::new().nest(prefix, router);
|
||||
}
|
||||
|
||||
if with_docs {
|
||||
let docs_path = match prefix {
|
||||
None => "/docs".into(),
|
||||
Some(prefix) => format!("{prefix}/docs"),
|
||||
};
|
||||
|
||||
let api = Api::openapi();
|
||||
let api = match prefix {
|
||||
None => api,
|
||||
Some(prefix) => utoipa::openapi::OpenApi::default().nest(prefix, api),
|
||||
};
|
||||
|
||||
let docs =
|
||||
SwaggerUi::new(docs_path.clone()).url(format!("{}/openapi.json", docs_path), api);
|
||||
|
||||
router = router.merge(docs);
|
||||
}
|
||||
router
|
||||
}
|
||||
}
|
||||
@@ -5,12 +5,13 @@ use axum::{
|
||||
response::{IntoResponse, Response},
|
||||
};
|
||||
use pile_config::Label;
|
||||
use pile_dataset::Datasets;
|
||||
use pile_value::{extract::traits::ExtractState, value::PileValue};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::{collections::HashMap, sync::Arc};
|
||||
use utoipa::IntoParams;
|
||||
|
||||
use crate::Datasets;
|
||||
|
||||
#[derive(Deserialize, IntoParams)]
|
||||
pub struct SchemaQuery {
|
||||
source: String,
|
||||
@@ -20,7 +21,7 @@ pub struct SchemaQuery {
|
||||
hidden: bool,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
#[derive(Serialize, Deserialize)]
|
||||
#[serde(untagged)]
|
||||
pub enum ApiValue {
|
||||
Binary { binary: bool, mime: String },
|
||||
@@ -6,7 +6,6 @@ use axum::{
|
||||
response::{IntoResponse, Response},
|
||||
};
|
||||
use pile_config::Label;
|
||||
use pile_dataset::Datasets;
|
||||
use pile_value::{
|
||||
extract::traits::ExtractState,
|
||||
value::{BinaryPileValue, PileValue},
|
||||
@@ -17,6 +16,8 @@ use tokio_util::io::ReaderStream;
|
||||
use tracing::debug;
|
||||
use utoipa::IntoParams;
|
||||
|
||||
use crate::Datasets;
|
||||
|
||||
#[derive(Deserialize, IntoParams)]
|
||||
pub struct SchemaFieldQuery {
|
||||
source: String,
|
||||
@@ -83,24 +84,17 @@ pub async fn schema_field(
|
||||
let mut value = None;
|
||||
for path in paths {
|
||||
match item.query(&extract_state, path).await {
|
||||
Ok(None) => continue,
|
||||
|
||||
Ok(Some(PileValue::Null)) => {
|
||||
value = Some(PileValue::Null);
|
||||
continue;
|
||||
}
|
||||
|
||||
Ok(Some(PileValue::Null)) | Ok(None) => continue,
|
||||
Ok(Some(v)) => {
|
||||
value = Some(v);
|
||||
break;
|
||||
}
|
||||
|
||||
Err(e) => return (StatusCode::INTERNAL_SERVER_ERROR, format!("{e:?}")).into_response(),
|
||||
}
|
||||
}
|
||||
|
||||
let Some(value) = value else {
|
||||
return (StatusCode::BAD_REQUEST, "no value").into_response();
|
||||
return StatusCode::NOT_FOUND.into_response();
|
||||
};
|
||||
|
||||
debug!(
|
||||
@@ -1,28 +0,0 @@
|
||||
[package]
|
||||
name = "pile-serve"
|
||||
version = { workspace = true }
|
||||
rust-version = { workspace = true }
|
||||
edition = { workspace = true }
|
||||
|
||||
[lints]
|
||||
workspace = true
|
||||
|
||||
[dependencies]
|
||||
pile-config = { workspace = true }
|
||||
pile-value = { workspace = true }
|
||||
pile-dataset = { workspace = true }
|
||||
|
||||
serde_json = { workspace = true }
|
||||
tracing = { workspace = true }
|
||||
tokio = { workspace = true }
|
||||
tokio-util = { version = "0.7", features = ["io"] }
|
||||
|
||||
serde = { workspace = true }
|
||||
axum = { workspace = true }
|
||||
percent-encoding = { workspace = true }
|
||||
utoipa = { workspace = true }
|
||||
utoipa-swagger-ui = { workspace = true }
|
||||
|
||||
[features]
|
||||
default = []
|
||||
pdfium = ["pile-value/pdfium"]
|
||||
@@ -1,89 +0,0 @@
|
||||
use axum::{
|
||||
Router,
|
||||
routing::{get, post},
|
||||
};
|
||||
use pile_dataset::Datasets;
|
||||
use std::sync::Arc;
|
||||
use utoipa::OpenApi;
|
||||
use utoipa_swagger_ui::SwaggerUi;
|
||||
|
||||
mod lookup;
|
||||
pub use lookup::*;
|
||||
|
||||
mod extract;
|
||||
pub use extract::*;
|
||||
|
||||
mod items;
|
||||
pub use items::*;
|
||||
|
||||
mod config_schema;
|
||||
pub use config_schema::*;
|
||||
|
||||
mod schema_field;
|
||||
pub use schema_field::*;
|
||||
|
||||
mod schema;
|
||||
pub use schema::*;
|
||||
|
||||
#[derive(OpenApi)]
|
||||
#[openapi(
|
||||
tags(),
|
||||
paths(
|
||||
lookup,
|
||||
get_extract,
|
||||
items_list,
|
||||
config_schema,
|
||||
schema_field,
|
||||
schema_all
|
||||
),
|
||||
components(schemas(
|
||||
LookupRequest,
|
||||
LookupResponse,
|
||||
LookupResult,
|
||||
ExtractQuery,
|
||||
ItemsQuery,
|
||||
ItemsResponse,
|
||||
ItemRef
|
||||
))
|
||||
)]
|
||||
pub(crate) struct Api;
|
||||
|
||||
#[inline]
|
||||
pub fn router(ds: Arc<Datasets>, with_docs: bool) -> Router<()> {
|
||||
router_prefix(ds, with_docs, None)
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub fn router_prefix(ds: Arc<Datasets>, with_docs: bool, prefix: Option<&str>) -> Router<()> {
|
||||
let mut router = Router::new()
|
||||
.route("/lookup", post(lookup))
|
||||
.route("/extract", get(get_extract))
|
||||
.route("/items", get(items_list))
|
||||
.route("/config/schema", get(config_schema))
|
||||
.route("/schema", get(schema_all))
|
||||
.route("/schema/{field}", get(schema_field))
|
||||
.with_state(ds.clone());
|
||||
|
||||
if let Some(prefix) = prefix {
|
||||
router = Router::new().nest(prefix, router);
|
||||
}
|
||||
|
||||
if with_docs {
|
||||
let docs_path = match prefix {
|
||||
None => "/docs".into(),
|
||||
Some(prefix) => format!("{prefix}/docs"),
|
||||
};
|
||||
|
||||
let api = Api::openapi();
|
||||
let api = match prefix {
|
||||
None => api,
|
||||
Some(prefix) => utoipa::openapi::OpenApi::default().nest(prefix, api),
|
||||
};
|
||||
|
||||
let docs =
|
||||
SwaggerUi::new(docs_path.clone()).url(format!("{}/openapi.json", docs_path), api);
|
||||
|
||||
router = router.merge(docs);
|
||||
}
|
||||
router
|
||||
}
|
||||
@@ -27,6 +27,7 @@ macro_rules! hash_algos {
|
||||
}
|
||||
}
|
||||
|
||||
#[expect(clippy::unwrap_used)]
|
||||
static LABELS: std::sync::LazyLock<Vec<Label>> = std::sync::LazyLock::new(|| {
|
||||
vec![$(Label::new(stringify!($name)).unwrap()),*]
|
||||
});
|
||||
|
||||
@@ -9,9 +9,8 @@ workspace = true
|
||||
|
||||
[dependencies]
|
||||
pile-toolbox = { workspace = true }
|
||||
pile-dataset = { workspace = true }
|
||||
pile-serve = { workspace = true }
|
||||
pile-value = { workspace = true }
|
||||
pile-dataset = { workspace = true, features = ["axum", "pdfium"] }
|
||||
pile-value = { workspace = true, features = ["pdfium"] }
|
||||
pile-config = { workspace = true }
|
||||
|
||||
tracing = { workspace = true }
|
||||
@@ -35,7 +34,3 @@ base64 = { workspace = true }
|
||||
dotenvy = { workspace = true }
|
||||
envy = { workspace = true }
|
||||
thiserror = { workspace = true }
|
||||
|
||||
[features]
|
||||
default = ["pdfium"]
|
||||
pdfium = ["pile-dataset/pdfium", "pile-serve/pdfium", "pile-value/pdfium"]
|
||||
|
||||
@@ -57,7 +57,8 @@ impl CliCmd for ServeCommand {
|
||||
})?;
|
||||
}
|
||||
|
||||
let app = pile_serve::router(Arc::new(ds), true)
|
||||
let app = Arc::new(ds)
|
||||
.router(true)
|
||||
.into_make_service_with_connect_info::<std::net::SocketAddr>();
|
||||
|
||||
let listener = match tokio::net::TcpListener::bind(self.addr.clone()).await {
|
||||
|
||||
@@ -8,11 +8,10 @@ use axum::{
|
||||
routing::get,
|
||||
};
|
||||
use clap::Args;
|
||||
use pile_dataset::{DatasetError, Datasets};
|
||||
use pile_dataset::Datasets;
|
||||
use pile_toolbox::cancelabletask::{CancelFlag, CancelableTaskError};
|
||||
use pile_value::extract::traits::ExtractState;
|
||||
use serde::Serialize;
|
||||
use std::{fmt::Debug, path::PathBuf, sync::Arc, time::Duration};
|
||||
use std::{fmt::Debug, path::PathBuf, sync::Arc};
|
||||
use tracing::{error, info};
|
||||
use utoipa::{OpenApi, ToSchema};
|
||||
use utoipa_swagger_ui::SwaggerUi;
|
||||
@@ -28,18 +27,6 @@ pub struct ServerCommand {
|
||||
/// If provided, do not serve docs
|
||||
#[arg(long)]
|
||||
no_docs: bool,
|
||||
|
||||
/// If provided, never auto-refresh indices
|
||||
#[arg(long)]
|
||||
no_refresh: bool,
|
||||
|
||||
/// Number of threads to use to refresh indices
|
||||
#[arg(long, default_value = "5")]
|
||||
refresh_jobs: usize,
|
||||
|
||||
/// Refresh indices every `n` seconds
|
||||
#[arg(long, default_value = "300")]
|
||||
refresh_delay: usize,
|
||||
}
|
||||
|
||||
impl CliCmd for ServerCommand {
|
||||
@@ -60,57 +47,12 @@ impl CliCmd for ServerCommand {
|
||||
Arc::new(datasets)
|
||||
};
|
||||
|
||||
// Start auto-refresh task
|
||||
if !self.no_refresh {
|
||||
let datasets = datasets.clone();
|
||||
let jobs = self.refresh_jobs.max(1);
|
||||
let delay = self.refresh_delay.max(1);
|
||||
|
||||
async fn refresh_dataset(ds: &Datasets, jobs: usize) -> Result<(), DatasetError> {
|
||||
if ds.needs_fts().await? {
|
||||
let state = ExtractState { ignore_mime: false };
|
||||
match ds.fts_refresh(&state, jobs, None).await {
|
||||
Ok(()) => {}
|
||||
Err(CancelableTaskError::Error(err)) => return Err(err),
|
||||
Err(CancelableTaskError::Cancelled) => unreachable!(),
|
||||
};
|
||||
}
|
||||
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
tokio::task::spawn(async move {
|
||||
loop {
|
||||
for ds in datasets.iter() {
|
||||
match refresh_dataset(ds, jobs).await {
|
||||
Ok(x) => x,
|
||||
Err(error) => {
|
||||
error!(
|
||||
message = "Error while refreshing dataset",
|
||||
dataset = ds.config.dataset.name.as_str(),
|
||||
?error
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
tokio::time::sleep(Duration::from_secs(10)).await;
|
||||
}
|
||||
|
||||
tokio::time::sleep(Duration::from_secs(delay as u64)).await;
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
let bearer = BearerToken(ctx.config.api_token.clone().map(Arc::new));
|
||||
|
||||
let mut router = Router::new();
|
||||
for d in datasets.iter() {
|
||||
let prefix = format!("/{}", d.config.dataset.name);
|
||||
router = router.merge(pile_serve::router_prefix(
|
||||
d.clone(),
|
||||
!self.no_docs,
|
||||
Some(&prefix),
|
||||
))
|
||||
router = router.merge(d.clone().router_prefix(!self.no_docs, Some(&prefix)))
|
||||
}
|
||||
|
||||
router = router.merge(
|
||||
|
||||
Reference in New Issue
Block a user