usbip: fix review findings

This commit is contained in:
世界 2026-04-25 04:26:26 +08:00
parent ae25958a7c
commit 524d580cf4
No known key found for this signature in database
GPG key ID: CD109927C34A63C4
11 changed files with 335 additions and 116 deletions

View file

@ -120,8 +120,10 @@ func (c *ClientService) runBusIDLoop(ctx context.Context, busid, description str
if ctx.Err() != nil {
return
}
c.setBusIDActive(busid, true)
controller, err := c.attemptAttach(ctx, busid)
if err != nil {
c.setBusIDActive(busid, false)
c.logger.Error("attach ", description, " (", busid, "): ", err)
if !sleepCtx(ctx, clientReconnectDelay) {
return
@ -129,7 +131,6 @@ func (c *ClientService) runBusIDLoop(ctx context.Context, busid, description str
continue
}
c.logger.Info("attached ", busid, " through IOUSBHostControllerInterface")
c.setBusIDActive(busid, true)
waitDarwinController(ctx, controller)
c.setBusIDActive(busid, false)
if ctx.Err() != nil {

View file

@ -124,8 +124,10 @@ func (c *ClientService) runBusIDLoop(ctx context.Context, busid, description str
if ctx.Err() != nil {
return
}
c.setBusIDActive(busid, true)
port, done, err := c.attemptAttach(ctx, busid)
if err != nil {
c.setBusIDActive(busid, false)
c.logger.Error("attach ", description, " (", busid, "): ", err)
if !sleepCtx(ctx, clientReconnectDelay) {
return
@ -133,7 +135,6 @@ func (c *ClientService) runBusIDLoop(ctx context.Context, busid, description str
continue
}
c.logger.Info("attached ", busid, " → vhci port ", port)
c.setBusIDActive(busid, true)
c.waitPortSession(ctx, port, busid, done)
c.setBusIDActive(busid, false)
c.trackPort(port, false)

View file

@ -3,6 +3,7 @@
package usbip
import (
"bytes"
"context"
"io"
"net"
@ -24,6 +25,23 @@ import (
"golang.org/x/sys/unix"
)
type testLogWriter struct {
access sync.Mutex
buffer bytes.Buffer
}
func (w *testLogWriter) Write(p []byte) (int, error) {
w.access.Lock()
defer w.access.Unlock()
return w.buffer.Write(p)
}
func (w *testLogWriter) String() string {
w.access.Lock()
defer w.access.Unlock()
return w.buffer.String()
}
const (
darwinFakeBusID = "test-1"
darwinFakeVendorID = 0x1209
@ -67,23 +85,28 @@ func requireRoot(t *testing.T) {
}
}
func newTestLogger() log.ContextLogger {
if os.Getenv("CODEX_USBIP_TEST_LOG") != "" {
factory := log.NewDefaultFactory(
context.Background(),
log.Formatter{
BaseTime: time.Now(),
DisableColors: true,
},
os.Stderr,
"",
nil,
false,
)
factory.SetLevel(log.LevelTrace)
return factory.NewLogger("usbip")
}
return log.NewNOPFactory().NewLogger("usbip")
func newTestLogger(t testing.TB) log.ContextLogger {
t.Helper()
writer := new(testLogWriter)
factory := log.NewDefaultFactory(
context.Background(),
log.Formatter{
BaseTime: time.Now(),
DisableColors: true,
},
writer,
"",
nil,
false,
)
factory.SetLevel(log.LevelTrace)
t.Cleanup(func() {
if output := writer.String(); t.Failed() && output != "" {
t.Logf("USB/IP log:\n%s", output)
}
_ = factory.Close()
})
return factory.NewLogger("usbip")
}
func loopbackListenAddr() *badoption.Addr {
@ -107,7 +130,7 @@ func requireDarwinUserHCI(t *testing.T) {
defer left.Close()
defer right.Close()
controller := newDarwinVirtualController(context.Background(), newTestLogger(), left, darwinFakeDeviceEntry().Info)
controller := newDarwinVirtualController(context.Background(), newTestLogger(t), left, darwinFakeDeviceEntry().Info)
hostController, err := darwinCreateUSBHostController(controller, 1, SpeedFull)
controller.cancel()
if err != nil {
@ -121,7 +144,7 @@ func TestDarwinVirtualControllerReadsCompliantSubmitResponsePayload(t *testing.T
clientConn, serverConn := net.Pipe()
defer serverConn.Close()
controller := newDarwinVirtualController(context.Background(), newTestLogger(), clientConn, DeviceInfoTruncated{
controller := newDarwinVirtualController(context.Background(), newTestLogger(t), clientConn, DeviceInfoTruncated{
BusNum: 1,
DevNum: 1,
})
@ -182,7 +205,7 @@ func TestDarwinHandleIsoTransferPreservesASAPFlag(t *testing.T) {
clientConn, serverConn := net.Pipe()
defer serverConn.Close()
controller := newDarwinVirtualController(context.Background(), newTestLogger(), clientConn, DeviceInfoTruncated{
controller := newDarwinVirtualController(context.Background(), newTestLogger(t), clientConn, DeviceInfoTruncated{
BusNum: 1,
DevNum: 2,
})
@ -240,7 +263,7 @@ func TestDarwinHandleIsoTransferPreservesASAPFlag(t *testing.T) {
func TestDarwinSubmitInTransferRejectsOversizedPayload(t *testing.T) {
t.Parallel()
controller := newDarwinVirtualController(context.Background(), newTestLogger(), nil, DeviceInfoTruncated{})
controller := newDarwinVirtualController(context.Background(), newTestLogger(t), nil, DeviceInfoTruncated{})
buffer := []byte{0xaa, 0xbb}
status, length := controller.completeSubmitInTransfer(unsafe.Pointer(&buffer[0]), SubmitResponse{
@ -264,7 +287,7 @@ func TestWaitDarwinControllerClosesOnContextCancel(t *testing.T) {
clientConn, serverConn := net.Pipe()
defer serverConn.Close()
controller := newDarwinVirtualController(context.Background(), newTestLogger(), clientConn, DeviceInfoTruncated{})
controller := newDarwinVirtualController(context.Background(), newTestLogger(t), clientConn, DeviceInfoTruncated{})
go controller.readLoop()
ctx, cancel := context.WithCancel(context.Background())
@ -336,7 +359,7 @@ func (f *fakeDarwinEndpointStateMachine) complete(darwinCITransfer, int, int) er
func TestDarwinHandleDoorbellSkipsNoResponseCompletion(t *testing.T) {
t.Parallel()
controller := newDarwinVirtualController(context.Background(), newTestLogger(), nil, DeviceInfoTruncated{})
controller := newDarwinVirtualController(context.Background(), newTestLogger(t), nil, DeviceInfoTruncated{})
message := darwinCIMessage{
control: (1 << 15) | (1 << 14) | 0x3c,
data0: (uint32(2) << 8) | 1,
@ -356,7 +379,7 @@ func TestDarwinHandleDoorbellSkipsNoResponseCompletion(t *testing.T) {
func TestDarwinHandleDoorbellContinuesAfterNoResponseTransfer(t *testing.T) {
t.Parallel()
controller := newDarwinVirtualController(context.Background(), newTestLogger(), nil, DeviceInfoTruncated{})
controller := newDarwinVirtualController(context.Background(), newTestLogger(t), nil, DeviceInfoTruncated{})
noResponseMessage := darwinCIMessage{
control: (1 << 15) | (1 << 14) | 0x3c,
data0: (uint32(2) << 8) | 1,
@ -387,7 +410,7 @@ func TestDarwinHandleDoorbellContinuesAfterNoResponseTransfer(t *testing.T) {
func TestDarwinControllerCloseWaitsForEventLoopTeardown(t *testing.T) {
t.Parallel()
controller := newDarwinVirtualController(context.Background(), newTestLogger(), nil, DeviceInfoTruncated{})
controller := newDarwinVirtualController(context.Background(), newTestLogger(t), nil, DeviceInfoTruncated{})
processStarted := make(chan struct{})
releaseProcess := make(chan struct{})
endpointClosed := make(chan struct{})
@ -441,7 +464,7 @@ func TestDarwinControllerCloseWaitsForEventLoopTeardown(t *testing.T) {
func TestDarwinControllerCloseWithNilConn(t *testing.T) {
t.Parallel()
controller := newDarwinVirtualController(context.Background(), newTestLogger(), nil, DeviceInfoTruncated{})
controller := newDarwinVirtualController(context.Background(), newTestLogger(t), nil, DeviceInfoTruncated{})
done := make(chan struct{})
go func() {
controller.Close()
@ -771,7 +794,7 @@ func TestDarwinUSBIPClientSmoke(t *testing.T) {
fakeServer := startDarwinFakeUSBIPServer(t)
defer fakeServer.Close()
serviceInstance, err := NewClientService(context.Background(), newTestLogger(), "usbip-client-darwin-test", option.USBIPClientServiceOptions{
serviceInstance, err := NewClientService(context.Background(), newTestLogger(t), "usbip-client-darwin-test", option.USBIPClientServiceOptions{
ServerOptions: option.ServerOptions{
Server: fakeServer.address.AddrString(),
ServerPort: fakeServer.address.Port,
@ -809,7 +832,7 @@ func TestDarwinUSBIPServerSmoke(t *testing.T) {
t.Skip("safe vendor-specific USB capture target unavailable")
}
serviceInstance, err := NewServerService(context.Background(), newTestLogger(), "usbip-server-darwin-test", option.USBIPServerServiceOptions{
serviceInstance, err := NewServerService(context.Background(), newTestLogger(t), "usbip-server-darwin-test", option.USBIPServerServiceOptions{
ListenOptions: option.ListenOptions{
Listen: loopbackListenAddr(),
ListenPort: pickFreeTCPPort(t),

View file

@ -166,6 +166,10 @@ func WriteSubmitCommand(w io.Writer, command SubmitCommand) error {
if err != nil {
return err
}
err = validateUSBIPPayloadBuffer(command.Header.Direction, command.Buffer, command.TransferBufferLength, true)
if err != nil {
return err
}
packetCount := normalizeUSBIPIsoPacketCount(command.NumberOfPackets, command.IsoPackets)
err = validateUSBIPIsoPacketCount(packetCount)
if err != nil {
@ -197,6 +201,10 @@ func WriteSubmitResponse(w io.Writer, response SubmitResponse) error {
if err != nil {
return err
}
err = validateUSBIPPayloadBuffer(response.Header.Direction, response.Buffer, response.ActualLength, false)
if err != nil {
return err
}
packetCount := normalizeUSBIPIsoPacketCount(response.NumberOfPackets, response.IsoPackets)
err = validateUSBIPIsoPacketCount(packetCount)
if err != nil {
@ -356,6 +364,19 @@ func validateUSBIPBufferLength(length int32) error {
return nil
}
func validateUSBIPPayloadBuffer(direction uint32, buffer []byte, length int32, command bool) error {
if shouldCarryUSBIPBuffer(direction, command) {
if len(buffer) != int(length) {
return E.New("USB/IP payload length mismatch: header length ", length, ", buffer length ", len(buffer))
}
return nil
}
if len(buffer) > 0 {
return E.New("USB/IP unexpected payload buffer: ", len(buffer))
}
return nil
}
func validateUSBIPIsoPacketCount(count int32) error {
if count < nonIsoPacketCount {
return E.New("USB/IP iso packet count is negative: ", count)

View file

@ -56,7 +56,6 @@ func TestUSBIPSubmitCommandRoundTripInOmitsCommandPayload(t *testing.T) {
},
TransferBufferLength: 4,
NumberOfPackets: nonIsoPacketCount,
Buffer: []byte{9, 8, 7, 6},
}
var buffer bytes.Buffer
@ -122,7 +121,6 @@ func TestUSBIPSubmitResponseRoundTripOutOmitsResponsePayload(t *testing.T) {
Status: 0,
ActualLength: 3,
NumberOfPackets: nonIsoPacketCount,
Buffer: []byte{1, 2, 3},
}
var buffer bytes.Buffer
@ -317,6 +315,33 @@ func TestUSBIPRejectsInvalidDataPlaneLengths(t *testing.T) {
})
require.ErrorContains(t, err, "too large")
err = WriteSubmitCommand(&buffer, SubmitCommand{
Header: DataHeader{Command: CmdSubmit, Direction: USBIPDirOut},
TransferBufferLength: 4,
Buffer: []byte("abc"),
})
require.ErrorContains(t, err, "payload length mismatch")
err = WriteSubmitResponse(&buffer, SubmitResponse{
Header: DataHeader{Command: RetSubmit, Direction: USBIPDirIn},
ActualLength: 4,
Buffer: []byte("abc"),
})
require.ErrorContains(t, err, "payload length mismatch")
err = WriteSubmitCommand(&buffer, SubmitCommand{
Header: DataHeader{Command: CmdSubmit, Direction: USBIPDirIn},
TransferBufferLength: 4,
Buffer: []byte("abcd"),
})
require.ErrorContains(t, err, "unexpected payload buffer")
err = WriteSubmitResponse(&buffer, SubmitResponse{
Header: DataHeader{Command: RetSubmit, Direction: USBIPDirOut},
Buffer: []byte("abcd"),
})
require.ErrorContains(t, err, "unexpected payload buffer")
var raw [28]byte
binary.BigEndian.PutUint32(raw[4:8], 0xffffffff)
_, err = ReadSubmitCommandBody(bytes.NewReader(raw[:]), DataHeader{Command: CmdSubmit, Direction: USBIPDirOut})

View file

@ -387,7 +387,7 @@ func startRealUSBIPServer(t *testing.T, devices []option.USBIPDeviceMatch) (*Ser
t.Helper()
requireUSBIPHost(t)
serviceInstance, err := NewServerService(context.Background(), newTestLogger(), "usbip-server-test", option.USBIPServerServiceOptions{
serviceInstance, err := NewServerService(context.Background(), newTestLogger(t), "usbip-server-test", option.USBIPServerServiceOptions{
ListenOptions: option.ListenOptions{
Listen: loopbackListenAddr(),
ListenPort: pickFreeTCPPort(t),
@ -409,7 +409,7 @@ func startRealUSBIPClient(t *testing.T, destination M.Socksaddr, devices []optio
t.Helper()
requireVHCI(t)
serviceInstance, err := NewClientService(context.Background(), newTestLogger(), "usbip-client-test", option.USBIPClientServiceOptions{
serviceInstance, err := NewClientService(context.Background(), newTestLogger(t), "usbip-client-test", option.USBIPClientServiceOptions{
ServerOptions: option.ServerOptions{
Server: destination.AddrString(),
ServerPort: destination.Port,

View file

@ -3,6 +3,7 @@
package usbip
import (
"bytes"
"context"
"encoding/binary"
"errors"
@ -26,6 +27,23 @@ import (
"golang.org/x/sys/unix"
)
type testLogWriter struct {
access sync.Mutex
buffer bytes.Buffer
}
func (w *testLogWriter) Write(p []byte) (int, error) {
w.access.Lock()
defer w.access.Unlock()
return w.buffer.Write(p)
}
func (w *testLogWriter) String() string {
w.access.Lock()
defer w.access.Unlock()
return w.buffer.String()
}
type testDialer struct{}
func (testDialer) DialContext(ctx context.Context, network string, destination M.Socksaddr) (net.Conn, error) {
@ -69,17 +87,19 @@ func (wrappingDialer) ListenPacket(context.Context, M.Socksaddr) (net.PacketConn
}
type testDeviceStore struct {
access sync.Mutex
devices map[string]sysfsDevice
statuses map[string]int
sockfds map[string]int
access sync.Mutex
devices map[string]sysfsDevice
statuses map[string]int
sockfds map[string]int
sockfdWrites map[string][]int
}
func newTestDeviceStore(devices ...sysfsDevice) *testDeviceStore {
store := &testDeviceStore{
devices: make(map[string]sysfsDevice),
statuses: make(map[string]int),
sockfds: make(map[string]int),
devices: make(map[string]sysfsDevice),
statuses: make(map[string]int),
sockfds: make(map[string]int),
sockfdWrites: make(map[string][]int),
}
store.setDevices(devices...)
return store
@ -148,6 +168,7 @@ func (s *testDeviceStore) writeUsbipSockfd(busid string, fd int) error {
s.access.Lock()
defer s.access.Unlock()
s.sockfds[busid] = fd
s.sockfdWrites[busid] = append(s.sockfdWrites[busid], fd)
return nil
}
@ -157,6 +178,46 @@ func (s *testDeviceStore) lastSockfd(busid string) int {
return s.sockfds[busid]
}
func (s *testDeviceStore) hasPositiveSockfd(busid string) bool {
s.access.Lock()
defer s.access.Unlock()
for _, fd := range s.sockfdWrites[busid] {
if fd > 0 {
return true
}
}
return false
}
type testUSBEventListener struct {
closeOnce sync.Once
waitOnce sync.Once
closed chan struct{}
waitEntered chan struct{}
}
func newTestUSBEventListener() *testUSBEventListener {
return &testUSBEventListener{
closed: make(chan struct{}),
waitEntered: make(chan struct{}),
}
}
func (l *testUSBEventListener) Close() error {
l.closeOnce.Do(func() {
close(l.closed)
})
return nil
}
func (l *testUSBEventListener) WaitUSBEvent() error {
l.waitOnce.Do(func() {
close(l.waitEntered)
})
<-l.closed
return context.Canceled
}
func newTestUSBIPOps(t *testing.T) usbipOps {
t.Helper()
@ -232,23 +293,28 @@ func newTestUSBIPOps(t *testing.T) usbipOps {
}
}
func newTestLogger() log.ContextLogger {
if os.Getenv("CODEX_USBIP_TEST_LOG") != "" {
factory := log.NewDefaultFactory(
context.Background(),
log.Formatter{
BaseTime: time.Now(),
DisableColors: true,
},
os.Stderr,
"",
nil,
false,
)
factory.SetLevel(log.LevelTrace)
return factory.NewLogger("usbip")
}
return log.NewNOPFactory().NewLogger("usbip")
func newTestLogger(t testing.TB) log.ContextLogger {
t.Helper()
writer := new(testLogWriter)
factory := log.NewDefaultFactory(
context.Background(),
log.Formatter{
BaseTime: time.Now(),
DisableColors: true,
},
writer,
"",
nil,
false,
)
factory.SetLevel(log.LevelTrace)
t.Cleanup(func() {
if output := writer.String(); t.Failed() && output != "" {
t.Logf("USB/IP log:\n%s", output)
}
_ = factory.Close()
})
return factory.NewLogger("usbip")
}
func newTestDevice(busid string, vendorID, productID uint16, serial string, speed uint32) sysfsDevice {
@ -523,7 +589,7 @@ func TestClientApplyRemoteExportsKeepsActiveBusIDWorker(t *testing.T) {
canceled := false
client := &ClientService{
ctx: context.Background(),
logger: newTestLogger(),
logger: newTestLogger(t),
allWorkers: map[string]*clientBusIDWorker{"1-1": {cancel: func() { canceled = true }}},
activeBusIDs: map[string]struct{}{"1-1": {}},
ops: newTestUSBIPOps(t),
@ -595,7 +661,7 @@ func TestClientShouldRetryBusIDRefreshesImportAllState(t *testing.T) {
server := &ServerService{
ctx: ctx,
cancel: cancel,
logger: newTestLogger(),
logger: newTestLogger(t),
exports: make(map[string]serverExport),
controlSubs: make(map[uint64]*serverControlConn),
ops: newTestUSBIPOps(t),
@ -606,7 +672,7 @@ func TestClientShouldRetryBusIDRefreshesImportAllState(t *testing.T) {
canceled := false
client := &ClientService{
ctx: context.Background(),
logger: newTestLogger(),
logger: newTestLogger(t),
dialer: testDialer{},
serverAddr: serverAddr,
allWorkers: map[string]*clientBusIDWorker{"1-1": {cancel: func() { canceled = true }}},
@ -628,7 +694,7 @@ func TestClientShouldRetryBusIDKeepsRetryOnRefreshFailure(t *testing.T) {
canceled := false
client := &ClientService{
ctx: context.Background(),
logger: newTestLogger(),
logger: newTestLogger(t),
dialer: failingDialer{err: expectedErr},
serverAddr: M.ParseSocksaddrHostPort("127.0.0.1", 3240),
allWorkers: map[string]*clientBusIDWorker{"1-1": {cancel: func() { canceled = true }}},
@ -686,8 +752,9 @@ func TestLinuxHelpers(t *testing.T) {
require.Equal(t, "hs", vhciHubForSpeed(SpeedHigh))
require.Equal(t, "ss", vhciHubForSpeed(SpeedSuper))
require.True(t, isUSBUEvent([]byte("ACTION=add\x00SUBSYSTEM=usb\x00")))
require.False(t, isUSBUEvent([]byte("ACTION=add\x00SUBSYSTEM=net\x00")))
require.True(t, isUSBDeviceUEvent([]byte("add@/devices/platform/dummy_hcd.0/usb19/19-1\x00ACTION=add\x00SUBSYSTEM=usb\x00DEVTYPE=usb_device\x00")))
require.False(t, isUSBDeviceUEvent([]byte("add@/devices/platform/dummy_hcd.0/usb19/19-1/19-1:1.0\x00ACTION=add\x00SUBSYSTEM=usb\x00DEVTYPE=usb_interface\x00")))
require.False(t, isUSBDeviceUEvent([]byte("ACTION=add\x00SUBSYSTEM=net\x00DEVTYPE=usb_device\x00")))
}
func TestUSBIPConnHandoffDirectTCP(t *testing.T) {
@ -716,7 +783,7 @@ func TestUSBIPConnHandoffDirectTCP(t *testing.T) {
require.False(t, handoff.relay())
require.Equal(t, "direct", handoff.mode())
requireStreamSocketFD(t, handoff.kernelFD())
done := handoff.startRelay(context.Background(), newTestLogger(), "test", "direct")
done := handoff.startRelay(context.Background(), newTestLogger(t), "test", "direct")
_, err = conn.Write([]byte("closed"))
require.Error(t, err)
@ -747,7 +814,7 @@ func TestUSBIPConnHandoffRelaySocketpairCopies(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
done := handoff.startRelay(ctx, newTestLogger(), "test", "relay")
done := handoff.startRelay(ctx, newTestLogger(t), "test", "relay")
_, err = right.Write([]byte("ping"))
require.NoError(t, err)
@ -772,7 +839,7 @@ func TestServerStartRequiresHostDriver(t *testing.T) {
expectedErr := errors.New("host driver unavailable")
server := &ServerService{
ctx: context.Background(),
logger: newTestLogger(),
logger: newTestLogger(t),
exports: make(map[string]serverExport),
controlSubs: make(map[uint64]*serverControlConn),
ops: usbipOps{
@ -790,7 +857,7 @@ func TestClientStartRequiresVHCI(t *testing.T) {
expectedErr := errors.New("vhci unavailable")
client := &ClientService{
ctx: context.Background(),
logger: newTestLogger(),
logger: newTestLogger(t),
ops: usbipOps{
ensureVHCI: func() error { return expectedErr },
},
@ -835,7 +902,7 @@ func TestServerReconcileExportsBindsMatchesAndSkipsHub(t *testing.T) {
server := &ServerService{
ctx: context.Background(),
logger: newTestLogger(),
logger: newTestLogger(t),
matches: []option.USBIPDeviceMatch{{VendorID: 0x1d6b, ProductID: 0x0002}},
exports: make(map[string]serverExport),
controlSubs: make(map[uint64]*serverControlConn),
@ -896,7 +963,7 @@ func TestServerBindOneRetriesAfterStaleHostMatch(t *testing.T) {
server := &ServerService{
ctx: context.Background(),
logger: newTestLogger(),
logger: newTestLogger(t),
exports: make(map[string]serverExport),
controlSubs: make(map[uint64]*serverControlConn),
ops: ops,
@ -952,7 +1019,7 @@ func TestServerReconcileExportsSkipsVHCIDevices(t *testing.T) {
server := &ServerService{
ctx: context.Background(),
logger: newTestLogger(),
logger: newTestLogger(t),
matches: []option.USBIPDeviceMatch{{VendorID: 0x1d6b, ProductID: 0x0002}},
exports: make(map[string]serverExport),
controlSubs: make(map[uint64]*serverControlConn),
@ -1007,7 +1074,7 @@ func TestServerReconcileExportsReleasesRemovedExports(t *testing.T) {
server := &ServerService{
ctx: context.Background(),
logger: newTestLogger(),
logger: newTestLogger(t),
exports: map[string]serverExport{"1-1": {busid: "1-1", managed: true, originalDriver: "usbhid"}},
ops: ops,
}
@ -1035,7 +1102,7 @@ func TestServerReleaseExportLeavesCooptedSocketUntouched(t *testing.T) {
}
server := &ServerService{
logger: newTestLogger(),
logger: newTestLogger(t),
exports: map[string]serverExport{"1-1": {busid: "1-1"}},
ops: ops,
}
@ -1074,7 +1141,7 @@ func TestServerReleaseExportRetainsTrackingOnFailure(t *testing.T) {
}
server := &ServerService{
logger: newTestLogger(),
logger: newTestLogger(t),
exports: map[string]serverExport{"1-1": export},
ops: ops,
}
@ -1136,7 +1203,7 @@ func TestServerCloseSerializesRollbackWithActiveReconcile(t *testing.T) {
server := &ServerService{
ctx: ctx,
cancel: cancel,
logger: newTestLogger(),
logger: newTestLogger(t),
matches: []option.USBIPDeviceMatch{{BusID: "1-1"}},
exports: make(map[string]serverExport),
controlSubs: make(map[uint64]*serverControlConn),
@ -1189,7 +1256,7 @@ func TestServerReconcileAndBroadcastSkipsAfterCancel(t *testing.T) {
ops := newTestUSBIPOps(t)
server := &ServerService{
ctx: ctx,
logger: newTestLogger(),
logger: newTestLogger(t),
exports: make(map[string]serverExport),
controlSubs: make(map[uint64]*serverControlConn),
controlState: make(map[string]DeviceInfoV2),
@ -1199,6 +1266,63 @@ func TestServerReconcileAndBroadcastSkipsAfterCancel(t *testing.T) {
require.NoError(t, server.reconcileAndBroadcast(true))
}
func TestServerUEventLoopReconcilesWhenListenerStarts(t *testing.T) {
t.Parallel()
ctx, cancel := context.WithCancel(context.Background())
listener := newTestUSBEventListener()
done := make(chan struct{})
t.Cleanup(func() {
cancel()
_ = listener.Close()
select {
case <-done:
case <-time.After(time.Second):
t.Error("timed out waiting for uevent loop")
}
})
device := newTestDevice("1-1", 0x1d6b, 0x0002, "startup", SpeedHigh)
store := newTestDeviceStore(device)
store.setStatus("1-1", usbipStatusAvailable)
ops := newTestUSBIPOps(t)
ops.newUEventListener = func() (usbEventListener, error) {
return listener, nil
}
ops.listUSBDevices = store.listUSBDevices
ops.currentDriver = func(string) (string, error) {
return "", nil
}
ops.hostMatchBusID = func(string, bool) error {
return nil
}
ops.hostBind = func(string) error {
return nil
}
ops.readUsbipStatus = store.readUsbipStatus
ops.readSysfsDevice = store.readSysfsDevice
server := &ServerService{
ctx: ctx,
logger: newTestLogger(t),
matches: []option.USBIPDeviceMatch{{VendorID: 0x1d6b, ProductID: 0x0002}},
exports: make(map[string]serverExport),
controlSubs: make(map[uint64]*serverControlConn),
controlState: make(map[string]DeviceInfoV2),
ops: ops,
}
go func() {
defer close(done)
server.ueventLoop()
}()
require.Eventually(t, func() bool {
_, ok := server.getExport("1-1")
return ok
}, 3*time.Second, 10*time.Millisecond)
}
func TestServerBuildDevListEntriesFiltersUnavailableAndRefreshFailures(t *testing.T) {
t.Parallel()
@ -1213,7 +1337,7 @@ func TestServerBuildDevListEntriesFiltersUnavailableAndRefreshFailures(t *testin
ops.readSysfsDevice = store.readSysfsDevice
server := &ServerService{
logger: newTestLogger(),
logger: newTestLogger(t),
exports: map[string]serverExport{
"1-1": {busid: "1-1"},
"1-2": {busid: "1-2"},
@ -1276,7 +1400,7 @@ func TestServerHandleImportWithOpaqueConnRelay(t *testing.T) {
server := &ServerService{
ctx: ctx,
cancel: cancel,
logger: newTestLogger(),
logger: newTestLogger(t),
exports: map[string]serverExport{"1-1": {busid: "1-1"}},
controlSubs: make(map[uint64]*serverControlConn),
controlState: make(map[string]DeviceInfoV2),
@ -1361,7 +1485,7 @@ func TestServerHandleImportRelayClosesHandoffOnSockfdFailure(t *testing.T) {
server := &ServerService{
ctx: ctx,
cancel: cancel,
logger: newTestLogger(),
logger: newTestLogger(t),
exports: map[string]serverExport{"1-1": {busid: "1-1"}},
controlSubs: make(map[uint64]*serverControlConn),
ops: ops,
@ -1431,7 +1555,7 @@ func TestServerHandleImportRelayClosesHandoffOnReplyFailure(t *testing.T) {
server := &ServerService{
ctx: ctx,
cancel: cancel,
logger: newTestLogger(),
logger: newTestLogger(t),
exports: map[string]serverExport{"1-1": {busid: "1-1"}},
controlSubs: make(map[uint64]*serverControlConn),
ops: ops,
@ -1474,7 +1598,7 @@ func TestServerDispatchConnHandlesControlPingAndChanged(t *testing.T) {
server := &ServerService{
ctx: ctx,
cancel: cancel,
logger: newTestLogger(),
logger: newTestLogger(t),
exports: make(map[string]serverExport),
controlSubs: make(map[uint64]*serverControlConn),
ops: newTestUSBIPOps(t),
@ -1523,7 +1647,7 @@ func TestServerRegisterControlConnQueuesSnapshotBeforeBroadcast(t *testing.T) {
t.Parallel()
server := &ServerService{
logger: newTestLogger(),
logger: newTestLogger(t),
exports: make(map[string]serverExport),
controlSubs: make(map[uint64]*serverControlConn),
controlState: make(map[string]DeviceInfoV2),
@ -1567,7 +1691,7 @@ func TestServerReconcileBroadcastsStatusOnlyDeviceDelta(t *testing.T) {
server := &ServerService{
ctx: ctx,
cancel: cancel,
logger: newTestLogger(),
logger: newTestLogger(t),
matches: []option.USBIPDeviceMatch{{BusID: "1-1"}},
exports: map[string]serverExport{"1-1": {busid: "1-1"}},
controlSubs: make(map[uint64]*serverControlConn),
@ -1645,7 +1769,7 @@ func TestServerControlSnapshotPreservesPendingDelta(t *testing.T) {
server := &ServerService{
ctx: ctx,
cancel: cancel,
logger: newTestLogger(),
logger: newTestLogger(t),
matches: []option.USBIPDeviceMatch{{BusID: "1-1"}},
exports: map[string]serverExport{"1-1": {busid: "1-1"}},
controlSubs: make(map[uint64]*serverControlConn),
@ -1714,7 +1838,7 @@ func TestServerControlLeaseEnablesImportExt(t *testing.T) {
server := &ServerService{
ctx: ctx,
cancel: cancel,
logger: newTestLogger(),
logger: newTestLogger(t),
exports: map[string]serverExport{"1-1": {busid: "1-1"}},
controlSubs: make(map[uint64]*serverControlConn),
controlState: make(map[string]DeviceInfoV2),
@ -1774,7 +1898,7 @@ func TestServerControlLeaseEnablesImportExt(t *testing.T) {
require.NoError(t, err)
require.Equal(t, "1-1", info.BusIDString())
require.NoError(t, importConn.Close())
require.Positive(t, store.lastSockfd("1-1"))
require.True(t, store.hasPositiveSockfd("1-1"))
reuseConn, err := net.Dial("tcp", serverAddr.String())
require.NoError(t, err)
@ -1810,7 +1934,7 @@ func TestClientAttemptAttachUsesImportReplyAndVHCIAttach(t *testing.T) {
server := &ServerService{
ctx: ctx,
cancel: cancel,
logger: newTestLogger(),
logger: newTestLogger(t),
exports: map[string]serverExport{"1-1": {busid: "1-1"}},
controlSubs: make(map[uint64]*serverControlConn),
ops: serverOps,
@ -1836,7 +1960,7 @@ func TestClientAttemptAttachUsesImportReplyAndVHCIAttach(t *testing.T) {
client := &ClientService{
ctx: ctx,
cancel: cancel,
logger: newTestLogger(),
logger: newTestLogger(t),
dialer: testDialer{},
serverAddr: serverAddr,
ops: clientOps,
@ -1966,7 +2090,7 @@ func TestClientAttemptAttachUsesImportExtLease(t *testing.T) {
client := &ClientService{
ctx: ctx,
cancel: cancel,
logger: newTestLogger(),
logger: newTestLogger(t),
dialer: testDialer{},
serverAddr: M.SocksaddrFromNet(listener.Addr()),
ops: ops,
@ -2057,7 +2181,7 @@ func TestClientAttemptAttachWithOpaqueConnRelay(t *testing.T) {
client := &ClientService{
ctx: ctx,
cancel: cancel,
logger: newTestLogger(),
logger: newTestLogger(t),
dialer: wrappingDialer{},
serverAddr: M.SocksaddrFromNet(listener.Addr()),
ops: ops,
@ -2177,7 +2301,7 @@ func TestClientAttemptAttachRelayClosesHandoffOnVHCIAttachFailure(t *testing.T)
client := &ClientService{
ctx: ctx,
cancel: cancel,
logger: newTestLogger(),
logger: newTestLogger(t),
dialer: wrappingDialer{},
serverAddr: M.SocksaddrFromNet(listener.Addr()),
ops: ops,
@ -2256,7 +2380,7 @@ func TestClientFetchDevListRejectsUnexpectedReplyVersion(t *testing.T) {
client := &ClientService{
ctx: ctx,
cancel: cancel,
logger: newTestLogger(),
logger: newTestLogger(t),
dialer: testDialer{},
serverAddr: M.SocksaddrFromNet(listener.Addr()),
ops: newTestUSBIPOps(t),
@ -2311,7 +2435,7 @@ func TestClientFetchDevListReturnsOnContextCancelWhileServerStalls(t *testing.T)
client := &ClientService{
ctx: ctx,
cancel: cancel,
logger: newTestLogger(),
logger: newTestLogger(t),
dialer: testDialer{},
serverAddr: M.SocksaddrFromNet(listener.Addr()),
ops: newTestUSBIPOps(t),
@ -2374,7 +2498,7 @@ func TestClientSyncRemoteStateAndResetControlStateRebuildsV2Map(t *testing.T) {
client := &ClientService{
ctx: ctx,
cancel: cancel,
logger: newTestLogger(),
logger: newTestLogger(t),
dialer: testDialer{},
serverAddr: M.SocksaddrFromNet(listener.Addr()),
matches: []option.USBIPDeviceMatch{{BusID: "unused"}},
@ -2457,7 +2581,7 @@ func TestClientAttemptAttachRejectsUnexpectedReplyVersion(t *testing.T) {
client := &ClientService{
ctx: ctx,
cancel: cancel,
logger: newTestLogger(),
logger: newTestLogger(t),
dialer: testDialer{},
serverAddr: M.SocksaddrFromNet(listener.Addr()),
ops: ops,
@ -2489,7 +2613,7 @@ func TestClientRunControlSessionSyncsAssignmentsOnChanged(t *testing.T) {
server := &ServerService{
ctx: serverCtx,
cancel: serverCancel,
logger: newTestLogger(),
logger: newTestLogger(t),
exports: map[string]serverExport{"1-1": {busid: "1-1"}},
controlSubs: make(map[uint64]*serverControlConn),
ops: serverOps,
@ -2505,7 +2629,7 @@ func TestClientRunControlSessionSyncsAssignmentsOnChanged(t *testing.T) {
client := &ClientService{
ctx: clientCtx,
cancel: clientCancel,
logger: newTestLogger(),
logger: newTestLogger(t),
dialer: testDialer{},
serverAddr: serverAddr,
matches: []option.USBIPDeviceMatch{match},
@ -2557,7 +2681,7 @@ func TestUSBIPLinuxSmoke(t *testing.T) {
server := &ServerService{
ctx: context.Background(),
logger: newTestLogger(),
logger: newTestLogger(t),
exports: make(map[string]serverExport),
controlSubs: make(map[uint64]*serverControlConn),
ops: systemUSBIPOps,

View file

@ -41,7 +41,7 @@ func TestDarwinServerAbortPendingSubmitsMarksAndAbortsEndpoints(t *testing.T) {
device := &fakeDarwinServerDataDevice{}
session := &darwinServerDataSession{
logger: newTestLogger(),
logger: newTestLogger(t),
device: device,
pending: make(map[uint32]darwinServerPendingSubmit),
}
@ -65,7 +65,7 @@ func TestDarwinServerServeAbortsPendingSubmitOnClose(t *testing.T) {
ioStarted: make(chan struct{}),
abortNotify: make(chan struct{}),
}
session := newDarwinServerDataSession(context.Background(), newTestLogger(), serverConn, device)
session := newDarwinServerDataSession(context.Background(), newTestLogger(t), serverConn, device)
done := make(chan error, 1)
go func() {
done <- session.serve()
@ -134,7 +134,7 @@ func TestDarwinServerReconcileAndBroadcastSkipsAfterCancel(t *testing.T) {
cancel()
server := &ServerService{
ctx: ctx,
logger: newTestLogger(),
logger: newTestLogger(t),
exports: make(map[string]serverExport),
controlSubs: make(map[uint64]*serverControlConn),
controlState: make(map[string]DeviceInfoV2),
@ -165,7 +165,7 @@ func TestDarwinServerUSBEventWatcherTriggersReconcile(t *testing.T) {
var fakeWatch *fakeDarwinUSBHostDeviceWatch
server := &ServerService{
ctx: ctx,
logger: newTestLogger(),
logger: newTestLogger(t),
matches: []option.USBIPDeviceMatch{{BusID: busid}},
exports: make(map[string]serverExport),
controlSubs: make(map[uint64]*serverControlConn),
@ -232,7 +232,7 @@ func TestDarwinServerRegisterControlConnQueuesSnapshotBeforeBroadcast(t *testing
t.Parallel()
server := &ServerService{
logger: newTestLogger(),
logger: newTestLogger(t),
exports: make(map[string]serverExport),
controlSubs: make(map[uint64]*serverControlConn),
controlState: make(map[string]DeviceInfoV2),

View file

@ -617,6 +617,9 @@ func (s *ServerService) ueventLoop() {
case <-done:
}
}()
if err := s.reconcileAndBroadcast(true); err != nil {
s.logger.Warn("reconcile exports: ", err)
}
for {
err = listener.WaitUSBEvent()
if err != nil {

View file

@ -8,6 +8,8 @@ import (
"golang.org/x/sys/unix"
)
const ueventReceiveBufferSize = 1 << 20
type ueventListener struct {
fd int
}
@ -17,6 +19,7 @@ func newUEventListener() (*ueventListener, error) {
if err != nil {
return nil, err
}
_ = unix.SetsockoptInt(fd, unix.SOL_SOCKET, unix.SO_RCVBUF, ueventReceiveBufferSize)
addr := &unix.SockaddrNetlink{
Family: unix.AF_NETLINK,
Groups: 1,
@ -34,20 +37,27 @@ func (l *ueventListener) Close() error {
}
func (l *ueventListener) WaitUSBEvent() error {
var buf [4096]byte
var buf [16384]byte
for {
n, _, err := unix.Recvfrom(l.fd, buf[:], 0)
n, from, err := unix.Recvfrom(l.fd, buf[:], 0)
if err == unix.ENOBUFS {
return nil
}
if err != nil {
return err
}
if isUSBUEvent(buf[:n]) {
if source, ok := from.(*unix.SockaddrNetlink); ok && source.Pid != 0 {
continue
}
if isUSBDeviceUEvent(buf[:n]) {
return nil
}
}
}
var usbSubsystemMarker = []byte("\x00SUBSYSTEM=usb\x00")
var usbDeviceTypeMarker = []byte("\x00DEVTYPE=usb_device\x00")
func isUSBUEvent(raw []byte) bool {
return bytes.Contains(raw, usbSubsystemMarker)
func isUSBDeviceUEvent(raw []byte) bool {
return bytes.Contains(raw, usbSubsystemMarker) && bytes.Contains(raw, usbDeviceTypeMarker)
}

View file

@ -271,21 +271,32 @@ static void box_load_interfaces(BoxUSBHostDevice *box) {
IOObjectRelease(iterator);
}
static IOUSBHostPipe *box_find_pipe_for_endpoint(BoxUSBHostDevice *box, uint8_t endpoint) {
for (IOUSBHostInterface *interface in box.interfaces) {
NSError *error = nil;
IOUSBHostPipe *pipe = [interface copyPipeWithAddress:endpoint error:&error];
if (pipe != nil) {
return pipe;
}
}
return nil;
}
static IOUSBHostPipe *box_pipe_for_endpoint(BoxUSBHostDevice *box, uint8_t endpoint) {
NSNumber *key = @(endpoint);
IOUSBHostPipe *cached = box.pipes[key];
if (cached != nil) {
return cached;
}
for (IOUSBHostInterface *interface in box.interfaces) {
NSError *error = nil;
IOUSBHostPipe *pipe = [interface copyPipeWithAddress:endpoint error:&error];
if (pipe != nil) {
box.pipes[key] = pipe;
return pipe;
}
IOUSBHostPipe *pipe = box_find_pipe_for_endpoint(box, endpoint);
if (pipe == nil) {
box_load_interfaces(box);
pipe = box_find_pipe_for_endpoint(box, endpoint);
}
return nil;
if (pipe != nil) {
box.pipes[key] = pipe;
}
return pipe;
}
static IOUSBHostInterface *box_interface_for_number(BoxUSBHostDevice *box, uint8_t interface_number) {