From cb32a0b8fc2b5ea5659179c1fdd449057c40afa2 Mon Sep 17 00:00:00 2001 From: Kovid Goyal Date: Tue, 22 Feb 2022 11:47:44 +0530 Subject: [PATCH] Handle remote print callback in tests --- kitty/window.py | 13 ++++++++----- kitty_tests/__init__.py | 10 +++++++++- 2 files changed, 17 insertions(+), 6 deletions(-) diff --git a/kitty/window.py b/kitty/window.py index 13d1bea5f..d4312ddf3 100644 --- a/kitty/window.py +++ b/kitty/window.py @@ -301,6 +301,13 @@ def text_sanitizer(as_ansi: bool, add_wrap_markers: bool) -> Callable[[str], str return remove_wrap_markers +def process_remote_print(msg: bytes) -> str: + from base64 import standard_b64decode + from .cli import green + text = standard_b64decode(msg).decode('utf-8', 'replace') + return text.replace('\x1b', green(r'\e')).replace('\a', green(r'\a')).replace('\0', green(r'\0')) + + class EdgeWidths: left: Optional[float] top: Optional[float] @@ -845,11 +852,7 @@ class Window: get_boss().handle_remote_cmd(cmd, self) def handle_remote_print(self, msg: bytes) -> None: - from base64 import standard_b64decode - - from .cli import green - text = standard_b64decode(msg).decode('utf-8') - text = text.replace('\x1b', green(r'\e')).replace('\a', green(r'\a')).replace('\0', green(r'\0')) + text = process_remote_print(msg) print(text, end='', file=sys.stderr) sys.stderr.flush() diff --git a/kitty_tests/__init__.py b/kitty_tests/__init__.py index e31a5ca5e..3ec9dad2b 100644 --- a/kitty_tests/__init__.py +++ b/kitty_tests/__init__.py @@ -7,6 +7,7 @@ import os import select import shlex import struct +import sys import termios import time from pty import CHILD, fork @@ -20,6 +21,7 @@ from kitty.options.parse import merge_result_dicts from kitty.options.types import Options, defaults from kitty.types import MouseEvent from kitty.utils import read_screen_size, write_all +from kitty.window import process_remote_print class Callbacks: @@ -87,6 +89,10 @@ class Callbacks: self.current_mouse_button = 0 return True + def handle_remote_print(self, msg): + text = process_remote_print(msg) + print(text, file=sys.__stderr__) + def filled_line_buf(ynum=5, xnum=5, cursor=Cursor()): ans = LineBuf(ynum, xnum) @@ -178,6 +184,7 @@ class PTY: termios.tcsetattr(self.master_fd, termios.TCSADRAIN, new) self.callbacks = Callbacks() self.screen = Screen(self.callbacks, rows, columns, scrollback, cell_width, cell_height, 0, self.callbacks) + self.received_bytes = b'' def __del__(self): if not self.is_child: @@ -205,6 +212,7 @@ class PTY: if not data: break bytes_read += len(data) + self.received_bytes += data parse_bytes(self.screen, data) return bytes_read @@ -213,7 +221,7 @@ class PTY: while not q() and time.monotonic() - st < timeout: self.process_input_from_child(timeout=timeout - (time.monotonic() - st)) if not q(): - raise TimeoutError('The condition was not met') + raise TimeoutError(f'The condition was not met. Screen contents: \n {repr(self.screen_contents())}') def set_window_size(self, rows=25, columns=80): if hasattr(self, 'screen'):