feat: timeouts for SFTP operations (#3384)

This commit is contained in:
三咲雅 misaki masa 2025-11-29 15:02:25 +08:00 committed by GitHub
parent 81ccdd8b64
commit ade1025a74
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
7 changed files with 90 additions and 34 deletions

View file

@ -59,6 +59,10 @@ impl From<tokio::sync::oneshot::error::RecvError> for Error {
fn from(_: tokio::sync::oneshot::error::RecvError) -> Self { Self::custom("channel closed") }
}
impl From<tokio::time::error::Elapsed> for Error {
fn from(_: tokio::time::error::Elapsed) -> Self { Self::Timeout }
}
impl std::error::Error for Error {}
impl std::fmt::Display for Error {

View file

@ -1,8 +1,8 @@
use std::{io, pin::Pin, sync::Arc, task::{Context, Poll, ready}, time::Duration};
use tokio::{io::{AsyncRead, AsyncWrite, ReadBuf}, sync::oneshot, time::{Timeout, timeout}};
use tokio::{io::{AsyncRead, AsyncWrite, ReadBuf}, time::{Timeout, timeout}};
use crate::{Error, Operator, Packet, Session, fs::Attrs};
use crate::{Error, Operator, Packet, Receiver, Session, fs::Attrs};
pub struct File {
session: Arc<Session>,
@ -10,10 +10,10 @@ pub struct File {
closed: bool,
cursor: u64,
close_rx: Option<Timeout<oneshot::Receiver<Packet<'static>>>>,
read_rx: Option<oneshot::Receiver<Packet<'static>>>,
write_rx: Option<(oneshot::Receiver<Packet<'static>>, usize)>,
flush_rx: Option<Timeout<oneshot::Receiver<Packet<'static>>>>,
close_rx: Option<Timeout<Receiver>>,
read_rx: Option<Receiver>,
write_rx: Option<(Receiver, usize)>,
flush_rx: Option<Timeout<Receiver>>,
}
impl Unpin for File {}

View file

@ -9,6 +9,7 @@ mod macros;
mod operator;
mod packet;
mod path;
mod receiver;
mod ser;
mod session;
@ -18,5 +19,6 @@ pub(crate) use id::*;
pub use operator::*;
pub use packet::*;
pub use path::*;
pub use receiver::*;
pub(crate) use ser::*;
pub use session::*;

View file

@ -1,15 +1,14 @@
use std::{ops::Deref, sync::Arc};
use russh::{ChannelStream, client::Msg};
use tokio::sync::oneshot;
use typed_path::UnixPathBuf;
use crate::{AsSftpPath, Error, Packet, Session, SftpPath, fs::{Attrs, File, Flags, ReadDir}, requests, responses};
use crate::{AsSftpPath, Error, Receiver, Session, SftpPath, fs::{Attrs, File, Flags, ReadDir}, requests, responses};
pub struct Operator(Arc<Session>);
impl Deref for Operator {
type Target = Session;
type Target = Arc<Session>;
fn deref(&self) -> &Self::Target { &self.0 }
}
@ -36,25 +35,15 @@ impl Operator {
Ok(File::new(&self.0, handle.handle))
}
pub fn close(&self, handle: &str) -> Result<oneshot::Receiver<Packet<'static>>, Error> {
pub fn close(&self, handle: &str) -> Result<Receiver, Error> {
self.send_sync(requests::Close::new(handle))
}
pub fn read(
&self,
handle: &str,
offset: u64,
len: u32,
) -> Result<oneshot::Receiver<Packet<'static>>, Error> {
pub fn read(&self, handle: &str, offset: u64, len: u32) -> Result<Receiver, Error> {
self.send_sync(requests::Read::new(handle, offset, len))
}
pub fn write(
&self,
handle: &str,
offset: u64,
data: &[u8],
) -> Result<oneshot::Receiver<Packet<'static>>, Error> {
pub fn write(&self, handle: &str, offset: u64, data: &[u8]) -> Result<Receiver, Error> {
self.send_sync(requests::Write::new(handle, offset, data))
}
@ -183,7 +172,7 @@ impl Operator {
status.into()
}
pub fn fsync(&self, handle: &str) -> Result<oneshot::Receiver<Packet<'static>>, Error> {
pub fn fsync(&self, handle: &str) -> Result<Receiver, Error> {
if self.extensions.lock().get("fsync@openssh.com").is_none_or(|s| s != "1") {
return Err(Error::Unsupported);
}

47
yazi-sftp/src/receiver.rs Normal file
View file

@ -0,0 +1,47 @@
use std::{pin::Pin, sync::Arc, task::Poll};
use tokio::sync::oneshot;
use crate::{Packet, Session};
pub struct Receiver {
rx: oneshot::Receiver<Packet<'static>>,
received: bool,
session: Arc<Session>,
id: u32,
}
impl Drop for Receiver {
fn drop(&mut self) {
if !self.received {
self.session.callback.lock().remove(&self.id);
}
}
}
impl Receiver {
pub(crate) fn new(
session: &Arc<Session>,
id: u32,
rx: oneshot::Receiver<Packet<'static>>,
) -> Self {
Self { rx, received: false, session: session.clone(), id }
}
}
impl Future for Receiver {
type Output = Result<Packet<'static>, oneshot::error::RecvError>;
fn poll(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output> {
let me = self.get_mut();
match Pin::new(&mut me.rx).poll(cx) {
Poll::Ready(Ok(packet)) => {
me.received = true;
Poll::Ready(Ok(packet))
}
Poll::Ready(Err(e)) => Poll::Ready(Err(e)),
Poll::Pending => Poll::Pending,
}
}
}

View file

@ -27,6 +27,15 @@ impl Status {
pub fn is_ok(&self) -> bool { self.code == StatusCode::Ok }
pub fn is_eof(&self) -> bool { self.code == StatusCode::Eof }
pub(crate) fn connection_lost(id: u32) -> Self {
Self {
id,
code: StatusCode::ConnectionLost,
message: "connection lost".to_owned(),
language: "en".to_owned(),
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Deserialize, Serialize)]

View file

@ -1,16 +1,16 @@
use std::{any::TypeId, collections::HashMap, io::{self, ErrorKind}, sync::Arc};
use std::{any::TypeId, collections::HashMap, io::{self, ErrorKind}, sync::Arc, time::Duration};
use parking_lot::Mutex;
use russh::{ChannelStream, client::Msg};
use serde::Serialize;
use tokio::{io::{AsyncReadExt, AsyncWriteExt, ReadHalf, WriteHalf}, select, sync::{mpsc, oneshot}};
use tokio::{io::{AsyncReadExt, AsyncWriteExt, ReadHalf, WriteHalf}, select, sync::{mpsc, oneshot}, time::timeout};
use crate::{Error, Id, Packet, responses};
use crate::{Error, Id, Packet, Receiver, responses};
pub struct Session {
tx: mpsc::UnboundedSender<Vec<u8>>,
id: Id,
callback: Mutex<HashMap<u32, oneshot::Sender<Packet<'static>>>>,
pub(super) callback: Mutex<HashMap<u32, oneshot::Sender<Packet<'static>>>>,
pub(super) extensions: Mutex<HashMap<String, String>>,
}
@ -43,6 +43,7 @@ impl Session {
}
}
let me_ = me.clone();
let (mut reader, mut writer) = tokio::io::split(stream);
tokio::spawn(async move {
while let Some(data) = rx.recv().await {
@ -51,6 +52,9 @@ impl Session {
{
rx.close();
writer.shutdown().await.ok();
for (id, cb) in me_.callback.lock().drain() {
cb.send(responses::Status::connection_lost(id).into()).ok();
}
break;
}
}
@ -84,12 +88,12 @@ impl Session {
me
}
pub async fn send<'a, I, O>(&self, input: I) -> Result<O, Error>
pub async fn send<'a, I, O>(self: &Arc<Self>, input: I) -> Result<O, Error>
where
I: Into<Packet<'a>> + Serialize,
O: TryFrom<Packet<'static>, Error = Error> + 'static,
{
match self.send_sync(input)?.await? {
match timeout(Duration::from_secs(30), self.send_sync(input)?).await?? {
Packet::Status(status) if TypeId::of::<O>() != TypeId::of::<responses::Status>() => {
Err(Error::Status(status))
}
@ -97,7 +101,7 @@ impl Session {
}
}
pub fn send_sync<'a, I>(&self, input: I) -> Result<oneshot::Receiver<Packet<'static>>, Error>
pub fn send_sync<'a, I>(self: &Arc<Self>, input: I) -> Result<Receiver, Error>
where
I: Into<Packet<'a>> + Serialize,
{
@ -106,12 +110,13 @@ impl Session {
request = request.with_id(self.id.next());
}
let id = request.id();
let (tx, rx) = oneshot::channel();
self.callback.lock().insert(request.id(), tx);
self.tx.send(crate::to_bytes(request)?)?;
Ok(rx)
self.callback.lock().insert(id, tx);
self.tx.send(crate::to_bytes(request)?)?;
Ok(Receiver::new(self, id, rx))
}
pub fn is_closed(&self) -> bool { self.tx.is_closed() }
pub fn is_closed(self: &Arc<Self>) -> bool { self.tx.is_closed() }
}