Auto-refresh
This commit is contained in:
@@ -302,6 +302,7 @@ impl Datasets {
|
||||
_threads: usize,
|
||||
flag: Option<CancelFlag>,
|
||||
) -> Result<(), CancelableTaskError<DatasetError>> {
|
||||
let start = Instant::now();
|
||||
let workdir = match self.path_workdir.as_ref() {
|
||||
Some(x) => x,
|
||||
None => {
|
||||
@@ -313,6 +314,14 @@ impl Datasets {
|
||||
let fts_tmp_dir = workdir.join(".tmp-fts");
|
||||
let fts_dir = workdir.join("fts");
|
||||
|
||||
debug!(
|
||||
message = "Rebuilding fts index",
|
||||
dataset = self.config.dataset.name.as_str(),
|
||||
?fts_dir,
|
||||
?fts_tmp_dir,
|
||||
?workdir
|
||||
);
|
||||
|
||||
if fts_tmp_dir.is_dir() {
|
||||
warn!("Removing temporary index in {}", fts_dir.display());
|
||||
std::fs::remove_dir_all(&fts_tmp_dir).map_err(DatasetError::from)?;
|
||||
@@ -392,9 +401,18 @@ impl Datasets {
|
||||
return Err(CancelableTaskError::Cancelled);
|
||||
}
|
||||
|
||||
info!("Committing {total} documents");
|
||||
index_writer.commit().map_err(DatasetError::from)?;
|
||||
|
||||
debug!(
|
||||
message = "Rebuilt fts index",
|
||||
dataset = self.config.dataset.name.as_str(),
|
||||
?fts_dir,
|
||||
?fts_tmp_dir,
|
||||
?workdir,
|
||||
n_docs = total,
|
||||
time_ms = start.elapsed().as_millis()
|
||||
);
|
||||
|
||||
if fts_dir.is_dir() {
|
||||
warn!("Removing existing index in {}", fts_dir.display());
|
||||
std::fs::remove_dir_all(&fts_dir).map_err(DatasetError::from)?;
|
||||
|
||||
@@ -8,10 +8,11 @@ use axum::{
|
||||
routing::get,
|
||||
};
|
||||
use clap::Args;
|
||||
use pile_dataset::Datasets;
|
||||
use pile_dataset::{DatasetError, Datasets};
|
||||
use pile_toolbox::cancelabletask::{CancelFlag, CancelableTaskError};
|
||||
use pile_value::extract::traits::ExtractState;
|
||||
use serde::Serialize;
|
||||
use std::{fmt::Debug, path::PathBuf, sync::Arc};
|
||||
use std::{fmt::Debug, path::PathBuf, sync::Arc, time::Duration};
|
||||
use tracing::{error, info};
|
||||
use utoipa::{OpenApi, ToSchema};
|
||||
use utoipa_swagger_ui::SwaggerUi;
|
||||
@@ -27,6 +28,18 @@ pub struct ServerCommand {
|
||||
/// If provided, do not serve docs
|
||||
#[arg(long)]
|
||||
no_docs: bool,
|
||||
|
||||
/// If provided, never auto-refresh indices
|
||||
#[arg(long)]
|
||||
no_refresh: bool,
|
||||
|
||||
/// Number of threads to use to referesh indices
|
||||
#[arg(long, default_value = "5")]
|
||||
refresh_jobs: usize,
|
||||
|
||||
/// Refresh indices every `n` seconds
|
||||
#[arg(long, default_value = "300")]
|
||||
refresh_delay: usize,
|
||||
}
|
||||
|
||||
impl CliCmd for ServerCommand {
|
||||
@@ -47,6 +60,47 @@ impl CliCmd for ServerCommand {
|
||||
Arc::new(datasets)
|
||||
};
|
||||
|
||||
// Start auto-refresh task
|
||||
if !self.no_refresh {
|
||||
let datasets = datasets.clone();
|
||||
let jobs = self.refresh_jobs.max(1);
|
||||
let delay = self.refresh_delay.max(1);
|
||||
|
||||
async fn refresh_dataset(ds: &Datasets, jobs: usize) -> Result<(), DatasetError> {
|
||||
if ds.needs_fts().await? {
|
||||
let state = ExtractState { ignore_mime: false };
|
||||
match ds.fts_refresh(&state, jobs, None).await {
|
||||
Ok(()) => {}
|
||||
Err(CancelableTaskError::Error(err)) => return Err(err),
|
||||
Err(CancelableTaskError::Cancelled) => unreachable!(),
|
||||
};
|
||||
}
|
||||
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
tokio::task::spawn(async move {
|
||||
loop {
|
||||
for ds in datasets.iter() {
|
||||
match refresh_dataset(ds, jobs).await {
|
||||
Ok(x) => x,
|
||||
Err(error) => {
|
||||
error!(
|
||||
message = "Error while refreshing dataset",
|
||||
dataset = ds.config.dataset.name.as_str(),
|
||||
?error
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
tokio::time::sleep(Duration::from_secs(10)).await;
|
||||
}
|
||||
|
||||
tokio::time::sleep(Duration::from_secs(delay as u64)).await;
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
let bearer = BearerToken(ctx.config.api_token.clone().map(Arc::new));
|
||||
|
||||
let mut router = Router::new();
|
||||
|
||||
Reference in New Issue
Block a user