From ea48332f467b8ecac7f67d9bb0f0cf175aed6066 Mon Sep 17 00:00:00 2001 From: Kovid Goyal Date: Sat, 14 Mar 2020 09:52:56 +0530 Subject: [PATCH] more typing work --- kitty/boss.py | 331 +++++++++++++++++++++----------------- kitty/fast_data_types.pyi | 3 + kitty/launch.py | 5 +- kitty/rc/__init__.py | 1 - kitty/rc/resize_window.py | 4 +- kitty/tabs.py | 2 +- kitty/window.py | 10 +- 7 files changed, 202 insertions(+), 154 deletions(-) diff --git a/kitty/boss.py b/kitty/boss.py index 2ca5c1747..234d16eb4 100644 --- a/kitty/boss.py +++ b/kitty/boss.py @@ -18,9 +18,9 @@ from weakref import WeakValueDictionary from .child import cached_process_data, cwd_of_process from .cli import create_opts, parse_args from .cli_stub import CLIOptions -from .conf.utils import to_cmdline +from .conf.utils import BadLine, to_cmdline from .config import ( - SubSequenceMap, common_opts_as_dict, initial_window_size_func, + KeyAction, SubSequenceMap, common_opts_as_dict, initial_window_size_func, prepare_config_file_for_editing ) from .config_data import MINIMUM_FONT_SIZE @@ -298,7 +298,7 @@ class Boss: def _new_os_window(self, args: Union[SpecialWindowInstance, Iterable[str]], cwd_from: Optional[int] = None) -> int: if isinstance(args, SpecialWindowInstance): - sw = args + sw: Optional[SpecialWindowInstance] = args else: sw = self.args_to_special_window(args, cwd_from) if args else None startup_session = next(create_sessions(self.opts, special_window=sw, cwd_from=cwd_from)) @@ -344,8 +344,8 @@ class Boss: response = {'ok': False, 'error': 'Remote control is disabled. Add allow_remote_control to your kitty.conf'} return response - def peer_message_received(self, msg): - msg = msg.decode('utf-8') + def peer_message_received(self, msg_bytes: bytes) -> Optional[bytes]: + msg = msg_bytes.decode('utf-8') cmd_prefix = '\x1bP@kitty-cmd' if msg.startswith(cmd_prefix): cmd = msg[len(cmd_prefix):-2] @@ -353,30 +353,30 @@ class Boss: if response is None: return None return (cmd_prefix + json.dumps(response) + '\x1b\\').encode('utf-8') + data = json.loads(msg) + if isinstance(data, dict) and data.get('cmd') == 'new_instance': + from .cli_stub import CLIOptions + startup_id = data.get('startup_id') + args, rest = parse_args(data['args'][1:], result_class=CLIOptions) + args.args = rest + opts = create_opts(args) + if not os.path.isabs(args.directory): + args.directory = os.path.join(data['cwd'], args.directory) + for session in create_sessions(opts, args, respect_cwd=True): + os_window_id = self.add_os_window(session, wclass=args.cls, wname=args.name, opts_for_size=opts, startup_id=startup_id) + if data.get('notify_on_os_window_death'): + self.os_window_death_actions[os_window_id] = partial(self.notify_on_os_window_death, data['notify_on_os_window_death']) else: - msg = json.loads(msg) - if isinstance(msg, dict) and msg.get('cmd') == 'new_instance': - from .cli_stub import CLIOptions - startup_id = msg.get('startup_id') - args, rest = parse_args(msg['args'][1:], result_class=CLIOptions) - args.args = rest - opts = create_opts(args) - if not os.path.isabs(args.directory): - args.directory = os.path.join(msg['cwd'], args.directory) - for session in create_sessions(opts, args, respect_cwd=True): - os_window_id = self.add_os_window(session, wclass=args.cls, wname=args.name, opts_for_size=opts, startup_id=startup_id) - if msg.get('notify_on_os_window_death'): - self.os_window_death_actions[os_window_id] = partial(self.notify_on_os_window_death, msg['notify_on_os_window_death']) - else: - log_error('Unknown message received from peer, ignoring') + log_error('Unknown message received from peer, ignoring') + return None - def handle_remote_cmd(self, cmd, window=None): + def handle_remote_cmd(self, cmd: str, window: Optional[Window] = None) -> None: response = self._handle_remote_command(cmd, window) if response is not None: if window is not None: window.send_cmd_response(response) - def _cleanup_tab_after_window_removal(self, src_tab): + def _cleanup_tab_after_window_removal(self, src_tab: Tab) -> None: if len(src_tab) < 1: tm = src_tab.tab_manager_ref() if tm is not None: @@ -386,7 +386,7 @@ class Boss: if not self.shutting_down: mark_os_window_for_close(src_tab.os_window_id) - def on_child_death(self, window_id): + def on_child_death(self, window_id: int) -> None: window = self.window_id_map.pop(window_id, None) if window is None: return @@ -416,24 +416,24 @@ class Boss: traceback.print_exc() window.action_on_close = window.action_on_removal = None - def close_window(self, window=None): - if window is None: - window = self.active_window - self.child_monitor.mark_for_close(window.id) + def close_window(self, window: Optional[Window] = None) -> None: + window = window or self.active_window + if window: + self.child_monitor.mark_for_close(window.id) - def close_tab(self, tab=None): - if tab is None: - tab = self.active_tab - for window in tab: - self.close_window(window) + def close_tab(self, tab: Optional[Tab] = None) -> None: + tab = tab or self.active_tab + if tab: + for window in tab: + self.close_window(window) - def toggle_fullscreen(self): + def toggle_fullscreen(self) -> None: toggle_fullscreen() - def toggle_maximized(self): + def toggle_maximized(self) -> None: toggle_maximized() - def start(self): + def start(self) -> None: if not getattr(self, 'io_thread_started', False): self.child_monitor.start() self.io_thread_started = True @@ -442,12 +442,12 @@ class Boss: run_update_check(self.opts.update_check_interval * 60 * 60) self.update_check_started = True - def activate_tab_at(self, os_window_id, x): + def activate_tab_at(self, os_window_id: int, x: int) -> int: tm = self.os_window_map.get(os_window_id) if tm is not None: tm.activate_tab_at(x) - def on_window_resize(self, os_window_id, w, h, dpi_changed): + def on_window_resize(self, os_window_id: int, w: int, h: int, dpi_changed: bool) -> None: if dpi_changed: self.on_dpi_change(os_window_id) else: @@ -455,7 +455,7 @@ class Boss: if tm is not None: tm.resize() - def clear_terminal(self, action, only_active): + def clear_terminal(self, action: str, only_active: bool) -> None: if only_active: windows = [] w = self.active_window @@ -475,22 +475,22 @@ class Boss: else: w.screen.erase_in_display(how, False) - def increase_font_size(self): # legacy + def increase_font_size(self) -> None: # legacy cfs = global_font_size() self.set_font_size(min(self.opts.font_size * 5, cfs + 2.0)) - def decrease_font_size(self): # legacy + def decrease_font_size(self) -> None: # legacy cfs = global_font_size() self.set_font_size(max(MINIMUM_FONT_SIZE, cfs - 2.0)) - def restore_font_size(self): # legacy + def restore_font_size(self) -> None: # legacy self.set_font_size(self.opts.font_size) - def set_font_size(self, new_size): # legacy + def set_font_size(self, new_size: float) -> None: # legacy self.change_font_size(True, None, new_size) - def change_font_size(self, all_windows, increment_operation, amt): - def calc_new_size(old_size): + def change_font_size(self, all_windows: bool, increment_operation: Optional[str], amt: float) -> None: + def calc_new_size(old_size: float) -> float: new_size = old_size if amt == 0: new_size = self.opts.font_size @@ -524,14 +524,14 @@ class Boss: if final_windows: self._change_font_size(final_windows) - def _change_font_size(self, sz_map): + def _change_font_size(self, sz_map: Dict[int, float]) -> None: for os_window_id, sz in sz_map.items(): tm = self.os_window_map.get(os_window_id) if tm is not None: os_window_font_size(os_window_id, sz) tm.resize() - def on_dpi_change(self, os_window_id): + def on_dpi_change(self, os_window_id: int) -> None: tm = self.os_window_map.get(os_window_id) if tm is not None: sz = os_window_font_size(os_window_id) @@ -540,28 +540,29 @@ class Boss: tm.update_dpi_based_sizes() tm.resize() - def _set_os_window_background_opacity(self, os_window_id, opacity): + def _set_os_window_background_opacity(self, os_window_id: int, opacity: float) -> None: change_background_opacity(os_window_id, max(0.1, min(opacity, 1.0))) - def set_background_opacity(self, opacity): + def set_background_opacity(self, opacity: str) -> None: window = self.active_window if window is None or not opacity: return if not self.opts.dynamic_background_opacity: - return self.show_error( + self.show_error( _('Cannot change background opacity'), _('You must set the dynamic_background_opacity option in kitty.conf to be able to change background opacity')) + return os_window_id = window.os_window_id if opacity[0] in '+-': old_opacity = background_opacity_of(os_window_id) if old_opacity is None: return - opacity = old_opacity + float(opacity) + fin_opacity = old_opacity + float(opacity) elif opacity == 'default': - opacity = self.opts.background_opacity + fin_opacity = self.opts.background_opacity else: - opacity = float(opacity) - self._set_os_window_background_opacity(os_window_id, opacity) + fin_opacity = float(opacity) + self._set_os_window_background_opacity(os_window_id, fin_opacity) @property def active_tab_manager(self) -> Optional[TabManager]: @@ -581,7 +582,7 @@ class Boss: if t is not None: return t.active_window - def dispatch_special_key(self, key, native_key, action, mods): + def dispatch_special_key(self, key: int, native_key: int, action: int, mods: int) -> bool: # Handles shortcuts, return True if the key was consumed key_action = get_shortcut(self.keymap, mods, key, native_key) if key_action is None: @@ -594,7 +595,7 @@ class Boss: self.current_key_press_info = key, native_key, action, mods return self.dispatch_action(key_action) - def process_sequence(self, key, native_key, action, mods): + def process_sequence(self, key: int, native_key: int, action: Any, mods: int) -> None: if not self.pending_sequences: set_in_sequence_mode(False) return @@ -617,7 +618,7 @@ class Boss: if matched_action is not None: self.dispatch_action(matched_action) - def start_resizing_window(self): + def start_resizing_window(self) -> None: w = self.active_window if w is None: return @@ -628,15 +629,16 @@ class Boss: if overlay_window is not None: overlay_window.allow_remote_control = True - def resize_layout_window(self, window, increment, is_horizontal, reset=False): + def resize_layout_window(self, window: Window, increment: float, is_horizontal: bool, reset: bool = False) -> Union[bool, None, str]: tab = window.tabref() if tab is None or not increment: return False if reset: - return tab.reset_window_sizes() + tab.reset_window_sizes() + return None return tab.resize_window_by(window.id, increment, is_horizontal) - def default_bg_changed_for(self, window_id): + def default_bg_changed_for(self, window_id: int) -> None: w = self.window_id_map.get(window_id) if w is not None: tm = self.os_window_map.get(w.os_window_id) @@ -647,7 +649,7 @@ class Boss: if t is not None: t.relayout_borders() - def dispatch_action(self, key_action): + def dispatch_action(self, key_action: KeyAction) -> bool: if key_action is not None: f = getattr(self, key_action.func, None) if f is not None: @@ -672,11 +674,11 @@ class Boss: return True return False - def combine(self, *actions): + def combine(self, *actions: KeyAction) -> None: for key_action in actions: self.dispatch_action(key_action) - def on_focus(self, os_window_id, focused): + def on_focus(self, os_window_id: int, focused: bool) -> None: tm = self.os_window_map.get(os_window_id) if tm is not None: w = tm.active_window @@ -686,19 +688,19 @@ class Boss: cocoa_set_menubar_title(w.title or '') tm.mark_tab_bar_dirty() - def update_tab_bar_data(self, os_window_id): + def update_tab_bar_data(self, os_window_id: int) -> None: tm = self.os_window_map.get(os_window_id) if tm is not None: tm.update_tab_bar_data() - def on_drop(self, os_window_id, strings): + def on_drop(self, os_window_id: int, strings: Iterable[str]) -> None: tm = self.os_window_map.get(os_window_id) if tm is not None: w = tm.active_window if w is not None: w.paste('\n'.join(strings)) - def on_os_window_closed(self, os_window_id, viewport_width, viewport_height): + def on_os_window_closed(self, os_window_id: int, viewport_width: int, viewport_height: int) -> None: self.cached_values['window-size'] = viewport_width, viewport_height tm = self.os_window_map.pop(os_window_id, None) if tm is not None: @@ -711,7 +713,7 @@ class Boss: if action is not None: action() - def notify_on_os_window_death(self, address): + def notify_on_os_window_death(self, address: str) -> None: import socket s = socket.socket(family=socket.AF_UNIX) with suppress(Exception): @@ -721,7 +723,7 @@ class Boss: s.shutdown(socket.SHUT_RDWR) s.close() - def display_scrollback(self, window, data, cmd): + def display_scrollback(self, window: Window, data: Optional[bytes], cmd: Optional[List[str]]) -> None: tab = self.active_tab if tab is not None and window.overlay_for is None: tab.new_special_window( @@ -729,14 +731,14 @@ class Boss: copy_colors_from=self.active_window ) - def edit_config_file(self, *a): + def edit_config_file(self, *a: Any) -> None: confpath = prepare_config_file_for_editing() # On macOS vim fails to handle SIGWINCH if it occurs early, so add a # small delay. cmd = [kitty_exe(), '+runpy', 'import os, sys, time; time.sleep(0.05); os.execvp(sys.argv[1], sys.argv[1:])'] + get_editor() + [confpath] self.new_os_window(*cmd) - def get_output(self, source_window, num_lines=1): + def get_output(self, source_window: Window, num_lines: Optional[int] = 1) -> str: output = '' s = source_window.screen if num_lines is None: @@ -745,7 +747,15 @@ class Boss: output += str(s.linebuf.line(i)) return output - def _run_kitten(self, kitten, args=(), input_data=None, window=None, custom_callback=None, action_on_removal=None): + def _run_kitten( + self, + kitten: str, + args: Iterable[str] = (), + input_data: Optional[Union[bytes, str]] = None, + window: Optional[Window] = None, + custom_callback: Optional[Callable] = None, + action_on_removal: Optional[Callable] = None + ) -> Optional[Window]: orig_args, args = list(args), list(args) from kittens.runner import create_kitten_handler end_kitten = create_kitten_handler(kitten, orig_args) @@ -757,7 +767,7 @@ class Boss: tab = w.tabref() if w else None if end_kitten.no_ui: end_kitten(None, getattr(w, 'id', None), self) - return + return None if w is not None and tab is not None and w.overlay_for is None: args[0:0] = [config_dir, kitten] @@ -774,7 +784,7 @@ class Boss: else: raise ValueError('Unknown type_of_input: {}'.format(type_of_input)) else: - data = input_data + data = input_data if isinstance(input_data, bytes) else input_data.encode('utf-8') copts = common_opts_as_dict(self.opts) overlay_window = tab.new_special_window( SpecialWindow( @@ -795,7 +805,11 @@ class Boss: wid = w.id overlay_window.action_on_close = partial(self.on_kitten_finish, wid, custom_callback or end_kitten) if action_on_removal is not None: - overlay_window.action_on_removal = lambda *a: action_on_removal(wid, self) + + def callback_wrapper(*a: Any) -> None: + if action_on_removal is not None: + action_on_removal(wid, self) + overlay_window.action_on_removal = callback_wrapper return overlay_window def kitten(self, kitten: str, *args: str) -> None: @@ -804,23 +818,23 @@ class Boss: kargs = shlex.split(cmdline) if cmdline else [] self._run_kitten(kitten, kargs) - def on_kitten_finish(self, target_window_id, end_kitten, source_window): + def on_kitten_finish(self, target_window_id: str, end_kitten: Callable, source_window: Window) -> None: output = self.get_output(source_window, num_lines=None) from kittens.runner import deserialize data = deserialize(output) if data is not None: end_kitten(data, target_window_id, self) - def input_unicode_character(self): + def input_unicode_character(self) -> None: self._run_kitten('unicode_input') - def set_tab_title(self): + def set_tab_title(self) -> None: tab = self.active_tab if tab: args = ['--name=tab-title', '--message', _('Enter the new title for this tab below.'), 'do_set_tab_title', str(tab.id)] self._run_kitten('ask', args) - def do_set_tab_title(self, title, tab_id): + def do_set_tab_title(self, title: str, tab_id: int) -> None: tm = self.active_tab_manager if tm is not None and title: tab_id = int(tab_id) @@ -829,19 +843,19 @@ class Boss: tab.set_title(title) break - def show_error(self, title, msg): + def show_error(self, title: str, msg: str) -> None: self._run_kitten('show_error', args=['--title', title], input_data=msg) - def create_marker(self): + def create_marker(self) -> None: w = self.active_window if w: spec = None - def done(data, target_window_id, self): + def done(data: Dict[str, Any], target_window_id: int, self: Boss) -> None: nonlocal spec spec = data['response'] - def done2(target_window_id, self): + def done2(target_window_id: int, self: Boss) -> None: w = self.window_id_map.get(target_window_id) if w is not None and spec: try: @@ -855,7 +869,7 @@ class Boss: ], custom_callback=done, action_on_removal=done2) - def kitty_shell(self, window_type): + def kitty_shell(self, window_type: str) -> None: cmd = ['@', kitty_exe(), '@'] if window_type == 'tab': self._new_tab(cmd) @@ -870,21 +884,21 @@ class Boss: else: self._new_window(cmd) - def switch_focus_to(self, window_idx): + def switch_focus_to(self, window_idx: int) -> None: tab = self.active_tab if tab: tab.set_active_window_idx(window_idx) - def open_url(self, url, program=None, cwd=None): + def open_url(self, url: str, program: Optional[Union[str, List[str]]] = None, cwd: Optional[str] = None) -> None: if url: if isinstance(program, str): program = to_cmdline(program) open_url(url, program or self.opts.open_url_with, cwd=cwd) - def open_url_lines(self, lines, program=None): + def open_url_lines(self, lines: Iterable[str], program: Optional[Union[str, List[str]]] = None) -> None: self.open_url(''.join(lines), program) - def destroy(self): + def destroy(self) -> None: self.shutting_down = True self.child_monitor.shutdown_monitor() self.set_update_check_process() @@ -895,21 +909,21 @@ class Boss: self.os_window_map = {} destroy_global_data() - def paste_to_active_window(self, text): + def paste_to_active_window(self, text: str) -> None: if text: w = self.active_window if w is not None: w.paste(text) - def paste_from_clipboard(self): + def paste_from_clipboard(self) -> None: text = get_clipboard_string() self.paste_to_active_window(text) - def paste_from_selection(self): + def paste_from_selection(self) -> None: text = get_primary_selection() if supports_primary_selection else get_clipboard_string() self.paste_to_active_window(text) - def set_primary_selection(self): + def set_primary_selection(self) -> None: w = self.active_window if w is not None and not w.destroyed: text = w.text_for_selection() @@ -918,7 +932,7 @@ class Boss: if self.opts.copy_on_select: self.copy_to_buffer(self.opts.copy_on_select) - def copy_to_buffer(self, buffer_name): + def copy_to_buffer(self, buffer_name: str) -> None: w = self.active_window if w is not None and not w.destroyed: text = w.text_for_selection() @@ -930,7 +944,7 @@ class Boss: else: self.clipboard_buffers[buffer_name] = text - def paste_from_buffer(self, buffer_name): + def paste_from_buffer(self, buffer_name: str) -> None: if buffer_name == 'clipboard': text: Optional[str] = get_clipboard_string() elif buffer_name == 'primary': @@ -940,69 +954,84 @@ class Boss: if text: self.paste_to_active_window(text) - def goto_tab(self, tab_num): + def goto_tab(self, tab_num: int) -> None: tm = self.active_tab_manager if tm is not None: tm.goto_tab(tab_num - 1) - def set_active_tab(self, tab): + def set_active_tab(self, tab: Tab) -> bool: tm = self.active_tab_manager if tm is not None: return tm.set_active_tab(tab) return False - def next_tab(self): + def next_tab(self) -> None: tm = self.active_tab_manager if tm is not None: tm.next_tab() - def previous_tab(self): + def previous_tab(self) -> None: tm = self.active_tab_manager if tm is not None: tm.next_tab(-1) prev_tab = previous_tab - def process_stdin_source(self, window=None, stdin=None): + def process_stdin_source(self, window: Optional[Window] = None, stdin: Optional[str] = None) -> Tuple[Optional[Dict[str, str]], Optional[bytes]]: w = window or self.active_window if not w: return None, None env = None + input_data = None if stdin: add_wrap_markers = stdin.endswith('_wrap') if add_wrap_markers: stdin = stdin[:-len('_wrap')] stdin = data_for_at(w, stdin, add_wrap_markers=add_wrap_markers) if stdin is not None: - pipe_data = w.pipe_data(stdin, has_wrap_markers=add_wrap_markers) if w else {} + pipe_data = w.pipe_data(stdin, has_wrap_markers=add_wrap_markers) if w else None if pipe_data: env = { 'KITTY_PIPE_DATA': '{scrolled_by}:{cursor_x},{cursor_y}:{lines},{columns}'.format(**pipe_data) } - stdin = stdin.encode('utf-8') - return env, stdin + input_data = stdin.encode('utf-8') + return env, input_data - def data_for_at(self, which, window=None, add_wrap_markers=False) -> Optional[str]: + def data_for_at(self, which: str, window: Optional[Window] = None, add_wrap_markers: bool = False) -> Optional[str]: window = window or self.active_window if not window: return None return data_for_at(window, which, add_wrap_markers=add_wrap_markers) - def special_window_for_cmd(self, cmd, window=None, stdin=None, cwd_from=None, as_overlay=False): + def special_window_for_cmd( + self, cmd: List[str], + window: Optional[Window] = None, + stdin: Optional[str] = None, + cwd_from: Optional[int] = None, + as_overlay: bool = False + ) -> SpecialWindowInstance: w = window or self.active_window - env, stdin = self.process_stdin_source(w, stdin) + env, input_data = self.process_stdin_source(w, stdin) cmdline = [] for arg in cmd: if arg == '@selection' and w: - arg = data_for_at(w, arg) - if not arg: + q = data_for_at(w, arg) + if not q: continue + arg = q cmdline.append(arg) overlay_for = w.id if w and as_overlay and w.overlay_for is None else None - return SpecialWindow(cmd, stdin, cwd_from=cwd_from, overlay_for=overlay_for, env=env) + return SpecialWindow(cmd, input_data, cwd_from=cwd_from, overlay_for=overlay_for, env=env) - def run_background_process(self, cmd, cwd=None, env=None, stdin=None, cwd_from=None): + def run_background_process( + self, + cmd: List[str], + cwd: Optional[str] = None, + env: Optional[Dict[str, str]] = None, + stdin: Optional[bytes] = None, + cwd_from: Optional[int] = None + ) -> None: import subprocess if cwd_from: with suppress(Exception): @@ -1021,12 +1050,12 @@ class Boss: else: subprocess.Popen(cmd, env=env, cwd=cwd) - def pipe(self, source, dest, exe, *args): + def pipe(self, source: str, dest: str, exe: str, *args: str) -> Window: cmd = [exe] + list(args) window = self.active_window cwd_from = window.child.pid_for_cwd if window else None - def create_window(): + def create_window() -> SpecialWindowInstance: return self.special_window_for_cmd( cmd, stdin=source, as_overlay=dest == 'overlay', cwd_from=cwd_from) @@ -1049,7 +1078,7 @@ class Boss: env, stdin = self.process_stdin_source(stdin=source, window=window) self.run_background_process(cmd, cwd_from=cwd_from, stdin=stdin, env=env) - def args_to_special_window(self, args: Iterable[str], cwd_from=None): + def args_to_special_window(self, args: Iterable[str], cwd_from: Optional[int] = None) -> SpecialWindowInstance: args = list(args) stdin = None w = self.active_window @@ -1070,7 +1099,7 @@ class Boss: cmd.append(arg) return SpecialWindow(cmd, stdin, cwd_from=cwd_from) - def _new_tab(self, args, cwd_from=None, as_neighbor=False): + def _new_tab(self, args: Union[SpecialWindowInstance, Iterable[str]], cwd_from: Optional[int] = None, as_neighbor: bool = False) -> Optional[Tab]: special_window = None if args: if isinstance(args, SpecialWindowInstance): @@ -1081,26 +1110,26 @@ class Boss: if tm is not None: return tm.new_tab(special_window=special_window, cwd_from=cwd_from, as_neighbor=as_neighbor) - def _create_tab(self, args, cwd_from=None): + def _create_tab(self, args: List[str], cwd_from: Optional[int] = None) -> None: as_neighbor = False if args and args[0].startswith('!'): as_neighbor = 'neighbor' in args[0][1:].split(',') args = args[1:] self._new_tab(args, as_neighbor=as_neighbor, cwd_from=cwd_from) - def new_tab(self, *args): - self._create_tab(args) + def new_tab(self, *args: str) -> None: + self._create_tab(list(args)) - def new_tab_with_cwd(self, *args): + def new_tab_with_cwd(self, *args: str) -> None: w = self.active_window_for_cwd cwd_from = w.child.pid_for_cwd if w is not None else None - self._create_tab(args, cwd_from=cwd_from) + self._create_tab(list(args), cwd_from=cwd_from) - def new_tab_with_wd(self, wd): + def new_tab_with_wd(self, wd: str) -> None: special_window = SpecialWindow(None, cwd=wd) self._new_tab(special_window) - def _new_window(self, args, cwd_from=None): + def _new_window(self, args: List[str], cwd_from: Optional[int] = None) -> Optional[Window]: tab = self.active_tab if tab is not None: allow_remote_control = False @@ -1118,32 +1147,32 @@ class Boss: else: return tab.new_window(cwd_from=cwd_from, location=location, allow_remote_control=allow_remote_control) - def new_window(self, *args): - self._new_window(args) + def new_window(self, *args: str) -> None: + self._new_window(list(args)) - def new_window_with_cwd(self, *args): + def new_window_with_cwd(self, *args: str) -> None: w = self.active_window_for_cwd if w is None: return self.new_window(*args) cwd_from = w.child.pid_for_cwd - self._new_window(args, cwd_from=cwd_from) + self._new_window(list(args), cwd_from=cwd_from) def launch(self, *args: str) -> None: from kitty.launch import parse_launch_args, launch opts, args_ = parse_launch_args(args) launch(self, opts, args_) - def move_tab_forward(self): + def move_tab_forward(self) -> None: tm = self.active_tab_manager if tm is not None: tm.move_tab(1) - def move_tab_backward(self): + def move_tab_backward(self) -> None: tm = self.active_tab_manager if tm is not None: tm.move_tab(-1) - def disable_ligatures_in(self, where: Union[str, Iterable[Window]], strategy: int): + def disable_ligatures_in(self, where: Union[str, Iterable[Window]], strategy: int) -> None: if isinstance(where, str): windows: List[Window] = [] if where == 'active': @@ -1160,7 +1189,7 @@ class Boss: window.screen.disable_ligatures = strategy window.refresh() - def patch_colors(self, spec, cursor_text_color, configured=False): + def patch_colors(self, spec: Dict[str, int], cursor_text_color: Union[bool, int, Color], configured: bool = False) -> None: if configured: for k, v in spec.items(): if hasattr(self.opts, k): @@ -1173,19 +1202,19 @@ class Boss: tm.tab_bar.patch_colors(spec) patch_global_colors(spec, configured) - def safe_delete_temp_file(self, path): + def safe_delete_temp_file(self, path: str) -> None: if is_path_in_temp_dir(path): with suppress(FileNotFoundError): os.remove(path) - def set_update_check_process(self, process=None): + def set_update_check_process(self, process: Optional['Popen'] = None) -> None: if self.update_check_process is not None: with suppress(Exception): if self.update_check_process.poll() is None: self.update_check_process.kill() self.update_check_process = process - def on_monitored_pid_death(self, pid, exit_status): + def on_monitored_pid_death(self, pid: int, exit_status: int) -> None: update_check_process = self.update_check_process if update_check_process is not None and pid == update_check_process.pid: self.update_check_process = None @@ -1201,22 +1230,24 @@ class Boss: except Exception as e: log_error('Failed to process update check data {!r}, with error: {}'.format(raw, e)) - def dbus_notification_callback(self, activated, *args): + def dbus_notification_callback(self, activated: bool, a: int, b: Union[int, str]) -> None: from .notify import dbus_notification_created, dbus_notification_activated if activated: - dbus_notification_activated(*args) + assert isinstance(b, str) + dbus_notification_activated(a, b) else: - dbus_notification_created(*args) + assert isinstance(b, int) + dbus_notification_created(a, b) - def show_bad_config_lines(self, bad_lines): + def show_bad_config_lines(self, bad_lines: Iterable[BadLine]) -> None: - def format_bad_line(bad_line): + def format_bad_line(bad_line: BadLine) -> str: return '{}:{} in line: {}\n'.format(bad_line.number, bad_line.exception, bad_line.line) msg = '\n'.join(map(format_bad_line, bad_lines)).rstrip() self.show_error(_('Errors in kitty.conf'), msg) - def set_colors(self, *args): + def set_colors(self, *args: str) -> None: from kitty.rc.base import parse_subcommand_cli, command_for_name, PayloadGetter from kitty.remote_control import parse_rc_args c = command_for_name('set_colors') @@ -1224,7 +1255,12 @@ class Boss: payload = c.message_to_kitty(parse_rc_args([])[0], opts, items) c.response_from_kitty(self, self.active_window, PayloadGetter(c, payload if isinstance(payload, dict) else {})) - def _move_window_to(self, window=None, target_tab_id=None, target_os_window_id=None): + def _move_window_to( + self, + window: Optional[Window] = None, + target_tab_id: Optional[Union[str, int]] = None, + target_os_window_id: Optional[Union[str, int]] = None + ) -> None: window = window or self.active_window if not window: return @@ -1238,7 +1274,12 @@ class Boss: else: target_os_window_id = target_os_window_id or current_os_window() if target_tab_id == 'new': - tm = self.os_window_map[target_os_window_id] + if not isinstance(target_os_window_id, int): + q = self.active_tab_manager + assert q is not None + tm = q + else: + tm = self.os_window_map[target_os_window_id] target_tab = tm.new_tab(empty_tab=True) else: for tab in self.all_tabs: @@ -1257,7 +1298,7 @@ class Boss: self._cleanup_tab_after_window_removal(src_tab) target_tab.make_active() - def _move_tab_to(self, tab=None, target_os_window_id=None): + def _move_tab_to(self, tab: Optional[Tab] = None, target_os_window_id: Optional[int] = None) -> None: tab = tab or self.active_tab if tab is None: return @@ -1269,7 +1310,7 @@ class Boss: self._cleanup_tab_after_window_removal(tab) target_tab.make_active() - def detach_window(self, *args): + def detach_window(self, *args: str) -> None: if not args or args[0] == 'new': return self._move_window_to(target_os_window_id='new') if args[0] == 'new-tab': @@ -1294,11 +1335,11 @@ class Boss: tab_id_map[new_idx - 1] = None lines.append(fmt.format(new_idx, 'New OS Window')) - def done(data, target_window_id, self): + def done(data: Dict[str, Any], target_window_id: int, self: Boss) -> None: nonlocal done_tab_id done_tab_id = tab_id_map[int(data['groupdicts'][0]['index'])] - def done2(target_window_id, self): + def done2(target_window_id: int, self: Boss) -> None: if not hasattr(done, 'tab_id'): return tab_id = done_tab_id @@ -1319,7 +1360,7 @@ class Boss: ), input_data='\r\n'.join(lines).encode('utf-8'), custom_callback=done, action_on_removal=done2 ) - def detach_tab(self, *args): + def detach_tab(self, *args: str) -> None: if not args or args[0] == 'new': return self._move_tab_to() @@ -1342,12 +1383,12 @@ class Boss: os_window_id_map[new_idx - 1] = None lines.append(fmt.format(new_idx, 'New OS Window')) - def done(data, target_window_id, self): + def done(data: Dict[str, Any], target_window_id: int, self: Boss) -> None: nonlocal done_called, done_osw done_osw = os_window_id_map[int(data['groupdicts'][0]['index'])] done_called = True - def done2(target_window_id, self): + def done2(target_window_id: int, self: Boss) -> None: if not done_called: return os_window_id = done_osw @@ -1367,7 +1408,7 @@ class Boss: ), input_data='\r\n'.join(lines).encode('utf-8'), custom_callback=done, action_on_removal=done2 ) - def set_background_image(self, path, os_windows, configured, layout): + def set_background_image(self, path: Optional[str], os_windows: Tuple[int, ...], configured: bool, layout: Optional[str]) -> None: if layout: set_background_image(path, os_windows, configured, layout) else: diff --git a/kitty/fast_data_types.pyi b/kitty/fast_data_types.pyi index 0847f2893..fb2dbd559 100644 --- a/kitty/fast_data_types.pyi +++ b/kitty/fast_data_types.pyi @@ -930,6 +930,9 @@ class LineBuf: def is_continued(self, idx: int) -> bool: pass + def line(self, num: int) -> Line: + pass + class Cursor: x: int diff --git a/kitty/launch.py b/kitty/launch.py index baa9ff797..29b36705d 100644 --- a/kitty/launch.py +++ b/kitty/launch.py @@ -248,7 +248,10 @@ def launch(boss: Boss, opts: LaunchCLIOptions, args: List[str], target_tab: Opti env.update(penv) if opts.type == 'background': - boss.run_background_process(kw['cmd'], cwd=kw['cwd'], cwd_from=kw['cwd_from'], env=env or None, stdin=kw['stdin']) + cmd = kw['cmd'] + if not cmd: + raise ValueError('The cmd to run must be specified when running a background process') + boss.run_background_process(cmd, cwd=kw['cwd'], cwd_from=kw['cwd_from'], env=env or None, stdin=kw['stdin']) elif opts.type in ('clipboard', 'primary'): if kw.get('stdin') is not None: func = set_clipboard_string if opts.type == 'clipboard' else set_primary_selection diff --git a/kitty/rc/__init__.py b/kitty/rc/__init__.py index 8b1378917..e69de29bb 100644 --- a/kitty/rc/__init__.py +++ b/kitty/rc/__init__.py @@ -1 +0,0 @@ - diff --git a/kitty/rc/resize_window.py b/kitty/rc/resize_window.py index d0b167b7b..51ab1d2db 100644 --- a/kitty/rc/resize_window.py +++ b/kitty/rc/resize_window.py @@ -2,7 +2,7 @@ # vim:fileencoding=utf-8 # License: GPLv3 Copyright: 2020, Kovid Goyal -from typing import TYPE_CHECKING, Optional +from typing import TYPE_CHECKING, Optional, Union from .base import ( MATCH_WINDOW_OPTION, ArgsType, Boss, PayloadGetType, @@ -55,7 +55,7 @@ If specified resize the window this command is run in, rather than the active wi def response_from_kitty(self, boss: Boss, window: Optional[Window], payload_get: PayloadGetType) -> ResponseType: windows = self.windows_for_match_payload(boss, window, payload_get) - resized = False + resized: Union[bool, None, str] = False if windows and windows[0]: resized = boss.resize_layout_window( windows[0], increment=payload_get('increment'), is_horizontal=payload_get('axis') == 'horizontal', diff --git a/kitty/tabs.py b/kitty/tabs.py index cbcd99ebe..e735d933a 100644 --- a/kitty/tabs.py +++ b/kitty/tabs.py @@ -275,7 +275,7 @@ class Tab: # {{{ self._set_current_layout(layout_name) self.relayout() - def resize_window_by(self, window_id: int, increment: int, is_horizontal: bool) -> Optional[str]: + def resize_window_by(self, window_id: int, increment: float, is_horizontal: bool) -> Optional[str]: increment_as_percent = self.current_layout.bias_increment_for_cell(is_horizontal) * increment if self.current_layout.modify_size_of_window(self.windows, window_id, increment_as_percent, is_horizontal): self.relayout() diff --git a/kitty/window.py b/kitty/window.py index d72861d1f..d2f0f17cb 100644 --- a/kitty/window.py +++ b/kitty/window.py @@ -42,8 +42,7 @@ if TYPE_CHECKING: from .tabs import Tab from .child import Child from typing import TypedDict - from .rc.base import RemoteCommand - Tab, Child, RemoteCommand + Tab, Child else: TypedDict = dict @@ -470,7 +469,7 @@ class Window: def request_capabilities(self, q: str) -> None: self.screen.send_escape_code_to_child(DCS, get_capabilities(q)) - def handle_remote_cmd(self, cmd: 'RemoteCommand') -> None: + def handle_remote_cmd(self, cmd: str) -> None: get_boss().handle_remote_cmd(cmd, self) def handle_remote_print(self, msg: bytes) -> None: @@ -615,7 +614,10 @@ class Window: exe = shutil.which(cmd[0], path=env['PATH']) if exe: cmd[0] = exe - get_boss().display_scrollback(self, data['text'], cmd) + bdata: Union[str, bytes, None] = data['text'] + if isinstance(bdata, str): + bdata = bdata.encode('utf-8') + get_boss().display_scrollback(self, bdata, cmd) def paste_bytes(self, text: Union[str, bytes]) -> None: # paste raw bytes without any processing