Parallel indexing
Some checks failed
CI / Build and test (push) Waiting to run
CI / Typos (push) Successful in 25s
CI / Clippy (push) Has been cancelled

This commit is contained in:
2026-02-21 20:31:44 -08:00
parent bf1241e0a5
commit 936826597f
9 changed files with 153 additions and 49 deletions

View File

@@ -1,13 +1,23 @@
use chrono::{DateTime, Utc};
use pile_config::{ConfigToml, Source};
use pile_toolbox::cancelabletask::{CancelFlag, CancelableTaskError};
use std::{io::ErrorKind, path::PathBuf, sync::Arc};
use rayon::{
ThreadPoolBuilder,
iter::{IntoParallelIterator, ParallelIterator},
};
use std::{
io::ErrorKind,
path::PathBuf,
sync::{Arc, mpsc::Receiver},
thread::JoinHandle,
time::Instant,
};
use tantivy::{Executor, Index, IndexWriter, TantivyError, collector::TopDocs};
use thiserror::Error;
use tracing::{info, trace, warn};
use tracing::{debug, info, trace, warn};
use crate::{
DataSource,
DataSource, Item,
index::{DbFtsIndex, FtsLookupResult},
path_ts_earliest,
source::DirDataSource,
@@ -85,6 +95,7 @@ impl Dataset {
/// Refresh this dataset's fts index
pub fn fts_refresh(
&self,
threads: usize,
flag: Option<CancelFlag>,
) -> Result<(), CancelableTaskError<DatasetError>> {
let fts_tmp_dir = self.path_workdir.join(".tmp-fts");
@@ -94,23 +105,9 @@ impl Dataset {
warn!("Removing temporary index in {}", fts_dir.display());
std::fs::remove_dir_all(&fts_tmp_dir).map_err(DatasetError::from)?;
}
if fts_dir.is_dir() {
warn!("Removing existing index in {}", fts_dir.display());
std::fs::remove_dir_all(&fts_dir).map_err(DatasetError::from)?;
}
std::fs::create_dir_all(&fts_tmp_dir).map_err(DatasetError::from)?;
let mut sources = Vec::new();
for (name, source) in &self.config.dataset.source {
match source {
Source::Flac { path: dir } => {
let source = DirDataSource::new(name, dir.clone().to_vec());
sources.push(source);
}
}
}
let db_index = DbFtsIndex::new(&fts_tmp_dir, &self.config);
let mut index = Index::create_in_dir(&fts_tmp_dir, db_index.schema.clone())
.map_err(DatasetError::from)?;
@@ -118,35 +115,72 @@ impl Dataset {
let mut index_writer: IndexWriter =
index.writer(50 * 1024 * 1024).map_err(DatasetError::from)?;
for s in sources {
info!("Processing source {:?}", s.name);
let batch_size = 5000;
let (_read_task, read_rx) = start_read_task(&self.config, batch_size);
for i in s.iter() {
let (k, v) = i.map_err(DatasetError::from)?;
#[expect(clippy::unwrap_used)]
let write_pool = ThreadPoolBuilder::new()
.num_threads(threads.max(1))
.thread_name(|x| format!("fts_refresh_thread_{x}"))
.build()
.unwrap();
let doc = match db_index.entry_to_document(&*v) {
Ok(Some(x)) => x,
Ok(None) => {
warn!("Skipping {k:?}, document is empty");
continue;
}
Err(err) => {
warn!("Could not read {k:?}, skipping. {err}");
continue;
}
};
index_writer.add_document(doc).map_err(DatasetError::from)?;
let mut total = 0u64;
while let Ok(batch) = read_rx.recv() {
let batch = batch.map_err(DatasetError::from)?;
let len = batch.len() as u64;
if let Some(flag) = flag.as_ref()
&& flag.is_cancelled()
{
return Err(CancelableTaskError::Cancelled);
}
if let Some(flag) = &flag
&& flag.is_cancelled()
{
return Err(CancelableTaskError::Cancelled);
}
let start = Instant::now();
write_pool
.install(|| {
batch
.into_par_iter()
.filter_map(|(key, item)| match db_index.entry_to_document(&*item) {
Ok(Some(doc)) => Some((key, doc)),
Ok(None) => {
warn!("Skipping {key:?}, document is empty");
None
}
Err(err) => {
warn!("Could not read {key:?}, skipping. {err}");
None
}
})
.map(|(key, doc)| {
index_writer
.add_document(doc)
.map_err(|err| (key, err))
.map(|_| ())
})
.find_first(|x| x.is_err())
.unwrap_or(Ok(()))
})
.map_err(|(_key, err)| DatasetError::from(err))?;
total += len;
let time_ms = start.elapsed().as_millis();
debug!("Added a batch of {len} in {time_ms} ms ({total} total)");
}
if let Some(flag) = flag.as_ref()
&& flag.is_cancelled()
{
return Err(CancelableTaskError::Cancelled);
}
info!("Committing index");
index_writer.commit().map_err(DatasetError::from)?;
if fts_dir.is_dir() {
warn!("Removing existing index in {}", fts_dir.display());
std::fs::remove_dir_all(&fts_dir).map_err(DatasetError::from)?;
}
std::fs::rename(&fts_tmp_dir, &fts_dir).map_err(DatasetError::from)?;
return Ok(());
@@ -229,3 +263,52 @@ impl Dataset {
}
}
}
//
// MARK: read_task
//
fn start_read_task(
config: &ConfigToml,
batch_size: usize,
) -> (
JoinHandle<()>,
Receiver<Result<Vec<(PathBuf, Box<dyn Item<Key = PathBuf>>)>, DatasetError>>,
) {
let config = config.clone();
let (read_tx, read_rx) = std::sync::mpsc::sync_channel(2);
let read_task = std::thread::spawn(move || {
let mut batch = Vec::with_capacity(batch_size);
for (name, source) in &config.dataset.source {
debug!("Loading {name}");
match source {
Source::Flac { path: dir } => {
let source = DirDataSource::new(name, dir.clone().to_vec());
for i in source.iter() {
match i {
Ok(x) => batch.push(x),
Err(err) => {
let err = Err(DatasetError::from(err));
let _ = read_tx.send(err);
return;
}
}
if batch.len() >= batch_size {
let b = std::mem::replace(&mut batch, Vec::with_capacity(batch_size));
match read_tx.send(Ok(b)) {
Ok(()) => {}
Err(_) => return,
};
}
}
}
}
}
});
return (read_task, read_rx);
}