From 78ca183aeaf2c128b0848651d145cd284b019685 Mon Sep 17 00:00:00 2001 From: rm-dr <96270320+rm-dr@users.noreply.github.com> Date: Sat, 21 Feb 2026 21:40:32 -0800 Subject: [PATCH] Parallel indexing --- Cargo.lock | 1 + Cargo.toml | 2 + crates/pile-dataset/Cargo.toml | 1 + crates/pile-dataset/src/dataset.rs | 197 ++++++++++++++++----- crates/pile-dataset/src/index/index_fts.rs | 9 +- crates/pile-dataset/src/source/dir.rs | 29 ++- crates/pile-dataset/src/traits.rs | 9 +- crates/pile/src/command/index.rs | 6 +- crates/pile/src/command/lookup.rs | 30 ++-- crates/pile/src/config/logging.rs | 1 + 10 files changed, 213 insertions(+), 72 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index f3a62d9..ef263e9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1069,6 +1069,7 @@ dependencies = [ "pile-config", "pile-flac", "pile-toolbox", + "rayon", "serde_json", "tantivy", "thiserror", diff --git a/Cargo.toml b/Cargo.toml index 2f24117..2408663 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -102,3 +102,5 @@ mime = "0.3.17" paste = "1.0.15" smartstring = "1.0.1" chrono = "0.4.43" +parking_lot = "0.12.5" +rayon = "1.11.0" diff --git a/crates/pile-dataset/Cargo.toml b/crates/pile-dataset/Cargo.toml index 4959e02..72039d1 100644 --- a/crates/pile-dataset/Cargo.toml +++ b/crates/pile-dataset/Cargo.toml @@ -22,3 +22,4 @@ jsonpath-rust = { workspace = true } chrono = { workspace = true } toml = { workspace = true } thiserror = { workspace = true } +rayon = { workspace = true } diff --git a/crates/pile-dataset/src/dataset.rs b/crates/pile-dataset/src/dataset.rs index 7a47ca6..35e5378 100644 --- a/crates/pile-dataset/src/dataset.rs +++ b/crates/pile-dataset/src/dataset.rs @@ -1,13 +1,23 @@ use chrono::{DateTime, Utc}; -use pile_config::{ConfigToml, Source}; +use pile_config::{ConfigToml, Label, Source}; use pile_toolbox::cancelabletask::{CancelFlag, CancelableTaskError}; -use std::{io::ErrorKind, path::PathBuf, sync::Arc}; +use rayon::{ + ThreadPoolBuilder, + iter::{IntoParallelIterator, ParallelIterator}, +}; +use std::{ + io::ErrorKind, + path::PathBuf, + sync::{Arc, mpsc::Receiver}, + thread::JoinHandle, + time::Instant, +}; use tantivy::{Executor, Index, IndexWriter, TantivyError, collector::TopDocs}; use thiserror::Error; -use tracing::{info, trace, warn}; +use tracing::{debug, info, trace, warn}; use crate::{ - DataSource, + DataSource, Item, index::{DbFtsIndex, FtsLookupResult}, path_ts_earliest, source::DirDataSource, @@ -78,6 +88,23 @@ impl Dataset { }); } + // + // MARK: get + // + + pub fn get( + &self, + source: &Label, + key: &PathBuf, + ) -> Option + 'static>> { + let s = self.config.dataset.source.get(source)?; + let s = match s { + Source::Flac { path } => DirDataSource::new(source, path.clone().to_vec()), + }; + + s.get(key).ok().flatten() + } + // // MARK: fts // @@ -85,6 +112,7 @@ impl Dataset { /// Refresh this dataset's fts index pub fn fts_refresh( &self, + threads: usize, flag: Option, ) -> Result<(), CancelableTaskError> { let fts_tmp_dir = self.path_workdir.join(".tmp-fts"); @@ -94,23 +122,9 @@ impl Dataset { warn!("Removing temporary index in {}", fts_dir.display()); std::fs::remove_dir_all(&fts_tmp_dir).map_err(DatasetError::from)?; } - if fts_dir.is_dir() { - warn!("Removing existing index in {}", fts_dir.display()); - std::fs::remove_dir_all(&fts_dir).map_err(DatasetError::from)?; - } std::fs::create_dir_all(&fts_tmp_dir).map_err(DatasetError::from)?; - let mut sources = Vec::new(); - for (name, source) in &self.config.dataset.source { - match source { - Source::Flac { path: dir } => { - let source = DirDataSource::new(name, dir.clone().to_vec()); - sources.push(source); - } - } - } - let db_index = DbFtsIndex::new(&fts_tmp_dir, &self.config); let mut index = Index::create_in_dir(&fts_tmp_dir, db_index.schema.clone()) .map_err(DatasetError::from)?; @@ -118,35 +132,72 @@ impl Dataset { let mut index_writer: IndexWriter = index.writer(50 * 1024 * 1024).map_err(DatasetError::from)?; - for s in sources { - info!("Processing source {:?}", s.name); + let batch_size = 1000; + let (_read_task, read_rx) = start_read_task(&self.config, batch_size); - for i in s.iter() { - let (k, v) = i.map_err(DatasetError::from)?; + #[expect(clippy::unwrap_used)] + let write_pool = ThreadPoolBuilder::new() + .num_threads(threads.max(1)) + .thread_name(|x| format!("fts_refresh_thread_{x}")) + .build() + .unwrap(); - let doc = match db_index.entry_to_document(&*v) { - Ok(Some(x)) => x, - Ok(None) => { - warn!("Skipping {k:?}, document is empty"); - continue; - } - Err(err) => { - warn!("Could not read {k:?}, skipping. {err}"); - continue; - } - }; - index_writer.add_document(doc).map_err(DatasetError::from)?; + let mut total = 0u64; + while let Ok(batch) = read_rx.recv() { + let batch = batch.map_err(DatasetError::from)?; + let len = batch.len() as u64; - if let Some(flag) = flag.as_ref() - && flag.is_cancelled() - { - return Err(CancelableTaskError::Cancelled); - } + if let Some(flag) = &flag + && flag.is_cancelled() + { + return Err(CancelableTaskError::Cancelled); } + + let start = Instant::now(); + write_pool + .install(|| { + batch + .into_par_iter() + .filter_map(|(key, item)| match db_index.entry_to_document(&*item) { + Ok(Some(doc)) => Some((key, doc)), + Ok(None) => { + warn!("Skipping {key:?}, document is empty"); + None + } + Err(err) => { + warn!("Could not read {key:?}, skipping. {err}"); + None + } + }) + .map(|(key, doc)| { + index_writer + .add_document(doc) + .map_err(|err| (key, err)) + .map(|_| ()) + }) + .find_first(|x| x.is_err()) + .unwrap_or(Ok(())) + }) + .map_err(|(_key, err)| DatasetError::from(err))?; + + total += len; + let time_ms = start.elapsed().as_millis(); + debug!("Added a batch of {len} in {time_ms} ms ({total} total)"); + } + + if let Some(flag) = flag.as_ref() + && flag.is_cancelled() + { + return Err(CancelableTaskError::Cancelled); } info!("Committing index"); index_writer.commit().map_err(DatasetError::from)?; + + if fts_dir.is_dir() { + warn!("Removing existing index in {}", fts_dir.display()); + std::fs::remove_dir_all(&fts_dir).map_err(DatasetError::from)?; + } std::fs::rename(&fts_tmp_dir, &fts_dir).map_err(DatasetError::from)?; return Ok(()); @@ -215,17 +266,75 @@ impl Dataset { /// Returns true if we do not have an fts index, /// or if our fts index is older than our data. pub fn needs_fts(&self) -> Result { + let start = Instant::now(); let ts_fts = self.ts_fts()?; let ts_data = self.ts_data()?; - match (ts_fts, ts_data) { - (None, Some(_)) => return Ok(true), + let result = match (ts_fts, ts_data) { + (None, Some(_)) => true, (None, None) | (Some(_), None) => { warn!("Could not determine data age"); - return Ok(false); + false } - (Some(ts_fts), Some(ts_data)) => return Ok(ts_data > ts_fts), - } + (Some(ts_fts), Some(ts_data)) => ts_data > ts_fts, + }; + + debug!( + message = "Ran needs_fts", + time_ms = start.elapsed().as_millis(), + ?result + ); + + return Ok(result); } } + +// +// MARK: read_task +// + +fn start_read_task( + config: &ConfigToml, + batch_size: usize, +) -> ( + JoinHandle<()>, + Receiver>)>, DatasetError>>, +) { + let config = config.clone(); + let (read_tx, read_rx) = std::sync::mpsc::sync_channel(2); + + let read_task = std::thread::spawn(move || { + let mut batch = Vec::with_capacity(batch_size); + for (name, source) in &config.dataset.source { + info!("Loading source {name}"); + + match source { + Source::Flac { path: dir } => { + let source = DirDataSource::new(name, dir.clone().to_vec()); + for i in source.iter() { + match i { + Ok(x) => batch.push(x), + Err(err) => { + let err = Err(DatasetError::from(err)); + let _ = read_tx.send(err); + return; + } + } + + if batch.len() >= batch_size { + let b = std::mem::replace(&mut batch, Vec::with_capacity(batch_size)); + + match read_tx.send(Ok(b)) { + Ok(()) => {} + Err(_) => return, + }; + } + } + } + } + } + }); + + return (read_task, read_rx); +} diff --git a/crates/pile-dataset/src/index/index_fts.rs b/crates/pile-dataset/src/index/index_fts.rs index 6649f86..dce31e2 100644 --- a/crates/pile-dataset/src/index/index_fts.rs +++ b/crates/pile-dataset/src/index/index_fts.rs @@ -15,7 +15,7 @@ use crate::{Item, Key}; #[derive(Debug, Clone)] pub struct FtsLookupResult { pub score: f32, - pub source_name: String, + pub source_name: Label, pub key: String, } @@ -87,7 +87,9 @@ impl DbFtsIndex { let json = item.json()?; let mut empty = true; for name in self.fts_cfg().fields.keys() { - let val = match self.get_field(&json, name)? { + let x = self.get_field(&json, name)?; + + let val = match x { Some(x) => x, None => continue, }; @@ -246,7 +248,6 @@ impl DbFtsIndex { return Ok(Vec::new()); } - let query: String = query.into(); let index = Index::open_in_dir(&self.path)?; let reader = index .reader_builder() @@ -260,6 +261,7 @@ impl DbFtsIndex { fields.push(f.0) } + let query: String = query.into(); let searcher = reader.searcher(); let query_parser = QueryParser::for_index(&index, fields); let query = query_parser.parse_query(&query)?; @@ -277,6 +279,7 @@ impl DbFtsIndex { for v in retrieved_doc.get_all(f_source) { assert!(source.is_none()); // Must only exist once let v = v.as_str().unwrap().to_owned(); + let v = Label::new(v).unwrap(); source = Some(v) } diff --git a/crates/pile-dataset/src/source/dir.rs b/crates/pile-dataset/src/source/dir.rs index 0f89624..fe65270 100644 --- a/crates/pile-dataset/src/source/dir.rs +++ b/crates/pile-dataset/src/source/dir.rs @@ -25,7 +25,10 @@ impl DataSource for DirDataSource { type Key = PathBuf; type Error = std::io::Error; - fn get(&self, key: &Self::Key) -> Result>>, Self::Error> { + fn get( + &self, + key: &Self::Key, + ) -> Result + 'static>>, Self::Error> { if !key.is_file() { return Ok(None); } @@ -44,20 +47,28 @@ impl DataSource for DirDataSource { .iter() .flat_map(|x| WalkDir::new(x).into_iter().map_ok(move |d| (x, d))) .filter_ok(|(_, entry)| !entry.file_type().is_dir()) - .map(|x| match x { + .filter_map(|x| match x { Err(err) => { let msg = format!("other walkdir error: {err:?}"); - Err(err.into_io_error().unwrap_or(std::io::Error::other(msg))) + Some(Err(err + .into_io_error() + .unwrap_or(std::io::Error::other(msg)))) } + Ok((_, entry)) => { let path = entry.into_path(); - let item = FlacItem { - source_name: self.name.clone(), - path: path.clone(), - }; - let item: Box> = Box::new(item); - Ok((path, item)) + let item: Box> = + match path.extension().map(|x| x.to_str()).flatten() { + None => return None, + Some("flac") => Box::new(FlacItem { + source_name: self.name.clone(), + path: path.clone(), + }), + Some(_) => return None, + }; + + Some(Ok((path, item))) } }); } diff --git a/crates/pile-dataset/src/traits.rs b/crates/pile-dataset/src/traits.rs index 1f5fb47..aac2dd6 100644 --- a/crates/pile-dataset/src/traits.rs +++ b/crates/pile-dataset/src/traits.rs @@ -7,15 +7,18 @@ pub trait DataSource { /// (e.g, a PathBuf or a primary key) type Key: Key; - type Error: Error; + type Error: Error + Sync + Send; /// Get an item from this datasource - fn get(&self, key: &Self::Key) -> Result>>, Self::Error>; + fn get( + &self, + key: &Self::Key, + ) -> Result + 'static>>, Self::Error>; /// Iterate over all items in this source in an arbitrary order fn iter( &self, - ) -> impl Iterator>), Self::Error>>; + ) -> impl Iterator + 'static>), Self::Error>>; /// Return the time of the latest change to the data in this source fn latest_change(&self) -> Result>, Self::Error>; diff --git a/crates/pile/src/command/index.rs b/crates/pile/src/command/index.rs index d5a99ef..7e54cba 100644 --- a/crates/pile/src/command/index.rs +++ b/crates/pile/src/command/index.rs @@ -11,6 +11,10 @@ pub struct IndexCommand { /// Path to dataset config #[arg(long, short = 'c', default_value = "./pile.toml")] config: PathBuf, + + /// Number of threads to use for indexing + #[arg(long, short = 'j', default_value = "3")] + jobs: usize, } impl CliCmd for IndexCommand { @@ -22,7 +26,7 @@ impl CliCmd for IndexCommand { let ds = Dataset::open(&self.config) .with_context(|| format!("while opening dataset for {}", self.config.display()))?; - ds.fts_refresh(Some(flag)).map_err(|x| { + ds.fts_refresh(self.jobs, Some(flag)).map_err(|x| { x.map_err(|x| { anyhow::Error::from(x).context(format!( "while refreshing fts for {}", diff --git a/crates/pile/src/command/lookup.rs b/crates/pile/src/command/lookup.rs index aea1a80..ec48824 100644 --- a/crates/pile/src/command/lookup.rs +++ b/crates/pile/src/command/lookup.rs @@ -23,9 +23,13 @@ pub struct LookupCommand { #[arg(long, short = 'c', default_value = "./pile.toml")] config: PathBuf, - /// If provided, do not refresh fts + /// If provided, refresh fts if it is out-of-date #[arg(long)] - no_refresh: bool, + refresh: bool, + + /// Number of threads to use for indexing + #[arg(long, short = 'j', default_value = "3")] + jobs: usize, } impl CliCmd for LookupCommand { @@ -38,16 +42,18 @@ impl CliCmd for LookupCommand { let ds = Dataset::open(&self.config) .with_context(|| format!("while opening dataset for {}", self.config.display()))?; - if ds.needs_fts().context("while checking dataset fts")? { - info!("FTS index is missing or out-of-date, regenerating"); - ds.fts_refresh(Some(flag)).map_err(|x| { - x.map_err(|x| { - anyhow::Error::from(x).context(format!( - "while refreshing fts for {}", - self.config.display() - )) - }) - })?; + if self.refresh { + if ds.needs_fts().context("while checking dataset fts")? { + info!("FTS index is missing or out-of-date, regenerating"); + ds.fts_refresh(self.jobs, Some(flag)).map_err(|x| { + x.map_err(|x| { + anyhow::Error::from(x).context(format!( + "while refreshing fts for {}", + self.config.display() + )) + }) + })?; + } } let results = ds diff --git a/crates/pile/src/config/logging.rs b/crates/pile/src/config/logging.rs index 7611378..ccbbd8f 100644 --- a/crates/pile/src/config/logging.rs +++ b/crates/pile/src/config/logging.rs @@ -52,6 +52,7 @@ impl From for EnvFilter { &[ // Fixed sources format!("html5ever={}", LogLevel::Error), + format!("tantivy={}", LogLevel::Error), // Configurable sources format!("pile={}", conf.pile), format!("pile_flac={}", conf.pile_flac),