Parallel indexing
This commit is contained in:
1
Cargo.lock
generated
1
Cargo.lock
generated
@@ -1069,6 +1069,7 @@ dependencies = [
|
|||||||
"pile-config",
|
"pile-config",
|
||||||
"pile-flac",
|
"pile-flac",
|
||||||
"pile-toolbox",
|
"pile-toolbox",
|
||||||
|
"rayon",
|
||||||
"serde_json",
|
"serde_json",
|
||||||
"tantivy",
|
"tantivy",
|
||||||
"thiserror",
|
"thiserror",
|
||||||
|
|||||||
@@ -102,3 +102,5 @@ mime = "0.3.17"
|
|||||||
paste = "1.0.15"
|
paste = "1.0.15"
|
||||||
smartstring = "1.0.1"
|
smartstring = "1.0.1"
|
||||||
chrono = "0.4.43"
|
chrono = "0.4.43"
|
||||||
|
parking_lot = "0.12.5"
|
||||||
|
rayon = "1.11.0"
|
||||||
|
|||||||
@@ -22,3 +22,4 @@ jsonpath-rust = { workspace = true }
|
|||||||
chrono = { workspace = true }
|
chrono = { workspace = true }
|
||||||
toml = { workspace = true }
|
toml = { workspace = true }
|
||||||
thiserror = { workspace = true }
|
thiserror = { workspace = true }
|
||||||
|
rayon = { workspace = true }
|
||||||
|
|||||||
@@ -1,13 +1,22 @@
|
|||||||
use chrono::{DateTime, Utc};
|
use chrono::{DateTime, Utc};
|
||||||
use pile_config::{ConfigToml, Source};
|
use pile_config::{ConfigToml, Source};
|
||||||
use pile_toolbox::cancelabletask::{CancelFlag, CancelableTaskError};
|
use pile_toolbox::cancelabletask::{CancelFlag, CancelableTaskError};
|
||||||
use std::{io::ErrorKind, path::PathBuf, sync::Arc};
|
use rayon::{
|
||||||
|
ThreadPoolBuilder,
|
||||||
|
iter::{IntoParallelIterator, ParallelIterator},
|
||||||
|
};
|
||||||
|
use std::{
|
||||||
|
io::ErrorKind,
|
||||||
|
path::PathBuf,
|
||||||
|
sync::{Arc, mpsc::Receiver},
|
||||||
|
thread::JoinHandle,
|
||||||
|
};
|
||||||
use tantivy::{Executor, Index, IndexWriter, TantivyError, collector::TopDocs};
|
use tantivy::{Executor, Index, IndexWriter, TantivyError, collector::TopDocs};
|
||||||
use thiserror::Error;
|
use thiserror::Error;
|
||||||
use tracing::{info, trace, warn};
|
use tracing::{debug, info, trace, warn};
|
||||||
|
|
||||||
use crate::{
|
use crate::{
|
||||||
DataSource,
|
DataSource, Item,
|
||||||
index::{DbFtsIndex, FtsLookupResult},
|
index::{DbFtsIndex, FtsLookupResult},
|
||||||
path_ts_earliest,
|
path_ts_earliest,
|
||||||
source::DirDataSource,
|
source::DirDataSource,
|
||||||
@@ -85,6 +94,7 @@ impl Dataset {
|
|||||||
/// Refresh this dataset's fts index
|
/// Refresh this dataset's fts index
|
||||||
pub fn fts_refresh(
|
pub fn fts_refresh(
|
||||||
&self,
|
&self,
|
||||||
|
threads: usize,
|
||||||
flag: Option<CancelFlag>,
|
flag: Option<CancelFlag>,
|
||||||
) -> Result<(), CancelableTaskError<DatasetError>> {
|
) -> Result<(), CancelableTaskError<DatasetError>> {
|
||||||
let fts_tmp_dir = self.path_workdir.join(".tmp-fts");
|
let fts_tmp_dir = self.path_workdir.join(".tmp-fts");
|
||||||
@@ -94,23 +104,9 @@ impl Dataset {
|
|||||||
warn!("Removing temporary index in {}", fts_dir.display());
|
warn!("Removing temporary index in {}", fts_dir.display());
|
||||||
std::fs::remove_dir_all(&fts_tmp_dir).map_err(DatasetError::from)?;
|
std::fs::remove_dir_all(&fts_tmp_dir).map_err(DatasetError::from)?;
|
||||||
}
|
}
|
||||||
if fts_dir.is_dir() {
|
|
||||||
warn!("Removing existing index in {}", fts_dir.display());
|
|
||||||
std::fs::remove_dir_all(&fts_dir).map_err(DatasetError::from)?;
|
|
||||||
}
|
|
||||||
|
|
||||||
std::fs::create_dir_all(&fts_tmp_dir).map_err(DatasetError::from)?;
|
std::fs::create_dir_all(&fts_tmp_dir).map_err(DatasetError::from)?;
|
||||||
|
|
||||||
let mut sources = Vec::new();
|
|
||||||
for (name, source) in &self.config.dataset.source {
|
|
||||||
match source {
|
|
||||||
Source::Flac { path: dir } => {
|
|
||||||
let source = DirDataSource::new(name, dir.clone().to_vec());
|
|
||||||
sources.push(source);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
let db_index = DbFtsIndex::new(&fts_tmp_dir, &self.config);
|
let db_index = DbFtsIndex::new(&fts_tmp_dir, &self.config);
|
||||||
let mut index = Index::create_in_dir(&fts_tmp_dir, db_index.schema.clone())
|
let mut index = Index::create_in_dir(&fts_tmp_dir, db_index.schema.clone())
|
||||||
.map_err(DatasetError::from)?;
|
.map_err(DatasetError::from)?;
|
||||||
@@ -118,35 +114,64 @@ impl Dataset {
|
|||||||
let mut index_writer: IndexWriter =
|
let mut index_writer: IndexWriter =
|
||||||
index.writer(50 * 1024 * 1024).map_err(DatasetError::from)?;
|
index.writer(50 * 1024 * 1024).map_err(DatasetError::from)?;
|
||||||
|
|
||||||
for s in sources {
|
let batch_size = 1000;
|
||||||
info!("Processing source {:?}", s.name);
|
let (_read_task, read_rx) = start_read_task(&self.config, batch_size);
|
||||||
|
|
||||||
for i in s.iter() {
|
#[expect(clippy::unwrap_used)]
|
||||||
let (k, v) = i.map_err(DatasetError::from)?;
|
let write_pool = ThreadPoolBuilder::new()
|
||||||
|
.num_threads(threads.max(1))
|
||||||
|
.thread_name(|x| format!("fts_refresh_thread_{x}"))
|
||||||
|
.build()
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
let doc = match db_index.entry_to_document(&*v) {
|
let mut total = 0u64;
|
||||||
Ok(Some(x)) => x,
|
while let Ok(batch) = read_rx.recv() {
|
||||||
Ok(None) => {
|
let batch = batch.map_err(DatasetError::from)?;
|
||||||
warn!("Skipping {k:?}, document is empty");
|
let len = batch.len() as u64;
|
||||||
continue;
|
|
||||||
}
|
|
||||||
Err(err) => {
|
|
||||||
warn!("Could not read {k:?}, skipping. {err}");
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
};
|
|
||||||
index_writer.add_document(doc).map_err(DatasetError::from)?;
|
|
||||||
|
|
||||||
if let Some(flag) = flag.as_ref()
|
write_pool
|
||||||
&& flag.is_cancelled()
|
.install(|| {
|
||||||
{
|
batch
|
||||||
return Err(CancelableTaskError::Cancelled);
|
.into_par_iter()
|
||||||
}
|
.filter_map(|(key, item)| match db_index.entry_to_document(&*item) {
|
||||||
}
|
Ok(Some(doc)) => Some((key, doc)),
|
||||||
|
Ok(None) => {
|
||||||
|
warn!("Skipping {key:?}, document is empty");
|
||||||
|
None
|
||||||
|
}
|
||||||
|
Err(err) => {
|
||||||
|
warn!("Could not read {key:?}, skipping. {err}");
|
||||||
|
None
|
||||||
|
}
|
||||||
|
})
|
||||||
|
.map(|(key, doc)| {
|
||||||
|
index_writer
|
||||||
|
.add_document(doc)
|
||||||
|
.map_err(|err| (key, err))
|
||||||
|
.map(|_| ())
|
||||||
|
})
|
||||||
|
.find_first(|x| x.is_err())
|
||||||
|
.unwrap_or(Ok(()))
|
||||||
|
})
|
||||||
|
.map_err(|(_key, err)| DatasetError::from(err))?;
|
||||||
|
|
||||||
|
total += len;
|
||||||
|
debug!("Added a batch of {len} ({total} total)");
|
||||||
|
}
|
||||||
|
|
||||||
|
if let Some(flag) = flag.as_ref()
|
||||||
|
&& flag.is_cancelled()
|
||||||
|
{
|
||||||
|
return Err(CancelableTaskError::Cancelled);
|
||||||
}
|
}
|
||||||
|
|
||||||
info!("Committing index");
|
info!("Committing index");
|
||||||
index_writer.commit().map_err(DatasetError::from)?;
|
index_writer.commit().map_err(DatasetError::from)?;
|
||||||
|
|
||||||
|
if fts_dir.is_dir() {
|
||||||
|
warn!("Removing existing index in {}", fts_dir.display());
|
||||||
|
std::fs::remove_dir_all(&fts_dir).map_err(DatasetError::from)?;
|
||||||
|
}
|
||||||
std::fs::rename(&fts_tmp_dir, &fts_dir).map_err(DatasetError::from)?;
|
std::fs::rename(&fts_tmp_dir, &fts_dir).map_err(DatasetError::from)?;
|
||||||
|
|
||||||
return Ok(());
|
return Ok(());
|
||||||
@@ -229,3 +254,50 @@ impl Dataset {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
//
|
||||||
|
// MARK: read_task
|
||||||
|
//
|
||||||
|
|
||||||
|
fn start_read_task(
|
||||||
|
config: &ConfigToml,
|
||||||
|
batch_size: usize,
|
||||||
|
) -> (
|
||||||
|
JoinHandle<()>,
|
||||||
|
Receiver<Result<Vec<(PathBuf, Box<dyn Item<Key = PathBuf>>)>, DatasetError>>,
|
||||||
|
) {
|
||||||
|
let config = config.clone();
|
||||||
|
let (read_tx, read_rx) = std::sync::mpsc::sync_channel(2);
|
||||||
|
|
||||||
|
let read_task = std::thread::spawn(move || {
|
||||||
|
let mut batch = Vec::with_capacity(batch_size);
|
||||||
|
for (name, source) in &config.dataset.source {
|
||||||
|
match source {
|
||||||
|
Source::Flac { path: dir } => {
|
||||||
|
let source = DirDataSource::new(name, dir.clone().to_vec());
|
||||||
|
for i in source.iter() {
|
||||||
|
match i {
|
||||||
|
Ok(x) => batch.push(x),
|
||||||
|
Err(err) => {
|
||||||
|
let err = Err(DatasetError::from(err));
|
||||||
|
let _ = read_tx.send(err);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if batch.len() >= batch_size {
|
||||||
|
let b = std::mem::replace(&mut batch, Vec::with_capacity(batch_size));
|
||||||
|
|
||||||
|
match read_tx.send(Ok(b)) {
|
||||||
|
Ok(()) => {}
|
||||||
|
Err(_) => return,
|
||||||
|
};
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
return (read_task, read_rx);
|
||||||
|
}
|
||||||
|
|||||||
@@ -44,20 +44,28 @@ impl DataSource for DirDataSource {
|
|||||||
.iter()
|
.iter()
|
||||||
.flat_map(|x| WalkDir::new(x).into_iter().map_ok(move |d| (x, d)))
|
.flat_map(|x| WalkDir::new(x).into_iter().map_ok(move |d| (x, d)))
|
||||||
.filter_ok(|(_, entry)| !entry.file_type().is_dir())
|
.filter_ok(|(_, entry)| !entry.file_type().is_dir())
|
||||||
.map(|x| match x {
|
.filter_map(|x| match x {
|
||||||
Err(err) => {
|
Err(err) => {
|
||||||
let msg = format!("other walkdir error: {err:?}");
|
let msg = format!("other walkdir error: {err:?}");
|
||||||
Err(err.into_io_error().unwrap_or(std::io::Error::other(msg)))
|
Some(Err(err
|
||||||
|
.into_io_error()
|
||||||
|
.unwrap_or(std::io::Error::other(msg))))
|
||||||
}
|
}
|
||||||
|
|
||||||
Ok((_, entry)) => {
|
Ok((_, entry)) => {
|
||||||
let path = entry.into_path();
|
let path = entry.into_path();
|
||||||
let item = FlacItem {
|
|
||||||
source_name: self.name.clone(),
|
|
||||||
path: path.clone(),
|
|
||||||
};
|
|
||||||
|
|
||||||
let item: Box<dyn Item<Key = Self::Key>> = Box::new(item);
|
let item: Box<dyn Item<Key = Self::Key>> =
|
||||||
Ok((path, item))
|
match path.extension().map(|x| x.to_str()).flatten() {
|
||||||
|
None => return None,
|
||||||
|
Some("flac") => Box::new(FlacItem {
|
||||||
|
source_name: self.name.clone(),
|
||||||
|
path: path.clone(),
|
||||||
|
}),
|
||||||
|
Some(_) => return None,
|
||||||
|
};
|
||||||
|
|
||||||
|
Some(Ok((path, item)))
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -7,7 +7,7 @@ pub trait DataSource {
|
|||||||
/// (e.g, a PathBuf or a primary key)
|
/// (e.g, a PathBuf or a primary key)
|
||||||
type Key: Key;
|
type Key: Key;
|
||||||
|
|
||||||
type Error: Error;
|
type Error: Error + Sync + Send;
|
||||||
|
|
||||||
/// Get an item from this datasource
|
/// Get an item from this datasource
|
||||||
fn get(&self, key: &Self::Key) -> Result<Option<Box<dyn Item<Key = Self::Key>>>, Self::Error>;
|
fn get(&self, key: &Self::Key) -> Result<Option<Box<dyn Item<Key = Self::Key>>>, Self::Error>;
|
||||||
|
|||||||
@@ -11,6 +11,10 @@ pub struct IndexCommand {
|
|||||||
/// Path to dataset config
|
/// Path to dataset config
|
||||||
#[arg(long, short = 'c', default_value = "./pile.toml")]
|
#[arg(long, short = 'c', default_value = "./pile.toml")]
|
||||||
config: PathBuf,
|
config: PathBuf,
|
||||||
|
|
||||||
|
/// Number of threads to use for indexing
|
||||||
|
#[arg(long, default_value = "3")]
|
||||||
|
jobs: usize,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl CliCmd for IndexCommand {
|
impl CliCmd for IndexCommand {
|
||||||
@@ -22,7 +26,7 @@ impl CliCmd for IndexCommand {
|
|||||||
let ds = Dataset::open(&self.config)
|
let ds = Dataset::open(&self.config)
|
||||||
.with_context(|| format!("while opening dataset for {}", self.config.display()))?;
|
.with_context(|| format!("while opening dataset for {}", self.config.display()))?;
|
||||||
|
|
||||||
ds.fts_refresh(Some(flag)).map_err(|x| {
|
ds.fts_refresh(self.jobs, Some(flag)).map_err(|x| {
|
||||||
x.map_err(|x| {
|
x.map_err(|x| {
|
||||||
anyhow::Error::from(x).context(format!(
|
anyhow::Error::from(x).context(format!(
|
||||||
"while refreshing fts for {}",
|
"while refreshing fts for {}",
|
||||||
|
|||||||
@@ -26,6 +26,10 @@ pub struct LookupCommand {
|
|||||||
/// If provided, do not refresh fts
|
/// If provided, do not refresh fts
|
||||||
#[arg(long)]
|
#[arg(long)]
|
||||||
no_refresh: bool,
|
no_refresh: bool,
|
||||||
|
|
||||||
|
/// Number of threads to use for indexing
|
||||||
|
#[arg(long, default_value = "3")]
|
||||||
|
jobs: usize,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl CliCmd for LookupCommand {
|
impl CliCmd for LookupCommand {
|
||||||
@@ -40,7 +44,7 @@ impl CliCmd for LookupCommand {
|
|||||||
|
|
||||||
if ds.needs_fts().context("while checking dataset fts")? {
|
if ds.needs_fts().context("while checking dataset fts")? {
|
||||||
info!("FTS index is missing or out-of-date, regenerating");
|
info!("FTS index is missing or out-of-date, regenerating");
|
||||||
ds.fts_refresh(Some(flag)).map_err(|x| {
|
ds.fts_refresh(self.jobs, Some(flag)).map_err(|x| {
|
||||||
x.map_err(|x| {
|
x.map_err(|x| {
|
||||||
anyhow::Error::from(x).context(format!(
|
anyhow::Error::from(x).context(format!(
|
||||||
"while refreshing fts for {}",
|
"while refreshing fts for {}",
|
||||||
|
|||||||
@@ -52,6 +52,7 @@ impl From<LoggingConfig> for EnvFilter {
|
|||||||
&[
|
&[
|
||||||
// Fixed sources
|
// Fixed sources
|
||||||
format!("html5ever={}", LogLevel::Error),
|
format!("html5ever={}", LogLevel::Error),
|
||||||
|
format!("tantivy={}", LogLevel::Error),
|
||||||
// Configurable sources
|
// Configurable sources
|
||||||
format!("pile={}", conf.pile),
|
format!("pile={}", conf.pile),
|
||||||
format!("pile_flac={}", conf.pile_flac),
|
format!("pile_flac={}", conf.pile_flac),
|
||||||
|
|||||||
Reference in New Issue
Block a user