lazily-evaluated extractors
x
This commit is contained in:
124
Cargo.lock
generated
124
Cargo.lock
generated
@@ -135,15 +135,6 @@ dependencies = [
|
||||
"crunchy",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "block-buffer"
|
||||
version = "0.10.4"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "3078c7629b62d3f0439517fa394996acacc5cbc91c5a20d8c658e77abd503a71"
|
||||
dependencies = [
|
||||
"generic-array",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "block-buffer"
|
||||
version = "0.11.0"
|
||||
@@ -362,16 +353,6 @@ version = "0.2.4"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "460fbee9c2c2f33933d720630a6a0bac33ba7053db5344fac858d4b8952d77d5"
|
||||
|
||||
[[package]]
|
||||
name = "crypto-common"
|
||||
version = "0.1.7"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "78c8292055d1c1df0cce5d180393dc8cce0abec0a7102adb6c7b1eef6016d60a"
|
||||
dependencies = [
|
||||
"generic-array",
|
||||
"typenum",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "crypto-common"
|
||||
version = "0.2.0"
|
||||
@@ -425,25 +406,15 @@ dependencies = [
|
||||
"serde_core",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "digest"
|
||||
version = "0.10.7"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "9ed9a281f7bc9b7576e61468ba615a66a5c8cfdff42420a70aa82701a3b1e292"
|
||||
dependencies = [
|
||||
"block-buffer 0.10.4",
|
||||
"crypto-common 0.1.7",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "digest"
|
||||
version = "0.11.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "f8bf3682cdec91817be507e4aa104314898b95b84d74f3d43882210101a545b6"
|
||||
dependencies = [
|
||||
"block-buffer 0.11.0",
|
||||
"block-buffer",
|
||||
"const-oid",
|
||||
"crypto-common 0.2.0",
|
||||
"crypto-common",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@@ -556,16 +527,6 @@ dependencies = [
|
||||
"slab",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "generic-array"
|
||||
version = "0.14.7"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "85649ca51fd72272d7821adaf274ad91c288277713d9c18820d8499a7ff69e9a"
|
||||
dependencies = [
|
||||
"typenum",
|
||||
"version_check",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "getrandom"
|
||||
version = "0.2.17"
|
||||
@@ -753,19 +714,6 @@ dependencies = [
|
||||
"wasm-bindgen",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "jsonpath-rust"
|
||||
version = "1.0.4"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "633a7320c4bb672863a3782e89b9094ad70285e097ff6832cddd0ec615beadfa"
|
||||
dependencies = [
|
||||
"pest",
|
||||
"pest_derive",
|
||||
"regex",
|
||||
"serde_json",
|
||||
"thiserror",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "lazy_static"
|
||||
version = "1.5.0"
|
||||
@@ -985,49 +933,6 @@ version = "1.0.15"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "57c0d7b74b563b49d38dae00a0c37d4d6de9b432382b2892f0574ddcae73fd0a"
|
||||
|
||||
[[package]]
|
||||
name = "pest"
|
||||
version = "2.8.6"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "e0848c601009d37dfa3430c4666e147e49cdcf1b92ecd3e63657d8a5f19da662"
|
||||
dependencies = [
|
||||
"memchr",
|
||||
"ucd-trie",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "pest_derive"
|
||||
version = "2.8.6"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "11f486f1ea21e6c10ed15d5a7c77165d0ee443402f0780849d1768e7d9d6fe77"
|
||||
dependencies = [
|
||||
"pest",
|
||||
"pest_generator",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "pest_generator"
|
||||
version = "2.8.6"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "8040c4647b13b210a963c1ed407c1ff4fdfa01c31d6d2a098218702e6664f94f"
|
||||
dependencies = [
|
||||
"pest",
|
||||
"pest_meta",
|
||||
"proc-macro2",
|
||||
"quote",
|
||||
"syn",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "pest_meta"
|
||||
version = "2.8.6"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "89815c69d36021a140146f26659a81d6c2afa33d216d736dd4be5381a7362220"
|
||||
dependencies = [
|
||||
"pest",
|
||||
"sha2 0.10.9",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "pile"
|
||||
version = "0.0.1"
|
||||
@@ -1052,9 +957,7 @@ dependencies = [
|
||||
name = "pile-config"
|
||||
version = "0.0.1"
|
||||
dependencies = [
|
||||
"itertools",
|
||||
"serde",
|
||||
"serde_json",
|
||||
"smartstring",
|
||||
"toml",
|
||||
]
|
||||
@@ -1065,12 +968,12 @@ version = "0.0.1"
|
||||
dependencies = [
|
||||
"chrono",
|
||||
"itertools",
|
||||
"jsonpath-rust",
|
||||
"pile-config",
|
||||
"pile-flac",
|
||||
"pile-toolbox",
|
||||
"rayon",
|
||||
"serde_json",
|
||||
"smartstring",
|
||||
"tantivy",
|
||||
"thiserror",
|
||||
"toml",
|
||||
@@ -1086,7 +989,7 @@ dependencies = [
|
||||
"itertools",
|
||||
"mime",
|
||||
"paste",
|
||||
"sha2 0.11.0-rc.5",
|
||||
"sha2",
|
||||
"smartstring",
|
||||
"strum",
|
||||
"thiserror",
|
||||
@@ -1373,17 +1276,6 @@ dependencies = [
|
||||
"serde_core",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "sha2"
|
||||
version = "0.10.9"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "a7507d819769d01a365ab707794a4084392c824f54a7a6a7862f8c3d0892b283"
|
||||
dependencies = [
|
||||
"cfg-if",
|
||||
"cpufeatures",
|
||||
"digest 0.10.7",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "sha2"
|
||||
version = "0.11.0-rc.5"
|
||||
@@ -1392,7 +1284,7 @@ checksum = "7c5f3b1e2dc8aad28310d8410bd4d7e180eca65fca176c52ab00d364475d0024"
|
||||
dependencies = [
|
||||
"cfg-if",
|
||||
"cpufeatures",
|
||||
"digest 0.11.0",
|
||||
"digest",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@@ -1900,12 +1792,6 @@ version = "1.19.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "562d481066bde0658276a35467c4af00bdc6ee726305698a55b86e61d7ad82bb"
|
||||
|
||||
[[package]]
|
||||
name = "ucd-trie"
|
||||
version = "0.1.7"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "2896d95c02a80c6d6a5d6e953d479f5ddf2dfdb6a244441010e373ac0fb88971"
|
||||
|
||||
[[package]]
|
||||
name = "unicode-ident"
|
||||
version = "1.0.24"
|
||||
|
||||
@@ -87,7 +87,6 @@ serde = { version = "1.0.228", features = ["derive"] }
|
||||
serde_json = "1.0.149"
|
||||
base64 = "0.22.1"
|
||||
toml = "1.0.3"
|
||||
jsonpath-rust = "1.0.4"
|
||||
sha2 = "0.11.0-rc.5"
|
||||
|
||||
# Misc helpers
|
||||
|
||||
@@ -9,8 +9,6 @@ workspace = true
|
||||
|
||||
[dependencies]
|
||||
serde = { workspace = true }
|
||||
itertools = { workspace = true }
|
||||
serde_json = { workspace = true }
|
||||
smartstring = { workspace = true }
|
||||
|
||||
[dev-dependencies]
|
||||
|
||||
@@ -1,6 +1,4 @@
|
||||
use itertools::Itertools;
|
||||
use serde::Deserialize;
|
||||
use serde_json::Value;
|
||||
|
||||
#[derive(Debug, Clone, Deserialize, PartialEq, Eq)]
|
||||
#[serde(untagged)]
|
||||
@@ -12,107 +10,6 @@ pub enum FieldSpecPost {
|
||||
NotEmpty { notempty: bool },
|
||||
}
|
||||
|
||||
impl FieldSpecPost {
|
||||
pub fn apply(&self, val: &Value) -> Option<Value> {
|
||||
Some(match self {
|
||||
Self::NotEmpty { notempty: false } => val.clone(),
|
||||
Self::NotEmpty { notempty: true } => match val {
|
||||
Value::Null => return None,
|
||||
Value::String(x) if x.is_empty() => return None,
|
||||
Value::Array(x) if x.is_empty() => return None,
|
||||
x => x.clone(),
|
||||
},
|
||||
|
||||
Self::SetCase { case: Case::Lower } => match val {
|
||||
Value::Null => return None,
|
||||
Value::Bool(_) | Value::Number(_) => val.clone(),
|
||||
Value::String(x) => Value::String(x.to_lowercase()),
|
||||
|
||||
Value::Array(x) => {
|
||||
Value::Array(x.iter().map(|x| self.apply(x)).collect::<Option<_>>()?)
|
||||
}
|
||||
|
||||
Value::Object(x) => Value::Object(
|
||||
x.iter()
|
||||
.map(|x| (x.0.to_lowercase(), self.apply(x.1)))
|
||||
.map(|x| x.1.map(|y| (x.0, y)))
|
||||
.collect::<Option<_>>()?,
|
||||
),
|
||||
},
|
||||
|
||||
Self::SetCase { case: Case::Upper } => match val {
|
||||
Value::Null => return None,
|
||||
Value::Bool(_) | Value::Number(_) => val.clone(),
|
||||
Value::String(x) => Value::String(x.to_uppercase()),
|
||||
|
||||
Value::Array(x) => {
|
||||
Value::Array(x.iter().map(|x| self.apply(x)).collect::<Option<_>>()?)
|
||||
}
|
||||
|
||||
Value::Object(x) => Value::Object(
|
||||
x.iter()
|
||||
.map(|x| (x.0.to_uppercase(), self.apply(x.1)))
|
||||
.map(|x| x.1.map(|y| (x.0, y)))
|
||||
.collect::<Option<_>>()?,
|
||||
),
|
||||
},
|
||||
|
||||
Self::TrimSuffix { trim_suffix } => match val {
|
||||
Value::Null => return None,
|
||||
Value::Bool(_) | Value::Number(_) => Value::String(val.to_string()),
|
||||
|
||||
Value::String(x) => {
|
||||
Value::String(x.strip_suffix(trim_suffix).unwrap_or(x).to_owned())
|
||||
}
|
||||
|
||||
Value::Array(x) => {
|
||||
Value::Array(x.iter().map(|x| self.apply(x)).collect::<Option<_>>()?)
|
||||
}
|
||||
|
||||
Value::Object(x) => Value::Object(
|
||||
x.iter()
|
||||
.map(|x| {
|
||||
(
|
||||
x.0.strip_suffix(trim_suffix).unwrap_or(x.0).to_owned(),
|
||||
self.apply(x.1),
|
||||
)
|
||||
})
|
||||
.map(|x| x.1.map(|y| (x.0, y)))
|
||||
.collect::<Option<_>>()?,
|
||||
),
|
||||
},
|
||||
|
||||
Self::TrimPrefix { trim_prefix } => match val {
|
||||
Value::Null => return None,
|
||||
Value::Object(_) => return None,
|
||||
Value::Bool(_) | Value::Number(_) => Value::String(val.to_string()),
|
||||
|
||||
Value::String(x) => {
|
||||
Value::String(x.strip_prefix(trim_prefix).unwrap_or(x).to_owned())
|
||||
}
|
||||
|
||||
Value::Array(x) => {
|
||||
Value::Array(x.iter().map(|x| self.apply(x)).collect::<Option<_>>()?)
|
||||
}
|
||||
},
|
||||
|
||||
Self::Join { join } => match val {
|
||||
Value::Null => return None,
|
||||
Value::Object(_) => return None,
|
||||
Value::Bool(_) | Value::Number(_) => Value::String(val.to_string()),
|
||||
Value::String(x) => Value::String(x.clone()),
|
||||
Value::Array(x) => Value::String(
|
||||
x.iter()
|
||||
.map(|x| self.apply(x))
|
||||
.collect::<Option<Vec<_>>>()?
|
||||
.into_iter()
|
||||
.join(join),
|
||||
),
|
||||
},
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Copy, Deserialize, PartialEq, Eq)]
|
||||
#[serde(rename_all = "lowercase")]
|
||||
pub enum Case {
|
||||
|
||||
@@ -12,14 +12,13 @@ pile-config = { workspace = true }
|
||||
pile-toolbox = { workspace = true }
|
||||
pile-flac = { workspace = true }
|
||||
|
||||
|
||||
serde_json = { workspace = true }
|
||||
itertools = { workspace = true }
|
||||
walkdir = { workspace = true }
|
||||
tantivy = { workspace = true }
|
||||
tracing = { workspace = true }
|
||||
jsonpath-rust = { workspace = true }
|
||||
chrono = { workspace = true }
|
||||
toml = { workspace = true }
|
||||
thiserror = { workspace = true }
|
||||
rayon = { workspace = true }
|
||||
smartstring = { workspace = true }
|
||||
|
||||
@@ -8,7 +8,11 @@ use rayon::{
|
||||
use std::{
|
||||
io::ErrorKind,
|
||||
path::PathBuf,
|
||||
sync::{Arc, mpsc::Receiver},
|
||||
sync::{
|
||||
Arc,
|
||||
atomic::{AtomicU64, Ordering},
|
||||
mpsc::Receiver,
|
||||
},
|
||||
thread::JoinHandle,
|
||||
time::Instant,
|
||||
};
|
||||
@@ -145,14 +149,13 @@ impl Dataset {
|
||||
let mut total = 0u64;
|
||||
while let Ok(batch) = read_rx.recv() {
|
||||
let batch = batch.map_err(DatasetError::from)?;
|
||||
let len = batch.len() as u64;
|
||||
|
||||
if let Some(flag) = &flag
|
||||
&& flag.is_cancelled()
|
||||
{
|
||||
return Err(CancelableTaskError::Cancelled);
|
||||
}
|
||||
|
||||
let this = AtomicU64::new(0);
|
||||
let start = Instant::now();
|
||||
write_pool
|
||||
.install(|| {
|
||||
@@ -170,6 +173,7 @@ impl Dataset {
|
||||
}
|
||||
})
|
||||
.map(|(key, doc)| {
|
||||
this.fetch_add(1, Ordering::Relaxed);
|
||||
index_writer
|
||||
.add_document(doc)
|
||||
.map_err(|err| (key, err))
|
||||
@@ -180,9 +184,10 @@ impl Dataset {
|
||||
})
|
||||
.map_err(|(_key, err)| DatasetError::from(err))?;
|
||||
|
||||
total += len;
|
||||
let this = this.load(Ordering::Relaxed);
|
||||
total += this;
|
||||
let time_ms = start.elapsed().as_millis();
|
||||
debug!("Added a batch of {len} in {time_ms} ms ({total} total)");
|
||||
debug!("Added a batch of {this} in {time_ms} ms ({total} total)");
|
||||
}
|
||||
|
||||
if let Some(flag) = flag.as_ref()
|
||||
@@ -334,6 +339,13 @@ fn start_read_task(
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if !batch.is_empty() {
|
||||
match read_tx.send(Ok(batch)) {
|
||||
Ok(()) => {}
|
||||
Err(_) => return,
|
||||
};
|
||||
}
|
||||
});
|
||||
|
||||
return (read_task, read_rx);
|
||||
|
||||
64
crates/pile-dataset/src/extract/flac.rs
Normal file
64
crates/pile-dataset/src/extract/flac.rs
Normal file
@@ -0,0 +1,64 @@
|
||||
use pile_config::Label;
|
||||
use pile_flac::{FlacBlock, FlacReader};
|
||||
use std::{collections::HashMap, fs::File, io::BufReader, sync::OnceLock};
|
||||
|
||||
use crate::{FileItem, PileValue, extract::Extractor};
|
||||
|
||||
pub struct FlacExtractor<'a> {
|
||||
item: &'a FileItem,
|
||||
output: OnceLock<HashMap<Label, PileValue<'a, FileItem>>>,
|
||||
}
|
||||
|
||||
impl<'a> FlacExtractor<'a> {
|
||||
pub fn new(item: &'a FileItem) -> Self {
|
||||
Self {
|
||||
item,
|
||||
output: OnceLock::new(),
|
||||
}
|
||||
}
|
||||
|
||||
fn get_inner(&self) -> Result<&HashMap<Label, PileValue<'a, FileItem>>, std::io::Error> {
|
||||
if let Some(x) = self.output.get() {
|
||||
return Ok(x);
|
||||
}
|
||||
|
||||
let file = File::open(&self.item.path)?;
|
||||
let reader = FlacReader::new(BufReader::new(file));
|
||||
|
||||
let mut output: HashMap<Label, Vec<_>> = HashMap::new();
|
||||
for block in reader {
|
||||
if let FlacBlock::VorbisComment(comment) = block.unwrap() {
|
||||
for (k, v) in comment.comment.comments {
|
||||
match Label::new(k.to_string().to_lowercase()) {
|
||||
Some(k) => output.entry(k).or_default().push(PileValue::String(v)),
|
||||
None => continue,
|
||||
}
|
||||
}
|
||||
|
||||
// We should only have one comment block,
|
||||
// stop reading when we find it
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
let output = output
|
||||
.into_iter()
|
||||
.map(|(k, v)| (k, PileValue::Array(v)))
|
||||
.collect();
|
||||
|
||||
return Ok(self.output.get_or_init(|| output));
|
||||
}
|
||||
}
|
||||
|
||||
impl Extractor<FileItem> for FlacExtractor<'_> {
|
||||
fn field<'a>(
|
||||
&'a self,
|
||||
name: &Label,
|
||||
) -> Result<Option<&'a PileValue<'a, FileItem>>, std::io::Error> {
|
||||
Ok(self.get_inner()?.get(&name))
|
||||
}
|
||||
|
||||
fn fields(&self) -> Result<Vec<Label>, std::io::Error> {
|
||||
Ok(self.get_inner()?.keys().cloned().collect())
|
||||
}
|
||||
}
|
||||
77
crates/pile-dataset/src/extract/fs.rs
Normal file
77
crates/pile-dataset/src/extract/fs.rs
Normal file
@@ -0,0 +1,77 @@
|
||||
use pile_config::Label;
|
||||
use std::{collections::HashMap, path::Component, sync::OnceLock};
|
||||
|
||||
use crate::{FileItem, Key, PileValue, extract::Extractor};
|
||||
|
||||
pub struct FsExtractor<'a> {
|
||||
item: &'a FileItem,
|
||||
output: OnceLock<HashMap<Label, PileValue<'a, FileItem>>>,
|
||||
}
|
||||
|
||||
impl<'a> FsExtractor<'a> {
|
||||
pub fn new(item: &'a FileItem) -> Self {
|
||||
Self {
|
||||
item,
|
||||
output: OnceLock::new(),
|
||||
}
|
||||
}
|
||||
|
||||
fn get_inner(&self) -> Result<&HashMap<Label, PileValue<'a, FileItem>>, std::io::Error> {
|
||||
if let Some(x) = self.output.get() {
|
||||
return Ok(x);
|
||||
}
|
||||
|
||||
let output = HashMap::from([
|
||||
(
|
||||
Label::new("extension").unwrap(),
|
||||
self.item
|
||||
.path
|
||||
.extension()
|
||||
.map(|x| x.to_str())
|
||||
.flatten()
|
||||
.map(|x| PileValue::String(x.into()))
|
||||
.unwrap_or(PileValue::Null),
|
||||
),
|
||||
(
|
||||
Label::new("path").unwrap(),
|
||||
self.item
|
||||
.path
|
||||
.to_string()
|
||||
.map(|x| PileValue::String(x.into()))
|
||||
.unwrap_or(PileValue::Null),
|
||||
),
|
||||
(
|
||||
Label::new("segments").unwrap(),
|
||||
self.item
|
||||
.path
|
||||
.components()
|
||||
.map(|x| match x {
|
||||
Component::CurDir => Some(".".to_owned()),
|
||||
Component::Normal(x) => x.to_str().map(|x| x.to_owned()),
|
||||
Component::ParentDir => Some("..".to_owned()),
|
||||
Component::RootDir => Some("/".to_owned()),
|
||||
Component::Prefix(x) => x.as_os_str().to_str().map(|x| x.to_owned()),
|
||||
})
|
||||
.map(|x| x.map(|x| PileValue::String(x.into())))
|
||||
.collect::<Option<Vec<_>>>()
|
||||
.map(|x| PileValue::Array(x))
|
||||
.unwrap_or(PileValue::Null),
|
||||
),
|
||||
]);
|
||||
|
||||
return Ok(self.output.get_or_init(|| output));
|
||||
}
|
||||
}
|
||||
|
||||
impl Extractor<FileItem> for FsExtractor<'_> {
|
||||
fn field<'a>(
|
||||
&'a self,
|
||||
name: &Label,
|
||||
) -> Result<Option<&'a PileValue<'a, FileItem>>, std::io::Error> {
|
||||
Ok(self.get_inner()?.get(&name))
|
||||
}
|
||||
|
||||
fn fields(&self) -> Result<Vec<Label>, std::io::Error> {
|
||||
Ok(self.get_inner()?.keys().cloned().collect())
|
||||
}
|
||||
}
|
||||
18
crates/pile-dataset/src/extract/map.rs
Normal file
18
crates/pile-dataset/src/extract/map.rs
Normal file
@@ -0,0 +1,18 @@
|
||||
use pile_config::Label;
|
||||
use std::collections::HashMap;
|
||||
|
||||
use crate::{Item, PileValue, extract::Extractor};
|
||||
|
||||
pub struct MapExtractor<'a, I: Item> {
|
||||
pub(super) inner: HashMap<Label, PileValue<'a, I>>,
|
||||
}
|
||||
|
||||
impl<I: Item> Extractor<I> for MapExtractor<'_, I> {
|
||||
fn field<'a>(&'a self, name: &Label) -> Result<Option<&'a PileValue<'a, I>>, std::io::Error> {
|
||||
Ok(self.inner.get(&name))
|
||||
}
|
||||
|
||||
fn fields(&self) -> Result<Vec<Label>, std::io::Error> {
|
||||
Ok(self.inner.keys().cloned().collect())
|
||||
}
|
||||
}
|
||||
67
crates/pile-dataset/src/extract/mod.rs
Normal file
67
crates/pile-dataset/src/extract/mod.rs
Normal file
@@ -0,0 +1,67 @@
|
||||
mod flac;
|
||||
use std::{collections::HashMap, rc::Rc};
|
||||
|
||||
pub use flac::*;
|
||||
|
||||
mod fs;
|
||||
pub use fs::*;
|
||||
|
||||
mod map;
|
||||
pub use map::*;
|
||||
use pile_config::Label;
|
||||
|
||||
/// An attachment that extracts metadata from an [Item].
|
||||
///
|
||||
/// Metadata is exposed as an immutable map of {label: value},
|
||||
/// much like a json object.
|
||||
pub trait Extractor<I: crate::Item> {
|
||||
/// Get the field at `name` from `item`.
|
||||
/// - returns `None` if `name` is not a valid field
|
||||
/// - returns `Some(Null)` if `name` is not available
|
||||
fn field<'a>(
|
||||
&'a self,
|
||||
name: &pile_config::Label,
|
||||
) -> Result<Option<&'a crate::PileValue<'a, I>>, std::io::Error>;
|
||||
|
||||
/// Return all fields in this extrator.
|
||||
/// `Self::field` must return [Some] for all these keys
|
||||
/// and [None] for all others.
|
||||
fn fields(&self) -> Result<Vec<Label>, std::io::Error>;
|
||||
}
|
||||
|
||||
pub struct MetaExtractor<'a, I: crate::Item> {
|
||||
inner: MapExtractor<'a, I>,
|
||||
}
|
||||
|
||||
impl<'a> MetaExtractor<'a, crate::FileItem> {
|
||||
pub fn new(item: &'a crate::FileItem) -> Self {
|
||||
let inner = MapExtractor {
|
||||
inner: HashMap::from([
|
||||
(
|
||||
Label::new("flac").unwrap(),
|
||||
crate::PileValue::Extractor(Rc::new(FlacExtractor::new(item))),
|
||||
),
|
||||
(
|
||||
Label::new("fs").unwrap(),
|
||||
crate::PileValue::Extractor(Rc::new(FsExtractor::new(item))),
|
||||
),
|
||||
]),
|
||||
};
|
||||
|
||||
Self { inner }
|
||||
}
|
||||
}
|
||||
|
||||
impl Extractor<crate::FileItem> for MetaExtractor<'_, crate::FileItem> {
|
||||
fn field<'a>(
|
||||
&'a self,
|
||||
name: &pile_config::Label,
|
||||
) -> Result<Option<&'a crate::PileValue<'a, crate::FileItem>>, std::io::Error> {
|
||||
self.inner.field(name)
|
||||
}
|
||||
|
||||
#[expect(clippy::unwrap_used)]
|
||||
fn fields(&self) -> Result<Vec<Label>, std::io::Error> {
|
||||
return Ok(vec![Label::new("flac").unwrap(), Label::new("fs").unwrap()]);
|
||||
}
|
||||
}
|
||||
@@ -1,7 +1,6 @@
|
||||
use jsonpath_rust::JsonPath;
|
||||
use pile_config::{ConfigToml, DatasetFts, Label};
|
||||
use serde_json::Value;
|
||||
use std::{path::PathBuf, sync::LazyLock};
|
||||
use itertools::Itertools;
|
||||
use pile_config::{Case, ConfigToml, DatasetFts, FieldSpecPost, Label};
|
||||
use std::{path::PathBuf, rc::Rc, sync::LazyLock};
|
||||
use tantivy::{
|
||||
DocAddress, Index, ReloadPolicy, TantivyDocument, TantivyError,
|
||||
collector::Collector,
|
||||
@@ -10,7 +9,7 @@ use tantivy::{
|
||||
};
|
||||
use tracing::{debug, trace, warn};
|
||||
|
||||
use crate::{Item, Key};
|
||||
use crate::{Item, Key, PileValue, extract::MetaExtractor};
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct FtsLookupResult {
|
||||
@@ -84,10 +83,17 @@ impl DbFtsIndex {
|
||||
doc.add_text(self.schema.get_field("_meta_source")?, item.source_name());
|
||||
doc.add_text(self.schema.get_field("_meta_key")?, key);
|
||||
|
||||
let json = item.json()?;
|
||||
let item = match item.as_file() {
|
||||
Some(x) => x,
|
||||
None => return Ok(None),
|
||||
};
|
||||
|
||||
let extractor = MetaExtractor::new(item);
|
||||
let extractor = PileValue::Extractor(Rc::new(extractor));
|
||||
|
||||
let mut empty = true;
|
||||
for name in self.fts_cfg().fields.keys() {
|
||||
let x = self.get_field(&json, name)?;
|
||||
let x = self.get_field(&extractor, name)?;
|
||||
|
||||
let val = match x {
|
||||
Some(x) => x,
|
||||
@@ -109,9 +115,9 @@ impl DbFtsIndex {
|
||||
// MARK: read
|
||||
//
|
||||
|
||||
pub fn get_field(
|
||||
pub fn get_field<I: Item>(
|
||||
&self,
|
||||
json: &Value,
|
||||
extractor: &PileValue<'_, I>,
|
||||
field_name: &Label,
|
||||
) -> Result<Option<String>, std::io::Error> {
|
||||
let field = match self.cfg.schema.get(field_name) {
|
||||
@@ -124,41 +130,23 @@ impl DbFtsIndex {
|
||||
|
||||
// Try paths in order, using the first value we find
|
||||
'outer: for path in field.path.as_slice() {
|
||||
let val = match json.query(path) {
|
||||
Ok(mut x) => {
|
||||
if x.len() > 1 {
|
||||
warn!(
|
||||
message = "Path returned more than one value, this is not supported. Skipping.",
|
||||
?path,
|
||||
field = field_name.to_string()
|
||||
);
|
||||
continue;
|
||||
}
|
||||
let segments = path
|
||||
.split('.')
|
||||
.map(|x| Label::new(x).expect(&format!("wtf {x}")))
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
match x.pop() {
|
||||
Some(x) => x,
|
||||
None => continue,
|
||||
}
|
||||
}
|
||||
|
||||
Err(error) => {
|
||||
warn!(
|
||||
message = "Invalid path, skipping",
|
||||
?path,
|
||||
field = field_name.to_string(),
|
||||
?error
|
||||
);
|
||||
continue;
|
||||
}
|
||||
let val = match extractor.query(&segments)? {
|
||||
Some(x) => x,
|
||||
None => return Ok(None),
|
||||
};
|
||||
|
||||
let mut val = match val {
|
||||
Value::Null => {
|
||||
PileValue::Null => {
|
||||
trace!(
|
||||
message = "Skipping field, is null",
|
||||
field = field_name.to_string(),
|
||||
path,
|
||||
value = ?val
|
||||
// value = ?val
|
||||
);
|
||||
continue;
|
||||
}
|
||||
@@ -166,7 +154,7 @@ impl DbFtsIndex {
|
||||
};
|
||||
|
||||
for post in &field.post {
|
||||
val = match post.apply(&val) {
|
||||
val = match apply(&post, &val) {
|
||||
Some(x) => x,
|
||||
None => return Ok(None),
|
||||
};
|
||||
@@ -175,7 +163,7 @@ impl DbFtsIndex {
|
||||
loop {
|
||||
val = match val {
|
||||
#[expect(clippy::unwrap_used)]
|
||||
Value::Array(ref mut x) => {
|
||||
PileValue::Array(ref mut x) => {
|
||||
if x.len() == 1 {
|
||||
x.pop().unwrap()
|
||||
} else if x.len() > 1 {
|
||||
@@ -183,7 +171,7 @@ impl DbFtsIndex {
|
||||
message = "Skipping field, is array with more than one element",
|
||||
field = field_name.to_string(),
|
||||
path,
|
||||
value = ?val
|
||||
//value = ?val
|
||||
);
|
||||
continue 'outer;
|
||||
} else {
|
||||
@@ -191,32 +179,30 @@ impl DbFtsIndex {
|
||||
message = "Skipping field, is empty array",
|
||||
field = field_name.to_string(),
|
||||
path,
|
||||
value = ?val
|
||||
//value = ?val
|
||||
);
|
||||
continue 'outer;
|
||||
}
|
||||
}
|
||||
Value::Null => {
|
||||
PileValue::Null => {
|
||||
trace!(
|
||||
message = "Skipping field, is null",
|
||||
field = field_name.to_string(),
|
||||
path,
|
||||
value = ?val
|
||||
//value = ?val
|
||||
);
|
||||
continue 'outer;
|
||||
}
|
||||
Value::Object(_) => {
|
||||
PileValue::Extractor(_) => {
|
||||
trace!(
|
||||
message = "Skipping field, is object",
|
||||
field = field_name.to_string(),
|
||||
path,
|
||||
value = ?val
|
||||
//value = ?val
|
||||
);
|
||||
continue 'outer;
|
||||
}
|
||||
Value::Bool(x) => return Ok(Some(x.to_string())),
|
||||
Value::Number(x) => return Ok(Some(x.to_string())),
|
||||
Value::String(x) => return Ok(Some(x)),
|
||||
PileValue::String(x) => return Ok(Some(x.to_string())),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -310,3 +296,80 @@ impl DbFtsIndex {
|
||||
return Ok(out);
|
||||
}
|
||||
}
|
||||
|
||||
pub fn apply<'a, I: Item>(
|
||||
post: &FieldSpecPost,
|
||||
val: &PileValue<'a, I>,
|
||||
) -> Option<PileValue<'a, I>> {
|
||||
Some(match post {
|
||||
FieldSpecPost::NotEmpty { notempty: false } => val.clone(),
|
||||
FieldSpecPost::NotEmpty { notempty: true } => match val {
|
||||
PileValue::Null => return None,
|
||||
PileValue::String(x) if x.is_empty() => return None,
|
||||
PileValue::Array(x) if x.is_empty() => return None,
|
||||
x => x.clone(),
|
||||
},
|
||||
|
||||
FieldSpecPost::SetCase { case: Case::Lower } => match val {
|
||||
PileValue::Null => return None,
|
||||
PileValue::Extractor(_) => return None,
|
||||
PileValue::String(x) => PileValue::String(x.to_lowercase().into()),
|
||||
|
||||
PileValue::Array(x) => {
|
||||
PileValue::Array(x.iter().map(|x| apply(post, x)).collect::<Option<_>>()?)
|
||||
}
|
||||
},
|
||||
|
||||
FieldSpecPost::SetCase { case: Case::Upper } => match val {
|
||||
PileValue::Null => return None,
|
||||
PileValue::Extractor(_) => return None,
|
||||
PileValue::String(x) => PileValue::String(x.to_uppercase().into()),
|
||||
|
||||
PileValue::Array(x) => {
|
||||
PileValue::Array(x.iter().map(|x| apply(post, x)).collect::<Option<_>>()?)
|
||||
}
|
||||
},
|
||||
|
||||
FieldSpecPost::TrimSuffix { trim_suffix } => match val {
|
||||
PileValue::Null => return None,
|
||||
PileValue::Extractor(_) => return None,
|
||||
|
||||
PileValue::String(x) => {
|
||||
PileValue::String(x.strip_suffix(trim_suffix).unwrap_or(x).into())
|
||||
}
|
||||
|
||||
PileValue::Array(x) => {
|
||||
PileValue::Array(x.iter().map(|x| apply(post, x)).collect::<Option<_>>()?)
|
||||
}
|
||||
},
|
||||
|
||||
FieldSpecPost::TrimPrefix { trim_prefix } => match val {
|
||||
PileValue::Null => return None,
|
||||
PileValue::Extractor(_) => return None,
|
||||
|
||||
PileValue::String(x) => {
|
||||
PileValue::String(x.strip_prefix(trim_prefix).unwrap_or(x).into())
|
||||
}
|
||||
|
||||
PileValue::Array(x) => {
|
||||
PileValue::Array(x.iter().map(|x| apply(post, x)).collect::<Option<_>>()?)
|
||||
}
|
||||
},
|
||||
|
||||
FieldSpecPost::Join { join } => match val {
|
||||
PileValue::Null => return None,
|
||||
PileValue::Extractor(_) => return None,
|
||||
|
||||
PileValue::String(x) => PileValue::String(x.clone()),
|
||||
PileValue::Array(x) => PileValue::String(
|
||||
x.iter()
|
||||
.map(|x| apply(post, x))
|
||||
.map(|x| x.map(|x| x.as_str().map(|x| x.to_owned())).flatten())
|
||||
.collect::<Option<Vec<_>>>()?
|
||||
.into_iter()
|
||||
.join(join)
|
||||
.into(),
|
||||
),
|
||||
},
|
||||
})
|
||||
}
|
||||
|
||||
62
crates/pile-dataset/src/item.rs
Normal file
62
crates/pile-dataset/src/item.rs
Normal file
@@ -0,0 +1,62 @@
|
||||
use pile_config::Label;
|
||||
use std::{fmt::Debug, path::PathBuf};
|
||||
|
||||
//
|
||||
// MARK: key
|
||||
//
|
||||
|
||||
pub trait Key: Debug + Clone + Send + Sync + 'static {
|
||||
/// Convert this key to a string, returning `None`
|
||||
/// if we encounter any kind of error.
|
||||
fn to_string(&self) -> Option<String>;
|
||||
|
||||
fn from_string(str: &str) -> Option<Self>;
|
||||
}
|
||||
|
||||
impl Key for PathBuf {
|
||||
fn from_string(str: &str) -> Option<Self> {
|
||||
str.parse().ok()
|
||||
}
|
||||
|
||||
fn to_string(&self) -> Option<String> {
|
||||
self.to_str().map(|x| x.to_owned())
|
||||
}
|
||||
}
|
||||
|
||||
//
|
||||
// MARK: item
|
||||
//
|
||||
|
||||
/// A pointer to raw data
|
||||
pub trait Item: Debug + Send + Sync + 'static {
|
||||
type Key: Key;
|
||||
|
||||
fn source_name(&self) -> &str;
|
||||
fn key(&self) -> &Self::Key;
|
||||
|
||||
fn as_file(&self) -> Option<&FileItem>;
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct FileItem {
|
||||
/// Path to this file.
|
||||
/// Must be relative to source root dir.
|
||||
pub path: PathBuf,
|
||||
pub source_name: Label,
|
||||
}
|
||||
|
||||
impl Item for FileItem {
|
||||
type Key = PathBuf;
|
||||
|
||||
fn source_name(&self) -> &str {
|
||||
&self.source_name
|
||||
}
|
||||
|
||||
fn key(&self) -> &Self::Key {
|
||||
&self.path
|
||||
}
|
||||
|
||||
fn as_file(&self) -> Option<&FileItem> {
|
||||
Some(&self)
|
||||
}
|
||||
}
|
||||
@@ -1,66 +0,0 @@
|
||||
use std::{fmt::Debug, fs::File, io::BufReader, path::PathBuf};
|
||||
|
||||
use pile_config::Label;
|
||||
use pile_flac::{FlacBlock, FlacReader};
|
||||
use serde_json::{Map, Value};
|
||||
|
||||
use crate::Item;
|
||||
|
||||
pub struct FlacItem {
|
||||
pub(crate) path: PathBuf,
|
||||
pub(crate) source_name: Label,
|
||||
}
|
||||
|
||||
impl Debug for FlacItem {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
f.debug_struct("FlacItem")
|
||||
.field("path", &self.path)
|
||||
.finish()
|
||||
}
|
||||
}
|
||||
|
||||
impl Item for FlacItem {
|
||||
type Key = PathBuf;
|
||||
|
||||
fn source_name(&self) -> &str {
|
||||
&self.source_name
|
||||
}
|
||||
|
||||
fn key(&self) -> &Self::Key {
|
||||
&self.path
|
||||
}
|
||||
|
||||
fn json(&self) -> Result<serde_json::Value, std::io::Error> {
|
||||
let file = File::open(&self.path)?;
|
||||
let reader = FlacReader::new(BufReader::new(file));
|
||||
|
||||
let mut output = Map::new();
|
||||
|
||||
for block in reader {
|
||||
if let FlacBlock::VorbisComment(comment) = block.unwrap() {
|
||||
for (k, v) in comment.comment.comments {
|
||||
let k = k.to_string();
|
||||
let v = Value::String(v.into());
|
||||
let e = output.get_mut(&k);
|
||||
|
||||
match e {
|
||||
None => {
|
||||
output.insert(k.clone(), Value::Array(vec![v]));
|
||||
}
|
||||
Some(e) => {
|
||||
// We always insert an array
|
||||
#[expect(clippy::unwrap_used)]
|
||||
e.as_array_mut().unwrap().push(v);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// We should only have one comment block,
|
||||
// stop reading when we find it
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
return Ok(serde_json::Value::Object(output));
|
||||
}
|
||||
}
|
||||
@@ -1,2 +0,0 @@
|
||||
mod flac;
|
||||
pub use flac::*;
|
||||
@@ -7,6 +7,12 @@ pub use misc::*;
|
||||
mod dataset;
|
||||
pub use dataset::*;
|
||||
|
||||
mod item;
|
||||
pub use item::*;
|
||||
|
||||
mod value;
|
||||
pub use value::*;
|
||||
|
||||
pub mod extract;
|
||||
pub mod index;
|
||||
pub mod item;
|
||||
pub mod source;
|
||||
|
||||
@@ -4,7 +4,7 @@ use pile_config::Label;
|
||||
use std::path::PathBuf;
|
||||
use walkdir::WalkDir;
|
||||
|
||||
use crate::{DataSource, Item, item::FlacItem, path_ts_latest};
|
||||
use crate::{DataSource, Item, item::FileItem, path_ts_latest};
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct DirDataSource {
|
||||
@@ -33,7 +33,7 @@ impl DataSource for DirDataSource {
|
||||
return Ok(None);
|
||||
}
|
||||
|
||||
return Ok(Some(Box::new(FlacItem {
|
||||
return Ok(Some(Box::new(FileItem {
|
||||
source_name: self.name.clone(),
|
||||
path: key.to_owned(),
|
||||
})));
|
||||
@@ -61,7 +61,7 @@ impl DataSource for DirDataSource {
|
||||
let item: Box<dyn Item<Key = Self::Key>> =
|
||||
match path.extension().map(|x| x.to_str()).flatten() {
|
||||
None => return None,
|
||||
Some("flac") => Box::new(FlacItem {
|
||||
Some("flac") => Box::new(FileItem {
|
||||
source_name: self.name.clone(),
|
||||
path: path.clone(),
|
||||
}),
|
||||
|
||||
@@ -1,5 +1,7 @@
|
||||
use chrono::{DateTime, Utc};
|
||||
use std::{error::Error, fmt::Debug, path::PathBuf};
|
||||
use std::error::Error;
|
||||
|
||||
use crate::{Item, Key};
|
||||
|
||||
/// A read-only set of [Item]s.
|
||||
pub trait DataSource {
|
||||
@@ -23,37 +25,3 @@ pub trait DataSource {
|
||||
/// Return the time of the latest change to the data in this source
|
||||
fn latest_change(&self) -> Result<Option<DateTime<Utc>>, Self::Error>;
|
||||
}
|
||||
|
||||
pub trait Item: Debug + Send + Sync + 'static {
|
||||
type Key: Key;
|
||||
|
||||
/// Get this item's unstructured schema
|
||||
///
|
||||
/// TODO: don't use json, use a lazily-evaluated type that supports binary
|
||||
fn json(&self) -> Result<serde_json::Value, std::io::Error>;
|
||||
|
||||
fn source_name(&self) -> &str;
|
||||
fn key(&self) -> &Self::Key;
|
||||
}
|
||||
|
||||
//
|
||||
// MARK: key
|
||||
//
|
||||
|
||||
pub trait Key: Debug + Clone + Send + Sync + 'static {
|
||||
/// Convert this key to a string, returning `None`
|
||||
/// if we encounter any kind of error.
|
||||
fn to_string(&self) -> Option<String>;
|
||||
|
||||
fn from_string(str: &str) -> Option<Self>;
|
||||
}
|
||||
|
||||
impl Key for PathBuf {
|
||||
fn from_string(str: &str) -> Option<Self> {
|
||||
str.parse().ok()
|
||||
}
|
||||
|
||||
fn to_string(&self) -> Option<String> {
|
||||
self.to_str().map(|x| x.to_owned())
|
||||
}
|
||||
}
|
||||
|
||||
84
crates/pile-dataset/src/value.rs
Normal file
84
crates/pile-dataset/src/value.rs
Normal file
@@ -0,0 +1,84 @@
|
||||
use std::rc::Rc;
|
||||
|
||||
use pile_config::Label;
|
||||
use serde_json::{Map, Value};
|
||||
use smartstring::{LazyCompact, SmartString};
|
||||
|
||||
use crate::{Item, extract::Extractor};
|
||||
|
||||
/// An immutable, lazily-computed value similar to [serde_json::Value].
|
||||
pub enum PileValue<'a, I: crate::Item> {
|
||||
Null,
|
||||
|
||||
/// A string
|
||||
String(SmartString<LazyCompact>),
|
||||
|
||||
/// An array of values
|
||||
Array(Vec<PileValue<'a, I>>),
|
||||
|
||||
/// A lazily-computed map of {label: value}
|
||||
Extractor(Rc<dyn Extractor<I> + 'a>),
|
||||
}
|
||||
|
||||
impl<I: Item> Clone for PileValue<'_, I> {
|
||||
fn clone(&self) -> Self {
|
||||
match self {
|
||||
Self::Null => Self::Null,
|
||||
Self::String(x) => Self::String(x.clone()),
|
||||
Self::Array(x) => Self::Array(x.clone()),
|
||||
Self::Extractor(x) => Self::Extractor(x.clone()),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a, I: Item> PileValue<'a, I> {
|
||||
pub fn query(&'a self, query: &[Label]) -> Result<Option<&'a Self>, std::io::Error> {
|
||||
let mut out = Some(self);
|
||||
|
||||
for q in query {
|
||||
out = match &out {
|
||||
None => return Ok(None),
|
||||
Some(Self::Null) => None,
|
||||
Some(Self::Array(_)) => None,
|
||||
Some(Self::String(_)) => None,
|
||||
Some(Self::Extractor(e)) => e.field(&q)?,
|
||||
};
|
||||
}
|
||||
|
||||
return Ok(out);
|
||||
}
|
||||
|
||||
pub fn as_str(&self) -> Option<&str> {
|
||||
match self {
|
||||
Self::String(x) => Some(&x),
|
||||
_ => None,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn to_json(&self) -> Result<Value, std::io::Error> {
|
||||
Ok(match self {
|
||||
Self::Null => Value::Null,
|
||||
Self::String(x) => Value::String(x.to_string()),
|
||||
|
||||
Self::Array(x) => Value::Array(
|
||||
x.iter()
|
||||
.map(|x| x.to_json())
|
||||
.collect::<Result<Vec<_>, _>>()?,
|
||||
),
|
||||
|
||||
Self::Extractor(e) => {
|
||||
let keys = e.fields()?;
|
||||
let map = keys
|
||||
.iter()
|
||||
.map(|k| {
|
||||
let v = e.field(k)?.unwrap();
|
||||
let v = v.to_json()?;
|
||||
Ok((k.to_string(), v))
|
||||
})
|
||||
.collect::<Result<Map<String, Value>, std::io::Error>>()?;
|
||||
|
||||
Value::Object(map)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user