From 195a1c78eaec5bf40368901e1d3af10411b3e302 Mon Sep 17 00:00:00 2001 From: rm-dr <96270320+rm-dr@users.noreply.github.com> Date: Tue, 10 Mar 2026 09:44:05 -0700 Subject: [PATCH] Parallel indexing --- crates/pile-dataset/src/dataset.rs | 55 +++++++++++++++++++++--------- 1 file changed, 39 insertions(+), 16 deletions(-) diff --git a/crates/pile-dataset/src/dataset.rs b/crates/pile-dataset/src/dataset.rs index 17c9337..2ac2786 100644 --- a/crates/pile-dataset/src/dataset.rs +++ b/crates/pile-dataset/src/dataset.rs @@ -4,6 +4,7 @@ use pile_toolbox::cancelabletask::{CancelFlag, CancelableTaskError}; use std::{collections::HashMap, io::ErrorKind, path::PathBuf, sync::Arc, time::Instant}; use tantivy::{Executor, Index, IndexWriter, TantivyError, collector::TopDocs}; use thiserror::Error; +use tokio::task::JoinSet; use tokio_stream::{StreamExt, wrappers::ReceiverStream}; use tracing::{debug, info, trace, warn}; @@ -189,7 +190,7 @@ impl Datasets { std::fs::create_dir_all(&fts_tmp_dir).map_err(DatasetError::from)?; - let db_index = DbFtsIndex::new(&fts_tmp_dir, &self.config); + let db_index = Arc::new(DbFtsIndex::new(&fts_tmp_dir, &self.config)); let mut index = Index::create_in_dir(&fts_tmp_dir, db_index.schema.clone()) .map_err(DatasetError::from)?; index.set_executor(Executor::multi_thread(10, "build-fts").map_err(DatasetError::from)?); @@ -199,6 +200,25 @@ impl Datasets { let mut total = 0u64; let mut logged_at = Instant::now(); + const CONCURRENCY: usize = 32; + let mut join_set = JoinSet::new(); + + let mut process = |key, result: Result<_, _>| -> Result<(), DatasetError> { + match result { + Ok(Some(doc)) => { + index_writer.add_document(doc).map_err(DatasetError::from)?; + total += 1; + if logged_at.elapsed().as_secs() >= 5 { + debug!("Indexed {total} documents so far"); + logged_at = Instant::now(); + } + } + Ok(None) => warn!("Skipping {key:?}, document is empty"), + Err(err) => warn!("Could not read {key:?}, skipping. {err}"), + } + Ok(()) + }; + for (name, dataset) in &self.sources { info!("Loading source {name}"); @@ -211,27 +231,30 @@ impl Datasets { } let item = item_result.map_err(DatasetError::from)?; - let key = item.key(); + let db = Arc::clone(&db_index); + join_set.spawn(async move { + let key = item.key(); + let result = db.entry_to_document(&item).await; + (key, result) + }); - match db_index.entry_to_document(&item).await { - Ok(Some(doc)) => { - index_writer.add_document(doc).map_err(DatasetError::from)?; - total += 1; - if logged_at.elapsed().as_secs() >= 5 { - debug!("Indexed {total} documents so far"); - logged_at = Instant::now(); - } - } - Ok(None) => { - warn!("Skipping {key:?}, document is empty"); - } - Err(err) => { - warn!("Could not read {key:?}, skipping. {err}"); + while join_set.len() >= CONCURRENCY { + match join_set.join_next().await { + Some(Ok((key, result))) => process(key, result)?, + Some(Err(e)) => warn!("Indexing task panicked: {e}"), + None => break, } } } } + while let Some(join_result) = join_set.join_next().await { + match join_result { + Ok((key, result)) => process(key, result)?, + Err(e) => warn!("Indexing task panicked: {e}"), + } + } + if let Some(flag) = flag.as_ref() && flag.is_cancelled() {