Remove unused code

This commit is contained in:
Kovid Goyal 2024-09-12 20:33:41 +05:30
parent 7a3edb0367
commit 41308a0b91
No known key found for this signature in database
GPG key ID: 06BC317B515ACE7C

View file

@ -2,7 +2,6 @@
# License: GPL v3 Copyright: 2016, Kovid Goyal <kovid at kovidgoyal.net>
import atexit
import errno
import fcntl
import math
import os
@ -25,7 +24,6 @@ from typing import (
)
from .constants import (
appname,
clear_handled_signals,
config_dir,
is_macos,
@ -410,71 +408,6 @@ def random_unix_socket() -> 'Socket':
return ans
def single_instance_unix(name: str) -> bool:
import socket
for path in unix_socket_paths(name):
socket_path = path.rpartition('.')[0] + '.sock'
fd = os.open(path, os.O_CREAT | os.O_WRONLY | os.O_TRUNC | os.O_CLOEXEC)
try:
fcntl.lockf(fd, fcntl.LOCK_EX | fcntl.LOCK_NB)
except OSError as err:
if err.errno in (errno.EAGAIN, errno.EACCES):
# Client
s = socket.socket(family=socket.AF_UNIX)
s.connect(socket_path)
single_instance.socket = s
return False
raise
s = socket.socket(family=socket.AF_UNIX)
try:
s.bind(socket_path)
except OSError as err:
if err.errno in (errno.EADDRINUSE, errno.EEXIST):
os.unlink(socket_path)
s.bind(socket_path)
else:
raise
single_instance.socket = s # prevent garbage collection from closing the socket
atexit.register(remove_socket_file, s, socket_path)
s.listen()
s.set_inheritable(False)
return True
return False
class SingleInstance:
socket: Optional['Socket'] = None
def __call__(self, group_id: Optional[str] = None) -> bool:
import socket
name = f'{appname}-ipc-{os.geteuid()}'
if group_id:
name += f'-{group_id}'
s = socket.socket(family=socket.AF_UNIX)
# First try with abstract UDS
addr = '\0' + name
try:
s.bind(addr)
except OSError as err:
if err.errno == errno.ENOENT:
return single_instance_unix(name)
if err.errno == errno.EADDRINUSE:
s.connect(addr)
self.socket = s
return False
raise
s.listen()
self.socket = s # prevent garbage collection from closing the socket
s.set_inheritable(False)
atexit.register(remove_socket_file, s)
return True
single_instance = SingleInstance()
def parse_address_spec(spec: str) -> tuple[AddressFamily, Union[tuple[str, int], str], Optional[str]]:
import socket
try:
@ -1240,12 +1173,10 @@ def timed_debug_print(*a: Any, sep: str = ' ', end: str = '\n') -> None:
def lock_file(f: BinaryIO) -> None:
if not f.writable():
raise ValueError('Cannot lock files not opened in writable mode')
import fcntl
fcntl.lockf(f, fcntl.LOCK_EX)
def unlock_file(f: BinaryIO) -> None:
if not f.writable():
raise ValueError('Cannot unlock files not opened in writable mode')
import fcntl
fcntl.lockf(f, fcntl.LOCK_UN)