Compare commits
1 Commits
d3ab2684f4
...
9b8f825667
| Author | SHA1 | Date | |
|---|---|---|---|
| 9b8f825667 |
@@ -302,7 +302,6 @@ impl Datasets {
|
|||||||
_threads: usize,
|
_threads: usize,
|
||||||
flag: Option<CancelFlag>,
|
flag: Option<CancelFlag>,
|
||||||
) -> Result<(), CancelableTaskError<DatasetError>> {
|
) -> Result<(), CancelableTaskError<DatasetError>> {
|
||||||
let start = Instant::now();
|
|
||||||
let workdir = match self.path_workdir.as_ref() {
|
let workdir = match self.path_workdir.as_ref() {
|
||||||
Some(x) => x,
|
Some(x) => x,
|
||||||
None => {
|
None => {
|
||||||
@@ -314,14 +313,6 @@ impl Datasets {
|
|||||||
let fts_tmp_dir = workdir.join(".tmp-fts");
|
let fts_tmp_dir = workdir.join(".tmp-fts");
|
||||||
let fts_dir = workdir.join("fts");
|
let fts_dir = workdir.join("fts");
|
||||||
|
|
||||||
debug!(
|
|
||||||
message = "Rebuilding fts index",
|
|
||||||
dataset = self.config.dataset.name.as_str(),
|
|
||||||
?fts_dir,
|
|
||||||
?fts_tmp_dir,
|
|
||||||
?workdir
|
|
||||||
);
|
|
||||||
|
|
||||||
if fts_tmp_dir.is_dir() {
|
if fts_tmp_dir.is_dir() {
|
||||||
warn!("Removing temporary index in {}", fts_dir.display());
|
warn!("Removing temporary index in {}", fts_dir.display());
|
||||||
std::fs::remove_dir_all(&fts_tmp_dir).map_err(DatasetError::from)?;
|
std::fs::remove_dir_all(&fts_tmp_dir).map_err(DatasetError::from)?;
|
||||||
@@ -401,18 +392,9 @@ impl Datasets {
|
|||||||
return Err(CancelableTaskError::Cancelled);
|
return Err(CancelableTaskError::Cancelled);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
info!("Committing {total} documents");
|
||||||
index_writer.commit().map_err(DatasetError::from)?;
|
index_writer.commit().map_err(DatasetError::from)?;
|
||||||
|
|
||||||
debug!(
|
|
||||||
message = "Rebuilt fts index",
|
|
||||||
dataset = self.config.dataset.name.as_str(),
|
|
||||||
?fts_dir,
|
|
||||||
?fts_tmp_dir,
|
|
||||||
?workdir,
|
|
||||||
n_docs = total,
|
|
||||||
time_ms = start.elapsed().as_millis()
|
|
||||||
);
|
|
||||||
|
|
||||||
if fts_dir.is_dir() {
|
if fts_dir.is_dir() {
|
||||||
warn!("Removing existing index in {}", fts_dir.display());
|
warn!("Removing existing index in {}", fts_dir.display());
|
||||||
std::fs::remove_dir_all(&fts_dir).map_err(DatasetError::from)?;
|
std::fs::remove_dir_all(&fts_dir).map_err(DatasetError::from)?;
|
||||||
|
|||||||
@@ -27,6 +27,7 @@ macro_rules! hash_algos {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[expect(clippy::unwrap_used)]
|
||||||
static LABELS: std::sync::LazyLock<Vec<Label>> = std::sync::LazyLock::new(|| {
|
static LABELS: std::sync::LazyLock<Vec<Label>> = std::sync::LazyLock::new(|| {
|
||||||
vec![$(Label::new(stringify!($name)).unwrap()),*]
|
vec![$(Label::new(stringify!($name)).unwrap()),*]
|
||||||
});
|
});
|
||||||
|
|||||||
@@ -8,11 +8,10 @@ use axum::{
|
|||||||
routing::get,
|
routing::get,
|
||||||
};
|
};
|
||||||
use clap::Args;
|
use clap::Args;
|
||||||
use pile_dataset::{DatasetError, Datasets};
|
use pile_dataset::Datasets;
|
||||||
use pile_toolbox::cancelabletask::{CancelFlag, CancelableTaskError};
|
use pile_toolbox::cancelabletask::{CancelFlag, CancelableTaskError};
|
||||||
use pile_value::extract::traits::ExtractState;
|
|
||||||
use serde::Serialize;
|
use serde::Serialize;
|
||||||
use std::{fmt::Debug, path::PathBuf, sync::Arc, time::Duration};
|
use std::{fmt::Debug, path::PathBuf, sync::Arc};
|
||||||
use tracing::{error, info};
|
use tracing::{error, info};
|
||||||
use utoipa::{OpenApi, ToSchema};
|
use utoipa::{OpenApi, ToSchema};
|
||||||
use utoipa_swagger_ui::SwaggerUi;
|
use utoipa_swagger_ui::SwaggerUi;
|
||||||
@@ -28,18 +27,6 @@ pub struct ServerCommand {
|
|||||||
/// If provided, do not serve docs
|
/// If provided, do not serve docs
|
||||||
#[arg(long)]
|
#[arg(long)]
|
||||||
no_docs: bool,
|
no_docs: bool,
|
||||||
|
|
||||||
/// If provided, never auto-refresh indices
|
|
||||||
#[arg(long)]
|
|
||||||
no_refresh: bool,
|
|
||||||
|
|
||||||
/// Number of threads to use to referesh indices
|
|
||||||
#[arg(long, default_value = "5")]
|
|
||||||
refresh_jobs: usize,
|
|
||||||
|
|
||||||
/// Refresh indices every `n` seconds
|
|
||||||
#[arg(long, default_value = "300")]
|
|
||||||
refresh_delay: usize,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
impl CliCmd for ServerCommand {
|
impl CliCmd for ServerCommand {
|
||||||
@@ -60,47 +47,6 @@ impl CliCmd for ServerCommand {
|
|||||||
Arc::new(datasets)
|
Arc::new(datasets)
|
||||||
};
|
};
|
||||||
|
|
||||||
// Start auto-refresh task
|
|
||||||
if !self.no_refresh {
|
|
||||||
let datasets = datasets.clone();
|
|
||||||
let jobs = self.refresh_jobs.max(1);
|
|
||||||
let delay = self.refresh_delay.max(1);
|
|
||||||
|
|
||||||
async fn refresh_dataset(ds: &Datasets, jobs: usize) -> Result<(), DatasetError> {
|
|
||||||
if ds.needs_fts().await? {
|
|
||||||
let state = ExtractState { ignore_mime: false };
|
|
||||||
match ds.fts_refresh(&state, jobs, None).await {
|
|
||||||
Ok(()) => {}
|
|
||||||
Err(CancelableTaskError::Error(err)) => return Err(err),
|
|
||||||
Err(CancelableTaskError::Cancelled) => unreachable!(),
|
|
||||||
};
|
|
||||||
}
|
|
||||||
|
|
||||||
return Ok(());
|
|
||||||
}
|
|
||||||
|
|
||||||
tokio::task::spawn(async move {
|
|
||||||
loop {
|
|
||||||
for ds in datasets.iter() {
|
|
||||||
match refresh_dataset(ds, jobs).await {
|
|
||||||
Ok(x) => x,
|
|
||||||
Err(error) => {
|
|
||||||
error!(
|
|
||||||
message = "Error while refreshing dataset",
|
|
||||||
dataset = ds.config.dataset.name.as_str(),
|
|
||||||
?error
|
|
||||||
);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
tokio::time::sleep(Duration::from_secs(10)).await;
|
|
||||||
}
|
|
||||||
|
|
||||||
tokio::time::sleep(Duration::from_secs(delay as u64)).await;
|
|
||||||
}
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
let bearer = BearerToken(ctx.config.api_token.clone().map(Arc::new));
|
let bearer = BearerToken(ctx.config.api_token.clone().map(Arc::new));
|
||||||
|
|
||||||
let mut router = Router::new();
|
let mut router = Router::new();
|
||||||
|
|||||||
Reference in New Issue
Block a user