fix: avoid blocking on read-half of the watch channel (#3721)

This commit is contained in:
三咲雅 misaki masa 2026-02-26 20:36:22 +08:00 committed by GitHub
parent 05aeae3b09
commit 884c265de1
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
9 changed files with 57 additions and 47 deletions

4
Cargo.lock generated
View file

@ -5859,7 +5859,6 @@ dependencies = [
"anyhow",
"bitflags 2.11.0",
"crossterm 0.29.0",
"dirs",
"globset",
"hashbrown 0.16.1",
"indexmap 2.13.0",
@ -6119,7 +6118,6 @@ name = "yazi-proxy"
version = "26.2.2"
dependencies = [
"tokio",
"yazi-binding",
"yazi-config",
"yazi-macro",
"yazi-parser",
@ -6256,7 +6254,7 @@ dependencies = [
name = "yazi-watcher"
version = "26.2.2"
dependencies = [
"anyhow",
"futures",
"hashbrown 0.16.1",
"notify",
"parking_lot",

View file

@ -22,7 +22,6 @@ yazi-tty = { path = "../yazi-tty", version = "26.2.2" }
anyhow = { workspace = true }
bitflags = { workspace = true }
crossterm = { workspace = true }
dirs = { workspace = true }
globset = { workspace = true }
hashbrown = { workspace = true }
indexmap = { workspace = true }

View file

@ -12,7 +12,6 @@ repository = "https://github.com/sxyazi/yazi"
workspace = true
[dependencies]
yazi-binding = { path = "../yazi-binding", version = "26.2.2" }
yazi-config = { path = "../yazi-config", version = "26.2.2" }
yazi-macro = { path = "../yazi-macro", version = "26.2.2" }
yazi-parser = { path = "../yazi-parser", version = "26.2.2" }

View file

@ -0,0 +1,27 @@
use std::sync::Arc;
use parking_lot::Mutex;
use tokio::sync::Notify;
#[derive(Clone, Debug, Default)]
pub struct LastValue<T> {
inner: Arc<(Notify, Mutex<Option<T>>)>,
}
impl<T> LastValue<T> {
pub fn set(&self, data: T) {
*self.inner.1.lock() = Some(data);
self.inner.0.notify_waiters();
}
pub async fn get(&self) -> T {
loop {
let notified = self.inner.0.notified();
if let Some(data) = self.inner.1.lock().take() {
return data;
}
notified.await;
}
}
}

View file

@ -1,6 +1,6 @@
yazi_macro::mod_pub!(data errors event loc path pool scheme shell strand translit url wtf8);
yazi_macro::mod_flat!(alias bytes chars completion_token condition debounce env id layer localset natsort os predictor ro_cell source sync_cell terminal tests throttle time utf8);
yazi_macro::mod_flat!(alias bytes chars completion_token condition debounce env id last_value layer localset natsort os predictor ro_cell source sync_cell terminal tests throttle time utf8);
pub fn init() {
LOCAL_SET.with(tokio::task::LocalSet::new);

View file

@ -21,7 +21,7 @@ yazi-shared = { path = "../yazi-shared", version = "26.2.2" }
yazi-vfs = { path = "../yazi-vfs", version = "26.2.2" }
# External dependencies
anyhow = { workspace = true }
futures = { workspace = true }
hashbrown = { workspace = true }
notify = { version = "8.2.0", default-features = false, features = [ "macos_fsevent" ] }
parking_lot = { workspace = true }

View file

@ -5,7 +5,7 @@ use notify::{PollWatcher, RecommendedWatcher, RecursiveMode, Result, Watcher};
use tokio::{pin, sync::mpsc::{self, UnboundedReceiver}};
use tokio_stream::{StreamExt, wrappers::UnboundedReceiverStream};
use tracing::error;
use yazi_fs::{File, FilesOp, cha::Cha, mounts::PARTITIONS, provider};
use yazi_fs::{File, FilesOp, mounts::PARTITIONS, provider::{self, Provider}};
use yazi_shared::url::{UrlBuf, UrlLike};
use yazi_vfs::VfsFile;
@ -74,13 +74,15 @@ impl Local {
}
}
pub(crate) fn soundless(path: &Path) -> bool {
pub(crate) async fn soundless(path: &Path) -> bool {
if cfg!(target_os = "netbsd") || yazi_adapter::WSL.get() {
return true;
}
let Ok(meta) = path.metadata() else { return true };
PARTITIONS.read().soundless(Cha::new(path.file_name().unwrap_or_default(), meta))
match provider::local::Local::regular(path).metadata().await {
Ok(cha) => PARTITIONS.read().soundless(cha),
Err(_) => true,
}
}
async fn changed(rx: UnboundedReceiver<UrlBuf>) {

View file

@ -34,13 +34,13 @@ impl<'a> Watchee<'a> {
})
}
pub(super) fn new<U>(url: U) -> Self
pub(super) async fn new<U>(url: U) -> Self
where
U: Into<UrlCow<'a>>,
{
let url = url.into();
if let Some(path) = url.as_local() {
let b = Local::soundless(path);
let b = Local::soundless(path).await;
Self::Local(url, b)
} else {
Self::Remote(url)

View file

@ -1,25 +1,25 @@
use futures::{StreamExt, stream};
use hashbrown::HashSet;
use tokio::sync::watch;
use tracing::error;
use yazi_fs::FsUrl;
use yazi_shared::url::{UrlBuf, UrlCow, UrlLike};
use yazi_shared::{LastValue, url::{UrlBuf, UrlCow, UrlLike}};
use crate::{Reporter, WATCHED, Watchee, backend::Backend};
pub struct Watcher {
tx: watch::Sender<HashSet<UrlBuf>>,
last: LastValue<HashSet<UrlBuf>>,
reporter: Reporter,
}
impl Watcher {
pub fn serve() -> Self {
let (tx, rx) = watch::channel(Default::default());
let last = LastValue::default();
let backend = Backend::serve();
let reporter = backend.reporter.clone();
tokio::spawn(Self::watched(rx, backend));
Self { tx, reporter }
tokio::spawn(Self::watched(last.clone(), backend));
Self { last, reporter }
}
pub fn watch<'a, I>(&mut self, urls: I)
@ -28,18 +28,18 @@ impl Watcher {
I::Item: Into<UrlCow<'a>>,
{
let it = urls.into_iter();
let mut set = HashSet::with_capacity(it.size_hint().0);
let mut urls = HashSet::with_capacity(it.size_hint().0);
for url in it.map(Into::into) {
if !url.is_absolute() {
continue;
} else if let Some(cache) = url.cache() {
set.insert(cache.into());
urls.insert(cache.into());
}
set.insert(url.into_owned());
urls.insert(url.into_owned());
}
self.tx.send(set).ok();
self.last.set(urls);
}
pub fn report<'a, I>(&self, urls: I)
@ -50,46 +50,31 @@ impl Watcher {
self.reporter.report(urls);
}
async fn watched(mut rx: watch::Receiver<HashSet<UrlBuf>>, mut backend: Backend) {
async fn watched(last: LastValue<HashSet<UrlBuf>>, mut backend: Backend) {
loop {
let (rx_, to_unwatch, to_watch) = Self::diff(rx).await;
rx = rx_;
let (to_unwatch, to_watch) = Self::diff(last.get().await).await;
if !to_unwatch.is_empty() || !to_watch.is_empty() {
backend = Self::sync(backend, to_unwatch, to_watch).await;
backend = backend.sync().await;
}
if rx.changed().await.is_err() {
break;
}
}
}
async fn diff(
mut rx: watch::Receiver<HashSet<UrlBuf>>,
) -> (watch::Receiver<HashSet<UrlBuf>>, Vec<Watchee<'static>>, Vec<Watchee<'static>>) {
tokio::task::spawn_blocking(move || {
let new = rx.borrow_and_update();
let new_: HashSet<_> = new.iter().map(Watchee::new).collect();
async fn diff(urls: HashSet<UrlBuf>) -> (Vec<Watchee<'static>>, HashSet<Watchee<'static>>) {
let mut new: HashSet<_> = stream::iter(urls).then(Watchee::new).collect().await;
let old = WATCHED.read();
let old = WATCHED.read();
let to_unwatch = old.difference(&new).map(Watchee::to_static).collect();
new.retain(|watchee| !old.contains(watchee));
let to_unwatch = old.difference(&new_).map(Watchee::to_static).collect();
let to_watch = new_.difference(&old).map(Watchee::to_static).collect();
drop(new_);
drop(new);
(rx, to_unwatch, to_watch)
})
.await
.unwrap()
(to_unwatch, new)
}
async fn sync(
mut backend: Backend,
to_unwatch: Vec<Watchee<'static>>,
to_watch: Vec<Watchee<'static>>,
to_watch: HashSet<Watchee<'static>>,
) -> Backend {
tokio::task::spawn_blocking(move || {
for watchee in to_unwatch {