From 14c29f2cf5b44fc9fbf155e49f31e629d492df63 Mon Sep 17 00:00:00 2001 From: Kovid Goyal Date: Sun, 23 Jul 2023 22:20:38 +0530 Subject: [PATCH] More work on porting receive kitten --- kittens/transfer/receive.go | 290 +++++++++++++++++++++++++++++++++++- 1 file changed, 285 insertions(+), 5 deletions(-) diff --git a/kittens/transfer/receive.go b/kittens/transfer/receive.go index 88191d8f1..d12eabded 100644 --- a/kittens/transfer/receive.go +++ b/kittens/transfer/receive.go @@ -5,6 +5,7 @@ package transfer import ( "bytes" "compress/zlib" + "errors" "fmt" "io" "io/fs" @@ -16,6 +17,8 @@ import ( "kitty" "kitty/tools/cli/markup" + "kitty/tools/rsync" + "kitty/tools/tui" "kitty/tools/tui/loop" "kitty/tools/utils" "kitty/tools/utils/humanize" @@ -62,6 +65,7 @@ func (ff *filesystem_file) write(data []byte) (int, error) { type remote_file struct { expected_size int64 expect_diff bool + patcher *rsync.Patcher transmit_started_at, done_at time.Time written_bytes int64 received_bytes int64 @@ -77,6 +81,7 @@ type remote_file struct { expanded_local_path string file_id string decompressor utils.StreamDecompressor + compression_type Compression remote_symlink_value string actual_file output_file } @@ -160,8 +165,10 @@ func new_remote_file(opts *Options, ftc *FileTransmissionCommand) (*remote_file, compression_capable := ftc.Ftype == FileType_regular && ftc.Size > 4096 && should_be_compressed(ftc.Name, opts.Compress) if compression_capable { ans.decompressor, err = utils.NewStreamDecompressor(zlib.NewReader) + ans.compression_type = Compression_zlib } else { ans.decompressor, err = utils.NewStreamDecompressor(nil) + ans.compression_type = Compression_none } if err != nil { return nil, err @@ -233,8 +240,82 @@ type manager struct { progress_tracker receive_progress_tracker } +type transmit_iterator = func(queue_write func(string) loop.IdType) (loop.IdType, error) + +type sigwriter struct { + wid loop.IdType + file_id, prefix, suffix string + q func(string) loop.IdType + amt int64 +} + +func (self *sigwriter) Write(b []byte) (int, error) { + frame := len(self.prefix) + len(self.suffix) + split_for_transfer(b, self.file_id, false, func(ftc *FileTransmissionCommand) { + self.q(self.prefix) + data := ftc.Serialize(false) + self.q(data) + self.wid = self.q(self.suffix) + self.amt += int64(frame + len(data)) + }) + return len(b), nil +} + +var files_done error = errors.New("files done") + +func (self *manager) request_files() transmit_iterator { + pos := 0 + return func(queue_write func(string) loop.IdType) (last_write_id loop.IdType, err error) { + if pos >= len(self.files) { + return 0, files_done + } + f := self.files[pos] + pos++ + read_signature := self.use_rsync + if read_signature && f.ftype == FileType_regular { + if s, err := os.Lstat(f.expanded_local_path); err == nil { + read_signature = s.Size() > 4096 + } else { + read_signature = false + } + } + queue_write(self.prefix) + queue_write(FileTransmissionCommand{ + Action: Action_file, Name: f.remote_path, File_id: f.file_id, Ttype: utils.IfElse( + read_signature, TransmissionType_rsync, TransmissionType_simple), Compression: f.compression_type, + }.Serialize(false)) + last_write_id = queue_write(self.suffix) + if read_signature { + fsf, err := os.Open(f.expanded_local_path) + if err != nil { + return 0, err + } + defer fsf.Close() + f.expect_diff = true + f.patcher = rsync.NewPatcher(f.expected_size) + output := sigwriter{q: queue_write, file_id: f.file_id, prefix: self.prefix, suffix: self.suffix} + s_it := f.patcher.CreateSignatureIterator(fsf, &output) + for { + err = s_it() + if err == io.EOF { + break + } else if err != nil { + return 0, err + } + } + f.sent_bytes += output.amt + queue_write(self.prefix) + queue_write(FileTransmissionCommand{Action: Action_end_data, File_id: f.file_id}.Serialize(false)) + last_write_id = queue_write(self.suffix) + } + return + } +} + type handler struct { lp *loop.Loop + progress_update_timer loop.IdType + spinner *tui.Spinner cli_opts *Options ctx *markup.Context manager manager @@ -243,6 +324,8 @@ type handler struct { transmit_started bool progress_drawn bool max_name_length int + transmit_iterator transmit_iterator + last_data_write_id loop.IdType } func (self *manager) start_transfer(send func(string) loop.IdType) { @@ -561,6 +644,47 @@ func (self *handler) confirm_paths() { self.print_check_paths() } +func (self *handler) transmit_one() { + if self.transmit_iterator == nil { + return + } + wid, err := self.transmit_iterator(self.lp.QueueWriteString) + if err != nil { + if err == files_done { + self.transmit_iterator = nil + } else { + self.print_err(err) + self.lp.Println(`Waiting to ensure terminal cancels transfer, will quit in a few seconds`) + self.abort_transfer(-1) + return + } + } else { + self.last_data_write_id = wid + } +} + +func (self *handler) start_transfer() { + self.transmit_started = true + n := len(self.manager.files) + msg := `Transmitting signature of` + if self.manager.use_rsync { + msg = `Queueing transfer of` + } + msg += ` ` + if n == 1 { + msg += `one file` + } else { + msg += fmt.Sprintf(`%d files`, n) + } + self.lp.Println(msg) + self.max_name_length = 0 + for _, f := range self.manager.files { + self.max_name_length = utils.Max(6, self.max_name_length, wcswidth.Stringwidth(f.display_name)) + } + self.transmit_iterator = self.manager.request_files() + self.transmit_one() +} + func (self *handler) on_file_transfer_response(ftc *FileTransmissionCommand) (err error) { if ftc.Id != self.manager.request_id { return @@ -615,14 +739,23 @@ func (self *handler) on_file_transfer_response(ftc *FileTransmissionCommand) (er self.lp.QueueWriteString(FileTransmissionCommand{Action: Action_finish}.Serialize(false)) self.lp.QueueWriteString(self.manager.suffix) self.quit_after_write_code = 0 - self.refresh_progress() + self.refresh_progress(0) } else if self.transmit_started { - self.refresh_progress() + self.refresh_progress(0) } return } -func (self *handler) on_sigint() (handled bool, err error) { +func (self *handler) on_writing_finished(msg_id loop.IdType) (err error) { + if self.quit_after_write_code > -1 { + self.lp.Quit(self.quit_after_write_code) + } else if msg_id == self.last_data_write_id { + self.transmit_one() + } + return nil +} + +func (self *handler) on_interrupt() (handled bool, err error) { handled = true if self.quit_after_write_code > -1 { return @@ -646,6 +779,144 @@ func (self *handler) on_sigterm() (handled bool, err error) { return } +func (self *handler) erase_progress() { + if self.progress_drawn { + self.lp.MoveCursorVertically(-2) + self.lp.QueueWriteString("\r") + self.lp.ClearToEndOfScreen() + self.progress_drawn = false + } +} + +func (self *handler) render_progress(name string, p Progress) { + if p.is_complete { + p.bytes_so_far = p.total_bytes + } + ss, _ := self.lp.ScreenSize() + self.lp.QueueWriteString(render_progress_in_width(name, p, int(ss.WidthCells), self.ctx)) +} + +func (self *handler) draw_progress_for_current_file(af *remote_file, spinner_char string, is_complete bool) { + p := &self.manager.progress_tracker + now := time.Now() + secs := utils.IfElse(af.done_at.IsZero(), now, af.done_at) + self.render_progress(af.display_name, Progress{ + spinner_char: spinner_char, is_complete: is_complete, + bytes_so_far: af.written_bytes, total_bytes: af.expected_size, + secs_so_far: secs.Sub(af.transmit_started_at).Seconds(), + bytes_per_sec: safe_divide(p.transfered_stats_amt, p.transfered_stats_interval), + }) +} + +func (self *handler) draw_files() { + tick := self.ctx.Green(`✔`) + var sc string + for _, df := range self.manager.progress_tracker.done_files { + sc = tick + if df.ftype == FileType_regular { + self.draw_progress_for_current_file(df, sc, true) + } else { + self.lp.QueueWriteString(fmt.Sprintf("%s %s %s", sc, df.display_name, self.ctx.Italic(self.ctx.Dim(df.ftype.String())))) + } + self.lp.Println() + self.manager.progress_tracker.done_files = nil + } + is_complete := self.quit_after_write_code > -1 + if is_complete { + sc = utils.IfElse(self.quit_after_write_code == 0, tick, self.ctx.Red(`✘`)) + } else { + sc = self.spinner.Tick() + } + p := &self.manager.progress_tracker + ss, _ := self.lp.ScreenSize() + if is_complete { + tui.RepeatChar(`─`, int(ss.WidthCells)) + } else { + af := p.active_file + if af != nil { + self.draw_progress_for_current_file(af, sc, false) + } + } + self.lp.Println() + if p.total_transferred > 0 { + self.render_progress(`Total`, Progress{ + spinner_char: sc, bytes_so_far: p.total_transferred, total_bytes: p.total_bytes_to_transfer, + secs_so_far: time.Now().Sub(p.started_at).Seconds(), is_complete: is_complete, + bytes_per_sec: safe_divide(p.transfered_stats_amt, p.transfered_stats_interval.Abs().Seconds()), + }) + self.lp.Println() + } else { + self.lp.Println(`File data transfer has not yet started`) + } +} + +func (self *handler) schedule_progress_update(delay time.Duration) { + if self.progress_update_timer != 0 { + self.lp.RemoveTimer(self.progress_update_timer) + self.progress_update_timer = 0 + } + timer_id, err := self.lp.AddTimer(delay, false, self.refresh_progress) + if err == nil { + self.progress_update_timer = timer_id + } +} + +func (self *handler) draw_progress() { + if self.manager.state == state_canceled { + return + } + self.lp.AllowLineWrapping(false) + defer self.lp.AllowLineWrapping(true) + self.draw_files() + self.schedule_progress_update(self.spinner.Interval()) + self.progress_drawn = true +} + +func (self *handler) refresh_progress(loop.IdType) error { + self.lp.StartAtomicUpdate() + defer self.lp.EndAtomicUpdate() + self.erase_progress() + self.draw_progress() + return nil +} + +func (self *handler) on_text(text string, from_key_event, in_bracketed_paste bool) error { + if self.quit_after_write_code > -1 { + return nil + } + if self.check_paths_printed && !self.transmit_started { + switch strings.ToLower(text) { + case "y": + self.start_transfer() + return nil + case "n": + self.abort_transfer(-1) + self.lp.Println(`Sending cancel request to terminal`) + return nil + } + self.print_continue_msg() + } + return nil +} +func (self *handler) on_key_event(ev *loop.KeyEvent) error { + if self.quit_after_write_code > -1 { + return nil + } + if ev.MatchesPressOrRepeat("esc") { + ev.Handled = true + if self.check_paths_printed && !self.transmit_started { + self.abort_transfer(-1) + self.lp.Println(`Sending cancel request to terminal`) + } else { + self.on_interrupt() + } + } else if ev.MatchesPressOrRepeat("ctrl+c") { + ev.Handled = true + self.on_interrupt() + } + return nil +} + func receive_loop(opts *Options, spec []string, dest string) (err error, rc int) { lp, err := loop.New(loop.NoAlternateScreen, loop.NoRestoreColors) if err != nil { @@ -653,7 +924,7 @@ func receive_loop(opts *Options, spec []string, dest string) (err error, rc int) } handler := handler{ - lp: lp, quit_after_write_code: -1, cli_opts: opts, + lp: lp, quit_after_write_code: -1, cli_opts: opts, spinner: tui.NewSpinner("dots"), manager: manager{ request_id: random_id(), spec: spec, dest: dest, bypass: opts.PermissionsBypass, use_rsync: opts.TransmitDeltas, failed_specs: make(map[int]string, len(spec)), spec_counts: make(map[int]int, len(spec)), @@ -682,8 +953,17 @@ func receive_loop(opts *Options, spec []string, dest string) (err error, rc int) return "" } - lp.OnSIGINT = handler.on_sigint + lp.OnSIGINT = handler.on_interrupt lp.OnSIGTERM = handler.on_sigterm + lp.OnWriteComplete = handler.on_writing_finished + lp.OnText = handler.on_text + lp.OnKeyEvent = handler.on_key_event + lp.OnResize = func(old_sz, new_sz loop.ScreenSize) error { + if handler.progress_drawn { + handler.refresh_progress(0) + } + return nil + } ftc_code := strconv.Itoa(kitty.FileTransferCode) lp.OnEscapeCode = func(et loop.EscapeCodeType, payload []byte) error {