Parallel indexing
This commit is contained in:
@@ -4,6 +4,7 @@ use pile_toolbox::cancelabletask::{CancelFlag, CancelableTaskError};
|
|||||||
use std::{collections::HashMap, io::ErrorKind, path::PathBuf, sync::Arc, time::Instant};
|
use std::{collections::HashMap, io::ErrorKind, path::PathBuf, sync::Arc, time::Instant};
|
||||||
use tantivy::{Executor, Index, IndexWriter, TantivyError, collector::TopDocs};
|
use tantivy::{Executor, Index, IndexWriter, TantivyError, collector::TopDocs};
|
||||||
use thiserror::Error;
|
use thiserror::Error;
|
||||||
|
use tokio::task::JoinSet;
|
||||||
use tokio_stream::{StreamExt, wrappers::ReceiverStream};
|
use tokio_stream::{StreamExt, wrappers::ReceiverStream};
|
||||||
use tracing::{debug, info, trace, warn};
|
use tracing::{debug, info, trace, warn};
|
||||||
|
|
||||||
@@ -189,7 +190,7 @@ impl Datasets {
|
|||||||
|
|
||||||
std::fs::create_dir_all(&fts_tmp_dir).map_err(DatasetError::from)?;
|
std::fs::create_dir_all(&fts_tmp_dir).map_err(DatasetError::from)?;
|
||||||
|
|
||||||
let db_index = DbFtsIndex::new(&fts_tmp_dir, &self.config);
|
let db_index = Arc::new(DbFtsIndex::new(&fts_tmp_dir, &self.config));
|
||||||
let mut index = Index::create_in_dir(&fts_tmp_dir, db_index.schema.clone())
|
let mut index = Index::create_in_dir(&fts_tmp_dir, db_index.schema.clone())
|
||||||
.map_err(DatasetError::from)?;
|
.map_err(DatasetError::from)?;
|
||||||
index.set_executor(Executor::multi_thread(10, "build-fts").map_err(DatasetError::from)?);
|
index.set_executor(Executor::multi_thread(10, "build-fts").map_err(DatasetError::from)?);
|
||||||
@@ -199,6 +200,25 @@ impl Datasets {
|
|||||||
let mut total = 0u64;
|
let mut total = 0u64;
|
||||||
let mut logged_at = Instant::now();
|
let mut logged_at = Instant::now();
|
||||||
|
|
||||||
|
const CONCURRENCY: usize = 32;
|
||||||
|
let mut join_set = JoinSet::new();
|
||||||
|
|
||||||
|
let mut process = |key, result: Result<_, _>| -> Result<(), DatasetError> {
|
||||||
|
match result {
|
||||||
|
Ok(Some(doc)) => {
|
||||||
|
index_writer.add_document(doc).map_err(DatasetError::from)?;
|
||||||
|
total += 1;
|
||||||
|
if logged_at.elapsed().as_secs() >= 5 {
|
||||||
|
debug!("Indexed {total} documents so far");
|
||||||
|
logged_at = Instant::now();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Ok(None) => warn!("Skipping {key:?}, document is empty"),
|
||||||
|
Err(err) => warn!("Could not read {key:?}, skipping. {err}"),
|
||||||
|
}
|
||||||
|
Ok(())
|
||||||
|
};
|
||||||
|
|
||||||
for (name, dataset) in &self.sources {
|
for (name, dataset) in &self.sources {
|
||||||
info!("Loading source {name}");
|
info!("Loading source {name}");
|
||||||
|
|
||||||
@@ -211,27 +231,30 @@ impl Datasets {
|
|||||||
}
|
}
|
||||||
|
|
||||||
let item = item_result.map_err(DatasetError::from)?;
|
let item = item_result.map_err(DatasetError::from)?;
|
||||||
let key = item.key();
|
let db = Arc::clone(&db_index);
|
||||||
|
join_set.spawn(async move {
|
||||||
|
let key = item.key();
|
||||||
|
let result = db.entry_to_document(&item).await;
|
||||||
|
(key, result)
|
||||||
|
});
|
||||||
|
|
||||||
match db_index.entry_to_document(&item).await {
|
while join_set.len() >= CONCURRENCY {
|
||||||
Ok(Some(doc)) => {
|
match join_set.join_next().await {
|
||||||
index_writer.add_document(doc).map_err(DatasetError::from)?;
|
Some(Ok((key, result))) => process(key, result)?,
|
||||||
total += 1;
|
Some(Err(e)) => warn!("Indexing task panicked: {e}"),
|
||||||
if logged_at.elapsed().as_secs() >= 5 {
|
None => break,
|
||||||
debug!("Indexed {total} documents so far");
|
|
||||||
logged_at = Instant::now();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
Ok(None) => {
|
|
||||||
warn!("Skipping {key:?}, document is empty");
|
|
||||||
}
|
|
||||||
Err(err) => {
|
|
||||||
warn!("Could not read {key:?}, skipping. {err}");
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
while let Some(join_result) = join_set.join_next().await {
|
||||||
|
match join_result {
|
||||||
|
Ok((key, result)) => process(key, result)?,
|
||||||
|
Err(e) => warn!("Indexing task panicked: {e}"),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
if let Some(flag) = flag.as_ref()
|
if let Some(flag) = flag.as_ref()
|
||||||
&& flag.is_cancelled()
|
&& flag.is_cancelled()
|
||||||
{
|
{
|
||||||
|
|||||||
Reference in New Issue
Block a user