diff --git a/Cargo.lock b/Cargo.lock index f3a62d9..ef263e9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1069,6 +1069,7 @@ dependencies = [ "pile-config", "pile-flac", "pile-toolbox", + "rayon", "serde_json", "tantivy", "thiserror", diff --git a/Cargo.toml b/Cargo.toml index 2f24117..2408663 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -102,3 +102,5 @@ mime = "0.3.17" paste = "1.0.15" smartstring = "1.0.1" chrono = "0.4.43" +parking_lot = "0.12.5" +rayon = "1.11.0" diff --git a/crates/pile-dataset/Cargo.toml b/crates/pile-dataset/Cargo.toml index 4959e02..72039d1 100644 --- a/crates/pile-dataset/Cargo.toml +++ b/crates/pile-dataset/Cargo.toml @@ -22,3 +22,4 @@ jsonpath-rust = { workspace = true } chrono = { workspace = true } toml = { workspace = true } thiserror = { workspace = true } +rayon = { workspace = true } diff --git a/crates/pile-dataset/src/dataset.rs b/crates/pile-dataset/src/dataset.rs index 7a47ca6..8cfab6f 100644 --- a/crates/pile-dataset/src/dataset.rs +++ b/crates/pile-dataset/src/dataset.rs @@ -1,13 +1,22 @@ use chrono::{DateTime, Utc}; use pile_config::{ConfigToml, Source}; use pile_toolbox::cancelabletask::{CancelFlag, CancelableTaskError}; -use std::{io::ErrorKind, path::PathBuf, sync::Arc}; +use rayon::{ + ThreadPoolBuilder, + iter::{IntoParallelIterator, ParallelIterator}, +}; +use std::{ + io::ErrorKind, + path::PathBuf, + sync::{Arc, mpsc::Receiver}, + thread::JoinHandle, +}; use tantivy::{Executor, Index, IndexWriter, TantivyError, collector::TopDocs}; use thiserror::Error; -use tracing::{info, trace, warn}; +use tracing::{debug, info, trace, warn}; use crate::{ - DataSource, + DataSource, Item, index::{DbFtsIndex, FtsLookupResult}, path_ts_earliest, source::DirDataSource, @@ -85,6 +94,7 @@ impl Dataset { /// Refresh this dataset's fts index pub fn fts_refresh( &self, + threads: usize, flag: Option, ) -> Result<(), CancelableTaskError> { let fts_tmp_dir = self.path_workdir.join(".tmp-fts"); @@ -94,23 +104,9 @@ impl Dataset { warn!("Removing temporary index in {}", fts_dir.display()); std::fs::remove_dir_all(&fts_tmp_dir).map_err(DatasetError::from)?; } - if fts_dir.is_dir() { - warn!("Removing existing index in {}", fts_dir.display()); - std::fs::remove_dir_all(&fts_dir).map_err(DatasetError::from)?; - } std::fs::create_dir_all(&fts_tmp_dir).map_err(DatasetError::from)?; - let mut sources = Vec::new(); - for (name, source) in &self.config.dataset.source { - match source { - Source::Flac { path: dir } => { - let source = DirDataSource::new(name, dir.clone().to_vec()); - sources.push(source); - } - } - } - let db_index = DbFtsIndex::new(&fts_tmp_dir, &self.config); let mut index = Index::create_in_dir(&fts_tmp_dir, db_index.schema.clone()) .map_err(DatasetError::from)?; @@ -118,35 +114,64 @@ impl Dataset { let mut index_writer: IndexWriter = index.writer(50 * 1024 * 1024).map_err(DatasetError::from)?; - for s in sources { - info!("Processing source {:?}", s.name); + let batch_size = 1000; + let (_read_task, read_rx) = start_read_task(&self.config, batch_size); - for i in s.iter() { - let (k, v) = i.map_err(DatasetError::from)?; + #[expect(clippy::unwrap_used)] + let write_pool = ThreadPoolBuilder::new() + .num_threads(threads.max(1)) + .thread_name(|x| format!("fts_refresh_thread_{x}")) + .build() + .unwrap(); - let doc = match db_index.entry_to_document(&*v) { - Ok(Some(x)) => x, - Ok(None) => { - warn!("Skipping {k:?}, document is empty"); - continue; - } - Err(err) => { - warn!("Could not read {k:?}, skipping. {err}"); - continue; - } - }; - index_writer.add_document(doc).map_err(DatasetError::from)?; + let mut total = 0u64; + while let Ok(batch) = read_rx.recv() { + let batch = batch.map_err(DatasetError::from)?; + let len = batch.len() as u64; - if let Some(flag) = flag.as_ref() - && flag.is_cancelled() - { - return Err(CancelableTaskError::Cancelled); - } - } + write_pool + .install(|| { + batch + .into_par_iter() + .filter_map(|(key, item)| match db_index.entry_to_document(&*item) { + Ok(Some(doc)) => Some((key, doc)), + Ok(None) => { + warn!("Skipping {key:?}, document is empty"); + None + } + Err(err) => { + warn!("Could not read {key:?}, skipping. {err}"); + None + } + }) + .map(|(key, doc)| { + index_writer + .add_document(doc) + .map_err(|err| (key, err)) + .map(|_| ()) + }) + .find_first(|x| x.is_err()) + .unwrap_or(Ok(())) + }) + .map_err(|(_key, err)| DatasetError::from(err))?; + + total += len; + debug!("Added a batch of {len} ({total} total)"); + } + + if let Some(flag) = flag.as_ref() + && flag.is_cancelled() + { + return Err(CancelableTaskError::Cancelled); } info!("Committing index"); index_writer.commit().map_err(DatasetError::from)?; + + if fts_dir.is_dir() { + warn!("Removing existing index in {}", fts_dir.display()); + std::fs::remove_dir_all(&fts_dir).map_err(DatasetError::from)?; + } std::fs::rename(&fts_tmp_dir, &fts_dir).map_err(DatasetError::from)?; return Ok(()); @@ -229,3 +254,50 @@ impl Dataset { } } } + +// +// MARK: read_task +// + +fn start_read_task( + config: &ConfigToml, + batch_size: usize, +) -> ( + JoinHandle<()>, + Receiver>)>, DatasetError>>, +) { + let config = config.clone(); + let (read_tx, read_rx) = std::sync::mpsc::sync_channel(2); + + let read_task = std::thread::spawn(move || { + let mut batch = Vec::with_capacity(batch_size); + for (name, source) in &config.dataset.source { + match source { + Source::Flac { path: dir } => { + let source = DirDataSource::new(name, dir.clone().to_vec()); + for i in source.iter() { + match i { + Ok(x) => batch.push(x), + Err(err) => { + let err = Err(DatasetError::from(err)); + let _ = read_tx.send(err); + return; + } + } + + if batch.len() >= batch_size { + let b = std::mem::replace(&mut batch, Vec::with_capacity(batch_size)); + + match read_tx.send(Ok(b)) { + Ok(()) => {} + Err(_) => return, + }; + } + } + } + } + } + }); + + return (read_task, read_rx); +} diff --git a/crates/pile-dataset/src/source/dir.rs b/crates/pile-dataset/src/source/dir.rs index 0f89624..c49d258 100644 --- a/crates/pile-dataset/src/source/dir.rs +++ b/crates/pile-dataset/src/source/dir.rs @@ -44,20 +44,28 @@ impl DataSource for DirDataSource { .iter() .flat_map(|x| WalkDir::new(x).into_iter().map_ok(move |d| (x, d))) .filter_ok(|(_, entry)| !entry.file_type().is_dir()) - .map(|x| match x { + .filter_map(|x| match x { Err(err) => { let msg = format!("other walkdir error: {err:?}"); - Err(err.into_io_error().unwrap_or(std::io::Error::other(msg))) + Some(Err(err + .into_io_error() + .unwrap_or(std::io::Error::other(msg)))) } + Ok((_, entry)) => { let path = entry.into_path(); - let item = FlacItem { - source_name: self.name.clone(), - path: path.clone(), - }; - let item: Box> = Box::new(item); - Ok((path, item)) + let item: Box> = + match path.extension().map(|x| x.to_str()).flatten() { + None => return None, + Some("flac") => Box::new(FlacItem { + source_name: self.name.clone(), + path: path.clone(), + }), + Some(_) => return None, + }; + + Some(Ok((path, item))) } }); } diff --git a/crates/pile-dataset/src/traits.rs b/crates/pile-dataset/src/traits.rs index 1f5fb47..a9a1d62 100644 --- a/crates/pile-dataset/src/traits.rs +++ b/crates/pile-dataset/src/traits.rs @@ -7,7 +7,7 @@ pub trait DataSource { /// (e.g, a PathBuf or a primary key) type Key: Key; - type Error: Error; + type Error: Error + Sync + Send; /// Get an item from this datasource fn get(&self, key: &Self::Key) -> Result>>, Self::Error>; diff --git a/crates/pile/src/command/index.rs b/crates/pile/src/command/index.rs index d5a99ef..944bf6c 100644 --- a/crates/pile/src/command/index.rs +++ b/crates/pile/src/command/index.rs @@ -11,6 +11,10 @@ pub struct IndexCommand { /// Path to dataset config #[arg(long, short = 'c', default_value = "./pile.toml")] config: PathBuf, + + /// Number of threads to use for indexing + #[arg(long, default_value = "3")] + jobs: usize, } impl CliCmd for IndexCommand { @@ -22,7 +26,7 @@ impl CliCmd for IndexCommand { let ds = Dataset::open(&self.config) .with_context(|| format!("while opening dataset for {}", self.config.display()))?; - ds.fts_refresh(Some(flag)).map_err(|x| { + ds.fts_refresh(self.jobs, Some(flag)).map_err(|x| { x.map_err(|x| { anyhow::Error::from(x).context(format!( "while refreshing fts for {}", diff --git a/crates/pile/src/command/lookup.rs b/crates/pile/src/command/lookup.rs index aea1a80..243299c 100644 --- a/crates/pile/src/command/lookup.rs +++ b/crates/pile/src/command/lookup.rs @@ -26,6 +26,10 @@ pub struct LookupCommand { /// If provided, do not refresh fts #[arg(long)] no_refresh: bool, + + /// Number of threads to use for indexing + #[arg(long, default_value = "3")] + jobs: usize, } impl CliCmd for LookupCommand { @@ -40,7 +44,7 @@ impl CliCmd for LookupCommand { if ds.needs_fts().context("while checking dataset fts")? { info!("FTS index is missing or out-of-date, regenerating"); - ds.fts_refresh(Some(flag)).map_err(|x| { + ds.fts_refresh(self.jobs, Some(flag)).map_err(|x| { x.map_err(|x| { anyhow::Error::from(x).context(format!( "while refreshing fts for {}", diff --git a/crates/pile/src/config/logging.rs b/crates/pile/src/config/logging.rs index 7611378..ccbbd8f 100644 --- a/crates/pile/src/config/logging.rs +++ b/crates/pile/src/config/logging.rs @@ -52,6 +52,7 @@ impl From for EnvFilter { &[ // Fixed sources format!("html5ever={}", LogLevel::Error), + format!("tantivy={}", LogLevel::Error), // Configurable sources format!("pile={}", conf.pile), format!("pile_flac={}", conf.pile_flac),