Auto-update fts index
Some checks failed
CI / Typos (push) Successful in 15s
CI / Clippy (push) Successful in 1m5s
CI / Build and test (push) Failing after 1m2s

This commit is contained in:
2026-02-21 15:55:10 -08:00
parent 5d8ad4665d
commit 141839ae55
36 changed files with 1119 additions and 275 deletions

View File

@@ -10,6 +10,7 @@ workspace = true
[dependencies]
pile-config = { workspace = true }
pile-audio = { workspace = true }
pile-toolbox = { workspace = true }
serde_json = { workspace = true }
itertools = { workspace = true }
@@ -18,3 +19,5 @@ tantivy = { workspace = true }
tracing = { workspace = true }
jsonpath-rust = { workspace = true }
chrono = { workspace = true }
toml = { workspace = true }
thiserror = { workspace = true }

View File

@@ -0,0 +1,229 @@
use chrono::{DateTime, Utc};
use pile_config::{ConfigToml, Source};
use pile_toolbox::cancelabletask::{CancelFlag, CancelableTaskError};
use std::{io::ErrorKind, path::PathBuf, sync::Arc};
use tantivy::{Executor, Index, IndexWriter, TantivyError, collector::TopDocs};
use thiserror::Error;
use tracing::{info, trace, warn};
use crate::{
DataSource,
index::{DbFtsIndex, FtsLookupResult},
path_ts_earliest,
source::DirDataSource,
};
#[derive(Debug, Error)]
pub enum DatasetError {
#[error("{0}")]
IoError(#[from] std::io::Error),
#[error("{0}")]
TantivyError(#[from] TantivyError),
#[error("this dataset does not have an fts index")]
NoFtsIndex,
}
pub struct Dataset {
pub path_config: PathBuf,
pub path_parent: PathBuf,
pub path_workdir: PathBuf,
pub config: ConfigToml,
}
impl Dataset {
pub fn open(config: impl Into<PathBuf>) -> Result<Self, std::io::Error> {
let path_config = config.into();
let path_parent = path_config
.parent()
.ok_or(std::io::Error::new(
ErrorKind::NotADirectory,
format!("Config file {} has no parent", path_config.display()),
))?
.to_owned();
let config = {
let config = std::fs::read_to_string(&path_config)?;
let config: Result<ConfigToml, _> = toml::from_str(&config);
match config {
Ok(config) => {
trace!(message = "Loaded config", ?config);
config
}
Err(error) => {
return Err(std::io::Error::new(
ErrorKind::InvalidData,
format!("{} is invalid:\n{error}", path_config.display()),
));
}
}
};
let path_workdir = config
.dataset
.working_dir
.clone()
.unwrap_or(path_parent.join(".pile"))
.join(config.dataset.name.as_str());
return Ok(Self {
path_config,
path_parent,
path_workdir,
config,
});
}
//
// MARK: fts
//
/// Refresh this dataset's fts index
pub fn fts_refresh(
&self,
flag: Option<CancelFlag>,
) -> Result<(), CancelableTaskError<DatasetError>> {
let fts_tmp_dir = self.path_workdir.join(".tmp-fts");
let fts_dir = self.path_workdir.join("fts");
if fts_tmp_dir.is_dir() {
warn!("Removing temporary index in {}", fts_dir.display());
std::fs::remove_dir_all(&fts_tmp_dir).map_err(DatasetError::from)?;
}
if fts_dir.is_dir() {
warn!("Removing existing index in {}", fts_dir.display());
std::fs::remove_dir_all(&fts_dir).map_err(DatasetError::from)?;
}
std::fs::create_dir_all(&fts_tmp_dir).map_err(DatasetError::from)?;
let mut sources = Vec::new();
for (name, source) in &self.config.dataset.source {
match source {
Source::Flac { path: dir } => {
let source = DirDataSource::new(name, dir.clone().to_vec());
sources.push(source);
}
}
}
let db_index = DbFtsIndex::new(&fts_tmp_dir, &self.config);
let mut index = Index::create_in_dir(&fts_tmp_dir, db_index.schema.clone())
.map_err(DatasetError::from)?;
index.set_executor(Executor::multi_thread(10, "build-fts").map_err(DatasetError::from)?);
let mut index_writer: IndexWriter = index.writer(15_000_000).map_err(DatasetError::from)?;
for s in sources {
info!("Processing source {:?}", s.name);
for i in s.iter() {
let (k, v) = i.map_err(DatasetError::from)?;
let doc = match db_index
.entry_to_document(&*v)
.map_err(DatasetError::from)?
{
Some(x) => x,
None => {
warn!("Skipping {k:?}, document is empty");
continue;
}
};
index_writer.add_document(doc).map_err(DatasetError::from)?;
if let Some(flag) = flag.as_ref()
&& flag.is_cancelled()
{
return Err(CancelableTaskError::Cancelled);
}
}
}
info!("Committing index");
index_writer.commit().map_err(DatasetError::from)?;
std::fs::rename(&fts_tmp_dir, &fts_dir).map_err(DatasetError::from)?;
return Ok(());
}
pub fn fts_lookup(
&self,
query: &str,
top_n: usize,
) -> Result<Vec<FtsLookupResult>, DatasetError> {
let fts_dir = self.path_workdir.join("fts");
if !fts_dir.exists() {
return Err(DatasetError::NoFtsIndex);
}
if !fts_dir.is_dir() {
return Err(std::io::Error::new(
ErrorKind::NotADirectory,
format!("fts index {} is not a directory", fts_dir.display()),
)
.into());
}
let db_index = DbFtsIndex::new(&fts_dir, &self.config);
let results = db_index.lookup(query, Arc::new(TopDocs::with_limit(top_n)))?;
return Ok(results);
}
/// Time at which fts was created
pub fn ts_fts(&self) -> Result<Option<DateTime<Utc>>, std::io::Error> {
let fts_dir = self.path_workdir.join("fts");
if !fts_dir.exists() {
return Ok(None);
}
if !fts_dir.is_dir() {
return Err(std::io::Error::new(
ErrorKind::NotADirectory,
format!("fts index {} is not a directory", fts_dir.display()),
));
}
return path_ts_earliest(&fts_dir);
}
/// Time at which data was last modified
pub fn ts_data(&self) -> Result<Option<DateTime<Utc>>, std::io::Error> {
let mut ts: Option<DateTime<Utc>> = None;
for (label, source) in &self.config.dataset.source {
match source {
Source::Flac { path } => {
let s = DirDataSource::new(label, path.clone().to_vec());
match (ts, s.latest_change()?) {
(_, None) => continue,
(None, Some(new)) => ts = Some(new),
(Some(old), Some(new)) => ts = Some(old.max(new)),
};
}
}
}
return Ok(ts);
}
/// Returns true if we do not have an fts index,
/// or if our fts index is older than our data.
pub fn needs_fts(&self) -> Result<bool, std::io::Error> {
let ts_fts = self.ts_fts()?;
let ts_data = self.ts_data()?;
match (ts_fts, ts_data) {
(None, Some(_)) => return Ok(true),
(None, None) | (Some(_), None) => {
warn!("Could not determine data age");
return Ok(false);
}
(Some(ts_fts), Some(ts_data)) => return Ok(ts_data > ts_fts),
}
}
}

View File

@@ -27,8 +27,8 @@ pub struct DbFtsIndex {
impl DbFtsIndex {
fn fts_cfg(&self) -> &DatasetFts {
static DEFAULT: LazyLock<DatasetFts> = LazyLock::new(|| DatasetFts::default());
&self.cfg.fts.as_ref().unwrap_or(&DEFAULT)
static DEFAULT: LazyLock<DatasetFts> = LazyLock::new(DatasetFts::default);
self.cfg.fts.as_ref().unwrap_or(&DEFAULT)
}
}
@@ -47,9 +47,9 @@ impl DbFtsIndex {
let fields = &cfg.fts.as_ref().unwrap_or(&default).fields;
for (name, field) in fields {
if field.tokenize {
schema_builder.add_text_field(&name, schema::TEXT);
schema_builder.add_text_field(name, schema::TEXT);
} else {
schema_builder.add_text_field(&name, schema::STRING);
schema_builder.add_text_field(name, schema::STRING);
}
}
@@ -70,19 +70,23 @@ impl DbFtsIndex {
) -> Result<Option<TantivyDocument>, TantivyError> {
let mut doc = TantivyDocument::default();
{
let f_ptr = self.schema.get_field("_meta_source")?;
doc.add_text(f_ptr, item.source_name());
}
let key = match item.key().to_string() {
Some(x) => x,
None => {
warn!(
message = "Item key cannot be converted to a string, skipping",
key = ?item.key(),
);
return Ok(None);
}
};
{
let f_ptr = self.schema.get_field("_meta_key")?;
doc.add_text(f_ptr, item.key().to_string());
}
doc.add_text(self.schema.get_field("_meta_source")?, item.source_name());
doc.add_text(self.schema.get_field("_meta_key")?, key);
let json = item.json().unwrap();
let json = item.json()?;
let mut empty = true;
for (name, _field) in &self.fts_cfg().fields {
for name in self.fts_cfg().fields.keys() {
let val = match self.get_field(&json, name)? {
Some(x) => x,
None => continue,
@@ -118,7 +122,7 @@ impl DbFtsIndex {
// Try paths in order, using the first value we find
'outer: for path in field.path.as_slice() {
let val = match json.query(&path) {
let val = match json.query(path) {
Ok(mut x) => {
if x.len() > 1 {
warn!(
@@ -168,6 +172,7 @@ impl DbFtsIndex {
loop {
val = match val {
#[expect(clippy::unwrap_used)]
Value::Array(ref mut x) => {
if x.len() == 1 {
x.pop().unwrap()

View File

@@ -6,13 +6,14 @@ use std::{
};
use pile_audio::flac::blockread::{FlacBlock, FlacBlockReader, FlacBlockSelector};
use pile_config::Label;
use serde_json::{Map, Value};
use crate::Item;
pub struct FlacItem {
pub(crate) path: PathBuf,
pub(crate) source_name: String,
pub(crate) source_name: Label,
}
impl Debug for FlacItem {
@@ -47,8 +48,10 @@ impl Item for FlacItem {
let mut data = Vec::new();
file.read_to_end(&mut data)?;
block_reader.push_data(&data).unwrap();
block_reader.finish().unwrap();
block_reader
.push_data(&data)
.map_err(std::io::Error::other)?;
block_reader.finish().map_err(std::io::Error::other)?;
//
// Return tags
@@ -56,9 +59,8 @@ impl Item for FlacItem {
let mut output = Map::new();
while block_reader.has_block() {
let b = block_reader.pop_block().unwrap();
match b {
while let Some(block) = block_reader.pop_block() {
match block {
FlacBlock::VorbisComment(comment) => {
for (k, v) in comment.comment.comments {
let k = k.to_string();
@@ -67,9 +69,11 @@ impl Item for FlacItem {
match e {
None => {
output.insert(k.to_string(), Value::Array(vec![v]));
output.insert(k.clone(), Value::Array(vec![v]));
}
Some(e) => {
// We always insert an array
#[expect(clippy::unwrap_used)]
e.as_array_mut().unwrap().push(v);
}
}

View File

@@ -4,6 +4,9 @@ pub use traits::*;
mod misc;
pub use misc::*;
mod dataset;
pub use dataset::*;
pub mod index;
pub mod item;
pub mod source;

View File

@@ -2,41 +2,65 @@ use chrono::{DateTime, Utc};
use std::fs;
use std::path::Path;
/// Returns the age of a path as a chrono DateTime.
/// - If the path doesn't exist, returns None
/// Returns the age of a path as a [DateTime].
/// - If the path doesn't exist, returns [None]
/// - If it's a file, returns the modified time
/// - If it's a directory, returns the LATEST modified time of all files within
pub fn path_age(path: impl AsRef<Path>) -> Option<DateTime<Utc>> {
pub fn path_ts_latest(path: impl AsRef<Path>) -> Result<Option<DateTime<Utc>>, std::io::Error> {
let path = path.as_ref();
// Check if path exists
if !path.exists() {
return None;
return Ok(None);
}
let metadata = fs::metadata(path).ok()?;
let metadata = fs::metadata(path)?;
if metadata.is_file() {
// For files, return the modified time
let modified = metadata.modified().ok()?;
Some(modified.into())
let modified = metadata.modified()?;
Ok(Some(modified.into()))
} else if metadata.is_dir() {
// For directories, find the latest modified time of all files
find_latest_modified(path)
} else {
None
Ok(None)
}
}
fn find_latest_modified(dir: &Path) -> Option<DateTime<Utc>> {
/// Returns the age of a path as a [DateTime].
/// - If the path doesn't exist, returns [None]
/// - If it's a file, returns the modified time
/// - If it's a directory, returns the EARLIEST modified time of all files within
pub fn path_ts_earliest(path: impl AsRef<Path>) -> Result<Option<DateTime<Utc>>, std::io::Error> {
let path = path.as_ref();
if !path.exists() {
return Ok(None);
}
let metadata = fs::metadata(path)?;
if metadata.is_file() {
let modified = metadata.modified()?;
Ok(Some(modified.into()))
} else if metadata.is_dir() {
find_earliest_modified(path)
} else {
Ok(None)
}
}
fn find_latest_modified(dir: &Path) -> Result<Option<DateTime<Utc>>, std::io::Error> {
let mut latest: Option<DateTime<Utc>> = None;
// Read directory entries
let entries = fs::read_dir(dir).ok()?;
// Include the directory's own modification time
let dir_metadata = fs::metadata(dir)?;
if let Ok(modified) = dir_metadata.modified() {
let dt: DateTime<Utc> = modified.into();
latest = Some(dt);
}
let entries = fs::read_dir(dir)?;
for entry in entries.flatten() {
let path = entry.path();
let metadata = entry.metadata().ok()?;
let metadata = entry.metadata()?;
if metadata.is_file() {
if let Ok(modified) = metadata.modified() {
@@ -46,16 +70,50 @@ fn find_latest_modified(dir: &Path) -> Option<DateTime<Utc>> {
_ => dt,
});
}
} else if metadata.is_dir() {
// Recursively check subdirectories
if let Some(dir_latest) = find_latest_modified(&path) {
} else if metadata.is_dir()
&& let Some(dir_latest) = find_latest_modified(&path)? {
latest = Some(match latest {
Some(prev) if prev > dir_latest => prev,
_ => dir_latest,
});
}
}
}
latest
return Ok(latest);
}
fn find_earliest_modified(dir: &Path) -> Result<Option<DateTime<Utc>>, std::io::Error> {
let mut earliest: Option<DateTime<Utc>> = None;
// Include the directory's own modification time
let dir_metadata = fs::metadata(dir)?;
if let Ok(modified) = dir_metadata.modified() {
let dt: DateTime<Utc> = modified.into();
earliest = Some(dt);
}
let entries = fs::read_dir(dir)?;
for entry in entries.flatten() {
let path = entry.path();
let metadata = entry.metadata()?;
if metadata.is_file() {
if let Ok(modified) = metadata.modified() {
let dt: DateTime<Utc> = modified.into();
earliest = Some(match earliest {
Some(prev) if prev < dt => prev,
_ => dt,
});
}
} else if metadata.is_dir()
&& let Some(dir_earliest) = find_earliest_modified(&path)? {
earliest = Some(match earliest {
Some(prev) if prev < dir_earliest => prev,
_ => dir_earliest,
});
}
}
return Ok(earliest);
}

View File

@@ -1,19 +1,21 @@
use chrono::{DateTime, Utc};
use itertools::Itertools;
use std::{io::ErrorKind, path::PathBuf};
use pile_config::Label;
use std::path::PathBuf;
use walkdir::WalkDir;
use crate::{DataSource, Item, item::FlacItem};
use crate::{DataSource, Item, item::FlacItem, path_ts_latest};
#[derive(Debug)]
pub struct DirDataSource {
pub name: String,
pub name: Label,
pub dirs: Vec<PathBuf>,
}
impl DirDataSource {
pub fn new(name: impl Into<String>, dirs: Vec<PathBuf>) -> Self {
pub fn new(name: &Label, dirs: Vec<PathBuf>) -> Self {
Self {
name: name.into(),
name: name.clone(),
dirs,
}
}
@@ -40,16 +42,12 @@ impl DataSource for DirDataSource {
return self
.dirs
.iter()
.map(|x| WalkDir::new(x).into_iter().map_ok(move |d| (x, d)))
.flatten()
.into_iter()
.filter_ok(|(_, entry)| entry.file_type().is_file())
.flat_map(|x| WalkDir::new(x).into_iter().map_ok(move |d| (x, d)))
.filter_ok(|(_, entry)| !entry.file_type().is_dir())
.map(|x| match x {
Err(err) => {
let msg = format!("other walkdir error: {err:?}");
Err(err
.into_io_error()
.unwrap_or(std::io::Error::new(ErrorKind::Other, msg)))
Err(err.into_io_error().unwrap_or(std::io::Error::other(msg)))
}
Ok((_, entry)) => {
let path = entry.into_path();
@@ -63,4 +61,23 @@ impl DataSource for DirDataSource {
}
});
}
fn latest_change(&self) -> Result<Option<DateTime<Utc>>, Self::Error> {
let mut ts: Option<DateTime<Utc>> = None;
for path in &self.dirs {
if !path.exists() {
continue;
}
let new = path_ts_latest(path)?;
match (ts, new) {
(_, None) => continue,
(None, Some(new)) => ts = Some(new),
(Some(old), Some(new)) => ts = Some(old.max(new)),
};
}
return Ok(ts);
}
}

View File

@@ -1,3 +1,4 @@
use chrono::{DateTime, Utc};
use std::{error::Error, fmt::Debug, path::PathBuf};
/// A read-only set of [Item]s.
@@ -15,6 +16,9 @@ pub trait DataSource {
fn iter(
&self,
) -> impl Iterator<Item = Result<(Self::Key, Box<dyn Item<Key = Self::Key>>), Self::Error>>;
/// Return the time of the latest change to the data in this source
fn latest_change(&self) -> Result<Option<DateTime<Utc>>, Self::Error>;
}
pub trait Item: Debug + Send + Sync + 'static {
@@ -34,7 +38,10 @@ pub trait Item: Debug + Send + Sync + 'static {
//
pub trait Key: Debug + Clone + Send + Sync + 'static {
fn to_string(&self) -> String;
/// Convert this key to a string, returning `None`
/// if we encounter any kind of error.
fn to_string(&self) -> Option<String>;
fn from_string(str: &str) -> Option<Self>;
}
@@ -43,7 +50,7 @@ impl Key for PathBuf {
str.parse().ok()
}
fn to_string(&self) -> String {
self.to_str().expect("path is not a string").to_owned()
fn to_string(&self) -> Option<String> {
self.to_str().map(|x| x.to_owned())
}
}