Auto-update fts index
This commit is contained in:
231
crates/pile-dataset/src/dataset.rs
Normal file
231
crates/pile-dataset/src/dataset.rs
Normal file
@@ -0,0 +1,231 @@
|
||||
use chrono::{DateTime, Utc};
|
||||
use pile_config::{ConfigToml, Source};
|
||||
use pile_toolbox::cancelabletask::{CancelFlag, CancelableTaskError};
|
||||
use std::{io::ErrorKind, path::PathBuf, sync::Arc};
|
||||
use tantivy::{Executor, Index, IndexWriter, TantivyError, collector::TopDocs};
|
||||
use thiserror::Error;
|
||||
use tracing::{info, trace, warn};
|
||||
|
||||
use crate::{
|
||||
DataSource,
|
||||
index::{DbFtsIndex, FtsLookupResult},
|
||||
path_ts_earliest,
|
||||
source::DirDataSource,
|
||||
};
|
||||
|
||||
#[derive(Debug, Error)]
|
||||
pub enum DatasetError {
|
||||
#[error("{0}")]
|
||||
IoError(#[from] std::io::Error),
|
||||
|
||||
#[error("{0}")]
|
||||
TantivyError(#[from] TantivyError),
|
||||
|
||||
#[error("this dataset does not have an fts index")]
|
||||
NoFtsIndex,
|
||||
}
|
||||
|
||||
pub struct Dataset {
|
||||
pub path_config: PathBuf,
|
||||
pub path_parent: PathBuf,
|
||||
pub path_workdir: PathBuf,
|
||||
|
||||
pub config: ConfigToml,
|
||||
}
|
||||
|
||||
impl Dataset {
|
||||
pub fn open(config: impl Into<PathBuf>) -> Result<Self, std::io::Error> {
|
||||
let path_config = config.into();
|
||||
let path_parent = path_config
|
||||
.parent()
|
||||
.ok_or(std::io::Error::new(
|
||||
ErrorKind::NotADirectory,
|
||||
format!("Config file {} has no parent", path_config.display()),
|
||||
))?
|
||||
.to_owned();
|
||||
|
||||
let config = {
|
||||
let config = std::fs::read_to_string(&path_config)?;
|
||||
let config: Result<ConfigToml, _> = toml::from_str(&config);
|
||||
|
||||
match config {
|
||||
Ok(config) => {
|
||||
trace!(message = "Loaded config", ?config);
|
||||
config
|
||||
}
|
||||
|
||||
Err(error) => {
|
||||
return Err(std::io::Error::new(
|
||||
ErrorKind::InvalidData,
|
||||
format!("{} is invalid:\n{error}", path_config.display()),
|
||||
));
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
let path_workdir = config
|
||||
.dataset
|
||||
.working_dir
|
||||
.clone()
|
||||
.unwrap_or(path_parent.join(".pile"))
|
||||
.join(config.dataset.name.as_str());
|
||||
|
||||
return Ok(Self {
|
||||
path_config,
|
||||
path_parent,
|
||||
path_workdir,
|
||||
config,
|
||||
});
|
||||
}
|
||||
|
||||
//
|
||||
// MARK: fts
|
||||
//
|
||||
|
||||
/// Refresh this dataset's fts index
|
||||
pub fn fts_refresh(
|
||||
&self,
|
||||
flag: Option<CancelFlag>,
|
||||
) -> Result<(), CancelableTaskError<DatasetError>> {
|
||||
let fts_tmp_dir = self.path_workdir.join(".tmp-fts");
|
||||
let fts_dir = self.path_workdir.join("fts");
|
||||
|
||||
if fts_tmp_dir.is_dir() {
|
||||
warn!("Removing temporary index in {}", fts_dir.display());
|
||||
std::fs::remove_dir_all(&fts_tmp_dir).map_err(DatasetError::from)?;
|
||||
}
|
||||
if fts_dir.is_dir() {
|
||||
warn!("Removing existing index in {}", fts_dir.display());
|
||||
std::fs::remove_dir_all(&fts_dir).map_err(DatasetError::from)?;
|
||||
}
|
||||
|
||||
std::fs::create_dir_all(&fts_tmp_dir).map_err(DatasetError::from)?;
|
||||
|
||||
let mut sources = Vec::new();
|
||||
for (name, source) in &self.config.dataset.source {
|
||||
match source {
|
||||
Source::Flac { path: dir } => {
|
||||
let source = DirDataSource::new(name, dir.clone().to_vec());
|
||||
sources.push(source);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
let db_index = DbFtsIndex::new(&fts_tmp_dir, &self.config);
|
||||
let mut index = Index::create_in_dir(&fts_tmp_dir, db_index.schema.clone())
|
||||
.map_err(DatasetError::from)?;
|
||||
index.set_executor(Executor::multi_thread(10, "build-fts").map_err(DatasetError::from)?);
|
||||
let mut index_writer: IndexWriter =
|
||||
index.writer(50 * 1024 * 1024).map_err(DatasetError::from)?;
|
||||
|
||||
for s in sources {
|
||||
info!("Processing source {:?}", s.name);
|
||||
|
||||
for i in s.iter() {
|
||||
let (k, v) = i.map_err(DatasetError::from)?;
|
||||
|
||||
let doc = match db_index.entry_to_document(&*v) {
|
||||
Ok(Some(x)) => x,
|
||||
Ok(None) => {
|
||||
warn!("Skipping {k:?}, document is empty");
|
||||
continue;
|
||||
}
|
||||
Err(err) => {
|
||||
warn!("Could not read {k:?}, skipping. {err}");
|
||||
continue;
|
||||
}
|
||||
};
|
||||
index_writer.add_document(doc).map_err(DatasetError::from)?;
|
||||
|
||||
if let Some(flag) = flag.as_ref()
|
||||
&& flag.is_cancelled()
|
||||
{
|
||||
return Err(CancelableTaskError::Cancelled);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
info!("Committing index");
|
||||
index_writer.commit().map_err(DatasetError::from)?;
|
||||
std::fs::rename(&fts_tmp_dir, &fts_dir).map_err(DatasetError::from)?;
|
||||
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
pub fn fts_lookup(
|
||||
&self,
|
||||
query: &str,
|
||||
top_n: usize,
|
||||
) -> Result<Vec<FtsLookupResult>, DatasetError> {
|
||||
let fts_dir = self.path_workdir.join("fts");
|
||||
|
||||
if !fts_dir.exists() {
|
||||
return Err(DatasetError::NoFtsIndex);
|
||||
}
|
||||
if !fts_dir.is_dir() {
|
||||
return Err(std::io::Error::new(
|
||||
ErrorKind::NotADirectory,
|
||||
format!("fts index {} is not a directory", fts_dir.display()),
|
||||
)
|
||||
.into());
|
||||
}
|
||||
|
||||
let db_index = DbFtsIndex::new(&fts_dir, &self.config);
|
||||
let results = db_index.lookup(query, Arc::new(TopDocs::with_limit(top_n)))?;
|
||||
return Ok(results);
|
||||
}
|
||||
|
||||
/// Time at which fts was created
|
||||
pub fn ts_fts(&self) -> Result<Option<DateTime<Utc>>, std::io::Error> {
|
||||
let fts_dir = self.path_workdir.join("fts");
|
||||
|
||||
if !fts_dir.exists() {
|
||||
return Ok(None);
|
||||
}
|
||||
if !fts_dir.is_dir() {
|
||||
return Err(std::io::Error::new(
|
||||
ErrorKind::NotADirectory,
|
||||
format!("fts index {} is not a directory", fts_dir.display()),
|
||||
));
|
||||
}
|
||||
|
||||
return path_ts_earliest(&fts_dir);
|
||||
}
|
||||
|
||||
/// Time at which data was last modified
|
||||
pub fn ts_data(&self) -> Result<Option<DateTime<Utc>>, std::io::Error> {
|
||||
let mut ts: Option<DateTime<Utc>> = None;
|
||||
|
||||
for (label, source) in &self.config.dataset.source {
|
||||
match source {
|
||||
Source::Flac { path } => {
|
||||
let s = DirDataSource::new(label, path.clone().to_vec());
|
||||
match (ts, s.latest_change()?) {
|
||||
(_, None) => continue,
|
||||
(None, Some(new)) => ts = Some(new),
|
||||
(Some(old), Some(new)) => ts = Some(old.max(new)),
|
||||
};
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return Ok(ts);
|
||||
}
|
||||
|
||||
/// Returns true if we do not have an fts index,
|
||||
/// or if our fts index is older than our data.
|
||||
pub fn needs_fts(&self) -> Result<bool, std::io::Error> {
|
||||
let ts_fts = self.ts_fts()?;
|
||||
let ts_data = self.ts_data()?;
|
||||
|
||||
match (ts_fts, ts_data) {
|
||||
(None, Some(_)) => return Ok(true),
|
||||
(None, None) | (Some(_), None) => {
|
||||
warn!("Could not determine data age");
|
||||
return Ok(false);
|
||||
}
|
||||
|
||||
(Some(ts_fts), Some(ts_data)) => return Ok(ts_data > ts_fts),
|
||||
}
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user