Parallelism

This commit is contained in:
2025-07-13 17:18:25 +00:00
parent 939ccc90fe
commit cd7d5008dc
7 changed files with 286 additions and 32 deletions

View File

@@ -140,7 +140,7 @@ impl LoggingPreset {
/// log: LogCli,
/// }
/// ```
#[derive(Parser, Debug)]
#[derive(Parser, Debug, Clone)]
pub struct LogCli {
/// Increase verbosity (can be repeated)
#[arg(short, action = clap::ArgAction::Count,global = true)]

View File

@@ -1,9 +1,10 @@
use anyhow::Result;
use anyhow::{Error, Result};
use clap::Parser;
use indicatif::ProgressIterator;
use logging::LogCli;
use std::{path::PathBuf, process::ExitCode};
use style::progress_style;
use tokio::task::JoinSet;
use tool::PickTool;
use tracing::{debug, error, trace};
@@ -15,7 +16,6 @@ pub mod tool;
mod prepare;
// count size of files to process
// parallelism
// capture/print stdout/stderr
// workdir vs root
// package & auto-build
@@ -37,7 +37,7 @@ mod prepare;
// warn when no matches
/// Pick is a utility that processes files based on pattern matching rules.
#[derive(Parser, Debug)]
#[derive(Parser, Debug, Clone)]
#[command(version, about, long_about = None, styles=style::get_styles())]
struct Cli {
#[command(flatten)]
@@ -59,10 +59,6 @@ fn main() -> ExitCode {
}
fn main_inner() -> Result<ExitCode> {
//
// MARK: setup
//
let mut cli = Cli::parse();
cli.manifest = std::path::absolute(&cli.manifest)?;
let cli = cli;
@@ -81,18 +77,55 @@ fn main_inner() -> Result<ExitCode> {
let queue = prepare::list_queue(&manifest, &work_dir)?;
#[expect(clippy::unwrap_used)] // Fix later
let bash = manifest.tool.bash.as_ref().unwrap();
bash.before(&cli.manifest, &manifest.config)?;
let bash = {
#[expect(clippy::unwrap_used)] // Fix later
let bash = manifest.tool.bash.clone().unwrap();
bash.before(&cli.manifest, &manifest.config)?;
for ctx in queue
.into_iter()
.progress_with_style(progress_style())
.with_message("Processing")
{
trace!("Running `{}` on {}", ctx.task, ctx.path_rel_str);
bash.run(&cli.manifest, &manifest.config, ctx)?;
}
bash
};
let max_tasks = manifest.config.threads.map(|x| x.get()).unwrap_or(1);
debug!("Starting runtime with {max_tasks} threads");
let rt = tokio::runtime::Builder::new_multi_thread()
.worker_threads(3)
.max_blocking_threads(max_tasks)
.build()
.unwrap();
let rt_cli = cli.clone();
let rt_manifest = manifest.clone();
let rt_bash = bash.clone();
rt.block_on(async move {
let mut js: JoinSet<Result<()>> = JoinSet::new();
for ctx in queue
.into_iter()
.progress_with_style(progress_style())
.with_message("Processing")
{
trace!("Running `{}` on {}", ctx.task, ctx.path_rel_str);
let cli = rt_cli.clone();
let manifest = rt_manifest.clone();
let bash = rt_bash.clone();
js.spawn_blocking(move || {
bash.run(&cli.manifest, &manifest.config, ctx)?;
return Ok(());
});
if js.len() >= max_tasks {
js.join_next().await.unwrap()??;
}
}
while let Some(x) = js.join_next().await {
x.unwrap()?
}
return Ok::<_, Error>(());
})?;
bash.after(&cli.manifest, &manifest.config)?;

View File

@@ -1,13 +1,16 @@
use anyhow::Result;
use indexmap::IndexMap;
use serde::Deserialize;
use std::path::{Path, PathBuf};
use std::{
num::NonZero,
path::{Path, PathBuf},
};
use crate::tool::ToolConfig;
use super::rule::FlatPickRule;
#[derive(Debug, Deserialize)]
#[derive(Debug, Deserialize, Clone)]
#[serde(deny_unknown_fields)]
pub struct Manifest {
pub config: PickConfig,
@@ -32,6 +35,9 @@ pub struct PickConfig {
#[serde(default = "default_false")]
pub process_links: bool,
#[serde(default)]
pub threads: Option<NonZero<usize>>,
}
impl PickConfig {

View File

@@ -8,7 +8,7 @@ use crate::manifest::types::PickConfig;
use super::{PickTool, TaskContext};
#[derive(Debug, Deserialize)]
#[derive(Debug, Deserialize, Clone)]
pub struct ToolBash {
#[serde(default)]
pub before: Option<String>,

View File

@@ -1,5 +1,5 @@
use anyhow::Result;
use serde::{Deserialize, de::DeserializeOwned};
use serde::{de::DeserializeOwned, Deserialize};
use std::{
fmt::Debug,
path::{Path, PathBuf},
@@ -41,7 +41,7 @@ pub struct TaskContext {
pub path_rel_str: String,
}
#[derive(Debug, Deserialize)]
#[derive(Debug, Deserialize, Clone)]
pub struct ToolConfig {
#[serde(default)]
pub bash: Option<ToolBash>,