More work on porting receive kitten

This commit is contained in:
Kovid Goyal 2023-07-23 22:20:38 +05:30
parent 15205be67b
commit 14c29f2cf5
No known key found for this signature in database
GPG key ID: 06BC317B515ACE7C

View file

@ -5,6 +5,7 @@ package transfer
import (
"bytes"
"compress/zlib"
"errors"
"fmt"
"io"
"io/fs"
@ -16,6 +17,8 @@ import (
"kitty"
"kitty/tools/cli/markup"
"kitty/tools/rsync"
"kitty/tools/tui"
"kitty/tools/tui/loop"
"kitty/tools/utils"
"kitty/tools/utils/humanize"
@ -62,6 +65,7 @@ func (ff *filesystem_file) write(data []byte) (int, error) {
type remote_file struct {
expected_size int64
expect_diff bool
patcher *rsync.Patcher
transmit_started_at, done_at time.Time
written_bytes int64
received_bytes int64
@ -77,6 +81,7 @@ type remote_file struct {
expanded_local_path string
file_id string
decompressor utils.StreamDecompressor
compression_type Compression
remote_symlink_value string
actual_file output_file
}
@ -160,8 +165,10 @@ func new_remote_file(opts *Options, ftc *FileTransmissionCommand) (*remote_file,
compression_capable := ftc.Ftype == FileType_regular && ftc.Size > 4096 && should_be_compressed(ftc.Name, opts.Compress)
if compression_capable {
ans.decompressor, err = utils.NewStreamDecompressor(zlib.NewReader)
ans.compression_type = Compression_zlib
} else {
ans.decompressor, err = utils.NewStreamDecompressor(nil)
ans.compression_type = Compression_none
}
if err != nil {
return nil, err
@ -233,8 +240,82 @@ type manager struct {
progress_tracker receive_progress_tracker
}
type transmit_iterator = func(queue_write func(string) loop.IdType) (loop.IdType, error)
type sigwriter struct {
wid loop.IdType
file_id, prefix, suffix string
q func(string) loop.IdType
amt int64
}
func (self *sigwriter) Write(b []byte) (int, error) {
frame := len(self.prefix) + len(self.suffix)
split_for_transfer(b, self.file_id, false, func(ftc *FileTransmissionCommand) {
self.q(self.prefix)
data := ftc.Serialize(false)
self.q(data)
self.wid = self.q(self.suffix)
self.amt += int64(frame + len(data))
})
return len(b), nil
}
var files_done error = errors.New("files done")
func (self *manager) request_files() transmit_iterator {
pos := 0
return func(queue_write func(string) loop.IdType) (last_write_id loop.IdType, err error) {
if pos >= len(self.files) {
return 0, files_done
}
f := self.files[pos]
pos++
read_signature := self.use_rsync
if read_signature && f.ftype == FileType_regular {
if s, err := os.Lstat(f.expanded_local_path); err == nil {
read_signature = s.Size() > 4096
} else {
read_signature = false
}
}
queue_write(self.prefix)
queue_write(FileTransmissionCommand{
Action: Action_file, Name: f.remote_path, File_id: f.file_id, Ttype: utils.IfElse(
read_signature, TransmissionType_rsync, TransmissionType_simple), Compression: f.compression_type,
}.Serialize(false))
last_write_id = queue_write(self.suffix)
if read_signature {
fsf, err := os.Open(f.expanded_local_path)
if err != nil {
return 0, err
}
defer fsf.Close()
f.expect_diff = true
f.patcher = rsync.NewPatcher(f.expected_size)
output := sigwriter{q: queue_write, file_id: f.file_id, prefix: self.prefix, suffix: self.suffix}
s_it := f.patcher.CreateSignatureIterator(fsf, &output)
for {
err = s_it()
if err == io.EOF {
break
} else if err != nil {
return 0, err
}
}
f.sent_bytes += output.amt
queue_write(self.prefix)
queue_write(FileTransmissionCommand{Action: Action_end_data, File_id: f.file_id}.Serialize(false))
last_write_id = queue_write(self.suffix)
}
return
}
}
type handler struct {
lp *loop.Loop
progress_update_timer loop.IdType
spinner *tui.Spinner
cli_opts *Options
ctx *markup.Context
manager manager
@ -243,6 +324,8 @@ type handler struct {
transmit_started bool
progress_drawn bool
max_name_length int
transmit_iterator transmit_iterator
last_data_write_id loop.IdType
}
func (self *manager) start_transfer(send func(string) loop.IdType) {
@ -561,6 +644,47 @@ func (self *handler) confirm_paths() {
self.print_check_paths()
}
func (self *handler) transmit_one() {
if self.transmit_iterator == nil {
return
}
wid, err := self.transmit_iterator(self.lp.QueueWriteString)
if err != nil {
if err == files_done {
self.transmit_iterator = nil
} else {
self.print_err(err)
self.lp.Println(`Waiting to ensure terminal cancels transfer, will quit in a few seconds`)
self.abort_transfer(-1)
return
}
} else {
self.last_data_write_id = wid
}
}
func (self *handler) start_transfer() {
self.transmit_started = true
n := len(self.manager.files)
msg := `Transmitting signature of`
if self.manager.use_rsync {
msg = `Queueing transfer of`
}
msg += ` `
if n == 1 {
msg += `one file`
} else {
msg += fmt.Sprintf(`%d files`, n)
}
self.lp.Println(msg)
self.max_name_length = 0
for _, f := range self.manager.files {
self.max_name_length = utils.Max(6, self.max_name_length, wcswidth.Stringwidth(f.display_name))
}
self.transmit_iterator = self.manager.request_files()
self.transmit_one()
}
func (self *handler) on_file_transfer_response(ftc *FileTransmissionCommand) (err error) {
if ftc.Id != self.manager.request_id {
return
@ -615,14 +739,23 @@ func (self *handler) on_file_transfer_response(ftc *FileTransmissionCommand) (er
self.lp.QueueWriteString(FileTransmissionCommand{Action: Action_finish}.Serialize(false))
self.lp.QueueWriteString(self.manager.suffix)
self.quit_after_write_code = 0
self.refresh_progress()
self.refresh_progress(0)
} else if self.transmit_started {
self.refresh_progress()
self.refresh_progress(0)
}
return
}
func (self *handler) on_sigint() (handled bool, err error) {
func (self *handler) on_writing_finished(msg_id loop.IdType) (err error) {
if self.quit_after_write_code > -1 {
self.lp.Quit(self.quit_after_write_code)
} else if msg_id == self.last_data_write_id {
self.transmit_one()
}
return nil
}
func (self *handler) on_interrupt() (handled bool, err error) {
handled = true
if self.quit_after_write_code > -1 {
return
@ -646,6 +779,144 @@ func (self *handler) on_sigterm() (handled bool, err error) {
return
}
func (self *handler) erase_progress() {
if self.progress_drawn {
self.lp.MoveCursorVertically(-2)
self.lp.QueueWriteString("\r")
self.lp.ClearToEndOfScreen()
self.progress_drawn = false
}
}
func (self *handler) render_progress(name string, p Progress) {
if p.is_complete {
p.bytes_so_far = p.total_bytes
}
ss, _ := self.lp.ScreenSize()
self.lp.QueueWriteString(render_progress_in_width(name, p, int(ss.WidthCells), self.ctx))
}
func (self *handler) draw_progress_for_current_file(af *remote_file, spinner_char string, is_complete bool) {
p := &self.manager.progress_tracker
now := time.Now()
secs := utils.IfElse(af.done_at.IsZero(), now, af.done_at)
self.render_progress(af.display_name, Progress{
spinner_char: spinner_char, is_complete: is_complete,
bytes_so_far: af.written_bytes, total_bytes: af.expected_size,
secs_so_far: secs.Sub(af.transmit_started_at).Seconds(),
bytes_per_sec: safe_divide(p.transfered_stats_amt, p.transfered_stats_interval),
})
}
func (self *handler) draw_files() {
tick := self.ctx.Green(``)
var sc string
for _, df := range self.manager.progress_tracker.done_files {
sc = tick
if df.ftype == FileType_regular {
self.draw_progress_for_current_file(df, sc, true)
} else {
self.lp.QueueWriteString(fmt.Sprintf("%s %s %s", sc, df.display_name, self.ctx.Italic(self.ctx.Dim(df.ftype.String()))))
}
self.lp.Println()
self.manager.progress_tracker.done_files = nil
}
is_complete := self.quit_after_write_code > -1
if is_complete {
sc = utils.IfElse(self.quit_after_write_code == 0, tick, self.ctx.Red(``))
} else {
sc = self.spinner.Tick()
}
p := &self.manager.progress_tracker
ss, _ := self.lp.ScreenSize()
if is_complete {
tui.RepeatChar(``, int(ss.WidthCells))
} else {
af := p.active_file
if af != nil {
self.draw_progress_for_current_file(af, sc, false)
}
}
self.lp.Println()
if p.total_transferred > 0 {
self.render_progress(`Total`, Progress{
spinner_char: sc, bytes_so_far: p.total_transferred, total_bytes: p.total_bytes_to_transfer,
secs_so_far: time.Now().Sub(p.started_at).Seconds(), is_complete: is_complete,
bytes_per_sec: safe_divide(p.transfered_stats_amt, p.transfered_stats_interval.Abs().Seconds()),
})
self.lp.Println()
} else {
self.lp.Println(`File data transfer has not yet started`)
}
}
func (self *handler) schedule_progress_update(delay time.Duration) {
if self.progress_update_timer != 0 {
self.lp.RemoveTimer(self.progress_update_timer)
self.progress_update_timer = 0
}
timer_id, err := self.lp.AddTimer(delay, false, self.refresh_progress)
if err == nil {
self.progress_update_timer = timer_id
}
}
func (self *handler) draw_progress() {
if self.manager.state == state_canceled {
return
}
self.lp.AllowLineWrapping(false)
defer self.lp.AllowLineWrapping(true)
self.draw_files()
self.schedule_progress_update(self.spinner.Interval())
self.progress_drawn = true
}
func (self *handler) refresh_progress(loop.IdType) error {
self.lp.StartAtomicUpdate()
defer self.lp.EndAtomicUpdate()
self.erase_progress()
self.draw_progress()
return nil
}
func (self *handler) on_text(text string, from_key_event, in_bracketed_paste bool) error {
if self.quit_after_write_code > -1 {
return nil
}
if self.check_paths_printed && !self.transmit_started {
switch strings.ToLower(text) {
case "y":
self.start_transfer()
return nil
case "n":
self.abort_transfer(-1)
self.lp.Println(`Sending cancel request to terminal`)
return nil
}
self.print_continue_msg()
}
return nil
}
func (self *handler) on_key_event(ev *loop.KeyEvent) error {
if self.quit_after_write_code > -1 {
return nil
}
if ev.MatchesPressOrRepeat("esc") {
ev.Handled = true
if self.check_paths_printed && !self.transmit_started {
self.abort_transfer(-1)
self.lp.Println(`Sending cancel request to terminal`)
} else {
self.on_interrupt()
}
} else if ev.MatchesPressOrRepeat("ctrl+c") {
ev.Handled = true
self.on_interrupt()
}
return nil
}
func receive_loop(opts *Options, spec []string, dest string) (err error, rc int) {
lp, err := loop.New(loop.NoAlternateScreen, loop.NoRestoreColors)
if err != nil {
@ -653,7 +924,7 @@ func receive_loop(opts *Options, spec []string, dest string) (err error, rc int)
}
handler := handler{
lp: lp, quit_after_write_code: -1, cli_opts: opts,
lp: lp, quit_after_write_code: -1, cli_opts: opts, spinner: tui.NewSpinner("dots"),
manager: manager{
request_id: random_id(), spec: spec, dest: dest, bypass: opts.PermissionsBypass, use_rsync: opts.TransmitDeltas,
failed_specs: make(map[int]string, len(spec)), spec_counts: make(map[int]int, len(spec)),
@ -682,8 +953,17 @@ func receive_loop(opts *Options, spec []string, dest string) (err error, rc int)
return ""
}
lp.OnSIGINT = handler.on_sigint
lp.OnSIGINT = handler.on_interrupt
lp.OnSIGTERM = handler.on_sigterm
lp.OnWriteComplete = handler.on_writing_finished
lp.OnText = handler.on_text
lp.OnKeyEvent = handler.on_key_event
lp.OnResize = func(old_sz, new_sz loop.ScreenSize) error {
if handler.progress_drawn {
handler.refresh_progress(0)
}
return nil
}
ftc_code := strconv.Itoa(kitty.FileTransferCode)
lp.OnEscapeCode = func(et loop.EscapeCodeType, payload []byte) error {