Fix CancelableTask

This commit is contained in:
2026-03-10 17:01:49 -07:00
parent fd66a4995a
commit a05cf9da01
6 changed files with 29 additions and 30 deletions

11
Cargo.lock generated
View File

@@ -2385,7 +2385,6 @@ dependencies = [
"pile-toolbox", "pile-toolbox",
"serde", "serde",
"serde_json", "serde_json",
"signal-hook",
"tokio", "tokio",
"tokio-stream", "tokio-stream",
"toml", "toml",
@@ -3037,16 +3036,6 @@ version = "1.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0fda2ff0d084019ba4d7c6f371c95d8fd75ce3524c3cb8fb653a3023f6323e64" checksum = "0fda2ff0d084019ba4d7c6f371c95d8fd75ce3524c3cb8fb653a3023f6323e64"
[[package]]
name = "signal-hook"
version = "0.4.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3b57709da74f9ff9f4a27dce9526eec25ca8407c45a7887243b031a58935fb8e"
dependencies = [
"libc",
"signal-hook-registry",
]
[[package]] [[package]]
name = "signal-hook-registry" name = "signal-hook-registry"
version = "1.4.8" version = "1.4.8"

View File

@@ -117,7 +117,6 @@ kamadak-exif = "0.6.1"
thiserror = "2.0.18" thiserror = "2.0.18"
anyhow = "1.0.102" anyhow = "1.0.102"
itertools = "0.14.0" itertools = "0.14.0"
signal-hook = "0.4.3"
rand = "0.10.0" rand = "0.10.0"
strum = { version = "0.27.2", features = ["derive"] } strum = { version = "0.27.2", features = ["derive"] }
walkdir = "2.5.0" walkdir = "2.5.0"

View File

@@ -80,8 +80,10 @@ impl CancelFlag {
#[inline] #[inline]
pub async fn await_cancel(&self) { pub async fn await_cancel(&self) {
if self.is_cancelled() {
return;
}
self.notify.notified().await; self.notify.notified().await;
assert!(self.is_cancelled());
} }
#[inline] #[inline]

View File

@@ -22,7 +22,6 @@ serde = { workspace = true }
anyhow = { workspace = true } anyhow = { workspace = true }
indicatif = { workspace = true } indicatif = { workspace = true }
tracing-indicatif = { workspace = true } tracing-indicatif = { workspace = true }
signal-hook = { workspace = true }
anstyle = { workspace = true } anstyle = { workspace = true }
toml = { workspace = true } toml = { workspace = true }
serde_json = { workspace = true } serde_json = { workspace = true }

View File

@@ -94,8 +94,7 @@ async fn main_inner() -> Result<i32> {
let ctx = GlobalContext { mp }; let ctx = GlobalContext { mp };
let task = cli.cmd.start(ctx).context("while starting task")?; let task = cli.cmd.start(ctx).context("while starting task")?;
let signal_task = let signal_task = start_signal_task(task.flag().clone());
start_signal_task(task.flag().clone()).context("while starting signal task")?;
match task.join().await { match task.join().await {
Ok(CancelableTaskResult::Finished(Ok(code))) => Ok(code), Ok(CancelableTaskResult::Finished(Ok(code))) => Ok(code),

View File

@@ -1,24 +1,35 @@
use anyhow::{Context, Result};
use pile_toolbox::cancelabletask::CancelFlag; use pile_toolbox::cancelabletask::CancelFlag;
use signal_hook::{consts::TERM_SIGNALS, iterator::Signals};
use tokio::task::JoinHandle; use tokio::task::JoinHandle;
use tracing::warn; use tracing::warn;
/// Start an async task that listens for OS signals, /// Start an async task that listens for OS signals,
/// setting `should_exit` to `true` when an exit signal /// setting `should_exit` to `true` when an exit signal
/// is caught. /// is caught.
pub fn start_signal_task(flag: CancelFlag) -> Result<JoinHandle<()>> { #[expect(clippy::expect_used)]
let mut signals = Signals::new(TERM_SIGNALS).context("Failed to initialize signal handler")?; pub fn start_signal_task(flag: CancelFlag) -> JoinHandle<()> {
tokio::spawn(async move {
let task = tokio::task::spawn_blocking(move || { #[cfg(unix)]
for sig in signals.forever() { {
if TERM_SIGNALS.contains(&sig) { use tokio::signal::unix::{SignalKind, signal};
warn!("Received signal {sig}, trying to exit cleanly"); let mut sigterm =
flag.cancel(); signal(SignalKind::terminate()).expect("Failed to install SIGTERM handler");
break; tokio::select! {
r = tokio::signal::ctrl_c() => {
if let Err(e) = r { tracing::error!("ctrl_c error: {e}"); }
warn!("Received SIGINT, exiting cleanly");
}
_ = sigterm.recv() => {
warn!("Received SIGTERM, exiting cleanly");
}
} }
} }
}); #[cfg(not(unix))]
{
return Ok(task); if let Err(e) = tokio::signal::ctrl_c().await {
tracing::error!("ctrl_c error: {e}");
}
warn!("Received Ctrl+C, exiting cleanly");
}
flag.cancel();
})
} }