Compare commits

..

1 Commits

Author SHA1 Message Date
9b8f825667 Add hash extractor
Some checks failed
CI / Typos (push) Failing after 18s
CI / Build and test (push) Successful in 1m43s
CI / Clippy (push) Successful in 2m31s
Docker / build-and-push (push) Successful in 4m36s
CI / Build and test (all features) (push) Successful in 6m52s
2026-04-02 22:56:31 -07:00
3 changed files with 4 additions and 75 deletions

View File

@@ -302,7 +302,6 @@ impl Datasets {
_threads: usize,
flag: Option<CancelFlag>,
) -> Result<(), CancelableTaskError<DatasetError>> {
let start = Instant::now();
let workdir = match self.path_workdir.as_ref() {
Some(x) => x,
None => {
@@ -314,14 +313,6 @@ impl Datasets {
let fts_tmp_dir = workdir.join(".tmp-fts");
let fts_dir = workdir.join("fts");
debug!(
message = "Rebuilding fts index",
dataset = self.config.dataset.name.as_str(),
?fts_dir,
?fts_tmp_dir,
?workdir
);
if fts_tmp_dir.is_dir() {
warn!("Removing temporary index in {}", fts_dir.display());
std::fs::remove_dir_all(&fts_tmp_dir).map_err(DatasetError::from)?;
@@ -401,18 +392,9 @@ impl Datasets {
return Err(CancelableTaskError::Cancelled);
}
info!("Committing {total} documents");
index_writer.commit().map_err(DatasetError::from)?;
debug!(
message = "Rebuilt fts index",
dataset = self.config.dataset.name.as_str(),
?fts_dir,
?fts_tmp_dir,
?workdir,
n_docs = total,
time_ms = start.elapsed().as_millis()
);
if fts_dir.is_dir() {
warn!("Removing existing index in {}", fts_dir.display());
std::fs::remove_dir_all(&fts_dir).map_err(DatasetError::from)?;

View File

@@ -27,6 +27,7 @@ macro_rules! hash_algos {
}
}
#[expect(clippy::unwrap_used)]
static LABELS: std::sync::LazyLock<Vec<Label>> = std::sync::LazyLock::new(|| {
vec![$(Label::new(stringify!($name)).unwrap()),*]
});

View File

@@ -8,11 +8,10 @@ use axum::{
routing::get,
};
use clap::Args;
use pile_dataset::{DatasetError, Datasets};
use pile_dataset::Datasets;
use pile_toolbox::cancelabletask::{CancelFlag, CancelableTaskError};
use pile_value::extract::traits::ExtractState;
use serde::Serialize;
use std::{fmt::Debug, path::PathBuf, sync::Arc, time::Duration};
use std::{fmt::Debug, path::PathBuf, sync::Arc};
use tracing::{error, info};
use utoipa::{OpenApi, ToSchema};
use utoipa_swagger_ui::SwaggerUi;
@@ -28,18 +27,6 @@ pub struct ServerCommand {
/// If provided, do not serve docs
#[arg(long)]
no_docs: bool,
/// If provided, never auto-refresh indices
#[arg(long)]
no_refresh: bool,
/// Number of threads to use to referesh indices
#[arg(long, default_value = "5")]
refresh_jobs: usize,
/// Refresh indices every `n` seconds
#[arg(long, default_value = "300")]
refresh_delay: usize,
}
impl CliCmd for ServerCommand {
@@ -60,47 +47,6 @@ impl CliCmd for ServerCommand {
Arc::new(datasets)
};
// Start auto-refresh task
if !self.no_refresh {
let datasets = datasets.clone();
let jobs = self.refresh_jobs.max(1);
let delay = self.refresh_delay.max(1);
async fn refresh_dataset(ds: &Datasets, jobs: usize) -> Result<(), DatasetError> {
if ds.needs_fts().await? {
let state = ExtractState { ignore_mime: false };
match ds.fts_refresh(&state, jobs, None).await {
Ok(()) => {}
Err(CancelableTaskError::Error(err)) => return Err(err),
Err(CancelableTaskError::Cancelled) => unreachable!(),
};
}
return Ok(());
}
tokio::task::spawn(async move {
loop {
for ds in datasets.iter() {
match refresh_dataset(ds, jobs).await {
Ok(x) => x,
Err(error) => {
error!(
message = "Error while refreshing dataset",
dataset = ds.config.dataset.name.as_str(),
?error
);
}
}
tokio::time::sleep(Duration::from_secs(10)).await;
}
tokio::time::sleep(Duration::from_secs(delay as u64)).await;
}
});
}
let bearer = BearerToken(ctx.config.api_token.clone().map(Arc::new));
let mut router = Router::new();