diff --git a/kittens/transfer/main.py b/kittens/transfer/main.py index 5e7db6395..03f5182f6 100644 --- a/kittens/transfer/main.py +++ b/kittens/transfer/main.py @@ -9,9 +9,6 @@ from typing import List, Tuple from kitty.cli import parse_args from kitty.cli_stub import TransferCLIOptions -from .receive import receive_main -from .send import send_main - usage = 'source_files_or_directories destination_path' help_text = 'Transfer files over the TTY device' @@ -83,16 +80,7 @@ def read_bypass(loc: str) -> str: def main(args: List[str]) -> None: - cli_opts, items = parse_transfer_args(args) - if cli_opts.permissions_bypass: - cli_opts.permissions_bypass = read_bypass(cli_opts.permissions_bypass).strip() - - if not items: - raise SystemExit('Usage: kitty +kitten transfer file_or_directory ...') - if cli_opts.direction == 'send': - send_main(cli_opts, items) - else: - receive_main(cli_opts, items) + raise SystemExit('This should be run as kitten transfer') if __name__ == '__main__': diff --git a/kittens/transfer/receive.py b/kittens/transfer/receive.py deleted file mode 100644 index fc6ee0f8c..000000000 --- a/kittens/transfer/receive.py +++ /dev/null @@ -1,664 +0,0 @@ -#!/usr/bin/env python -# License: GPLv3 Copyright: 2021, Kovid Goyal - -import os -import posixpath -from asyncio import TimerHandle -from collections import deque -from contextlib import suppress -from enum import auto -from itertools import count -from time import monotonic -from typing import IO, Deque, Dict, Iterator, List, Optional, Union - -from kitty.cli_stub import TransferCLIOptions -from kitty.fast_data_types import FILE_TRANSFER_CODE, wcswidth -from kitty.file_transmission import ( - Action, - Compression, - FileTransmissionCommand, - FileType, - IdentityDecompressor, - NameReprEnum, - TransmissionType, - ZlibDecompressor, - encode_bypass, - split_for_transfer, -) -from kitty.typing import KeyEventType, ScreenSize -from kitty.utils import sanitize_control_codes - -from ..tui.handler import Handler -from ..tui.loop import Loop, debug -from ..tui.operations import styled, without_line_wrap -from ..tui.spinners import Spinner -from ..tui.utils import human_size -from .send import Transfer -from .utils import expand_home, print_rsync_stats, random_id, render_progress_in_width, safe_divide, should_be_compressed - -debug -file_counter = count(1) - - -class State(NameReprEnum): - waiting_for_permission = auto() - waiting_for_file_metadata = auto() - transferring = auto() - canceled = auto() - - -class File: - - def __init__(self, ftc: FileTransmissionCommand): - self.expected_size = ftc.size - self.expect_diff = False - self.transmit_started_at = self.done_at = 0. - self.written_bytes = 0 - self.received_bytes = 0 - self.sent_bytes = 0 - self.ftype = ftc.ftype - self.mtime = ftc.mtime - self.spec_id = int(ftc.file_id) - self.permissions = ftc.permissions - self.remote_path = ftc.name - self.display_name = sanitize_control_codes(self.remote_path) - self.remote_id = ftc.status - self.remote_target = ftc.data.decode('utf-8') - self.parent = ftc.parent - self.expanded_local_path = '' - self.file_id = str(next(file_counter)) - compression_capable = self.ftype is FileType.regular and self.expected_size > 4096 and should_be_compressed(self.remote_path) - self.decompressor: Union[ZlibDecompressor, IdentityDecompressor] = ZlibDecompressor() if compression_capable else IdentityDecompressor() - self.remote_symlink_value = b'' - self.actual_file: Union[None, PatchFile, IO[bytes]] = None - - def __repr__(self) -> str: - return f'File(rpath={self.remote_path!r}, lpath={self.expanded_local_path!r})' - - def close(self) -> None: - if self.actual_file is not None: - self.actual_file.close() - self.actual_file = None - - def write_data(self, data: bytes, is_last: bool) -> int: - self.received_bytes += len(data) - data = self.decompressor(data, is_last) - if self.ftype is FileType.symlink: - self.remote_symlink_value += data - return 0 - if self.ftype is FileType.regular: - if self.actual_file is None: - parent = os.path.dirname(self.expanded_local_path) - if parent: - os.makedirs(parent, exist_ok=True) - self.actual_file = PatchFile(self.expanded_local_path) if self.expect_diff else open(self.expanded_local_path, 'wb') - base = self.actual_file.tell() - if data: - self.actual_file.write(data) - ans = self.actual_file.tell() - base - if is_last: - self.actual_file.close() - self.actual_file = None - return ans - return 0 - - def apply_metadata(self) -> None: - if self.ftype is FileType.symlink: - with suppress(NotImplementedError): - os.chmod(self.expanded_local_path, self.permissions, follow_symlinks=False) - os.utime(self.expanded_local_path, ns=(self.mtime, self.mtime), follow_symlinks=False) - else: - os.chmod(self.expanded_local_path, self.permissions) - os.utime(self.expanded_local_path, ns=(self.mtime, self.mtime)) - - -class TreeNode: - - def __init__(self, file: File, local_name: str, parent: Optional['TreeNode'] = None): - self.entry = file - self.entry.expanded_local_path = local_name - self.parent = parent - self.added_files: Dict[int, TreeNode] = {} - - def add_child(self, file: File) -> 'TreeNode': - q = self.added_files.get(id(file)) - if q is not None: - return q - c = TreeNode(file, os.path.join(self.entry.expanded_local_path, os.path.basename(file.remote_path)), self) - self.added_files[id(file)] = c - return c - - def __iter__(self) -> Iterator['TreeNode']: - for c in self.added_files.values(): - yield c - yield from c - - -def make_tree(all_files: List[File], local_base: str) -> TreeNode: - fid_map = {f.remote_id: f for f in all_files} - node_map: Dict[str, TreeNode] = {} - root_node = TreeNode(File(FileTransmissionCommand(file_id='-1')), local_base) - - def ensure_parent(f: File) -> TreeNode: - if not f.parent: - return root_node - parent = node_map.get(f.parent) - if parent is None: - fp = fid_map[f.parent] - gp = ensure_parent(fp) - parent = gp.add_child(fp) - return parent - - for f in all_files: - p = ensure_parent(f) - p.add_child(f) - return root_node - - -def files_for_receive(cli_opts: TransferCLIOptions, dest: str, files: List[File], remote_home: str, specs: List[str]) -> Iterator[File]: - spec_map: Dict[int, List[File]] = {i: [] for i in range(len(specs))} - for f in files: - spec_map[f.spec_id].append(f) - spec_paths = [spec_map[i][0].remote_path for i in range(len(specs))] - if cli_opts.mode == 'mirror': - try: - common_path = posixpath.commonpath(spec_paths) - except ValueError: - common_path = '' - home = remote_home.rstrip('/') - if common_path and common_path.startswith(home + '/'): - spec_paths = [posixpath.join('~', posixpath.relpath(x, home)) for x in spec_paths] - for spec_id, files_for_spec in spec_map.items(): - spec = spec_paths[spec_id] - tree = make_tree(files_for_spec, os.path.dirname(expand_home(spec))) - for x in tree: - yield x.entry - else: - number_of_source_files = sum(map(len, spec_map.values())) - dest_is_dir = dest[-1] in (os.sep, os.altsep) or number_of_source_files > 1 or os.path.isdir(dest) - for spec_id, files_for_spec in spec_map.items(): - if dest_is_dir: - dest_path = os.path.join(dest, posixpath.basename(files_for_spec[0].remote_path)) - tree = make_tree(files_for_spec, os.path.dirname(expand_home(dest_path))) - for x in tree: - yield x.entry - else: - f = files_for_spec[0] - f.expanded_local_path = dest - yield f - - -class ProgressTracker: - - def __init__(self) -> None: - self.total_size_of_all_files = 0 - self.total_bytes_to_transfer = 0 - self.active_file: Optional[File] = None - self.total_transferred = 0 - self.transfers: Deque[Transfer] = deque() - self.transfered_stats_amt = 0 - self.transfered_stats_interval = 0. - self.started_at = 0. - self.done_files: List[File] = [] - - def change_active_file(self, nf: File) -> None: - now = monotonic() - self.active_file = nf - nf.transmit_started_at = now - - def start_transfer(self) -> None: - self.transfers.append(Transfer()) - self.started_at = monotonic() - - def file_written(self, af: File, amt: int, is_done: bool) -> None: - if self.active_file is not af: - self.change_active_file(af) - af.written_bytes += amt - self.total_transferred += amt - self.transfers.append(Transfer(amt)) - now = self.transfers[-1].at - while len(self.transfers) > 2 and self.transfers[0].is_too_old(now): - self.transfers.popleft() - self.transfered_stats_interval = now - self.transfers[0].at - self.transfered_stats_amt = sum(t.amt for t in self.transfers) - if is_done: - af.done_at = monotonic() - self.done_files.append(af) - - -class Manager: - - def __init__( - self, request_id: str, spec: List[str], dest: str, - bypass: Optional[str] = None, use_rsync: bool = False - ): - self.request_id = request_id - self.spec = spec - self.failed_specs: Dict[int, str] = {} - self.spec_counts = dict.fromkeys(range(len(self.spec)), 0) - self.dest = dest - self.remote_home = '' - self.bypass = encode_bypass(request_id, bypass) if bypass else '' - self.prefix = f'\x1b]{FILE_TRANSFER_CODE};id={self.request_id};' - self.suffix = '\x1b\\' - self.state = State.waiting_for_permission - self.files: List[File] = [] - self.progress_tracker = ProgressTracker() - self.transfer_done = False - self.use_rsync = use_rsync - - @property - def finish_code(self) -> str: - return FileTransmissionCommand(action=Action.finish).serialize() - - def start_transfer(self) -> Iterator[str]: - yield FileTransmissionCommand(action=Action.receive, bypass=self.bypass, size=len(self.spec)).serialize() - for i, x in enumerate(self.spec): - yield FileTransmissionCommand(action=Action.file, file_id=str(i), name=x).serialize() - self.progress_tracker.start_transfer() - - def finalize_transfer(self) -> str: - self.transfer_done = True - rid_map = {f.remote_id: f for f in self.files} - for f in self.files: - if f.ftype is FileType.directory: - try: - os.makedirs(f.expanded_local_path, exist_ok=True) - except OSError as err: - return f'Failed to create directory with error: {err}' - elif f.ftype is FileType.link: - target = rid_map.get(f.remote_target) - if target is None: - return f'Hard link with remote id: {f.remote_target} not found' - try: - os.makedirs(os.path.dirname(f.expanded_local_path), exist_ok=True) - with suppress(FileNotFoundError): - os.remove(f.expanded_local_path) - os.link(target.expanded_local_path, f.expanded_local_path) - except OSError as err: - return f'Failed to create hardlink with error: {err}' - elif f.ftype is FileType.symlink: - if f.remote_target: - target = rid_map.get(f.remote_target) - if target is None: - return f'Symbolic link with remote id: {f.remote_target} not found' - lt = target.expanded_local_path - if not f.remote_symlink_value.startswith(b'/'): - lt = os.path.relpath(lt, os.path.dirname(f.expanded_local_path)) - else: - lt = f.remote_symlink_value.decode('utf-8') - with suppress(FileNotFoundError): - os.remove(f.expanded_local_path) - try: - os.symlink(lt, f.expanded_local_path) - except OSError as err: - return f'Failed to create symlink with error: {err}' - with suppress(OSError): - f.apply_metadata() - return '' - - def request_files(self) -> Iterator[str]: - for f in self.files: - if f.ftype is FileType.directory or (f.ftype is FileType.link and f.remote_target): - continue - read_signature = self.use_rsync - if read_signature and f.ftype is FileType.regular: - try: - sr = os.stat(f.expanded_local_path, follow_symlinks=False) - except OSError: - read_signature = False - else: - read_signature = sr.st_size > 4096 - yield FileTransmissionCommand( - action=Action.file, name=f.remote_path, file_id=f.file_id, ttype=TransmissionType.rsync if read_signature else TransmissionType.simple, - compression=Compression.zlib if isinstance(f.decompressor, ZlibDecompressor) else Compression.none - ).serialize() - if read_signature: - f.expect_diff = True - fs = signature_of_file(f.expanded_local_path) - for chunk in fs: - f.sent_bytes += len(chunk) - for data in split_for_transfer(chunk, file_id=f.file_id): - yield data.serialize() - yield FileTransmissionCommand(file_id=f.file_id, action=Action.end_data).serialize() - - def collect_files(self, cli_opts: TransferCLIOptions) -> None: - self.files = list(files_for_receive(cli_opts, self.dest, self.files, self.remote_home, self.spec)) - self.files_to_be_transferred = {f.file_id: f for f in self.files if f.ftype not in (FileType.directory, FileType.link)} - self.progress_tracker.total_size_of_all_files = sum(max(0, f.expected_size) for f in self.files_to_be_transferred.values()) - self.progress_tracker.total_bytes_to_transfer = self.progress_tracker.total_size_of_all_files - - def on_file_transfer_response(self, ftc: FileTransmissionCommand) -> str: - if self.state is State.waiting_for_permission: - if ftc.action is Action.status: - if ftc.status == 'OK': - self.state = State.waiting_for_file_metadata - else: - return 'Permission for transfer denied' - else: - return f'Unexpected response from terminal: {ftc}' - elif self.state is State.waiting_for_file_metadata: - if ftc.action is Action.status: - if ftc.file_id: - try: - fid = int(ftc.file_id) - except Exception: - return f'Unexpected response from terminal: {ftc}' - if fid < 0 or fid >= len(self.spec): - return f'Unexpected response from terminal: {ftc}' - self.failed_specs[fid] = ftc.status - else: - if ftc.status == 'OK': - self.state = State.transferring - self.remote_home = ftc.name - return '' - else: - return ftc.status - elif ftc.action is Action.file: - try: - fid = int(ftc.file_id) - except Exception: - return f'Unexpected response from terminal: {ftc}' - if fid < 0 or fid >= len(self.spec): - return f'Unexpected response from terminal: {ftc}' - self.spec_counts[fid] += 1 - self.files.append(File(ftc)) - else: - return f'Unexpected response from terminal: {ftc}' - elif self.state is State.transferring: - if ftc.action in (Action.data, Action.end_data): - f = self.files_to_be_transferred.get(ftc.file_id) - if f is None: - return f'Got data for unknown file id: {ftc.file_id}' - is_last = ftc.action is Action.end_data - try: - amt_written = f.write_data(ftc.data, is_last) - except Exception as err: - return str(err) - self.progress_tracker.file_written(f, amt_written, is_last) - if is_last: - del self.files_to_be_transferred[ftc.file_id] - if not self.files_to_be_transferred: - return self.finalize_transfer() - return '' - - -class Receive(Handler): - use_alternate_screen = False - - def __init__(self, cli_opts: TransferCLIOptions, spec: List[str], dest: str = ''): - self.cli_opts = cli_opts - self.manager = Manager(random_id(), spec, dest, bypass=cli_opts.permissions_bypass, use_rsync=cli_opts.transmit_deltas) - self.quit_after_write_code: Optional[int] = None - self.check_paths_printed = False - self.transmit_started = False - self.max_name_length = 0 - self.spinner = Spinner() - self.progress_update_call: Optional[TimerHandle] = None - self.progress_drawn = False - self.transmit_iterator: Optional[Iterator[str]] = None - - def send_payload(self, payload: str) -> None: - self.write(self.manager.prefix) - self.write(payload) - self.write(self.manager.suffix) - - def initialize(self) -> None: - self.cmd.set_cursor_visible(False) - self.print('Scanning files…') - for x in self.manager.start_transfer(): - self.send_payload(x) - - def finalize(self) -> None: - self.cmd.set_cursor_visible(True) - - def on_file_transfer_response(self, ftc: FileTransmissionCommand) -> None: - if ftc.id != self.manager.request_id: - return - if ftc.status == 'CANCELED' and ftc.action is Action.status: - self.quit_loop(1) - return - if self.quit_after_write_code is not None or self.manager.state is State.canceled: - return - transfer_started = self.manager.state is State.transferring - err = self.manager.on_file_transfer_response(ftc) - if err: - self.print_err(err) - self.print('Waiting to ensure terminal cancels transfer, will quit in a few seconds') - self.abort_transfer() - return - if not transfer_started and self.manager.state is State.transferring: - if self.manager.failed_specs: - self.print_err('Failed to process some sources') - for spec_id, msg in self.manager.failed_specs.items(): - spec = self.manager.spec[spec_id] - self.print(f'{spec}: {msg}') - self.quit_loop(1) - return - if 0 in self.manager.spec_counts.values(): - self.print_err('No matches found for: ' + ', '.join(self.manager.spec[k] for k, v in self.manager.spec_counts.items() if v == 0)) - self.quit_loop(1) - return - self.manager.collect_files(self.cli_opts) - if self.cli_opts.confirm_paths: - self.confirm_paths() - else: - self.start_transfer() - if self.manager.transfer_done: - self.send_payload(self.manager.finish_code) - self.quit_after_write_code = 0 - self.refresh_progress() - elif self.transmit_started: - self.refresh_progress() - - def confirm_paths(self) -> None: - self.print_check_paths() - - def print_check_paths(self) -> None: - if self.check_paths_printed: - return - self.check_paths_printed = True - self.print('The following file transfers will be performed. A red destination means an existing file will be overwritten.') - for df in self.manager.files: - self.cmd.styled(df.ftype.short_text, fg=df.ftype.color) - self.print(end=' ') - self.print(df.display_name, '→', end=' ') - self.cmd.styled(df.expanded_local_path, fg='red' if os.path.lexists(df.expanded_local_path) else None) - self.print() - self.print(f'Transferring {len(self.manager.files)} file(s) of total size: {human_size(self.manager.progress_tracker.total_size_of_all_files)}') - self.print() - self.print_continue_msg() - - def on_text(self, text: str, in_bracketed_paste: bool = False) -> None: - if self.quit_after_write_code is not None: - return - if self.check_paths_printed and not self.transmit_started: - if text.lower() == 'y': - self.start_transfer() - return - if text.lower() == 'n': - self.abort_transfer() - self.print('Sending cancel request to terminal') - return - self.print_continue_msg() - - def on_key(self, key_event: KeyEventType) -> None: - if self.quit_after_write_code is not None: - return - if key_event.matches('esc'): - if self.check_paths_printed and not self.transmit_started: - self.abort_transfer() - self.print('Sending cancel request to terminal') - else: - self.on_interrupt() - - def print_continue_msg(self) -> None: - self.print( - 'Press', styled('y', fg='green', bold=True, fg_intense=True), 'to continue or', - styled('n', fg='red', bold=True, fg_intense=True), 'to abort') - - def start_transfer(self) -> None: - self.transmit_started = True - n = len(self.manager.files) - msg = 'Transmitting signature of' if self.manager.use_rsync else 'Queueing transfer of' - msg += ' one file' if n == 1 else f'{n} files' - self.print(msg) - names = (f.display_name for f in self.manager.files) - self.max_name_length = max(6, max(map(wcswidth, names))) - self.transmit_iterator = self.manager.request_files() - self.transmit_one() - - def transmit_one(self) -> None: - if self.transmit_iterator is None: - return - try: - data = next(self.transmit_iterator) - except StopIteration: - self.transmit_iterator = None - except Exception as err: - self.print_err(str(err)) - self.print('Waiting to ensure terminal cancels transfer, will quit in a few seconds') - self.abort_transfer() - else: - self.send_payload(data) - - def print_err(self, msg: str) -> None: - self.cmd.styled(msg, fg='red') - self.print() - - def on_term(self) -> None: - if self.quit_after_write_code is not None: - return - self.print_err('Terminate requested, cancelling transfer, transferred files are in undefined state') - self.abort_transfer(delay=2) - - def on_interrupt(self) -> None: - if self.quit_after_write_code is not None: - return - if self.manager.state is State.canceled: - self.print('Waiting for canceled acknowledgement from terminal, will abort in a few seconds if no response received') - return - self.print_err('Interrupt requested, cancelling transfer, transferred files are in undefined state') - self.abort_transfer() - - def abort_transfer(self, delay: float = 5) -> None: - self.send_payload(FileTransmissionCommand(action=Action.cancel).serialize()) - self.manager.state = State.canceled - self.asyncio_loop.call_later(delay, self.quit_loop, 1) - - def render_progress( - self, name: str, spinner_char: str = ' ', bytes_so_far: int = 0, total_bytes: int = 0, - secs_so_far: float = 0., bytes_per_sec: float = 0., is_complete: bool = False - ) -> None: - if is_complete: - bytes_so_far = total_bytes - self.write(render_progress_in_width( - name, width=self.screen_size.cols, max_path_length=self.max_name_length, spinner_char=spinner_char, - bytes_so_far=bytes_so_far, total_bytes=total_bytes, secs_so_far=secs_so_far, - bytes_per_sec=bytes_per_sec, is_complete=is_complete - )) - - def draw_progress_for_current_file(self, af: File, spinner_char: str = ' ', is_complete: bool = False) -> None: - p = self.manager.progress_tracker - now = monotonic() - self.render_progress( - af.display_name, spinner_char=spinner_char, is_complete=is_complete, - bytes_so_far=af.written_bytes, total_bytes=af.expected_size, - secs_so_far=(af.done_at or now) - af.transmit_started_at, - bytes_per_sec=safe_divide(p.transfered_stats_amt, p.transfered_stats_interval) - ) - - def erase_progress(self) -> None: - if self.progress_drawn: - self.cmd.move_cursor_by(2, 'up') - self.write('\r') - self.cmd.clear_to_end_of_screen() - self.progress_drawn = False - - def refresh_progress(self) -> None: - self.erase_progress() - self.draw_progress() - - def schedule_progress_update(self, delay: float = 0.1) -> None: - if self.progress_update_call is None: - self.progress_update_call = self.asyncio_loop.call_later(delay, self.refresh_progress) - elif self.asyncio_loop.time() + delay < self.progress_update_call.when(): - self.progress_update_call.cancel() - self.progress_update_call = self.asyncio_loop.call_later(delay, self.refresh_progress) - - @Handler.atomic_update - def draw_progress(self) -> None: - if self.manager.state is State.canceled: - return - with without_line_wrap(self.write): - for df in self.manager.progress_tracker.done_files: - sc = styled('✔', fg='green') - if df.ftype is FileType.regular: - self.draw_progress_for_current_file(df, spinner_char=sc, is_complete=True) - else: - self.write(f'{sc} {df.display_name} {styled(df.ftype.name, dim=True, italic=True)}') - self.print() - del self.manager.progress_tracker.done_files[:] - is_complete = self.quit_after_write_code is not None - if is_complete: - sc = styled('✔', fg='green') if self.quit_after_write_code == 0 else styled('✘', fg='red') - else: - sc = self.spinner() - p = self.manager.progress_tracker - now = monotonic() - if is_complete: - self.cmd.repeat('─', self.screen_size.width) - else: - af = p.active_file - if af is not None: - self.draw_progress_for_current_file(af, spinner_char=sc) - self.print() - if p.total_transferred > 0: - self.render_progress( - 'Total', spinner_char=sc, - bytes_so_far=p.total_transferred, total_bytes=p.total_bytes_to_transfer, - secs_so_far=now - p.started_at, is_complete=is_complete, - bytes_per_sec=safe_divide(p.transfered_stats_amt, p.transfered_stats_interval) - ) - else: - self.print('File data transfer has not yet started', end='') - self.print() - self.schedule_progress_update(self.spinner.interval) - self.progress_drawn = True - - def on_writing_finished(self) -> None: - if self.quit_after_write_code is not None: - self.quit_loop(self.quit_after_write_code) - elif self.transmit_iterator is not None: - self.transmit_one() - - def on_resize(self, screen_size: ScreenSize) -> None: - super().on_resize(screen_size) - if self.progress_drawn: - self.refresh_progress() - - -def receive_main(cli_opts: TransferCLIOptions, args: List[str]) -> None: - dest = '' - if cli_opts.mode == 'mirror': - if len(args) < 1: - raise SystemExit('Must specify at least one file to transfer') - spec = list(args) - else: - if len(args) < 2: - raise SystemExit('Must specify at least one source and a destination file to transfer') - spec, dest = args[:-1], args[-1] - - loop = Loop() - handler = Receive(cli_opts, spec, dest) - loop.loop(handler) - for f in handler.manager.files: - f.close() - tsf = dsz = ssz = 0 - for f in handler.manager.files: - if f.expect_diff: - tsf += f.expected_size - dsz += f.received_bytes - ssz += f.sent_bytes - if tsf and dsz + ssz: - print_rsync_stats(tsf, dsz, ssz) diff --git a/kittens/transfer/send.py b/kittens/transfer/send.py deleted file mode 100644 index 6970dcd5e..000000000 --- a/kittens/transfer/send.py +++ /dev/null @@ -1,751 +0,0 @@ -#!/usr/bin/env python -# License: GPLv3 Copyright: 2021, Kovid Goyal - - -import os -import stat -from asyncio import TimerHandle -from collections import deque -from enum import auto -from itertools import count -from time import monotonic -from typing import IO, Callable, Deque, Dict, Iterable, Iterator, List, Optional, Sequence, Set, Tuple, Union - -from kitty.cli_stub import TransferCLIOptions -from kitty.fast_data_types import FILE_TRANSFER_CODE, wcswidth -from kitty.file_transmission import Action, Compression, FileTransmissionCommand, FileType, NameReprEnum, TransmissionType, encode_bypass, split_for_transfer -from kitty.typing import KeyEventType, ScreenSize -from kitty.utils import sanitize_control_codes - -from ..tui.handler import Handler -from ..tui.loop import Loop, debug -from ..tui.operations import styled, without_line_wrap -from ..tui.spinners import Spinner -from ..tui.utils import human_size -from .utils import ( - IdentityCompressor, - ZlibCompressor, - abspath, - expand_home, - home_path, - print_rsync_stats, - random_id, - render_progress_in_width, - safe_divide, - should_be_compressed, -) - -debug - - -def get_remote_path(local_path: str, remote_base: str) -> str: - if not remote_base: - return local_path.replace(os.sep, '/') - if remote_base.endswith('/'): - return os.path.join(remote_base, os.path.basename(local_path)) - return remote_base - - -class FileState(NameReprEnum): - waiting_for_start = auto() - waiting_for_data = auto() - transmitting = auto() - finished = auto() - acknowledged = auto() - - -class File: - - def __init__( - self, local_path: str, expanded_local_path: str, file_id: int, stat_result: os.stat_result, - remote_base: str, file_type: FileType, - ) -> None: - self.state = FileState.waiting_for_start - self.local_path = local_path - self.display_name = sanitize_control_codes(local_path) - self.expanded_local_path = expanded_local_path - self.permissions = stat.S_IMODE(stat_result.st_mode) - self.mtime = stat_result.st_mtime_ns - self.file_size = self.bytes_to_transmit = stat_result.st_size - self.file_hash = stat_result.st_dev, stat_result.st_ino - self.remote_path = get_remote_path(self.local_path, remote_base) - self.remote_path = self.remote_path.replace(os.sep, '/') - self.file_id = hex(file_id)[2:] - self.hard_link_target = '' - self.symbolic_link_target = '' - self.stat_result = stat_result - self.file_type = file_type - self.rsync_capable = self.file_type is FileType.regular and self.file_size > 4096 - self.compression_capable = self.file_type is FileType.regular and self.file_size > 4096 and should_be_compressed(self.expanded_local_path) - self.remote_final_path = '' - self.remote_initial_size = -1 - self.err_msg = '' - self.actual_file: Optional[IO[bytes]] = None - self.transmitted_bytes = 0 - self.reported_progress = 0 - self.transmit_started_at = self.transmit_ended_at = self.done_at = 0. - self.signature_loader: Optional[LoadSignature] = None - self.delta_loader: Optional[Iterator[memoryview]] = None - - def start_delta_calculation(self) -> None: - sl = self.signature_loader - assert sl is not None - self.state = FileState.transmitting - self.delta_loader = delta_for_file(self.expanded_local_path, sl.signature) - - def __repr__(self) -> str: - return f'File(name={self.display_name}, ft={self.file_type}, state={self.state})' - - def next_chunk(self, sz: int = 1024 * 1024) -> Tuple[bytes, int]: - if self.file_type is FileType.symlink: - self.state = FileState.finished - ans = self.symbolic_link_target.encode('utf-8') - return ans, len(ans) - if self.file_type is FileType.link: - self.state = FileState.finished - ans = self.hard_link_target.encode('utf-8') - return ans, len(ans) - is_last = False - if self.delta_loader is not None: - try: - chunk: Union[bytes, memoryview] = next(self.delta_loader) - except StopIteration: - is_last = True - self.delta_loader = None - chunk = b'' - else: - if self.actual_file is None: - self.actual_file = open(self.expanded_local_path, 'rb') - chunk = self.actual_file.read(sz) - is_last = not chunk or self.actual_file.tell() >= self.file_size - uncompressed_sz = len(chunk) - cchunk = self.compressor.compress(chunk) - if is_last and not isinstance(self.compressor, IdentityCompressor): - cchunk += self.compressor.flush() - if is_last: - self.state = FileState.finished - if self.actual_file is not None: - self.actual_file.close() - self.actual_file = None - return cchunk, uncompressed_sz - - def metadata_command(self, use_rsync: bool = False) -> FileTransmissionCommand: - self.ttype = TransmissionType.rsync if self.rsync_capable and use_rsync else TransmissionType.simple - self.compression = Compression.zlib if self.compression_capable else Compression.none - self.compressor: Union[ZlibCompressor, IdentityCompressor] = ZlibCompressor() if self.compression is Compression.zlib else IdentityCompressor() - return FileTransmissionCommand( - action=Action.file, compression=self.compression, ftype=self.file_type, - name=self.remote_path, permissions=self.permissions, mtime=self.mtime, - file_id=self.file_id, ttype=self.ttype - ) - - -def process(cli_opts: TransferCLIOptions, paths: Iterable[str], remote_base: str, counter: Iterator[int]) -> Iterator[File]: - for x in paths: - expanded = expand_home(x) - try: - s = os.stat(expanded, follow_symlinks=False) - except OSError as err: - raise SystemExit(f'Failed to stat {x} with error: {err}') from err - if stat.S_ISDIR(s.st_mode): - yield File(x, expanded, next(counter), s, remote_base, FileType.directory) - new_remote_base = remote_base - if new_remote_base: - new_remote_base = new_remote_base.rstrip('/') + '/' + os.path.basename(x) + '/' - else: - new_remote_base = x.replace(os.sep, '/').rstrip('/') + '/' - yield from process(cli_opts, [os.path.join(x, y) for y in os.listdir(expanded)], new_remote_base, counter) - elif stat.S_ISLNK(s.st_mode): - yield File(x, expanded, next(counter), s, remote_base, FileType.symlink) - elif stat.S_ISREG(s.st_mode): - yield File(x, expanded, next(counter), s, remote_base, FileType.regular) - - -def process_mirrored_files(cli_opts: TransferCLIOptions, args: Sequence[str]) -> Iterator[File]: - paths = [abspath(x) for x in args] - try: - common_path = os.path.commonpath(paths) - except ValueError: - common_path = '' - home = home_path().rstrip(os.sep) - if common_path and common_path.startswith(home + os.sep): - paths = [os.path.join('~', os.path.relpath(x, home)) for x in paths] - yield from process(cli_opts, paths, '', count(1)) - - -def process_normal_files(cli_opts: TransferCLIOptions, args: Sequence[str]) -> Iterator[File]: - if len(args) < 2: - raise SystemExit('Must specify at least one local path and one remote path') - args = list(args) - remote_base = args.pop().replace(os.sep, '/') - if len(args) > 1 and not remote_base.endswith('/'): - remote_base += '/' - paths = [abspath(x) for x in args] - yield from process(cli_opts, paths, remote_base, count(1)) - - -def files_for_send(cli_opts: TransferCLIOptions, args: List[str]) -> Tuple[File, ...]: - if cli_opts.mode == 'mirror': - files = list(process_mirrored_files(cli_opts, args)) - else: - files = list(process_normal_files(cli_opts, args)) - groups: Dict[Tuple[int, int], List[File]] = {} - - # detect hard links - for f in files: - groups.setdefault(f.file_hash, []).append(f) - for group in groups.values(): - if len(group) > 1: - for lf in group[1:]: - lf.file_type = FileType.link - lf.hard_link_target = group[0].file_id - - # detect symlinks to other transferred files - for f in tuple(files): - if f.file_type is FileType.symlink: - try: - link_dest = os.readlink(f.local_path) - except OSError: - files.remove(f) - continue - f.symbolic_link_target = f'path:{link_dest}' - is_abs = os.path.isabs(link_dest) - q = link_dest if is_abs else os.path.join(os.path.dirname(f.local_path), link_dest) - try: - st = os.stat(q) - except OSError: - pass - else: - fh = st.st_dev, st.st_ino - if fh in groups: - g = tuple(x for x in groups[fh] if os.path.samestat(st, x.stat_result)) - if g: - t = g[0] - prefix = 'fid_abs' if is_abs else 'fid' - f.symbolic_link_target = f'{prefix}:{t.file_id}' - return tuple(files) - - -class SendState(NameReprEnum): - waiting_for_permission = auto() - permission_granted = auto() - permission_denied = auto() - canceled = auto() - - -class Transfer: - - def __init__(self, amt: int = 0): - self.amt = amt - self.at = monotonic() - - def is_too_old(self, now: float) -> bool: - return now - self.at > 30 - - -class ProgressTracker: - - def __init__(self, total_size_of_all_files: int): - self.total_size_of_all_files = total_size_of_all_files - self.total_bytes_to_transfer = total_size_of_all_files - self.active_file: Optional[File] = None - self.total_transferred = 0 - self.transfers: Deque[Transfer] = deque() - self.transfered_stats_amt = 0 - self.transfered_stats_interval = 0. - self.started_at = 0. - self.signature_bytes = 0 - self.total_reported_progress = 0 - - def change_active_file(self, nf: File) -> None: - now = monotonic() - self.active_file = nf - nf.transmit_started_at = now - - def start_transfer(self) -> None: - self.transfers.append(Transfer()) - self.started_at = monotonic() - - def on_transmit(self, amt: int) -> None: - if self.active_file is not None: - self.active_file.transmitted_bytes += amt - self.total_transferred += amt - self.transfers.append(Transfer(amt)) - now = self.transfers[-1].at - while len(self.transfers) > 2 and self.transfers[0].is_too_old(now): - self.transfers.popleft() - self.transfered_stats_interval = now - self.transfers[0].at - self.transfered_stats_amt = sum(t.amt for t in self.transfers) - - def on_file_progress(self, af: File, delta: int) -> None: - if delta > 0: - self.total_reported_progress += delta - - def on_file_done(self, af: File) -> None: - af.done_at = monotonic() - - -class SendManager: - - def __init__( - self, request_id: str, files: Tuple[File, ...], - bypass: Optional[str] = None, use_rsync: bool = False, - file_progress: Callable[[File, int], None] = lambda f, i: None, - file_done: Callable[[File], None] = lambda f: None, - ): - self.use_rsync = use_rsync - self.files = files - self.bypass = encode_bypass(request_id, bypass) if bypass else '' - self.fid_map = {f.file_id: f for f in self.files} - self.request_id = request_id - self.state = SendState.waiting_for_permission - self.all_acknowledged = self.all_started = self.has_transmitting = self.has_rsync = False - self.active_idx: Optional[int] = None - self.current_chunk_uncompressed_sz: Optional[int] = None - self.prefix = f'\x1b]{FILE_TRANSFER_CODE};id={self.request_id};' - self.suffix = '\x1b\\' - self.progress = ProgressTracker(sum(df.file_size for df in self.files if df.file_size >= 0)) - self.file_done = file_done - self.file_progress = file_progress - self.last_progress_file: Optional[File] = None - - @property - def active_file(self) -> Optional[File]: - if self.active_idx is not None: - ans = self.files[self.active_idx] - if ans.state is FileState.transmitting: - return ans - return None - - def activate_next_ready_file(self) -> Optional[File]: - if self.active_idx is not None: - paf = self.files[self.active_idx] - paf.transmit_ended_at = monotonic() - for i, f in enumerate(self.files): - if f.state is FileState.transmitting: - self.active_idx = i - self.update_collective_statuses() - self.progress.change_active_file(f) - return f - self.active_idx = None - self.update_collective_statuses() - return None - - def update_collective_statuses(self) -> None: - found_not_started = found_not_done = False - has_rsync = has_transmitting = False - for f in self.files: - if f.state is not FileState.acknowledged: - found_not_done = True - if f.state is FileState.waiting_for_start: - found_not_started = True - elif f.state is FileState.transmitting: - has_transmitting = True - if f.ttype is TransmissionType.rsync: - has_rsync = True - - self.all_acknowledged = not found_not_done - self.all_started = not found_not_started - self.has_rsync = has_rsync - self.has_transmitting = has_transmitting - - def start_transfer(self) -> str: - return FileTransmissionCommand(action=Action.send, bypass=self.bypass).serialize() - - def next_chunks(self) -> Iterator[str]: - if self.active_file is None: - self.activate_next_ready_file() - af = self.active_file - if af is None: - return - chunk = b'' - self.current_chunk_uncompressed_sz = 0 - while af.state is not FileState.finished and not chunk: - chunk, usz = af.next_chunk() - self.current_chunk_uncompressed_sz += usz - is_last = af.state is FileState.finished - if len(chunk): - for ftc in split_for_transfer(chunk, file_id=af.file_id, mark_last=is_last): - yield ftc.serialize() - elif is_last: - yield FileTransmissionCommand(action=Action.end_data, file_id=af.file_id, data=b'').serialize() - - def send_file_metadata(self) -> Iterator[str]: - for f in self.files: - yield f.metadata_command(self.use_rsync).serialize() - - def on_file_status_update(self, ftc: FileTransmissionCommand) -> None: - file = self.fid_map.get(ftc.file_id) - if file is None: - return - if ftc.status == 'STARTED': - file.remote_final_path = ftc.name - file.remote_initial_size = ftc.size - if file.file_type is FileType.directory: - file.state = FileState.finished - else: - file.state = FileState.waiting_for_data if ftc.ttype is TransmissionType.rsync else FileState.transmitting - if file.state is FileState.waiting_for_data: - file.signature_loader = LoadSignature() - self.update_collective_statuses() - elif ftc.status == 'PROGRESS': - self.last_progress_file = file - change = ftc.size - file.reported_progress - file.reported_progress = ftc.size - self.progress.on_file_progress(file, change) - self.file_progress(file, change) - else: - if ftc.name and not file.remote_final_path: - file.remote_final_path = ftc.name - file.state = FileState.acknowledged - if ftc.status == 'OK': - if ftc.size > 0: - change = ftc.size - file.reported_progress - file.reported_progress = ftc.size - self.progress.on_file_progress(file, change) - self.file_progress(file, change) - else: - file.err_msg = ftc.status - self.progress.on_file_done(file) - self.file_done(file) - if self.active_idx is not None and file is self.files[self.active_idx]: - self.active_idx = None - self.update_collective_statuses() - - def on_signature_data_received(self, ftc: FileTransmissionCommand) -> None: - file = self.fid_map.get(ftc.file_id) - if file is None or file.state is not FileState.waiting_for_data: - return - sl = file.signature_loader - assert sl is not None - sl.add_chunk(ftc.data) - self.progress.signature_bytes += len(ftc.data) - if ftc.action is Action.end_data: - sl.commit() - file.start_delta_calculation() - self.update_collective_statuses() - - def on_file_transfer_response(self, ftc: FileTransmissionCommand) -> None: - if ftc.action is Action.status: - if ftc.file_id: - self.on_file_status_update(ftc) - else: - self.state = SendState.permission_granted if ftc.status == 'OK' else SendState.permission_denied - elif ftc.action in (Action.data, Action.end_data): - if ftc.file_id: - self.on_signature_data_received(ftc) - - -class Send(Handler): - use_alternate_screen = False - - def __init__(self, cli_opts: TransferCLIOptions, files: Tuple[File, ...]): - Handler.__init__(self) - self.manager = SendManager( - random_id(), files, cli_opts.permissions_bypass, cli_opts.transmit_deltas, self.on_file_progress, self.on_file_done) - self.cli_opts = cli_opts - self.transmit_started = False - self.file_metadata_sent = False - self.quit_after_write_code: Optional[int] = None - self.check_paths_printed = False - names = tuple(x.display_name for x in self.manager.files) - self.max_name_length = max(6, max(map(wcswidth, names))) - self.spinner = Spinner() - self.progress_drawn = True - self.done_files: List[File] = [] - self.done_file_ids: Set[str] = set() - self.failed_files: List[File] = [] - self.transmit_ok_checked = False - self.progress_update_call: Optional[TimerHandle] = None - - def send_payload(self, payload: str) -> None: - self.write(self.manager.prefix) - self.write(payload) - self.write(self.manager.suffix) - - def on_file_transfer_response(self, ftc: FileTransmissionCommand) -> None: - if ftc.id != self.manager.request_id: - return - if ftc.status == 'CANCELED' and ftc.action is Action.status: - self.quit_loop(1) - return - if self.quit_after_write_code is not None or self.manager.state is SendState.canceled: - return - before = self.manager.state - self.manager.on_file_transfer_response(ftc) - if before == SendState.waiting_for_permission: - if self.manager.state == SendState.permission_denied: - self.cmd.styled('Permission denied for this transfer', fg='red') - self.print() - self.quit_loop(1) - return - if self.manager.state == SendState.permission_granted: - self.cmd.styled('Permission granted for this transfer', fg='green') - self.print() - self.send_file_metadata() - self.asyncio_loop.call_soon(self.loop_tick) - - def start_transfer(self) -> None: - if self.manager.active_file is None: - self.manager.activate_next_ready_file() - if self.manager.active_file is not None: - self.transmit_started = True - self.manager.progress.start_transfer() - self.transmit_next_chunk() - self.draw_progress() - - def print_check_paths(self) -> None: - if self.check_paths_printed: - return - self.check_paths_printed = True - self.print('The following file transfers will be performed. A red destination means an existing file will be overwritten.') - for df in self.manager.files: - self.cmd.styled(df.file_type.short_text, fg=df.file_type.color) - self.print(end=' ') - self.print(df.display_name, '→', end=' ') - self.cmd.styled(df.remote_final_path, fg='red' if df.remote_initial_size > -1 else None) - self.print() - self.print(f'Transferring {len(self.manager.files)} files of total size: {human_size(self.manager.progress.total_bytes_to_transfer)}') - self.print() - self.print_continue_msg() - - def print_continue_msg(self) -> None: - self.print( - 'Press', styled('y', fg='green', bold=True, fg_intense=True), 'to continue or', - styled('n', fg='red', bold=True, fg_intense=True), 'to abort') - - def on_text(self, text: str, in_bracketed_paste: bool = False) -> None: - if self.quit_after_write_code is not None: - return - if self.check_paths_printed and not self.transmit_started: - if text.lower() == 'y': - self.start_transfer() - if self.manager.all_acknowledged: - self.refresh_progress() - self.transfer_finished() - return - if text.lower() == 'n': - del self.failed_files[:] - self.abort_transfer() - self.print('Sending cancel request to terminal') - return - self.print_continue_msg() - - def on_key(self, key_event: KeyEventType) -> None: - if self.quit_after_write_code is not None: - return - if key_event.matches('esc'): - if self.check_paths_printed and not self.transmit_started: - del self.failed_files[:] - self.abort_transfer() - self.print('Sending cancel request to terminal') - else: - self.on_interrupt() - - def check_for_transmit_ok(self) -> None: - if self.transmit_ok_checked: - return self.start_transfer() - if self.manager.state is not SendState.permission_granted: - return - if self.cli_opts.confirm_paths: - if self.manager.all_started: - self.print_check_paths() - return - self.transmit_ok_checked = True - self.start_transfer() - - def transmit_next_chunk(self) -> None: - found_chunk = False - for chunk in self.manager.next_chunks(): - self.send_payload(chunk) - found_chunk = True - if not found_chunk: - if self.manager.all_acknowledged: - self.transfer_finished() - - def transfer_finished(self) -> None: - self.send_payload(FileTransmissionCommand(action=Action.finish).serialize()) - self.quit_after_write_code = 1 if self.failed_files else 0 - - def on_writing_finished(self) -> None: - chunk_transmitted = self.manager.current_chunk_uncompressed_sz is not None - if chunk_transmitted: - self.manager.progress.on_transmit(self.manager.current_chunk_uncompressed_sz or 0) - self.manager.current_chunk_uncompressed_sz = None - if self.quit_after_write_code is not None: - self.quit_loop(self.quit_after_write_code) - return - if self.manager.state is SendState.permission_granted and (not self.transmit_started or chunk_transmitted): - self.asyncio_loop.call_soon(self.loop_tick) - - def loop_tick(self) -> None: - if self.manager.state is SendState.waiting_for_permission: - return - if self.transmit_started: - self.transmit_next_chunk() - self.refresh_progress() - else: - self.check_for_transmit_ok() - - def initialize(self) -> None: - self.send_payload(self.manager.start_transfer()) - if self.cli_opts.permissions_bypass: - # dont wait for permission, not needed with a bypass and - # avoids a roundtrip - self.send_file_metadata() - self.cmd.set_cursor_visible(False) - - def finalize(self) -> None: - self.cmd.set_cursor_visible(True) - - def send_file_metadata(self) -> None: - if not self.file_metadata_sent: - for payload in self.manager.send_file_metadata(): - self.send_payload(payload) - self.file_metadata_sent = True - - def on_term(self) -> None: - if self.quit_after_write_code is not None: - return - self.cmd.styled('Terminate requested, cancelling transfer, transferred files are in undefined state', fg='red') - self.print() - self.abort_transfer(delay=2) - - def on_interrupt(self) -> None: - if self.quit_after_write_code is not None: - return - if self.manager.state is SendState.canceled: - self.print('Waiting for canceled acknowledgement from terminal, will abort in a few seconds if no response received') - return - self.cmd.styled('Interrupt requested, cancelling transfer, transferred files are in undefined state', fg='red') - self.print() - self.abort_transfer() - - def abort_transfer(self, delay: float = 5) -> None: - self.send_payload(FileTransmissionCommand(action=Action.cancel).serialize()) - self.manager.state = SendState.canceled - self.asyncio_loop.call_later(delay, self.quit_loop, 1) - - def render_progress( - self, name: str, spinner_char: str = ' ', bytes_so_far: int = 0, total_bytes: int = 0, - secs_so_far: float = 0., bytes_per_sec: float = 0., is_complete: bool = False - ) -> None: - if is_complete: - bytes_so_far = total_bytes - self.write(render_progress_in_width( - name, width=self.screen_size.cols, max_path_length=self.max_name_length, spinner_char=spinner_char, - bytes_so_far=bytes_so_far, total_bytes=total_bytes, secs_so_far=secs_so_far, - bytes_per_sec=bytes_per_sec, is_complete=is_complete - )) - - def erase_progress(self) -> None: - if self.progress_drawn: - self.cmd.move_cursor_by(2, 'up') - self.write('\r') - self.cmd.clear_to_end_of_screen() - self.progress_drawn = False - - def schedule_progress_update(self, delay: float = 0.1) -> None: - if self.progress_update_call is None: - self.progress_update_call = self.asyncio_loop.call_later(delay, self.refresh_progress) - elif self.asyncio_loop.time() + delay < self.progress_update_call.when(): - self.progress_update_call.cancel() - self.progress_update_call = self.asyncio_loop.call_later(delay, self.refresh_progress) - - def on_file_progress(self, file: File, change: int) -> None: - self.schedule_progress_update() - - def on_file_done(self, file: File) -> None: - self.done_files.append(file) - if file.err_msg: - self.failed_files.append(file) - self.schedule_progress_update() - - @Handler.atomic_update - def draw_progress(self) -> None: - with without_line_wrap(self.write): - for df in self.done_files: - sc = styled('✔', fg='green') if not df.err_msg else styled('✘', fg='red') - if df.file_type is FileType.regular: - self.draw_progress_for_current_file(df, spinner_char=sc, is_complete=True) - else: - self.write(f'{sc} {df.display_name} {styled(df.file_type.name, dim=True, italic=True)}') - self.print() - self.done_file_ids.add(df.file_id) - del self.done_files[:] - is_complete = self.quit_after_write_code is not None - if is_complete: - sc = styled('✔', fg='green') if self.quit_after_write_code == 0 else styled('✘', fg='red') - else: - sc = self.spinner() - p = self.manager.progress - now = monotonic() - if is_complete: - self.cmd.repeat('─', self.screen_size.width) - else: - af = self.manager.last_progress_file - if af is None or af.file_id in self.done_file_ids: - if self.manager.has_rsync and not self.manager.has_transmitting: - self.print(sc, 'Transferring rsync signatures...', end='') - else: - self.print(sc, 'Transferring metadata...', end='') - else: - self.draw_progress_for_current_file(af, spinner_char=sc) - self.print() - if p.total_reported_progress > 0: - self.render_progress( - 'Total', spinner_char=sc, - bytes_so_far=p.total_reported_progress, total_bytes=p.total_bytes_to_transfer, - secs_so_far=now - p.started_at, is_complete=is_complete, - bytes_per_sec=safe_divide(p.transfered_stats_amt, p.transfered_stats_interval) - ) - else: - self.print('File data transfer has not yet started', end='') - self.print() - self.schedule_progress_update(self.spinner.interval) - self.progress_drawn = True - - def on_resize(self, screen_size: ScreenSize) -> None: - super().on_resize(screen_size) - if self.progress_drawn: - self.refresh_progress() - - def refresh_progress(self) -> None: - if not self.transmit_started: - return - self.erase_progress() - self.draw_progress() - - def draw_progress_for_current_file(self, af: File, spinner_char: str = ' ', is_complete: bool = False) -> None: - p = self.manager.progress - now = monotonic() - self.render_progress( - af.display_name, spinner_char=spinner_char, is_complete=is_complete, - bytes_so_far=af.reported_progress, total_bytes=af.bytes_to_transmit, - secs_so_far=(af.done_at or now) - af.transmit_started_at, - bytes_per_sec=safe_divide(p.transfered_stats_amt, p.transfered_stats_interval) - ) - - -def send_main(cli_opts: TransferCLIOptions, args: List[str]) -> None: - print('Scanning files…') - files = files_for_send(cli_opts, args) - print(f'Found {len(files)} files and directories, requesting transfer permission…') - loop = Loop() - handler = Send(cli_opts, files) - loop.loop(handler) - p = handler.manager.progress - if handler.manager.has_rsync and p.total_transferred + p.signature_bytes: - tsf = 0 - for f in files: - if f.ttype is TransmissionType.rsync: - tsf += f.file_size - if tsf: - print_rsync_stats(tsf, p.total_transferred, p.signature_bytes) - if handler.failed_files: - print(f'Transfer of {len(handler.failed_files)} out of {len(handler.manager.files)} files failed') - for ff in handler.failed_files: - print(styled(ff.display_name, fg='red')) - print(' ', ff.err_msg) - - raise SystemExit(loop.return_code)