From 141839ae5502ae21ffb26030874f906c98aeb347 Mon Sep 17 00:00:00 2001 From: rm-dr <96270320+rm-dr@users.noreply.github.com> Date: Sat, 21 Feb 2026 15:55:10 -0800 Subject: [PATCH] Auto-update fts index --- Cargo.lock | 4 +- Cargo.toml | 3 +- crates/pile-audio/src/common/picturetype.rs | 2 +- crates/pile-audio/src/common/vorbiscomment.rs | 70 ++++-- crates/pile-audio/src/flac/blockread.rs | 53 +++- .../pile-audio/src/flac/blocks/application.rs | 5 +- crates/pile-audio/src/flac/blocks/cuesheet.rs | 6 +- crates/pile-audio/src/flac/blocks/header.rs | 2 +- crates/pile-audio/src/flac/blocks/mod.rs | 2 +- crates/pile-audio/src/flac/blocks/padding.rs | 6 +- crates/pile-audio/src/flac/blocks/picture.rs | 73 +++--- .../pile-audio/src/flac/blocks/seektable.rs | 6 +- crates/pile-audio/src/flac/errors.rs | 4 +- crates/pile-audio/src/flac/mod.rs | 1 + crates/pile-audio/src/flac/proc/metastrip.rs | 7 +- crates/pile-audio/src/flac/proc/pictures.rs | 1 + crates/pile-audio/src/nodes/extractcovers.rs | 112 +++++++++ crates/pile-audio/src/nodes/extracttags.rs | 164 +++++++++++++ crates/pile-audio/src/nodes/mod.rs | 36 +++ crates/pile-audio/src/nodes/striptags.rs | 181 ++++++++++++++ crates/pile-config/src/config.toml | 4 +- crates/pile-config/src/lib.rs | 3 +- crates/pile-config/src/misc.rs | 2 +- crates/pile-config/src/post.rs | 6 +- crates/pile-dataset/Cargo.toml | 3 + crates/pile-dataset/src/dataset.rs | 229 ++++++++++++++++++ crates/pile-dataset/src/index/index_fts.rs | 35 +-- crates/pile-dataset/src/item/flac.rs | 18 +- crates/pile-dataset/src/lib.rs | 3 + crates/pile-dataset/src/misc.rs | 100 ++++++-- crates/pile-dataset/src/source/dir.rs | 41 +++- crates/pile-dataset/src/traits.rs | 13 +- crates/pile/Cargo.toml | 1 - crates/pile/src/command/check.rs | 28 ++- crates/pile/src/command/index.rs | 98 +------- crates/pile/src/command/lookup.rs | 72 ++---- 36 files changed, 1119 insertions(+), 275 deletions(-) create mode 100644 crates/pile-audio/src/nodes/extractcovers.rs create mode 100644 crates/pile-audio/src/nodes/extracttags.rs create mode 100644 crates/pile-audio/src/nodes/mod.rs create mode 100644 crates/pile-audio/src/nodes/striptags.rs create mode 100644 crates/pile-dataset/src/dataset.rs diff --git a/Cargo.lock b/Cargo.lock index f2d957b..cefcf80 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1041,7 +1041,6 @@ dependencies = [ "pile-toolbox", "serde", "signal-hook", - "tantivy", "tokio", "toml", "tracing", @@ -1084,8 +1083,11 @@ dependencies = [ "jsonpath-rust", "pile-audio", "pile-config", + "pile-toolbox", "serde_json", "tantivy", + "thiserror", + "toml", "tracing", "walkdir", ] diff --git a/Cargo.toml b/Cargo.toml index fa4f37f..efcd09f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -42,7 +42,7 @@ str_to_string = "deny" string_add = "deny" implicit_clone = "deny" use_debug = "allow" -verbose_file_reads = "deny" +verbose_file_reads = "allow" large_types_passed_by_value = "deny" wildcard_dependencies = "deny" negative_feature_names = "deny" @@ -58,7 +58,6 @@ unwrap_used = "warn" expect_used = "warn" type_complexity = "allow" - # # MARK: dependencies # diff --git a/crates/pile-audio/src/common/picturetype.rs b/crates/pile-audio/src/common/picturetype.rs index a760b32..5f8783b 100644 --- a/crates/pile-audio/src/common/picturetype.rs +++ b/crates/pile-audio/src/common/picturetype.rs @@ -17,7 +17,7 @@ impl Display for PictureTypeError { impl std::error::Error for PictureTypeError {} /// A picture type according to the ID3v2 APIC frame -#[allow(missing_docs)] +#[expect(missing_docs)] #[derive(Debug, PartialEq, Eq, Clone, Copy)] pub enum PictureType { Other, diff --git a/crates/pile-audio/src/common/vorbiscomment.rs b/crates/pile-audio/src/common/vorbiscomment.rs index d3bb04a..5370354 100644 --- a/crates/pile-audio/src/common/vorbiscomment.rs +++ b/crates/pile-audio/src/common/vorbiscomment.rs @@ -13,7 +13,7 @@ use crate::flac::blocks::{FlacMetablockDecode, FlacMetablockEncode, FlacPictureB use super::tagtype::TagType; #[derive(Debug)] -#[allow(missing_docs)] +#[expect(missing_docs)] pub enum VorbisCommentDecodeError { /// We encountered an IoError while processing a block IoError(std::io::Error), @@ -74,7 +74,7 @@ impl From for VorbisCommentDecodeError { } #[derive(Debug)] -#[allow(missing_docs)] +#[expect(missing_docs)] pub enum VorbisCommentEncodeError { /// We encountered an IoError while processing a block IoError(std::io::Error), @@ -132,39 +132,52 @@ impl VorbisComment { let mut block = [0u8; 4]; let vendor = { - #[expect(clippy::map_err_ignore)] d.read_exact(&mut block) - .map_err(|_| VorbisCommentDecodeError::MalformedData)?; + .map_err(|_err| VorbisCommentDecodeError::MalformedData)?; let length = u32::from_le_bytes(block); - let mut text = vec![0u8; length.try_into().unwrap()]; - #[expect(clippy::map_err_ignore)] + #[expect(clippy::expect_used)] + let mut text = vec![ + 0u8; + length + .try_into() + .expect("vendor length does not fit into usize") + ]; + d.read_exact(&mut text) - .map_err(|_| VorbisCommentDecodeError::MalformedData)?; + .map_err(|_err| VorbisCommentDecodeError::MalformedData)?; String::from_utf8(text)? }; - #[expect(clippy::map_err_ignore)] d.read_exact(&mut block) - .map_err(|_| VorbisCommentDecodeError::MalformedData)?; - let n_comments: usize = u32::from_le_bytes(block).try_into().unwrap(); + .map_err(|_err| VorbisCommentDecodeError::MalformedData)?; + + #[expect(clippy::expect_used)] + let n_comments: usize = u32::from_le_bytes(block) + .try_into() + .expect("comment count does not fit into usize"); let mut comments = Vec::new(); let mut pictures = Vec::new(); for _ in 0..n_comments { let comment = { - #[expect(clippy::map_err_ignore)] d.read_exact(&mut block) - .map_err(|_| VorbisCommentDecodeError::MalformedData)?; + .map_err(|_err| VorbisCommentDecodeError::MalformedData)?; let length = u32::from_le_bytes(block); - let mut text = vec![0u8; length.try_into().unwrap()]; - #[expect(clippy::map_err_ignore)] + #[expect(clippy::expect_used)] + let mut text = vec![ + 0u8; + length + .try_into() + .expect("comment length does not fit into usize") + ]; + d.read_exact(&mut text) - .map_err(|_| VorbisCommentDecodeError::MalformedData)?; + .map_err(|_err| VorbisCommentDecodeError::MalformedData)?; String::from_utf8(text)? }; @@ -218,9 +231,10 @@ impl VorbisComment { impl VorbisComment { /// Get the number of bytes that `encode()` will write. + #[expect(clippy::expect_used)] pub fn get_len(&self) -> u32 { let mut sum: u32 = 0; - sum += u32::try_from(self.vendor.len()).unwrap() + 4; + sum += u32::try_from(self.vendor.len()).expect("vendor length does not fit into u32") + 4; sum += 4; for (tagtype, value) in &self.comments { @@ -244,7 +258,8 @@ impl VorbisComment { .to_uppercase(); let str = format!("{tagtype_str}={value}"); - sum += 4 + u32::try_from(str.len()).unwrap(); + sum += + 4 + u32::try_from(str.len()).expect("comment string length does not fit into u32"); } for p in &self.pictures { @@ -271,13 +286,18 @@ impl VorbisComment { } /// Try to encode this vorbis comment + #[expect(clippy::expect_used)] pub fn encode(&self, target: &mut impl Write) -> Result<(), VorbisCommentEncodeError> { - target.write_all(&u32::try_from(self.vendor.len()).unwrap().to_le_bytes())?; + target.write_all( + &u32::try_from(self.vendor.len()) + .expect("vendor length does not fit into u32") + .to_le_bytes(), + )?; target.write_all(self.vendor.as_bytes())?; target.write_all( &u32::try_from(self.comments.len() + self.pictures.len()) - .unwrap() + .expect("total comment count does not fit into u32") .to_le_bytes(), )?; @@ -302,7 +322,11 @@ impl VorbisComment { .to_uppercase(); let str = format!("{tagtype_str}={value}"); - target.write_all(&u32::try_from(str.len()).unwrap().to_le_bytes())?; + target.write_all( + &u32::try_from(str.len()) + .expect("comment string length does not fit into u32") + .to_le_bytes(), + )?; target.write_all(str.as_bytes())?; } @@ -318,7 +342,11 @@ impl VorbisComment { &base64::prelude::BASE64_STANDARD.encode(&pic_data) ); - target.write_all(&u32::try_from(pic_string.len()).unwrap().to_le_bytes())?; + target.write_all( + &u32::try_from(pic_string.len()) + .expect("picture string length does not fit into u32") + .to_le_bytes(), + )?; target.write_all(pic_string.as_bytes())?; } diff --git a/crates/pile-audio/src/flac/blockread.rs b/crates/pile-audio/src/flac/blockread.rs index 2c7394a..aeec046 100644 --- a/crates/pile-audio/src/flac/blockread.rs +++ b/crates/pile-audio/src/flac/blockread.rs @@ -88,7 +88,7 @@ enum FlacBlockType { } #[derive(Debug)] -#[allow(missing_docs)] +#[expect(missing_docs)] pub enum FlacBlock { Streaminfo(FlacStreaminfoBlock), Picture(FlacPictureBlock), @@ -212,9 +212,16 @@ impl FlacBlockReader { } 'outer: while last_read_size != 0 { - match self.current_block.as_mut().unwrap() { + #[expect(clippy::expect_used)] + match self + .current_block + .as_mut() + .expect("current_block is Some, checked above") + { FlacBlockType::MagicBits { data, left_to_read } => { - last_read_size = buf.read(&mut data[4 - *left_to_read..4]).unwrap(); + last_read_size = buf + .read(&mut data[4 - *left_to_read..4]) + .map_err(FlacDecodeError::from)?; *left_to_read -= last_read_size; if *left_to_read == 0 { @@ -235,7 +242,9 @@ impl FlacBlockReader { data, left_to_read, } => { - last_read_size = buf.read(&mut data[4 - *left_to_read..4]).unwrap(); + last_read_size = buf + .read(&mut data[4 - *left_to_read..4]) + .map_err(FlacDecodeError::from)?; *left_to_read -= last_read_size; if *left_to_read == 0 { @@ -253,13 +262,24 @@ impl FlacBlockReader { } FlacBlockType::MetaBlock { header, data } => { - last_read_size = buf - .by_ref() - .take(u64::from(header.length) - u64::try_from(data.len()).unwrap()) - .read_to_end(data) - .unwrap(); + #[expect(clippy::expect_used)] + { + last_read_size = buf + .by_ref() + .take( + u64::from(header.length) + - u64::try_from(data.len()) + .expect("data length does not fit into u64"), + ) + .read_to_end(data) + .map_err(FlacDecodeError::from)?; + } - if data.len() == usize::try_from(header.length).unwrap() { + #[expect(clippy::expect_used)] + if data.len() + == usize::try_from(header.length) + .expect("header length does not fit into usize") + { // If we picked this block type, add it to the queue if self.selector.should_pick_meta(header.block_type) { let b = FlacBlock::decode(header.block_type, data)?; @@ -283,7 +303,11 @@ impl FlacBlockReader { // Limit the number of bytes we read at once, so we don't re-clone // large amounts of data if `buf` contains multiple sync sequences. // 5kb is a pretty reasonable frame size. - last_read_size = buf.by_ref().take(5_000).read_to_end(data).unwrap(); + last_read_size = buf + .by_ref() + .take(5_000) + .read_to_end(data) + .map_err(FlacDecodeError::from)?; if last_read_size == 0 { continue 'outer; } @@ -335,9 +359,10 @@ impl FlacBlockReader { // Backtrack to the first bit AFTER this new sync sequence buf.seek(std::io::SeekFrom::Current( - -i64::try_from(data.len() - i).unwrap(), + -i64::try_from(data.len() - i) + .expect("seek offset does not fit into i64"), )) - .unwrap(); + .map_err(FlacDecodeError::from)?; self.current_block = Some(FlacBlockType::AudioData { data: { @@ -406,6 +431,7 @@ mod tests { flac::tests::{FlacBlockOutput, FlacTestCase, VorbisCommentTestValue, manifest}, }; + #[expect(clippy::unwrap_used)] fn read_file( test_case: &FlacTestCase, fragment_size_range: Option>, @@ -447,6 +473,7 @@ mod tests { return Ok(out_blocks); } + #[expect(clippy::unwrap_used)] fn test_identical( test_case: &FlacTestCase, fragment_size_range: Option>, diff --git a/crates/pile-audio/src/flac/blocks/application.rs b/crates/pile-audio/src/flac/blocks/application.rs index cd2f378..b5c0ecc 100644 --- a/crates/pile-audio/src/flac/blocks/application.rs +++ b/crates/pile-audio/src/flac/blocks/application.rs @@ -50,8 +50,11 @@ impl FlacMetablockDecode for FlacApplicationBlock { } impl FlacMetablockEncode for FlacApplicationBlock { + #[expect(clippy::expect_used)] fn get_len(&self) -> u32 { - (self.data.len() + 4).try_into().unwrap() + (self.data.len() + 4) + .try_into() + .expect("application block size does not fit into u32") } fn encode( diff --git a/crates/pile-audio/src/flac/blocks/cuesheet.rs b/crates/pile-audio/src/flac/blocks/cuesheet.rs index 5a013ac..107a964 100644 --- a/crates/pile-audio/src/flac/blocks/cuesheet.rs +++ b/crates/pile-audio/src/flac/blocks/cuesheet.rs @@ -25,8 +25,12 @@ impl FlacMetablockDecode for FlacCuesheetBlock { } impl FlacMetablockEncode for FlacCuesheetBlock { + #[expect(clippy::expect_used)] fn get_len(&self) -> u32 { - self.data.len().try_into().unwrap() + self.data + .len() + .try_into() + .expect("cuesheet size does not fit into u32") } fn encode( diff --git a/crates/pile-audio/src/flac/blocks/header.rs b/crates/pile-audio/src/flac/blocks/header.rs index f60def6..48e3a36 100644 --- a/crates/pile-audio/src/flac/blocks/header.rs +++ b/crates/pile-audio/src/flac/blocks/header.rs @@ -4,7 +4,7 @@ use std::fmt::Debug; use crate::flac::errors::{FlacDecodeError, FlacEncodeError}; /// A type of flac metadata block -#[allow(missing_docs)] +#[expect(missing_docs)] #[derive(Debug, PartialEq, Eq, Clone, Copy)] pub enum FlacMetablockType { Streaminfo, diff --git a/crates/pile-audio/src/flac/blocks/mod.rs b/crates/pile-audio/src/flac/blocks/mod.rs index e70c479..1287f77 100644 --- a/crates/pile-audio/src/flac/blocks/mod.rs +++ b/crates/pile-audio/src/flac/blocks/mod.rs @@ -1,4 +1,4 @@ -//! Read and write impelementations for all flac block types +//! Read and write implementations for all flac block types // Not metadata blocks mod header; diff --git a/crates/pile-audio/src/flac/blocks/padding.rs b/crates/pile-audio/src/flac/blocks/padding.rs index 4ed2012..1315ea7 100644 --- a/crates/pile-audio/src/flac/blocks/padding.rs +++ b/crates/pile-audio/src/flac/blocks/padding.rs @@ -12,13 +12,17 @@ pub struct FlacPaddingBlock { } impl FlacMetablockDecode for FlacPaddingBlock { + #[expect(clippy::expect_used)] fn decode(data: &[u8]) -> Result { if data.iter().any(|x| *x != 0u8) { return Err(FlacDecodeError::MalformedBlock); } Ok(Self { - size: data.len().try_into().unwrap(), + size: data + .len() + .try_into() + .expect("padding size does not fit into u32"), }) } } diff --git a/crates/pile-audio/src/flac/blocks/picture.rs b/crates/pile-audio/src/flac/blocks/picture.rs index d590ba6..af4750e 100644 --- a/crates/pile-audio/src/flac/blocks/picture.rs +++ b/crates/pile-audio/src/flac/blocks/picture.rs @@ -63,16 +63,17 @@ impl FlacMetablockDecode for FlacPictureBlock { // Image format let mime = { - #[expect(clippy::map_err_ignore)] d.read_exact(&mut block) - .map_err(|_| FlacDecodeError::MalformedBlock)?; + .map_err(|_err| FlacDecodeError::MalformedBlock)?; - let mime_length = u32::from_be_bytes(block).try_into().unwrap(); + #[expect(clippy::expect_used)] + let mime_length = u32::from_be_bytes(block) + .try_into() + .expect("mime length does not fit into usize"); let mut mime = vec![0u8; mime_length]; - #[expect(clippy::map_err_ignore)] d.read_exact(&mut mime) - .map_err(|_| FlacDecodeError::MalformedBlock)?; + .map_err(|_err| FlacDecodeError::MalformedBlock)?; String::from_utf8(mime) .ok() @@ -82,16 +83,17 @@ impl FlacMetablockDecode for FlacPictureBlock { // Image description let description = { - #[expect(clippy::map_err_ignore)] d.read_exact(&mut block) - .map_err(|_| FlacDecodeError::MalformedBlock)?; + .map_err(|_err| FlacDecodeError::MalformedBlock)?; - let desc_length = u32::from_be_bytes(block).try_into().unwrap(); + #[expect(clippy::expect_used)] + let desc_length = u32::from_be_bytes(block) + .try_into() + .expect("description length does not fit into usize"); let mut desc = vec![0u8; desc_length]; - #[expect(clippy::map_err_ignore)] d.read_exact(&mut desc) - .map_err(|_| FlacDecodeError::MalformedBlock)?; + .map_err(|_err| FlacDecodeError::MalformedBlock)?; String::from_utf8(desc)? }; @@ -122,16 +124,17 @@ impl FlacMetablockDecode for FlacPictureBlock { // Image data length let img_data = { - #[expect(clippy::map_err_ignore)] d.read_exact(&mut block) - .map_err(|_| FlacDecodeError::MalformedBlock)?; + .map_err(|_err| FlacDecodeError::MalformedBlock)?; - let data_length = u32::from_be_bytes(block).try_into().unwrap(); + #[expect(clippy::expect_used)] + let data_length = u32::from_be_bytes(block) + .try_into() + .expect("image data length does not fit into usize"); let mut img_data = vec![0u8; data_length]; - #[expect(clippy::map_err_ignore)] d.read_exact(&mut img_data) - .map_err(|_| FlacDecodeError::MalformedBlock)?; + .map_err(|_err| FlacDecodeError::MalformedBlock)?; img_data }; @@ -150,13 +153,14 @@ impl FlacMetablockDecode for FlacPictureBlock { } impl FlacMetablockEncode for FlacPictureBlock { + #[expect(clippy::expect_used)] fn get_len(&self) -> u32 { (4 + (4 + self.mime.to_string().len()) + (4 + self.description.len()) + 4 + 4 + 4 + 4 + (4 + self.img_data.len())) .try_into() - .unwrap() + .expect("picture block size does not fit into u32") } fn encode( @@ -176,20 +180,35 @@ impl FlacMetablockEncode for FlacPictureBlock { target.write_all(&self.picture_type.to_idx().to_be_bytes())?; - let mime = self.mime.to_string(); - target.write_all(&u32::try_from(mime.len()).unwrap().to_be_bytes())?; - target.write_all(self.mime.to_string().as_bytes())?; - drop(mime); + #[expect(clippy::expect_used)] + { + let mime = self.mime.to_string(); + target.write_all( + &u32::try_from(mime.len()) + .expect("mime length does not fit into u32") + .to_be_bytes(), + )?; + target.write_all(self.mime.to_string().as_bytes())?; + drop(mime); - target.write_all(&u32::try_from(self.description.len()).unwrap().to_be_bytes())?; - target.write_all(self.description.as_bytes())?; + target.write_all( + &u32::try_from(self.description.len()) + .expect("description length does not fit into u32") + .to_be_bytes(), + )?; + target.write_all(self.description.as_bytes())?; - target.write_all(&self.width.to_be_bytes())?; - target.write_all(&self.height.to_be_bytes())?; - target.write_all(&self.bit_depth.to_be_bytes())?; - target.write_all(&self.color_count.to_be_bytes())?; + target.write_all(&self.width.to_be_bytes())?; + target.write_all(&self.height.to_be_bytes())?; + target.write_all(&self.bit_depth.to_be_bytes())?; + target.write_all(&self.color_count.to_be_bytes())?; - target.write_all(&u32::try_from(self.img_data.len()).unwrap().to_be_bytes())?; + target.write_all( + &u32::try_from(self.img_data.len()) + .expect("image data length does not fit into u32") + .to_be_bytes(), + )?; + } target.write_all(&self.img_data)?; return Ok(()); diff --git a/crates/pile-audio/src/flac/blocks/seektable.rs b/crates/pile-audio/src/flac/blocks/seektable.rs index 6edf16e..67065ef 100644 --- a/crates/pile-audio/src/flac/blocks/seektable.rs +++ b/crates/pile-audio/src/flac/blocks/seektable.rs @@ -25,8 +25,12 @@ impl FlacMetablockDecode for FlacSeektableBlock { } impl FlacMetablockEncode for FlacSeektableBlock { + #[expect(clippy::expect_used)] fn get_len(&self) -> u32 { - self.data.len().try_into().unwrap() + self.data + .len() + .try_into() + .expect("seektable size does not fit into u32") } fn encode( diff --git a/crates/pile-audio/src/flac/errors.rs b/crates/pile-audio/src/flac/errors.rs index 36a2980..8da733c 100644 --- a/crates/pile-audio/src/flac/errors.rs +++ b/crates/pile-audio/src/flac/errors.rs @@ -6,7 +6,7 @@ use crate::common::{ use std::string::FromUtf8Error; use thiserror::Error; -#[allow(missing_docs)] +#[expect(missing_docs)] #[derive(Debug, Error)] pub enum FlacDecodeError { /// FLAC does not start with 0x66 0x4C 0x61 0x43 @@ -46,7 +46,7 @@ pub enum FlacDecodeError { PictureTypeError(#[from] PictureTypeError), } -#[allow(missing_docs)] +#[expect(missing_docs)] #[derive(Debug, Error)] pub enum FlacEncodeError { /// We encountered an i/o error while processing diff --git a/crates/pile-audio/src/flac/mod.rs b/crates/pile-audio/src/flac/mod.rs index 302a102..1751247 100644 --- a/crates/pile-audio/src/flac/mod.rs +++ b/crates/pile-audio/src/flac/mod.rs @@ -213,6 +213,7 @@ mod tests { } /// A list of test files and their expected output + #[expect(clippy::unwrap_used)] pub fn manifest() -> [FlacTestCase; 23] { [ FlacTestCase::Error { diff --git a/crates/pile-audio/src/flac/proc/metastrip.rs b/crates/pile-audio/src/flac/proc/metastrip.rs index 2f51f75..4fa60f3 100644 --- a/crates/pile-audio/src/flac/proc/metastrip.rs +++ b/crates/pile-audio/src/flac/proc/metastrip.rs @@ -88,7 +88,11 @@ impl FlacMetaStrip { // We don't need to store audioframes in our last_block buffer, // since they do not have an `is_last` flag. if matches!(self.last_block, Some(FlacBlock::AudioFrame(_))) { - let x = self.last_block.take().unwrap(); + #[expect(clippy::expect_used)] + let x = self + .last_block + .take() + .expect("last_block is Some(AudioFrame), just matched"); x.encode(false, true, target)?; } @@ -107,6 +111,7 @@ mod tests { tests::manifest, }; + #[expect(clippy::unwrap_used)] fn test_strip( test_case: &FlacTestCase, fragment_size_range: Option>, diff --git a/crates/pile-audio/src/flac/proc/pictures.rs b/crates/pile-audio/src/flac/proc/pictures.rs index 9661f49..b77d957 100644 --- a/crates/pile-audio/src/flac/proc/pictures.rs +++ b/crates/pile-audio/src/flac/proc/pictures.rs @@ -90,6 +90,7 @@ mod tests { tests::{FlacBlockOutput, FlacTestCase, manifest}, }; + #[expect(clippy::unwrap_used)] fn test_pictures( test_case: &FlacTestCase, fragment_size_range: Option>, diff --git a/crates/pile-audio/src/nodes/extractcovers.rs b/crates/pile-audio/src/nodes/extractcovers.rs new file mode 100644 index 0000000..2a76d4c --- /dev/null +++ b/crates/pile-audio/src/nodes/extractcovers.rs @@ -0,0 +1,112 @@ +use crate::flac::proc::pictures::FlacPictureReader; +use std::{collections::BTreeMap, sync::Arc}; +use tracing::{debug, trace}; + +pub struct ExtractCovers {} + +impl NodeBuilder for ExtractCovers { + fn build<'ctx>(&self) -> Box> { + Box::new(Self {}) + } +} + +// Inputs: "data" - Bytes +#[async_trait] +impl<'ctx> Node<'ctx> for ExtractCovers { + async fn run( + &self, + ctx: &CopperContext<'ctx>, + this_node: ThisNodeInfo, + params: NodeParameters, + mut input: BTreeMap>, + ) -> Result, RunNodeError> { + // + // Extract parameters + // + params.err_if_not_empty()?; + + // + // Extract arguments + // + let data = input.remove(&PortName::new("data")); + if data.is_none() { + return Err(RunNodeError::MissingInput { + port: PortName::new("data"), + }); + } + if let Some((port, _)) = input.pop_first() { + return Err(RunNodeError::UnrecognizedInput { port }); + } + + trace!( + message = "Inputs ready, preparing reader", + node_id = ?this_node.id + ); + + let mut reader = match data.unwrap() { + None => { + return Err(RunNodeError::RequiredInputNull { + port: PortName::new("data"), + }); + } + + Some(PipeData::Blob { source, .. }) => source.build(ctx).await?, + + _ => { + return Err(RunNodeError::BadInputType { + port: PortName::new("data"), + }); + } + }; + + // + // Setup is done, extract covers + // + debug!( + message = "Extracting covers", + node_id = ?this_node.id + ); + let mut picreader = FlacPictureReader::new(); + + while let Some(data) = reader.next_fragment().await? { + picreader + .push_data(&data) + .map_err(|e| RunNodeError::Other(Arc::new(e)))?; + } + + picreader + .finish() + .map_err(|e| RunNodeError::Other(Arc::new(e)))?; + + // + // Send the first cover we find + // + + let mut output = BTreeMap::new(); + + if let Some(picture) = picreader.pop_picture() { + debug!( + message = "Found a cover, sending", + node_id = ?this_node.id, + picture = ?picture + ); + + output.insert( + PortName::new("cover_data"), + PipeData::Blob { + source: BytesProcessorBuilder::new(RawBytesSource::Array { + mime: picture.mime.clone(), + data: Arc::new(picture.img_data), + }), + }, + ); + } else { + debug!( + message = "Did not find a cover, sending None", + node_id = ?this_node.id + ); + } + + return Ok(output); + } +} diff --git a/crates/pile-audio/src/nodes/extracttags.rs b/crates/pile-audio/src/nodes/extracttags.rs new file mode 100644 index 0000000..370c17a --- /dev/null +++ b/crates/pile-audio/src/nodes/extracttags.rs @@ -0,0 +1,164 @@ +use crate::{ + common::tagtype::TagType, + flac::blockread::{FlacBlock, FlacBlockReader, FlacBlockSelector}, +}; +use async_trait::async_trait; +use copper_piper::{ + base::{Node, NodeBuilder, NodeParameterValue, PortName, RunNodeError, ThisNodeInfo}, + data::PipeData, + helpers::NodeParameters, + CopperContext, +}; +use std::{collections::BTreeMap, sync::Arc}; +use tracing::{debug, trace}; + +/// Extract tags from audio metadata +pub struct ExtractTags {} + +impl NodeBuilder for ExtractTags { + fn build<'ctx>(&self) -> Box> { + Box::new(Self {}) + } +} + +// Inputs: "data" - Bytes +// Outputs: variable, depends on tags +#[async_trait] +impl<'ctx> Node<'ctx> for ExtractTags { + async fn run( + &self, + ctx: &CopperContext<'ctx>, + this_node: ThisNodeInfo, + mut params: NodeParameters, + mut input: BTreeMap>, + ) -> Result, RunNodeError> { + // + // Extract parameters + // + + let tags = { + let mut tags: BTreeMap = BTreeMap::new(); + let val = params.pop_val("tags")?; + + match val { + NodeParameterValue::List(list) => { + for t in list { + match t { + NodeParameterValue::String(s) => { + tags.insert(PortName::new(s.as_str()), s.as_str().into()); + } + _ => { + return Err(RunNodeError::BadParameterType { + parameter: "tags".into(), + }) + } + } + } + } + _ => { + return Err(RunNodeError::BadParameterType { + parameter: "tags".into(), + }) + } + }; + + tags + }; + + params.err_if_not_empty()?; + + // + // Extract arguments + // + let data = input.remove(&PortName::new("data")); + if data.is_none() { + return Err(RunNodeError::MissingInput { + port: PortName::new("data"), + }); + } + if let Some((port, _)) = input.pop_first() { + return Err(RunNodeError::UnrecognizedInput { port }); + } + + trace!( + message = "Inputs ready, preparing reader", + node_id = ?this_node.id + ); + + let mut reader = match data.unwrap() { + None => { + return Err(RunNodeError::RequiredInputNull { + port: PortName::new("data"), + }) + } + + Some(PipeData::Blob { source, .. }) => source.build(ctx).await?, + + _ => { + return Err(RunNodeError::BadInputType { + port: PortName::new("data"), + }) + } + }; + + // + // Setup is done, extract tags + // + debug!( + message = "Extracting tags", + node_id = ?this_node.id + ); + + let mut block_reader = FlacBlockReader::new(FlacBlockSelector { + pick_vorbiscomment: true, + ..Default::default() + }); + + while let Some(data) = reader.next_fragment().await? { + block_reader + .push_data(&data) + .map_err(|e| RunNodeError::Other(Arc::new(e)))?; + } + + block_reader + .finish() + .map_err(|e| RunNodeError::Other(Arc::new(e)))?; + + // + // Return tags + // + + let mut output = BTreeMap::new(); + + while block_reader.has_block() { + let b = block_reader.pop_block().unwrap(); + match b { + FlacBlock::VorbisComment(comment) => { + for (port, tag_type) in tags.iter() { + if let Some((_, tag_value)) = + comment.comment.comments.iter().find(|(t, _)| t == tag_type) + { + let x = output.insert( + port.clone(), + PipeData::Text { + value: tag_value.clone(), + }, + ); + + // Each insertion should be new + assert!(x.is_none()); + } + } + } + + // `reader` filters blocks for us + _ => unreachable!(), + } + + // We should only have one comment block + assert!(!block_reader.has_block()); + } + + return Ok(output); + } +} diff --git a/crates/pile-audio/src/nodes/mod.rs b/crates/pile-audio/src/nodes/mod.rs new file mode 100644 index 0000000..3007874 --- /dev/null +++ b/crates/pile-audio/src/nodes/mod.rs @@ -0,0 +1,36 @@ +//! Pipeline nodes for processing audio files +use copper_piper::base::{NodeDispatcher, RegisterNodeError}; +use std::collections::BTreeMap; + +mod extractcovers; +mod extracttags; +mod striptags; + +/// Register all nodes in this module into the given dispatcher +pub fn register(dispatcher: &mut NodeDispatcher) -> Result<(), RegisterNodeError> { + dispatcher + .register_node( + "StripTags", + BTreeMap::new(), + Box::new(striptags::StripTags {}), + ) + .unwrap(); + + dispatcher + .register_node( + "ExtractCovers", + BTreeMap::new(), + Box::new(extractcovers::ExtractCovers {}), + ) + .unwrap(); + + dispatcher + .register_node( + "ExtractTags", + BTreeMap::new(), + Box::new(extracttags::ExtractTags {}), + ) + .unwrap(); + + return Ok(()); +} diff --git a/crates/pile-audio/src/nodes/striptags.rs b/crates/pile-audio/src/nodes/striptags.rs new file mode 100644 index 0000000..24ec31f --- /dev/null +++ b/crates/pile-audio/src/nodes/striptags.rs @@ -0,0 +1,181 @@ +//! Strip all tags from an audio file + +use crate::flac::proc::metastrip::FlacMetaStrip; +use async_trait::async_trait; +use copper_piper::{ + base::{Node, NodeBuilder, NodeId, PortName, RunNodeError, ThisNodeInfo}, + data::PipeData, + helpers::{ + processor::{StreamProcessor, StreamProcessorBuilder}, + NodeParameters, + }, + CopperContext, +}; +use copper_util::MimeType; +use smartstring::{LazyCompact, SmartString}; +use std::{collections::BTreeMap, sync::Arc}; +use tokio::sync::mpsc::{Receiver, Sender}; +use tracing::debug; + +/// Strip all metadata from an audio file +pub struct StripTags {} + +impl NodeBuilder for StripTags { + fn build<'ctx>(&self) -> Box> { + Box::new(Self {}) + } +} + +// Input: "data" - Blob +// Output: "out" - Blob +#[async_trait] +impl<'ctx> Node<'ctx> for StripTags { + async fn run( + &self, + _ctx: &CopperContext<'ctx>, + this_node: ThisNodeInfo, + params: NodeParameters, + mut input: BTreeMap>, + ) -> Result, RunNodeError> { + // + // Extract parameters + // + params.err_if_not_empty()?; + + // + // Extract arguments + // + let data = input.remove(&PortName::new("data")); + if data.is_none() { + return Err(RunNodeError::MissingInput { + port: PortName::new("data"), + }); + } + if let Some((port, _)) = input.pop_first() { + return Err(RunNodeError::UnrecognizedInput { port }); + } + + let source = match data.unwrap() { + None => { + return Err(RunNodeError::RequiredInputNull { + port: PortName::new("data"), + }) + } + + Some(PipeData::Blob { source, .. }) => source, + + _ => { + return Err(RunNodeError::BadInputType { + port: PortName::new("data"), + }) + } + }; + + debug!( + message = "Setup done, stripping tags", + node_id = ?this_node.id + ); + + let mut output = BTreeMap::new(); + + output.insert( + PortName::new("out"), + PipeData::Blob { + source: source.add_processor(Arc::new(TagStripProcessor { + node_id: this_node.id.clone(), + node_type: this_node.node_type.clone(), + })), + }, + ); + + return Ok(output); + } +} + +#[derive(Debug, Clone)] +struct TagStripProcessor { + node_id: NodeId, + node_type: SmartString, +} + +impl StreamProcessorBuilder for TagStripProcessor { + fn build(&self) -> Box { + Box::new(self.clone()) + } +} + +#[async_trait] +impl StreamProcessor for TagStripProcessor { + fn mime(&self) -> &MimeType { + return &MimeType::Flac; + } + + fn name(&self) -> &'static str { + "TagStripProcessor" + } + + fn source_node_id(&self) -> &NodeId { + &self.node_id + } + + /// Return the type of the node that created this processor + fn source_node_type(&self) -> &str { + &self.node_type + } + + async fn run( + &self, + mut source: Receiver>>, + sink: Sender>>, + max_buffer_size: usize, + ) -> Result<(), RunNodeError> { + // + // Strip tags + // + + let mut strip = FlacMetaStrip::new(); + let mut out_bytes = Vec::new(); + + while let Some(data) = source.recv().await { + strip + .push_data(&data) + .map_err(|e| RunNodeError::Other(Arc::new(e)))?; + + strip + .read_data(&mut out_bytes) + .map_err(|e| RunNodeError::Other(Arc::new(e)))?; + + if out_bytes.len() >= max_buffer_size { + let x = std::mem::take(&mut out_bytes); + + match sink.send(Arc::new(x)).await { + Ok(()) => {} + + // Not an error, our receiver was dropped. + // Exit early if that happens! + Err(_) => return Ok(()), + }; + } + } + + strip + .finish() + .map_err(|e| RunNodeError::Other(Arc::new(e)))?; + + while strip.has_data() { + strip + .read_data(&mut out_bytes) + .map_err(|e| RunNodeError::Other(Arc::new(e)))?; + } + + match sink.send(Arc::new(out_bytes)).await { + Ok(()) => {} + + // Not an error, our receiver was dropped. + // Exit early if that happens! + Err(_) => return Ok(()), + }; + + return Ok(()); + } +} diff --git a/crates/pile-config/src/config.toml b/crates/pile-config/src/config.toml index 505321f..689a6c1 100644 --- a/crates/pile-config/src/config.toml +++ b/crates/pile-config/src/config.toml @@ -8,7 +8,7 @@ name = "dataset" # # working_dir = ".pile" -# Data sources avaliable in this dataset +# Data sources available in this dataset source."music" = { type = "flac", path = ["music", "music-2"] } @@ -19,7 +19,7 @@ source."music" = { type = "flac", path = ["music", "music-2"] } # # "field-name" = { # # The type of data this field contains. -# # only text is supportedin this verison. +# # only text is supported in this version. # type = "text", # # # An array of jsonpaths (rfc9535) used to extract this field from each source entry. diff --git a/crates/pile-config/src/lib.rs b/crates/pile-config/src/lib.rs index 67525c6..8aaa828 100644 --- a/crates/pile-config/src/lib.rs +++ b/crates/pile-config/src/lib.rs @@ -10,8 +10,9 @@ pub use misc::*; pub static INIT_DB_TOML: &str = include_str!("./config.toml"); #[test] +#[expect(clippy::unwrap_used)] fn init_db_toml_valid() { - toml::from_str::(INIT_DB_TOML).unwrap(); + toml::from_str::(INIT_DB_TOML).expect("INIT_DB_TOML should be valid TOML"); } #[derive(Debug, Clone, Deserialize)] diff --git a/crates/pile-config/src/misc.rs b/crates/pile-config/src/misc.rs index 831c2f9..b0adb99 100644 --- a/crates/pile-config/src/misc.rs +++ b/crates/pile-config/src/misc.rs @@ -22,7 +22,7 @@ impl OneOrMany { pub fn as_slice(&self) -> &[T] { match self { - Self::One(x) => slice::from_ref(&x), + Self::One(x) => slice::from_ref(x), Self::Many(x) => &x[..], } } diff --git a/crates/pile-config/src/post.rs b/crates/pile-config/src/post.rs index 5b43116..326acb1 100644 --- a/crates/pile-config/src/post.rs +++ b/crates/pile-config/src/post.rs @@ -62,7 +62,7 @@ impl FieldSpecPost { Value::Bool(_) | Value::Number(_) => Value::String(val.to_string()), Value::String(x) => { - Value::String(x.strip_suffix(trim_suffix).unwrap_or(&x).to_owned()) + Value::String(x.strip_suffix(trim_suffix).unwrap_or(x).to_owned()) } Value::Array(x) => { @@ -73,7 +73,7 @@ impl FieldSpecPost { x.iter() .map(|x| { ( - x.0.strip_suffix(trim_suffix).unwrap_or(&x.0).to_owned(), + x.0.strip_suffix(trim_suffix).unwrap_or(x.0).to_owned(), self.apply(x.1), ) }) @@ -88,7 +88,7 @@ impl FieldSpecPost { Value::Bool(_) | Value::Number(_) => Value::String(val.to_string()), Value::String(x) => { - Value::String(x.strip_prefix(trim_prefix).unwrap_or(&x).to_owned()) + Value::String(x.strip_prefix(trim_prefix).unwrap_or(x).to_owned()) } Value::Array(x) => { diff --git a/crates/pile-dataset/Cargo.toml b/crates/pile-dataset/Cargo.toml index 566f693..43399af 100644 --- a/crates/pile-dataset/Cargo.toml +++ b/crates/pile-dataset/Cargo.toml @@ -10,6 +10,7 @@ workspace = true [dependencies] pile-config = { workspace = true } pile-audio = { workspace = true } +pile-toolbox = { workspace = true } serde_json = { workspace = true } itertools = { workspace = true } @@ -18,3 +19,5 @@ tantivy = { workspace = true } tracing = { workspace = true } jsonpath-rust = { workspace = true } chrono = { workspace = true } +toml = { workspace = true } +thiserror = { workspace = true } diff --git a/crates/pile-dataset/src/dataset.rs b/crates/pile-dataset/src/dataset.rs new file mode 100644 index 0000000..522a927 --- /dev/null +++ b/crates/pile-dataset/src/dataset.rs @@ -0,0 +1,229 @@ +use chrono::{DateTime, Utc}; +use pile_config::{ConfigToml, Source}; +use pile_toolbox::cancelabletask::{CancelFlag, CancelableTaskError}; +use std::{io::ErrorKind, path::PathBuf, sync::Arc}; +use tantivy::{Executor, Index, IndexWriter, TantivyError, collector::TopDocs}; +use thiserror::Error; +use tracing::{info, trace, warn}; + +use crate::{ + DataSource, + index::{DbFtsIndex, FtsLookupResult}, + path_ts_earliest, + source::DirDataSource, +}; + +#[derive(Debug, Error)] +pub enum DatasetError { + #[error("{0}")] + IoError(#[from] std::io::Error), + + #[error("{0}")] + TantivyError(#[from] TantivyError), + + #[error("this dataset does not have an fts index")] + NoFtsIndex, +} + +pub struct Dataset { + pub path_config: PathBuf, + pub path_parent: PathBuf, + pub path_workdir: PathBuf, + + pub config: ConfigToml, +} + +impl Dataset { + pub fn open(config: impl Into) -> Result { + let path_config = config.into(); + let path_parent = path_config + .parent() + .ok_or(std::io::Error::new( + ErrorKind::NotADirectory, + format!("Config file {} has no parent", path_config.display()), + ))? + .to_owned(); + + let config = { + let config = std::fs::read_to_string(&path_config)?; + let config: Result = toml::from_str(&config); + + match config { + Ok(config) => { + trace!(message = "Loaded config", ?config); + config + } + + Err(error) => { + return Err(std::io::Error::new( + ErrorKind::InvalidData, + format!("{} is invalid:\n{error}", path_config.display()), + )); + } + } + }; + + let path_workdir = config + .dataset + .working_dir + .clone() + .unwrap_or(path_parent.join(".pile")) + .join(config.dataset.name.as_str()); + + return Ok(Self { + path_config, + path_parent, + path_workdir, + config, + }); + } + + // + // MARK: fts + // + + /// Refresh this dataset's fts index + pub fn fts_refresh( + &self, + flag: Option, + ) -> Result<(), CancelableTaskError> { + let fts_tmp_dir = self.path_workdir.join(".tmp-fts"); + let fts_dir = self.path_workdir.join("fts"); + + if fts_tmp_dir.is_dir() { + warn!("Removing temporary index in {}", fts_dir.display()); + std::fs::remove_dir_all(&fts_tmp_dir).map_err(DatasetError::from)?; + } + if fts_dir.is_dir() { + warn!("Removing existing index in {}", fts_dir.display()); + std::fs::remove_dir_all(&fts_dir).map_err(DatasetError::from)?; + } + + std::fs::create_dir_all(&fts_tmp_dir).map_err(DatasetError::from)?; + + let mut sources = Vec::new(); + for (name, source) in &self.config.dataset.source { + match source { + Source::Flac { path: dir } => { + let source = DirDataSource::new(name, dir.clone().to_vec()); + sources.push(source); + } + } + } + + let db_index = DbFtsIndex::new(&fts_tmp_dir, &self.config); + let mut index = Index::create_in_dir(&fts_tmp_dir, db_index.schema.clone()) + .map_err(DatasetError::from)?; + index.set_executor(Executor::multi_thread(10, "build-fts").map_err(DatasetError::from)?); + let mut index_writer: IndexWriter = index.writer(15_000_000).map_err(DatasetError::from)?; + + for s in sources { + info!("Processing source {:?}", s.name); + + for i in s.iter() { + let (k, v) = i.map_err(DatasetError::from)?; + + let doc = match db_index + .entry_to_document(&*v) + .map_err(DatasetError::from)? + { + Some(x) => x, + None => { + warn!("Skipping {k:?}, document is empty"); + continue; + } + }; + index_writer.add_document(doc).map_err(DatasetError::from)?; + + if let Some(flag) = flag.as_ref() + && flag.is_cancelled() + { + return Err(CancelableTaskError::Cancelled); + } + } + } + + info!("Committing index"); + index_writer.commit().map_err(DatasetError::from)?; + std::fs::rename(&fts_tmp_dir, &fts_dir).map_err(DatasetError::from)?; + + return Ok(()); + } + + pub fn fts_lookup( + &self, + query: &str, + top_n: usize, + ) -> Result, DatasetError> { + let fts_dir = self.path_workdir.join("fts"); + + if !fts_dir.exists() { + return Err(DatasetError::NoFtsIndex); + } + if !fts_dir.is_dir() { + return Err(std::io::Error::new( + ErrorKind::NotADirectory, + format!("fts index {} is not a directory", fts_dir.display()), + ) + .into()); + } + + let db_index = DbFtsIndex::new(&fts_dir, &self.config); + let results = db_index.lookup(query, Arc::new(TopDocs::with_limit(top_n)))?; + return Ok(results); + } + + /// Time at which fts was created + pub fn ts_fts(&self) -> Result>, std::io::Error> { + let fts_dir = self.path_workdir.join("fts"); + + if !fts_dir.exists() { + return Ok(None); + } + if !fts_dir.is_dir() { + return Err(std::io::Error::new( + ErrorKind::NotADirectory, + format!("fts index {} is not a directory", fts_dir.display()), + )); + } + + return path_ts_earliest(&fts_dir); + } + + /// Time at which data was last modified + pub fn ts_data(&self) -> Result>, std::io::Error> { + let mut ts: Option> = None; + + for (label, source) in &self.config.dataset.source { + match source { + Source::Flac { path } => { + let s = DirDataSource::new(label, path.clone().to_vec()); + match (ts, s.latest_change()?) { + (_, None) => continue, + (None, Some(new)) => ts = Some(new), + (Some(old), Some(new)) => ts = Some(old.max(new)), + }; + } + } + } + + return Ok(ts); + } + + /// Returns true if we do not have an fts index, + /// or if our fts index is older than our data. + pub fn needs_fts(&self) -> Result { + let ts_fts = self.ts_fts()?; + let ts_data = self.ts_data()?; + + match (ts_fts, ts_data) { + (None, Some(_)) => return Ok(true), + (None, None) | (Some(_), None) => { + warn!("Could not determine data age"); + return Ok(false); + } + + (Some(ts_fts), Some(ts_data)) => return Ok(ts_data > ts_fts), + } + } +} diff --git a/crates/pile-dataset/src/index/index_fts.rs b/crates/pile-dataset/src/index/index_fts.rs index ce7e89c..6649f86 100644 --- a/crates/pile-dataset/src/index/index_fts.rs +++ b/crates/pile-dataset/src/index/index_fts.rs @@ -27,8 +27,8 @@ pub struct DbFtsIndex { impl DbFtsIndex { fn fts_cfg(&self) -> &DatasetFts { - static DEFAULT: LazyLock = LazyLock::new(|| DatasetFts::default()); - &self.cfg.fts.as_ref().unwrap_or(&DEFAULT) + static DEFAULT: LazyLock = LazyLock::new(DatasetFts::default); + self.cfg.fts.as_ref().unwrap_or(&DEFAULT) } } @@ -47,9 +47,9 @@ impl DbFtsIndex { let fields = &cfg.fts.as_ref().unwrap_or(&default).fields; for (name, field) in fields { if field.tokenize { - schema_builder.add_text_field(&name, schema::TEXT); + schema_builder.add_text_field(name, schema::TEXT); } else { - schema_builder.add_text_field(&name, schema::STRING); + schema_builder.add_text_field(name, schema::STRING); } } @@ -70,19 +70,23 @@ impl DbFtsIndex { ) -> Result, TantivyError> { let mut doc = TantivyDocument::default(); - { - let f_ptr = self.schema.get_field("_meta_source")?; - doc.add_text(f_ptr, item.source_name()); - } + let key = match item.key().to_string() { + Some(x) => x, + None => { + warn!( + message = "Item key cannot be converted to a string, skipping", + key = ?item.key(), + ); + return Ok(None); + } + }; - { - let f_ptr = self.schema.get_field("_meta_key")?; - doc.add_text(f_ptr, item.key().to_string()); - } + doc.add_text(self.schema.get_field("_meta_source")?, item.source_name()); + doc.add_text(self.schema.get_field("_meta_key")?, key); - let json = item.json().unwrap(); + let json = item.json()?; let mut empty = true; - for (name, _field) in &self.fts_cfg().fields { + for name in self.fts_cfg().fields.keys() { let val = match self.get_field(&json, name)? { Some(x) => x, None => continue, @@ -118,7 +122,7 @@ impl DbFtsIndex { // Try paths in order, using the first value we find 'outer: for path in field.path.as_slice() { - let val = match json.query(&path) { + let val = match json.query(path) { Ok(mut x) => { if x.len() > 1 { warn!( @@ -168,6 +172,7 @@ impl DbFtsIndex { loop { val = match val { + #[expect(clippy::unwrap_used)] Value::Array(ref mut x) => { if x.len() == 1 { x.pop().unwrap() diff --git a/crates/pile-dataset/src/item/flac.rs b/crates/pile-dataset/src/item/flac.rs index 3185c9c..f73ee36 100644 --- a/crates/pile-dataset/src/item/flac.rs +++ b/crates/pile-dataset/src/item/flac.rs @@ -6,13 +6,14 @@ use std::{ }; use pile_audio::flac::blockread::{FlacBlock, FlacBlockReader, FlacBlockSelector}; +use pile_config::Label; use serde_json::{Map, Value}; use crate::Item; pub struct FlacItem { pub(crate) path: PathBuf, - pub(crate) source_name: String, + pub(crate) source_name: Label, } impl Debug for FlacItem { @@ -47,8 +48,10 @@ impl Item for FlacItem { let mut data = Vec::new(); file.read_to_end(&mut data)?; - block_reader.push_data(&data).unwrap(); - block_reader.finish().unwrap(); + block_reader + .push_data(&data) + .map_err(std::io::Error::other)?; + block_reader.finish().map_err(std::io::Error::other)?; // // Return tags @@ -56,9 +59,8 @@ impl Item for FlacItem { let mut output = Map::new(); - while block_reader.has_block() { - let b = block_reader.pop_block().unwrap(); - match b { + while let Some(block) = block_reader.pop_block() { + match block { FlacBlock::VorbisComment(comment) => { for (k, v) in comment.comment.comments { let k = k.to_string(); @@ -67,9 +69,11 @@ impl Item for FlacItem { match e { None => { - output.insert(k.to_string(), Value::Array(vec![v])); + output.insert(k.clone(), Value::Array(vec![v])); } Some(e) => { + // We always insert an array + #[expect(clippy::unwrap_used)] e.as_array_mut().unwrap().push(v); } } diff --git a/crates/pile-dataset/src/lib.rs b/crates/pile-dataset/src/lib.rs index 23b1e2d..c0a9666 100644 --- a/crates/pile-dataset/src/lib.rs +++ b/crates/pile-dataset/src/lib.rs @@ -4,6 +4,9 @@ pub use traits::*; mod misc; pub use misc::*; +mod dataset; +pub use dataset::*; + pub mod index; pub mod item; pub mod source; diff --git a/crates/pile-dataset/src/misc.rs b/crates/pile-dataset/src/misc.rs index 48f9525..c86aa61 100644 --- a/crates/pile-dataset/src/misc.rs +++ b/crates/pile-dataset/src/misc.rs @@ -2,41 +2,65 @@ use chrono::{DateTime, Utc}; use std::fs; use std::path::Path; -/// Returns the age of a path as a chrono DateTime. -/// - If the path doesn't exist, returns None +/// Returns the age of a path as a [DateTime]. +/// - If the path doesn't exist, returns [None] /// - If it's a file, returns the modified time /// - If it's a directory, returns the LATEST modified time of all files within -pub fn path_age(path: impl AsRef) -> Option> { +pub fn path_ts_latest(path: impl AsRef) -> Result>, std::io::Error> { let path = path.as_ref(); - - // Check if path exists if !path.exists() { - return None; + return Ok(None); } - let metadata = fs::metadata(path).ok()?; + let metadata = fs::metadata(path)?; if metadata.is_file() { - // For files, return the modified time - let modified = metadata.modified().ok()?; - Some(modified.into()) + let modified = metadata.modified()?; + Ok(Some(modified.into())) } else if metadata.is_dir() { - // For directories, find the latest modified time of all files find_latest_modified(path) } else { - None + Ok(None) } } -fn find_latest_modified(dir: &Path) -> Option> { +/// Returns the age of a path as a [DateTime]. +/// - If the path doesn't exist, returns [None] +/// - If it's a file, returns the modified time +/// - If it's a directory, returns the EARLIEST modified time of all files within +pub fn path_ts_earliest(path: impl AsRef) -> Result>, std::io::Error> { + let path = path.as_ref(); + if !path.exists() { + return Ok(None); + } + + let metadata = fs::metadata(path)?; + + if metadata.is_file() { + let modified = metadata.modified()?; + Ok(Some(modified.into())) + } else if metadata.is_dir() { + find_earliest_modified(path) + } else { + Ok(None) + } +} + +fn find_latest_modified(dir: &Path) -> Result>, std::io::Error> { let mut latest: Option> = None; - // Read directory entries - let entries = fs::read_dir(dir).ok()?; + // Include the directory's own modification time + let dir_metadata = fs::metadata(dir)?; + if let Ok(modified) = dir_metadata.modified() { + let dt: DateTime = modified.into(); + latest = Some(dt); + } + + let entries = fs::read_dir(dir)?; for entry in entries.flatten() { let path = entry.path(); - let metadata = entry.metadata().ok()?; + let metadata = entry.metadata()?; if metadata.is_file() { if let Ok(modified) = metadata.modified() { @@ -46,16 +70,50 @@ fn find_latest_modified(dir: &Path) -> Option> { _ => dt, }); } - } else if metadata.is_dir() { - // Recursively check subdirectories - if let Some(dir_latest) = find_latest_modified(&path) { + } else if metadata.is_dir() + && let Some(dir_latest) = find_latest_modified(&path)? { latest = Some(match latest { Some(prev) if prev > dir_latest => prev, _ => dir_latest, }); } - } } - latest + return Ok(latest); +} + +fn find_earliest_modified(dir: &Path) -> Result>, std::io::Error> { + let mut earliest: Option> = None; + + // Include the directory's own modification time + let dir_metadata = fs::metadata(dir)?; + if let Ok(modified) = dir_metadata.modified() { + let dt: DateTime = modified.into(); + earliest = Some(dt); + } + + let entries = fs::read_dir(dir)?; + + for entry in entries.flatten() { + let path = entry.path(); + let metadata = entry.metadata()?; + + if metadata.is_file() { + if let Ok(modified) = metadata.modified() { + let dt: DateTime = modified.into(); + earliest = Some(match earliest { + Some(prev) if prev < dt => prev, + _ => dt, + }); + } + } else if metadata.is_dir() + && let Some(dir_earliest) = find_earliest_modified(&path)? { + earliest = Some(match earliest { + Some(prev) if prev < dir_earliest => prev, + _ => dir_earliest, + }); + } + } + + return Ok(earliest); } diff --git a/crates/pile-dataset/src/source/dir.rs b/crates/pile-dataset/src/source/dir.rs index 0fb6101..0f89624 100644 --- a/crates/pile-dataset/src/source/dir.rs +++ b/crates/pile-dataset/src/source/dir.rs @@ -1,19 +1,21 @@ +use chrono::{DateTime, Utc}; use itertools::Itertools; -use std::{io::ErrorKind, path::PathBuf}; +use pile_config::Label; +use std::path::PathBuf; use walkdir::WalkDir; -use crate::{DataSource, Item, item::FlacItem}; +use crate::{DataSource, Item, item::FlacItem, path_ts_latest}; #[derive(Debug)] pub struct DirDataSource { - pub name: String, + pub name: Label, pub dirs: Vec, } impl DirDataSource { - pub fn new(name: impl Into, dirs: Vec) -> Self { + pub fn new(name: &Label, dirs: Vec) -> Self { Self { - name: name.into(), + name: name.clone(), dirs, } } @@ -40,16 +42,12 @@ impl DataSource for DirDataSource { return self .dirs .iter() - .map(|x| WalkDir::new(x).into_iter().map_ok(move |d| (x, d))) - .flatten() - .into_iter() - .filter_ok(|(_, entry)| entry.file_type().is_file()) + .flat_map(|x| WalkDir::new(x).into_iter().map_ok(move |d| (x, d))) + .filter_ok(|(_, entry)| !entry.file_type().is_dir()) .map(|x| match x { Err(err) => { let msg = format!("other walkdir error: {err:?}"); - Err(err - .into_io_error() - .unwrap_or(std::io::Error::new(ErrorKind::Other, msg))) + Err(err.into_io_error().unwrap_or(std::io::Error::other(msg))) } Ok((_, entry)) => { let path = entry.into_path(); @@ -63,4 +61,23 @@ impl DataSource for DirDataSource { } }); } + + fn latest_change(&self) -> Result>, Self::Error> { + let mut ts: Option> = None; + + for path in &self.dirs { + if !path.exists() { + continue; + } + + let new = path_ts_latest(path)?; + match (ts, new) { + (_, None) => continue, + (None, Some(new)) => ts = Some(new), + (Some(old), Some(new)) => ts = Some(old.max(new)), + }; + } + + return Ok(ts); + } } diff --git a/crates/pile-dataset/src/traits.rs b/crates/pile-dataset/src/traits.rs index a71a116..1f5fb47 100644 --- a/crates/pile-dataset/src/traits.rs +++ b/crates/pile-dataset/src/traits.rs @@ -1,3 +1,4 @@ +use chrono::{DateTime, Utc}; use std::{error::Error, fmt::Debug, path::PathBuf}; /// A read-only set of [Item]s. @@ -15,6 +16,9 @@ pub trait DataSource { fn iter( &self, ) -> impl Iterator>), Self::Error>>; + + /// Return the time of the latest change to the data in this source + fn latest_change(&self) -> Result>, Self::Error>; } pub trait Item: Debug + Send + Sync + 'static { @@ -34,7 +38,10 @@ pub trait Item: Debug + Send + Sync + 'static { // pub trait Key: Debug + Clone + Send + Sync + 'static { - fn to_string(&self) -> String; + /// Convert this key to a string, returning `None` + /// if we encounter any kind of error. + fn to_string(&self) -> Option; + fn from_string(str: &str) -> Option; } @@ -43,7 +50,7 @@ impl Key for PathBuf { str.parse().ok() } - fn to_string(&self) -> String { - self.to_str().expect("path is not a string").to_owned() + fn to_string(&self) -> Option { + self.to_str().map(|x| x.to_owned()) } } diff --git a/crates/pile/Cargo.toml b/crates/pile/Cargo.toml index a7c6d27..e07d268 100644 --- a/crates/pile/Cargo.toml +++ b/crates/pile/Cargo.toml @@ -24,4 +24,3 @@ tracing-indicatif = { workspace = true } signal-hook = { workspace = true } anstyle = { workspace = true } toml = { workspace = true } -tantivy = { workspace = true } diff --git a/crates/pile/src/command/check.rs b/crates/pile/src/command/check.rs index 608cfdb..7bcbd6e 100644 --- a/crates/pile/src/command/check.rs +++ b/crates/pile/src/command/check.rs @@ -1,9 +1,10 @@ use anyhow::{Context, Result, anyhow}; use clap::Args; use pile_config::ConfigToml; +use pile_dataset::Dataset; use pile_toolbox::cancelabletask::{CancelFlag, CancelableTaskError}; use std::{fmt::Debug, path::PathBuf}; -use tracing::{debug, error, info}; +use tracing::{debug, error, info, warn}; use crate::{CliCmd, GlobalContext}; @@ -34,7 +35,6 @@ impl CliCmd for CheckCommand { Ok(config) => { info!("Config in {} is valid", self.config.display()); debug!("{config:#?}"); - return Ok(0); } Err(error) => { @@ -42,5 +42,29 @@ impl CliCmd for CheckCommand { return Ok(1); } } + + let ds = Dataset::open(&self.config) + .with_context(|| format!("while opening dataset for {}", self.config.display()))?; + + let ts_fts = ds.ts_fts().context("while determining fts age")?; + let ts_data = ds.ts_data().context("while determining data age")?; + + match (ts_fts, ts_data) { + (None, Some(_)) => warn!("Could not determine fts age"), + (None, None) | (Some(_), None) => warn!("Could not determine data age"), + (Some(ts_fts), Some(ts_data)) => { + let delta_secs = ((ts_data - ts_fts).as_seconds_f64() * 100.0).round() / 100.0; + if ts_data > ts_fts { + info!( + message = "FTS index is out-of-date and should be regenerated", + ?delta_secs, + ); + } else { + debug!(message = "FTS index is up-to-date", ?delta_secs); + } + } + } + + return Ok(0); } } diff --git a/crates/pile/src/command/index.rs b/crates/pile/src/command/index.rs index 88b51bf..d474f0c 100644 --- a/crates/pile/src/command/index.rs +++ b/crates/pile/src/command/index.rs @@ -1,11 +1,8 @@ -use anyhow::{Context, Result, anyhow}; +use anyhow::{Context, Result}; use clap::Args; -use pile_config::{ConfigToml, Source}; -use pile_dataset::{DataSource, index::DbFtsIndex, source::DirDataSource}; +use pile_dataset::Dataset; use pile_toolbox::cancelabletask::{CancelFlag, CancelableTaskError}; use std::{fmt::Debug, path::PathBuf}; -use tantivy::{Executor, Index, IndexWriter}; -use tracing::{error, info, trace, warn}; use crate::{CliCmd, GlobalContext}; @@ -22,88 +19,17 @@ impl CliCmd for IndexCommand { _ctx: GlobalContext, flag: CancelFlag, ) -> Result> { - let parent = self - .config - .parent() - .with_context(|| format!("Config file {} has no parent", self.config.display()))?; + let ds = Dataset::open(&self.config) + .with_context(|| format!("while opening dataset for {}", self.config.display()))?; - if !self.config.exists() { - return Err(anyhow!("{} does not exist", self.config.display()).into()); - } - - let config = { - let config = std::fs::read_to_string(&self.config) - .with_context(|| format!("while reading {}", self.config.display()))?; - let config: Result = toml::from_str(&config); - - match config { - Ok(config) => { - trace!(message = "Loaded config", ?config); - config - } - - Err(error) => { - error!("{} is invalid:\n{error}", self.config.display()); - return Ok(1); - } - } - }; - - let working_dir = config - .dataset - .working_dir - .clone() - .unwrap_or(parent.join(".pile")) - .join(&config.dataset.name.as_str()); - let fts_dir = working_dir.join("fts"); - - if fts_dir.is_dir() { - warn!("Removing existing index in {}", fts_dir.display()); - std::fs::remove_dir_all(&fts_dir) - .with_context(|| format!("while removing {}", fts_dir.display()))?; - } - - std::fs::create_dir_all(&fts_dir) - .with_context(|| format!("while creating dir {}", fts_dir.display()))?; - - let mut sources = Vec::new(); - for (name, source) in &config.dataset.source { - match source { - Source::Flac { path: dir } => { - let source = DirDataSource::new(name.as_str(), dir.clone().to_vec()); - sources.push(source); - } - } - } - - let db_index = DbFtsIndex::new(&fts_dir, &config); - let mut index = Index::create_in_dir(&fts_dir, db_index.schema.clone()).unwrap(); - index.set_executor(Executor::multi_thread(10, "build-fts").unwrap()); - let mut index_writer: IndexWriter = index.writer(15_000_000).unwrap(); - - for s in sources { - info!("Processing source {:?}", s.name); - - for i in s.iter() { - let (k, v) = i.unwrap(); - - let doc = match db_index.entry_to_document(&*v).unwrap() { - Some(x) => x, - None => { - warn!("Skipping {k:?}, document is empty"); - continue; - } - }; - index_writer.add_document(doc).unwrap(); - - if flag.is_cancelled() { - return Err(CancelableTaskError::Cancelled); - } - } - } - - info!("Committing index"); - index_writer.commit().unwrap(); + ds.fts_refresh(Some(flag)).map_err(|x| { + x.map_err(|x| { + anyhow::Error::from(x).context(format!( + "while opening dataset for {}", + self.config.display() + )) + }) + })?; return Ok(0); } diff --git a/crates/pile/src/command/lookup.rs b/crates/pile/src/command/lookup.rs index 2a7f8ec..071a55b 100644 --- a/crates/pile/src/command/lookup.rs +++ b/crates/pile/src/command/lookup.rs @@ -1,11 +1,9 @@ -use anyhow::{Context, Result, anyhow}; +use anyhow::{Context, Result}; use clap::Args; -use pile_config::ConfigToml; -use pile_dataset::index::DbFtsIndex; +use pile_dataset::Dataset; use pile_toolbox::cancelabletask::{CancelFlag, CancelableTaskError}; -use std::{fmt::Debug, path::PathBuf, sync::Arc}; -use tantivy::collector::TopDocs; -use tracing::{error, trace}; +use std::{fmt::Debug, path::PathBuf}; +use tracing::info; use crate::{CliCmd, GlobalContext}; @@ -24,57 +22,37 @@ pub struct LookupCommand { /// Path to dataset config #[arg(long, short = 'c', default_value = "./pile.toml")] config: PathBuf, + + /// If provided, do not refresh fts + #[arg(long)] + no_refresh: bool, } impl CliCmd for LookupCommand { + #[expect(clippy::print_stdout)] async fn run( self, _ctx: GlobalContext, - _flag: CancelFlag, + flag: CancelFlag, ) -> Result> { - let parent = self - .config - .parent() - .with_context(|| format!("Config file {} has no parent", self.config.display()))?; + let ds = Dataset::open(&self.config) + .with_context(|| format!("while opening dataset for {}", self.config.display()))?; - if !self.config.exists() { - return Err(anyhow!("{} does not exist", self.config.display()).into()); + if ds.needs_fts().context("while checking dataset fts")? { + info!("FTS index is missing or out-of-date, regenerating"); + ds.fts_refresh(Some(flag)).map_err(|x| { + x.map_err(|x| { + anyhow::Error::from(x).context(format!( + "while opening dataset for {}", + self.config.display() + )) + }) + })?; } - let config = { - let config = std::fs::read_to_string(&self.config) - .with_context(|| format!("while reading {}", self.config.display()))?; - let config: Result = toml::from_str(&config); - - match config { - Ok(config) => { - trace!(message = "Loaded config", ?config); - config - } - - Err(error) => { - error!("{} is invalid:\n{error}", self.config.display()); - return Ok(1); - } - } - }; - - let working_dir = config - .dataset - .working_dir - .clone() - .unwrap_or(parent.join(".pile")) - .join(&config.dataset.name.as_str()); - let fts_dir = working_dir.join("fts"); - - if !fts_dir.is_dir() { - return Err(anyhow!("fts index does not exist").into()); - } - - let db_index = DbFtsIndex::new(&fts_dir, &config); - let results = db_index - .lookup(self.query, Arc::new(TopDocs::with_limit(self.topn))) - .unwrap(); + let results = ds + .fts_lookup(&self.query, self.topn) + .context("while running fts lookup")?; if self.score { for res in results {