perf: concurrent chunked upload and download of a single file over SFTP (#3393)

This commit is contained in:
三咲雅 misaki masa 2025-12-01 22:39:36 +08:00 committed by GitHub
parent d910104c00
commit 44e244b9d6
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
14 changed files with 464 additions and 143 deletions

View file

@ -1,9 +1,7 @@
use std::io;
use std::io::{self};
use tokio::{select, sync::{mpsc, oneshot}};
use yazi_fs::provider::Attrs;
use yazi_macro::ok_or_not_found;
use yazi_shared::{strand::{StrandBuf, StrandLike}, url::{AsUrl, Url, UrlBuf, UrlLike}};
use yazi_shared::{strand::{StrandBuf, StrandLike}, url::{AsUrl, UrlBuf, UrlLike}};
use crate::provider;
@ -65,66 +63,3 @@ async fn _unique_name(mut url: UrlBuf, append: bool) -> io::Result<UrlBuf> {
Ok(url)
}
pub fn copy_with_progress<U, V, A>(
from: U,
to: V,
attrs: A,
) -> mpsc::Receiver<Result<u64, io::Error>>
where
U: AsUrl,
V: AsUrl,
A: Into<Attrs>,
{
_copy_with_progress(from.as_url(), to.as_url(), attrs.into())
}
fn _copy_with_progress(from: Url, to: Url, attrs: Attrs) -> mpsc::Receiver<Result<u64, io::Error>> {
let (prog_tx, prog_rx) = mpsc::channel(1);
let (done_tx, mut done_rx) = oneshot::channel();
tokio::spawn({
let (from, to) = (from.to_owned(), to.to_owned());
async move {
done_tx.send(provider::copy(from, to, attrs).await).ok();
}
});
tokio::spawn({
let (prog_tx, to) = (prog_tx.clone(), to.to_owned());
async move {
let mut last = 0;
let mut done = None;
loop {
select! {
res = &mut done_rx => done = Some(res.unwrap()),
_ = prog_tx.closed() => break,
_ = tokio::time::sleep(std::time::Duration::from_secs(3)) => {},
}
match done {
Some(Ok(len)) => {
if len > last {
prog_tx.send(Ok(len - last)).await.ok();
}
prog_tx.send(Ok(0)).await.ok();
break;
}
Some(Err(e)) => {
prog_tx.send(Err(e)).await.ok();
break;
}
None => {}
}
let len = provider::symlink_metadata(&to).await.map(|m| m.len).unwrap_or(0);
if len > last {
prog_tx.send(Ok(len - last)).await.ok();
last = len;
}
}
}
});
prog_rx
}

View file

@ -0,0 +1,133 @@
use std::{io::{self, SeekFrom}, sync::{Arc, atomic::{AtomicU64, Ordering}}};
use futures::{StreamExt, TryStreamExt};
use tokio::{io::{AsyncReadExt, AsyncSeekExt, AsyncWriteExt, BufReader, BufWriter}, select, sync::{mpsc, oneshot}};
use yazi_fs::provider::{Attrs, FileBuilder};
use yazi_shared::url::{Url, UrlBuf};
use crate::provider::{self, Gate};
pub(super) async fn copy_impl(from: Url<'_>, to: Url<'_>, attrs: Attrs) -> io::Result<u64> {
let src = provider::open(from).await?;
let dist = provider::create(to).await?;
let mut reader = BufReader::with_capacity(524288, src);
let mut writer = BufWriter::with_capacity(524288, dist);
let written = tokio::io::copy(&mut reader, &mut writer).await?;
writer.flush().await?;
writer.get_ref().set_attrs(attrs).await.ok();
writer.shutdown().await.ok();
Ok(written)
}
pub(super) fn copy_with_progress_impl(
from: UrlBuf,
to: UrlBuf,
attrs: Attrs,
) -> mpsc::Receiver<io::Result<u64>> {
let acc = Arc::new(AtomicU64::new(0));
let (from, to) = (Arc::new(from), Arc::new(to));
let (prog_tx, prog_rx) = mpsc::channel(10);
let (done_tx, mut done_rx) = oneshot::channel();
let (acc_, prog_tx_) = (acc.clone(), prog_tx.clone());
tokio::spawn(async move {
let (cha, mut src) = {
let f = provider::open(&*from).await?;
(f.metadata().await?, Some(f))
};
let mut dist = {
let f = provider::create(&*to).await?;
f.set_len(cha.len).await?;
Some(f)
};
let chunks = (cha.len + 10485760 - 1) / 10485760;
let result = futures::stream::iter(0..chunks)
.map(|i| {
let acc_ = acc_.clone();
let (from, to) = (from.clone(), to.clone());
let (src, dist) = (src.take(), dist.take());
async move {
let offset = i * 10485760;
let take = cha.len.saturating_sub(offset).min(10485760);
let mut src = BufReader::with_capacity(524288, match src {
Some(f) => f,
None => provider::open(&*from).await?,
});
let mut dist = BufWriter::with_capacity(524288, match dist {
Some(f) => f,
None => Gate::default().write(true).open(&*to).await?,
});
src.seek(SeekFrom::Start(offset)).await?;
dist.seek(SeekFrom::Start(offset)).await?;
let mut src = src.take(take);
let mut buf = vec![0u8; 65536];
let mut copied = 0u64;
loop {
let n = src.read(&mut buf).await?;
if n == 0 {
break;
}
dist.write_all(&buf[..n]).await?;
copied += n as u64;
acc_.fetch_add(n as u64, Ordering::SeqCst);
}
dist.flush().await?;
if i == chunks - 1 {
dist.get_ref().set_attrs(attrs).await.ok();
}
dist.shutdown().await.ok();
if copied == take {
Ok(())
} else {
Err(io::Error::other(format!(
"short copy for chunk {i}: copied {copied} bytes, expected {take}"
)))
}
}
})
.buffer_unordered(3)
.try_for_each(|_| async { Ok(()) })
.await;
let n = acc_.swap(0, Ordering::SeqCst);
if n > 0 {
prog_tx_.send(Ok(n)).await.ok();
}
if let Err(e) = result {
prog_tx_.send(Err(e)).await.ok();
} else {
prog_tx_.send(Ok(0)).await.ok();
}
done_tx.send(()).ok();
Ok::<_, io::Error>(())
});
tokio::spawn(async move {
loop {
select! {
_ = &mut done_rx => break,
_ = prog_tx.closed() => break,
_ = tokio::time::sleep(std::time::Duration::from_secs(3)) => {},
}
let n = acc.swap(0, Ordering::SeqCst);
if n > 0 {
prog_tx.send(Ok(n)).await.ok();
}
}
});
prog_rx
}

View file

@ -1,5 +1,5 @@
yazi_macro::mod_pub!(sftp);
yazi_macro::mod_flat!(calculator dir_entry gate provider providers read_dir rw_file);
yazi_macro::mod_flat!(calculator copier dir_entry gate provider providers read_dir rw_file);
pub(super) fn init() { sftp::init(); }

View file

@ -1,6 +1,6 @@
use std::io;
use tokio::io::{AsyncWriteExt, BufReader, BufWriter};
use tokio::sync::mpsc;
use yazi_fs::{cha::Cha, provider::{Attrs, Provider, local::Local}};
use yazi_shared::{path::{AsPath, PathBufDyn}, url::{AsUrl, UrlBuf, UrlCow}};
@ -51,18 +51,29 @@ where
(false, false) if from.scheme().covariant(to.scheme()) => {
Providers::new(from).await?.copy(to.loc(), attrs).await
}
(true, false) | (false, true) | (false, false) => super::copy_impl(from, to, attrs).await,
}
}
pub async fn copy_with_progress<U, V, A>(
from: U,
to: V,
attrs: A,
) -> io::Result<mpsc::Receiver<Result<u64, io::Error>>>
where
U: AsUrl,
V: AsUrl,
A: Into<Attrs>,
{
let (from, to) = (from.as_url(), to.as_url());
match (from.kind().is_local(), to.kind().is_local()) {
(true, true) => Local::new(from).await?.copy_with_progress(to.loc(), attrs),
(false, false) if from.scheme().covariant(to.scheme()) => {
Providers::new(from).await?.copy_with_progress(to.loc(), attrs)
}
(true, false) | (false, true) | (false, false) => {
let src = Providers::new(from).await?.open().await?;
let dist = Providers::new(to).await?.create().await?;
let mut reader = BufReader::with_capacity(524288, src);
let mut writer = BufWriter::with_capacity(524288, dist);
let written = tokio::io::copy(&mut reader, &mut writer).await?;
writer.flush().await?;
writer.get_ref().set_attrs(attrs).await.ok();
writer.shutdown().await.ok();
Ok(written)
Ok(super::copy_with_progress_impl(from.to_owned(), to.to_owned(), attrs.into()))
}
}
}
@ -128,6 +139,13 @@ where
identical(a, b).await.unwrap_or(false)
}
pub async fn open<U>(url: U) -> io::Result<RwFile>
where
U: AsUrl,
{
Providers::new(url.as_url()).await?.open().await
}
pub async fn read_dir<U>(url: U) -> io::Result<ReadDir>
where
U: AsUrl,

