diff --git a/yazi-fs/src/provider/local/copier.rs b/yazi-fs/src/provider/local/copier.rs new file mode 100644 index 00000000..bab205af --- /dev/null +++ b/yazi-fs/src/provider/local/copier.rs @@ -0,0 +1,99 @@ +use std::{io, path::PathBuf}; + +use tokio::{select, sync::{mpsc, oneshot}}; + +use crate::provider::Attrs; + +pub(super) async fn copy_impl(from: PathBuf, to: PathBuf, attrs: Attrs) -> io::Result { + #[cfg(any(target_os = "linux", target_os = "android"))] + { + use std::os::unix::fs::OpenOptionsExt; + + tokio::task::spawn_blocking(move || { + let mut opts = std::fs::OpenOptions::new(); + if let Some(mode) = attrs.mode { + opts.mode(mode.bits() as _); + } + + let mut reader = std::fs::File::open(from)?; + let mut writer = opts.write(true).create(true).truncate(true).open(to)?; + let written = std::io::copy(&mut reader, &mut writer)?; + + if let Some(mode) = attrs.mode { + writer.set_permissions(mode.into()).ok(); + } + writer.set_times(attrs.into()).ok(); + + Ok(written) + }) + .await? + } + + #[cfg(not(any(target_os = "linux", target_os = "android")))] + { + tokio::task::spawn_blocking(move || { + let written = std::fs::copy(from, &to)?; + + if let Ok(file) = std::fs::File::options().write(true).open(to) { + file.set_times(attrs.into()).ok(); + } + + Ok(written) + }) + .await? + } +} + +pub(super) fn copy_with_progress_impl( + from: PathBuf, + to: PathBuf, + attrs: Attrs, +) -> mpsc::Receiver> { + let (prog_tx, prog_rx) = mpsc::channel(10); + let (done_tx, mut done_rx) = oneshot::channel(); + + tokio::spawn({ + let to = to.to_owned(); + async move { + done_tx.send(copy_impl(from, to, attrs).await).ok(); + } + }); + + tokio::spawn({ + let prog_tx = prog_tx.clone(); + async move { + let mut last = 0; + let mut done = None; + loop { + select! { + res = &mut done_rx => done = Some(res.unwrap()), + _ = prog_tx.closed() => break, + _ = tokio::time::sleep(std::time::Duration::from_secs(3)) => {}, + } + + match done { + Some(Ok(len)) => { + if len > last { + prog_tx.send(Ok(len - last)).await.ok(); + } + prog_tx.send(Ok(0)).await.ok(); + break; + } + Some(Err(e)) => { + prog_tx.send(Err(e)).await.ok(); + break; + } + None => {} + } + + let len = tokio::fs::symlink_metadata(&to).await.map(|m| m.len()).unwrap_or(0); + if len > last { + prog_tx.send(Ok(len - last)).await.ok(); + last = len; + } + } + } + }); + + prog_rx +} diff --git a/yazi-fs/src/provider/local/local.rs b/yazi-fs/src/provider/local/local.rs index 8c51fed0..a3e25b17 100644 --- a/yazi-fs/src/provider/local/local.rs +++ b/yazi-fs/src/provider/local/local.rs @@ -1,5 +1,6 @@ -use std::{io, path::{Path, PathBuf}, sync::Arc}; +use std::{io, path::Path, sync::Arc}; +use tokio::sync::mpsc; use yazi_shared::{path::{AsPath, PathBufDyn}, scheme::SchemeKind, url::{Url, UrlBuf, UrlCow}}; use crate::{cha::Cha, path::absolute_url, provider::{Attrs, Provider}}; @@ -35,7 +36,17 @@ impl<'a> Provider for Local<'a> { { let to = to.as_path().to_os_owned()?; let from = self.path.to_owned(); - Self::copy_impl(from, to, attrs).await + super::copy_impl(from, to, attrs).await + } + + fn copy_with_progress(&self, to: P, attrs: A) -> io::Result>> + where + P: AsPath, + A: Into, + { + let to = to.as_path().to_os_owned()?; + let from = self.path.to_owned(); + Ok(super::copy_with_progress_impl(from, to, attrs.into())) } #[inline] @@ -204,46 +215,6 @@ impl<'a> Provider for Local<'a> { } impl<'a> Local<'a> { - async fn copy_impl(from: PathBuf, to: PathBuf, attrs: Attrs) -> io::Result { - #[cfg(any(target_os = "linux", target_os = "android"))] - { - use std::os::unix::fs::OpenOptionsExt; - - tokio::task::spawn_blocking(move || { - let mut opts = std::fs::OpenOptions::new(); - if let Some(mode) = attrs.mode { - opts.mode(mode.bits() as _); - } - - let mut reader = std::fs::File::open(from)?; - let mut writer = opts.write(true).create(true).truncate(true).open(to)?; - let written = std::io::copy(&mut reader, &mut writer)?; - - if let Some(mode) = attrs.mode { - writer.set_permissions(mode.into()).ok(); - } - writer.set_times(attrs.into()).ok(); - - Ok(written) - }) - .await? - } - - #[cfg(not(any(target_os = "linux", target_os = "android")))] - { - tokio::task::spawn_blocking(move || { - let written = std::fs::copy(from, &to)?; - - if let Ok(file) = std::fs::File::options().write(true).open(to) { - file.set_times(attrs.into()).ok(); - } - - Ok(written) - }) - .await? - } - } - #[inline] pub async fn read(&self) -> io::Result> { tokio::fs::read(self.path).await } diff --git a/yazi-fs/src/provider/local/mod.rs b/yazi-fs/src/provider/local/mod.rs index 2589b059..e69473f2 100644 --- a/yazi-fs/src/provider/local/mod.rs +++ b/yazi-fs/src/provider/local/mod.rs @@ -1 +1 @@ -yazi_macro::mod_flat!(calculator casefold dir_entry gate identical local read_dir); +yazi_macro::mod_flat!(calculator casefold copier dir_entry gate identical local read_dir); diff --git a/yazi-fs/src/provider/traits.rs b/yazi-fs/src/provider/traits.rs index 3a92b04d..cc7c43f4 100644 --- a/yazi-fs/src/provider/traits.rs +++ b/yazi-fs/src/provider/traits.rs @@ -1,13 +1,13 @@ use std::io; -use tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt}; +use tokio::{io::{AsyncRead, AsyncSeek, AsyncWrite, AsyncWriteExt}, sync::mpsc}; use yazi_macro::ok_or_not_found; use yazi_shared::{path::{AsPath, PathBufDyn}, strand::StrandCow, url::{AsUrl, Url, UrlBuf}}; use crate::{cha::{Cha, ChaType}, provider::Attrs}; pub trait Provider: Sized { - type File: AsyncRead + AsyncWrite + Unpin; + type File: AsyncRead + AsyncSeek + AsyncWrite + Unpin; type Gate: FileBuilder; type ReadDir: DirReader + 'static; type UrlCow; @@ -23,6 +23,15 @@ pub trait Provider: Sized { where P: AsPath; + fn copy_with_progress( + &self, + to: P, + attrs: A, + ) -> io::Result>> + where + P: AsPath, + A: Into; + fn create(&self) -> impl Future> { async move { self.gate().write(true).create(true).truncate(true).open(self.url()).await } } @@ -207,7 +216,7 @@ pub trait FileHolder { // --- FileBuilder pub trait FileBuilder: Sized + Default { - type File: AsyncRead + AsyncWrite + Unpin; + type File: AsyncRead + AsyncSeek + AsyncWrite + Unpin; fn append(&mut self, append: bool) -> &mut Self; diff --git a/yazi-scheduler/src/file/file.rs b/yazi-scheduler/src/file/file.rs index d1a17619..075749cb 100644 --- a/yazi-scheduler/src/file/file.rs +++ b/yazi-scheduler/src/file/file.rs @@ -7,7 +7,7 @@ use yazi_config::YAZI; use yazi_fs::{Cwd, FsHash128, FsUrl, cha::Cha, ok_or_not_found, path::{skip_url, url_relative_to}, provider::{Attrs, DirReader, FileHolder, Provider, local::Local}}; use yazi_macro::ok_or_not_found; use yazi_shared::{path::PathCow, timestamp_us, url::{AsUrl, UrlBuf, UrlCow, UrlLike}}; -use yazi_vfs::{VfsCha, copy_with_progress, maybe_exists, provider::{self, DirEntry}, unique_name}; +use yazi_vfs::{VfsCha, maybe_exists, provider::{self, DirEntry}, unique_name}; use super::{FileInDelete, FileInHardlink, FileInLink, FileInPaste, FileInTrash}; use crate::{LOW, NORMAL, TaskIn, TaskOp, TaskOps, file::{FileInDownload, FileInUpload, FileInUploadDo, FileOutDelete, FileOutDeleteDo, FileOutDownload, FileOutDownloadDo, FileOutHardlink, FileOutHardlinkDo, FileOutLink, FileOutPaste, FileOutPasteDo, FileOutTrash, FileOutUpload, FileOutUploadDo}}; @@ -96,7 +96,7 @@ impl File { pub(crate) async fn paste_do(&self, mut task: FileInPaste) -> Result<(), FileOutPasteDo> { ok_or_not_found!(provider::remove_file(&task.to).await); - let mut it = copy_with_progress(&task.from, &task.to, task.cha.unwrap()); + let mut it = provider::copy_with_progress(&task.from, &task.to, task.cha.unwrap()).await?; while let Some(res) = it.recv().await { match res { @@ -350,7 +350,7 @@ impl File { let cache = task.url.cache().context("Cannot determine cache path")?; let cache_tmp = Self::tmp(&cache).await.context("Cannot determine temporary download cache")?; - let mut it = copy_with_progress(&task.url, &cache_tmp, cha); + let mut it = provider::copy_with_progress(&task.url, &cache_tmp, cha).await?; while let Some(res) = it.recv().await { match res { Ok(0) => { @@ -456,12 +456,13 @@ impl File { } let tmp = Self::tmp(&task.url).await.context("Cannot determine temporary upload path")?; - let mut it = copy_with_progress(&task.cache, &tmp, Attrs { + let mut it = provider::copy_with_progress(&task.cache, &tmp, Attrs { mode: Some(task.cha.mode), atime: None, btime: None, mtime: None, - }); + }) + .await?; while let Some(res) = it.recv().await { match res { diff --git a/yazi-sftp/src/fs/file.rs b/yazi-sftp/src/fs/file.rs index 71b53c77..dd075b86 100644 --- a/yazi-sftp/src/fs/file.rs +++ b/yazi-sftp/src/fs/file.rs @@ -1,21 +1,28 @@ -use std::{io, pin::Pin, sync::Arc, task::{Context, Poll, ready}, time::Duration}; +use std::{io::{self, SeekFrom}, pin::Pin, sync::Arc, task::{Context, Poll, ready}, time::Duration}; -use tokio::{io::{AsyncRead, AsyncWrite, ReadBuf}, time::{Timeout, timeout}}; +use tokio::{io::{AsyncRead, AsyncSeek, AsyncWrite, ReadBuf}, time::{Timeout, timeout}}; -use crate::{Error, Operator, Packet, Receiver, Session, fs::Attrs}; +use crate::{Error, Operator, Packet, Receiver, Session, fs::Attrs, requests}; pub struct File { session: Arc, - handle: String, - closed: bool, - cursor: u64, + handle: String, + cursor: u64, + closed: bool, + close_rx: Option>, read_rx: Option, + seek_rx: Option, write_rx: Option<(Receiver, usize)>, flush_rx: Option>, } +enum SeekState { + NonBlocking(u64), + Blocking(i64, Timeout), +} + impl Unpin for File {} impl Drop for File { @@ -30,12 +37,14 @@ impl File { pub(crate) fn new(session: &Arc, handle: impl Into) -> Self { Self { session: session.clone(), - handle: handle.into(), - closed: false, - cursor: 0, + handle: handle.into(), + closed: false, + cursor: 0, + close_rx: None, read_rx: None, + seek_rx: None, write_rx: None, flush_rx: None, } @@ -81,6 +90,78 @@ impl AsyncRead for File { } } +impl AsyncSeek for File { + fn start_seek(mut self: Pin<&mut Self>, position: io::SeekFrom) -> io::Result<()> { + if self.seek_rx.is_some() { + return Err(io::Error::other( + "other file operation is pending, call poll_complete before start_seek", + )); + } + + self.seek_rx = Some(match position { + SeekFrom::Start(n) => SeekState::NonBlocking(n), + SeekFrom::Current(n) => self + .cursor + .checked_add_signed(n) + .map(SeekState::NonBlocking) + .ok_or_else(|| io::Error::other("seeking to a negative or overflowed position"))?, + SeekFrom::End(n) => SeekState::Blocking( + n, + timeout( + Duration::from_secs(10), + self.session.send_sync(requests::Fstat::new(&self.handle))?, + ), + ), + }); + + Ok(()) + } + + fn poll_complete(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let me = unsafe { self.get_unchecked_mut() }; + + let Some(state) = &mut me.seek_rx else { + return Poll::Ready(Ok(me.cursor)); + }; + + fn imp(cx: &mut Context<'_>, state: &mut SeekState) -> Poll> { + use Poll::Ready; + let (n, rx) = match state { + SeekState::NonBlocking(n) => return Ready(Ok(*n)), + SeekState::Blocking(n, rx) => (n, rx), + }; + + let Ok(result) = ready!(unsafe { Pin::new_unchecked(rx) }.poll(cx)) else { + return Ready(Err(Error::Timeout.into())); + }; + + let packet = match result { + Ok(Packet::Attrs(packet)) => packet, + Ok(_) => return Ready(Err(Error::Packet("not an Attrs").into())), + Err(e) => return Ready(Err(Error::from(e).into())), + }; + + let Some(size) = packet.attrs.size else { + return Ready(Err(io::Error::other("could not get file size for seeking from end"))); + }; + + Ready( + size + .checked_add_signed(*n) + .ok_or_else(|| io::Error::other("seeking to a negative or overflowed position")), + ) + } + + let result = ready!(imp(cx, state)); + if let Ok(n) = result { + me.cursor = n; + } + + me.seek_rx = None; + Poll::Ready(result) + } +} + impl AsyncWrite for File { fn poll_write( self: Pin<&mut Self>, diff --git a/yazi-shared/src/path/path.rs b/yazi-shared/src/path/path.rs index 0d33535f..8c1cce7e 100644 --- a/yazi-shared/src/path/path.rs +++ b/yazi-shared/src/path/path.rs @@ -182,6 +182,13 @@ impl<'p> PathDyn<'p> { pub fn to_string_lossy(self) -> Cow<'p, str> { String::from_utf8_lossy(self.encoded_bytes()) } + pub fn to_unix_owned(self) -> Result { + match self { + Self::Os(_) => Err(PathDynError::AsUnix), + Self::Unix(p) => Ok(p.to_owned()), + } + } + pub fn try_ends_with(self, child: T) -> Result where T: AsStrand, diff --git a/yazi-vfs/src/fns.rs b/yazi-vfs/src/fns.rs index a1f3cb6c..d83bd2ae 100644 --- a/yazi-vfs/src/fns.rs +++ b/yazi-vfs/src/fns.rs @@ -1,9 +1,7 @@ -use std::io; +use std::io::{self}; -use tokio::{select, sync::{mpsc, oneshot}}; -use yazi_fs::provider::Attrs; use yazi_macro::ok_or_not_found; -use yazi_shared::{strand::{StrandBuf, StrandLike}, url::{AsUrl, Url, UrlBuf, UrlLike}}; +use yazi_shared::{strand::{StrandBuf, StrandLike}, url::{AsUrl, UrlBuf, UrlLike}}; use crate::provider; @@ -65,66 +63,3 @@ async fn _unique_name(mut url: UrlBuf, append: bool) -> io::Result { Ok(url) } - -pub fn copy_with_progress( - from: U, - to: V, - attrs: A, -) -> mpsc::Receiver> -where - U: AsUrl, - V: AsUrl, - A: Into, -{ - _copy_with_progress(from.as_url(), to.as_url(), attrs.into()) -} - -fn _copy_with_progress(from: Url, to: Url, attrs: Attrs) -> mpsc::Receiver> { - let (prog_tx, prog_rx) = mpsc::channel(1); - let (done_tx, mut done_rx) = oneshot::channel(); - - tokio::spawn({ - let (from, to) = (from.to_owned(), to.to_owned()); - async move { - done_tx.send(provider::copy(from, to, attrs).await).ok(); - } - }); - - tokio::spawn({ - let (prog_tx, to) = (prog_tx.clone(), to.to_owned()); - async move { - let mut last = 0; - let mut done = None; - loop { - select! { - res = &mut done_rx => done = Some(res.unwrap()), - _ = prog_tx.closed() => break, - _ = tokio::time::sleep(std::time::Duration::from_secs(3)) => {}, - } - - match done { - Some(Ok(len)) => { - if len > last { - prog_tx.send(Ok(len - last)).await.ok(); - } - prog_tx.send(Ok(0)).await.ok(); - break; - } - Some(Err(e)) => { - prog_tx.send(Err(e)).await.ok(); - break; - } - None => {} - } - - let len = provider::symlink_metadata(&to).await.map(|m| m.len).unwrap_or(0); - if len > last { - prog_tx.send(Ok(len - last)).await.ok(); - last = len; - } - } - } - }); - - prog_rx -} diff --git a/yazi-vfs/src/provider/copier.rs b/yazi-vfs/src/provider/copier.rs new file mode 100644 index 00000000..610006bf --- /dev/null +++ b/yazi-vfs/src/provider/copier.rs @@ -0,0 +1,133 @@ +use std::{io::{self, SeekFrom}, sync::{Arc, atomic::{AtomicU64, Ordering}}}; + +use futures::{StreamExt, TryStreamExt}; +use tokio::{io::{AsyncReadExt, AsyncSeekExt, AsyncWriteExt, BufReader, BufWriter}, select, sync::{mpsc, oneshot}}; +use yazi_fs::provider::{Attrs, FileBuilder}; +use yazi_shared::url::{Url, UrlBuf}; + +use crate::provider::{self, Gate}; + +pub(super) async fn copy_impl(from: Url<'_>, to: Url<'_>, attrs: Attrs) -> io::Result { + let src = provider::open(from).await?; + let dist = provider::create(to).await?; + + let mut reader = BufReader::with_capacity(524288, src); + let mut writer = BufWriter::with_capacity(524288, dist); + let written = tokio::io::copy(&mut reader, &mut writer).await?; + + writer.flush().await?; + writer.get_ref().set_attrs(attrs).await.ok(); + writer.shutdown().await.ok(); + Ok(written) +} + +pub(super) fn copy_with_progress_impl( + from: UrlBuf, + to: UrlBuf, + attrs: Attrs, +) -> mpsc::Receiver> { + let acc = Arc::new(AtomicU64::new(0)); + let (from, to) = (Arc::new(from), Arc::new(to)); + let (prog_tx, prog_rx) = mpsc::channel(10); + let (done_tx, mut done_rx) = oneshot::channel(); + + let (acc_, prog_tx_) = (acc.clone(), prog_tx.clone()); + tokio::spawn(async move { + let (cha, mut src) = { + let f = provider::open(&*from).await?; + (f.metadata().await?, Some(f)) + }; + + let mut dist = { + let f = provider::create(&*to).await?; + f.set_len(cha.len).await?; + Some(f) + }; + + let chunks = (cha.len + 10485760 - 1) / 10485760; + let result = futures::stream::iter(0..chunks) + .map(|i| { + let acc_ = acc_.clone(); + let (from, to) = (from.clone(), to.clone()); + let (src, dist) = (src.take(), dist.take()); + async move { + let offset = i * 10485760; + let take = cha.len.saturating_sub(offset).min(10485760); + + let mut src = BufReader::with_capacity(524288, match src { + Some(f) => f, + None => provider::open(&*from).await?, + }); + let mut dist = BufWriter::with_capacity(524288, match dist { + Some(f) => f, + None => Gate::default().write(true).open(&*to).await?, + }); + + src.seek(SeekFrom::Start(offset)).await?; + dist.seek(SeekFrom::Start(offset)).await?; + + let mut src = src.take(take); + let mut buf = vec![0u8; 65536]; + let mut copied = 0u64; + loop { + let n = src.read(&mut buf).await?; + if n == 0 { + break; + } + + dist.write_all(&buf[..n]).await?; + copied += n as u64; + acc_.fetch_add(n as u64, Ordering::SeqCst); + } + + dist.flush().await?; + if i == chunks - 1 { + dist.get_ref().set_attrs(attrs).await.ok(); + } + dist.shutdown().await.ok(); + + if copied == take { + Ok(()) + } else { + Err(io::Error::other(format!( + "short copy for chunk {i}: copied {copied} bytes, expected {take}" + ))) + } + } + }) + .buffer_unordered(3) + .try_for_each(|_| async { Ok(()) }) + .await; + + let n = acc_.swap(0, Ordering::SeqCst); + if n > 0 { + prog_tx_.send(Ok(n)).await.ok(); + } + + if let Err(e) = result { + prog_tx_.send(Err(e)).await.ok(); + } else { + prog_tx_.send(Ok(0)).await.ok(); + } + + done_tx.send(()).ok(); + Ok::<_, io::Error>(()) + }); + + tokio::spawn(async move { + loop { + select! { + _ = &mut done_rx => break, + _ = prog_tx.closed() => break, + _ = tokio::time::sleep(std::time::Duration::from_secs(3)) => {}, + } + + let n = acc.swap(0, Ordering::SeqCst); + if n > 0 { + prog_tx.send(Ok(n)).await.ok(); + } + } + }); + + prog_rx +} diff --git a/yazi-vfs/src/provider/mod.rs b/yazi-vfs/src/provider/mod.rs index 913efe06..375515cc 100644 --- a/yazi-vfs/src/provider/mod.rs +++ b/yazi-vfs/src/provider/mod.rs @@ -1,5 +1,5 @@ yazi_macro::mod_pub!(sftp); -yazi_macro::mod_flat!(calculator dir_entry gate provider providers read_dir rw_file); +yazi_macro::mod_flat!(calculator copier dir_entry gate provider providers read_dir rw_file); pub(super) fn init() { sftp::init(); } diff --git a/yazi-vfs/src/provider/provider.rs b/yazi-vfs/src/provider/provider.rs index b60e1547..d151dea0 100644 --- a/yazi-vfs/src/provider/provider.rs +++ b/yazi-vfs/src/provider/provider.rs @@ -1,6 +1,6 @@ use std::io; -use tokio::io::{AsyncWriteExt, BufReader, BufWriter}; +use tokio::sync::mpsc; use yazi_fs::{cha::Cha, provider::{Attrs, Provider, local::Local}}; use yazi_shared::{path::{AsPath, PathBufDyn}, url::{AsUrl, UrlBuf, UrlCow}}; @@ -51,18 +51,29 @@ where (false, false) if from.scheme().covariant(to.scheme()) => { Providers::new(from).await?.copy(to.loc(), attrs).await } + (true, false) | (false, true) | (false, false) => super::copy_impl(from, to, attrs).await, + } +} + +pub async fn copy_with_progress( + from: U, + to: V, + attrs: A, +) -> io::Result>> +where + U: AsUrl, + V: AsUrl, + A: Into, +{ + let (from, to) = (from.as_url(), to.as_url()); + + match (from.kind().is_local(), to.kind().is_local()) { + (true, true) => Local::new(from).await?.copy_with_progress(to.loc(), attrs), + (false, false) if from.scheme().covariant(to.scheme()) => { + Providers::new(from).await?.copy_with_progress(to.loc(), attrs) + } (true, false) | (false, true) | (false, false) => { - let src = Providers::new(from).await?.open().await?; - let dist = Providers::new(to).await?.create().await?; - - let mut reader = BufReader::with_capacity(524288, src); - let mut writer = BufWriter::with_capacity(524288, dist); - let written = tokio::io::copy(&mut reader, &mut writer).await?; - - writer.flush().await?; - writer.get_ref().set_attrs(attrs).await.ok(); - writer.shutdown().await.ok(); - Ok(written) + Ok(super::copy_with_progress_impl(from.to_owned(), to.to_owned(), attrs.into())) } } } @@ -128,6 +139,13 @@ where identical(a, b).await.unwrap_or(false) } +pub async fn open(url: U) -> io::Result +where + U: AsUrl, +{ + Providers::new(url.as_url()).await?.open().await +} + pub async fn read_dir(url: U) -> io::Result where U: AsUrl, diff --git a/yazi-vfs/src/provider/providers.rs b/yazi-vfs/src/provider/providers.rs index d0ec7f90..59e4b00e 100644 --- a/yazi-vfs/src/provider/providers.rs +++ b/yazi-vfs/src/provider/providers.rs @@ -1,5 +1,6 @@ use std::io; +use tokio::sync::mpsc; use yazi_fs::{cha::Cha, provider::{Attrs, Provider}}; use yazi_shared::{path::{AsPath, PathBufDyn}, url::{Url, UrlBuf, UrlCow}}; @@ -47,6 +48,17 @@ impl<'a> Provider for Providers<'a> { } } + fn copy_with_progress(&self, to: P, attrs: A) -> io::Result>> + where + P: AsPath, + A: Into, + { + match self { + Self::Local(p) => p.copy_with_progress(to, attrs), + Self::Sftp(p) => p.copy_with_progress(to, attrs), + } + } + async fn create(&self) -> io::Result { Ok(match self { Self::Local(p) => p.create().await?.into(), diff --git a/yazi-vfs/src/provider/rw_file.rs b/yazi-vfs/src/provider/rw_file.rs index 8f8cb1df..47fce3c7 100644 --- a/yazi-vfs/src/provider/rw_file.rs +++ b/yazi-vfs/src/provider/rw_file.rs @@ -1,6 +1,6 @@ use std::{io, pin::Pin}; -use tokio::io::{AsyncRead, AsyncWrite}; +use tokio::io::{AsyncRead, AsyncSeek, AsyncWrite}; use yazi_fs::provider::Attrs; pub enum RwFile { @@ -17,6 +17,14 @@ impl From for RwFile { } impl RwFile { + // FIXME: path + pub async fn metadata(&self) -> io::Result { + Ok(match self { + Self::Tokio(f) => yazi_fs::cha::Cha::new("// FIXME", f.metadata().await?), + Self::Sftp(f) => super::sftp::Cha::try_from(("// FIXME".as_bytes(), &f.fstat().await?))?.0, + }) + } + pub async fn set_attrs(&self, attrs: Attrs) -> io::Result<()> { match self { Self::Tokio(f) => { @@ -37,6 +45,15 @@ impl RwFile { Ok(()) } + + pub async fn set_len(&self, size: u64) -> io::Result<()> { + Ok(match self { + Self::Tokio(f) => f.set_len(size).await?, + Self::Sftp(f) => { + f.fsetstat(&yazi_sftp::fs::Attrs { size: Some(size), ..Default::default() }).await? + } + }) + } } impl AsyncRead for RwFile { @@ -53,6 +70,27 @@ impl AsyncRead for RwFile { } } +impl AsyncSeek for RwFile { + #[inline] + fn start_seek(mut self: Pin<&mut Self>, position: io::SeekFrom) -> io::Result<()> { + match &mut *self { + Self::Tokio(f) => Pin::new(f).start_seek(position), + Self::Sftp(f) => Pin::new(f).start_seek(position), + } + } + + #[inline] + fn poll_complete( + mut self: Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + match &mut *self { + Self::Tokio(f) => Pin::new(f).poll_complete(cx), + Self::Sftp(f) => Pin::new(f).poll_complete(cx), + } + } +} + impl AsyncWrite for RwFile { #[inline] fn poll_write( diff --git a/yazi-vfs/src/provider/sftp/sftp.rs b/yazi-vfs/src/provider/sftp/sftp.rs index 9cc6143b..2be85fdc 100644 --- a/yazi-vfs/src/provider/sftp/sftp.rs +++ b/yazi-vfs/src/provider/sftp/sftp.rs @@ -1,10 +1,10 @@ use std::{io, sync::Arc}; -use tokio::io::{AsyncWriteExt, BufReader, BufWriter}; +use tokio::{io::{AsyncWriteExt, BufReader, BufWriter}, sync::mpsc::Receiver}; use yazi_config::vfs::{ProviderSftp, Vfs}; use yazi_fs::provider::{DirReader, FileHolder, Provider}; use yazi_sftp::fs::{Attrs, Flags}; -use yazi_shared::{path::{AsPath, PathBufDyn}, pool::InternStr, url::{Url, UrlBuf, UrlCow, UrlLike}}; +use yazi_shared::{loc::LocBuf, path::{AsPath, PathBufDyn}, pool::InternStr, scheme::SchemeKind, url::{Url, UrlBuf, UrlCow, UrlLike}}; use super::Cha; use crate::provider::sftp::Conn; @@ -88,6 +88,23 @@ impl<'a> Provider for Sftp<'a> { Ok(written) } + fn copy_with_progress(&self, to: P, attrs: A) -> io::Result>> + where + P: AsPath, + A: Into, + { + let to = UrlBuf::Sftp { + loc: LocBuf::::saturated( + to.as_path().to_unix_owned()?, + SchemeKind::Sftp, + ), + domain: self.name.intern(), + }; + let from = self.url.to_owned(); + + Ok(crate::provider::copy_with_progress_impl(from, to, attrs.into())) + } + async fn create_dir(&self) -> io::Result<()> { Ok(self.op().await?.mkdir(self.path, Attrs::default()).await?) }