From d3ab2684f43dea2f43be133fbf87fbb3d80f761e Mon Sep 17 00:00:00 2001 From: rm-dr <96270320+rm-dr@users.noreply.github.com> Date: Fri, 3 Apr 2026 08:57:43 -0700 Subject: [PATCH] Auto-refresh --- crates/pile-dataset/src/dataset.rs | 20 ++++++++++- crates/pile/src/command/server.rs | 58 ++++++++++++++++++++++++++++-- 2 files changed, 75 insertions(+), 3 deletions(-) diff --git a/crates/pile-dataset/src/dataset.rs b/crates/pile-dataset/src/dataset.rs index f23d615..d2bd2a3 100644 --- a/crates/pile-dataset/src/dataset.rs +++ b/crates/pile-dataset/src/dataset.rs @@ -302,6 +302,7 @@ impl Datasets { _threads: usize, flag: Option, ) -> Result<(), CancelableTaskError> { + let start = Instant::now(); let workdir = match self.path_workdir.as_ref() { Some(x) => x, None => { @@ -313,6 +314,14 @@ impl Datasets { let fts_tmp_dir = workdir.join(".tmp-fts"); let fts_dir = workdir.join("fts"); + debug!( + message = "Rebuilding fts index", + dataset = self.config.dataset.name.as_str(), + ?fts_dir, + ?fts_tmp_dir, + ?workdir + ); + if fts_tmp_dir.is_dir() { warn!("Removing temporary index in {}", fts_dir.display()); std::fs::remove_dir_all(&fts_tmp_dir).map_err(DatasetError::from)?; @@ -392,9 +401,18 @@ impl Datasets { return Err(CancelableTaskError::Cancelled); } - info!("Committing {total} documents"); index_writer.commit().map_err(DatasetError::from)?; + debug!( + message = "Rebuilt fts index", + dataset = self.config.dataset.name.as_str(), + ?fts_dir, + ?fts_tmp_dir, + ?workdir, + n_docs = total, + time_ms = start.elapsed().as_millis() + ); + if fts_dir.is_dir() { warn!("Removing existing index in {}", fts_dir.display()); std::fs::remove_dir_all(&fts_dir).map_err(DatasetError::from)?; diff --git a/crates/pile/src/command/server.rs b/crates/pile/src/command/server.rs index 3fb379c..9f6e4e8 100644 --- a/crates/pile/src/command/server.rs +++ b/crates/pile/src/command/server.rs @@ -8,10 +8,11 @@ use axum::{ routing::get, }; use clap::Args; -use pile_dataset::Datasets; +use pile_dataset::{DatasetError, Datasets}; use pile_toolbox::cancelabletask::{CancelFlag, CancelableTaskError}; +use pile_value::extract::traits::ExtractState; use serde::Serialize; -use std::{fmt::Debug, path::PathBuf, sync::Arc}; +use std::{fmt::Debug, path::PathBuf, sync::Arc, time::Duration}; use tracing::{error, info}; use utoipa::{OpenApi, ToSchema}; use utoipa_swagger_ui::SwaggerUi; @@ -27,6 +28,18 @@ pub struct ServerCommand { /// If provided, do not serve docs #[arg(long)] no_docs: bool, + + /// If provided, never auto-refresh indices + #[arg(long)] + no_refresh: bool, + + /// Number of threads to use to referesh indices + #[arg(long, default_value = "5")] + refresh_jobs: usize, + + /// Refresh indices every `n` seconds + #[arg(long, default_value = "300")] + refresh_delay: usize, } impl CliCmd for ServerCommand { @@ -47,6 +60,47 @@ impl CliCmd for ServerCommand { Arc::new(datasets) }; + // Start auto-refresh task + if !self.no_refresh { + let datasets = datasets.clone(); + let jobs = self.refresh_jobs.max(1); + let delay = self.refresh_delay.max(1); + + async fn refresh_dataset(ds: &Datasets, jobs: usize) -> Result<(), DatasetError> { + if ds.needs_fts().await? { + let state = ExtractState { ignore_mime: false }; + match ds.fts_refresh(&state, jobs, None).await { + Ok(()) => {} + Err(CancelableTaskError::Error(err)) => return Err(err), + Err(CancelableTaskError::Cancelled) => unreachable!(), + }; + } + + return Ok(()); + } + + tokio::task::spawn(async move { + loop { + for ds in datasets.iter() { + match refresh_dataset(ds, jobs).await { + Ok(x) => x, + Err(error) => { + error!( + message = "Error while refreshing dataset", + dataset = ds.config.dataset.name.as_str(), + ?error + ); + } + } + + tokio::time::sleep(Duration::from_secs(10)).await; + } + + tokio::time::sleep(Duration::from_secs(delay as u64)).await; + } + }); + } + let bearer = BearerToken(ctx.config.api_token.clone().map(Arc::new)); let mut router = Router::new();