View file

@ -1,5 +1,6 @@
use std::io;
use tokio::sync::mpsc;
use yazi_fs::{cha::Cha, provider::{Attrs, Provider}};
use yazi_shared::{path::{AsPath, PathBufDyn}, url::{Url, UrlBuf, UrlCow}};
@ -47,6 +48,17 @@ impl<'a> Provider for Providers<'a> {
}
}
fn copy_with_progress<P, A>(&self, to: P, attrs: A) -> io::Result<mpsc::Receiver<io::Result<u64>>>
where
P: AsPath,
A: Into<Attrs>,
{
match self {
Self::Local(p) => p.copy_with_progress(to, attrs),
Self::Sftp(p) => p.copy_with_progress(to, attrs),
}
}
async fn create(&self) -> io::Result<Self::File> {
Ok(match self {
Self::Local(p) => p.create().await?.into(),

View file

@ -1,6 +1,6 @@
use std::{io, pin::Pin};
use tokio::io::{AsyncRead, AsyncWrite};
use tokio::io::{AsyncRead, AsyncSeek, AsyncWrite};
use yazi_fs::provider::Attrs;
pub enum RwFile {
@ -17,6 +17,14 @@ impl From<yazi_sftp::fs::File> for RwFile {
}
impl RwFile {
// FIXME: path
pub async fn metadata(&self) -> io::Result<yazi_fs::cha::Cha> {
Ok(match self {
Self::Tokio(f) => yazi_fs::cha::Cha::new("// FIXME", f.metadata().await?),
Self::Sftp(f) => super::sftp::Cha::try_from(("// FIXME".as_bytes(), &f.fstat().await?))?.0,
})
}
pub async fn set_attrs(&self, attrs: Attrs) -> io::Result<()> {
match self {
Self::Tokio(f) => {
@ -37,6 +45,15 @@ impl RwFile {
Ok(())
}
pub async fn set_len(&self, size: u64) -> io::Result<()> {
Ok(match self {
Self::Tokio(f) => f.set_len(size).await?,
Self::Sftp(f) => {
f.fsetstat(&yazi_sftp::fs::Attrs { size: Some(size), ..Default::default() }).await?
}
})
}
}
impl AsyncRead for RwFile {
@ -53,6 +70,27 @@ impl AsyncRead for RwFile {
}
}
impl AsyncSeek for RwFile {
#[inline]
fn start_seek(mut self: Pin<&mut Self>, position: io::SeekFrom) -> io::Result<()> {
match &mut *self {
Self::Tokio(f) => Pin::new(f).start_seek(position),
Self::Sftp(f) => Pin::new(f).start_seek(position),
}
}
#[inline]
fn poll_complete(
mut self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<io::Result<u64>> {
match &mut *self {
Self::Tokio(f) => Pin::new(f).poll_complete(cx),
Self::Sftp(f) => Pin::new(f).poll_complete(cx),
}
}
}
impl AsyncWrite for RwFile {
#[inline]
fn poll_write(

View file

@ -1,10 +1,10 @@
use std::{io, sync::Arc};
use tokio::io::{AsyncWriteExt, BufReader, BufWriter};
use tokio::{io::{AsyncWriteExt, BufReader, BufWriter}, sync::mpsc::Receiver};
use yazi_config::vfs::{ProviderSftp, Vfs};
use yazi_fs::provider::{DirReader, FileHolder, Provider};
use yazi_sftp::fs::{Attrs, Flags};
use yazi_shared::{path::{AsPath, PathBufDyn}, pool::InternStr, url::{Url, UrlBuf, UrlCow, UrlLike}};
use yazi_shared::{loc::LocBuf, path::{AsPath, PathBufDyn}, pool::InternStr, scheme::SchemeKind, url::{Url, UrlBuf, UrlCow, UrlLike}};
use super::Cha;
use crate::provider::sftp::Conn;
@ -88,6 +88,23 @@ impl<'a> Provider for Sftp<'a> {
Ok(written)
}
fn copy_with_progress<P, A>(&self, to: P, attrs: A) -> io::Result<Receiver<io::Result<u64>>>
where
P: AsPath,
A: Into<yazi_fs::provider::Attrs>,
{
let to = UrlBuf::Sftp {
loc: LocBuf::<typed_path::UnixPathBuf>::saturated(
to.as_path().to_unix_owned()?,
SchemeKind::Sftp,
),
domain: self.name.intern(),
};
let from = self.url.to_owned();
Ok(crate::provider::copy_with_progress_impl(from, to, attrs.into()))
}
async fn create_dir(&self) -> io::Result<()> {
Ok(self.op().await?.mkdir(self.path, Attrs::default()).await?)
}