sing-box/common/trafficcontrol/manager.go
2026-06-25 17:38:53 +08:00

166 lines
3.9 KiB
Go

package trafficcontrol
import (
"sync"
"sync/atomic"
"time"
"github.com/sagernet/sing-box/adapter"
"github.com/sagernet/sing-box/common/compatible"
"github.com/sagernet/sing/common/cleanup"
"github.com/sagernet/sing/common/observable"
"github.com/sagernet/sing/common/x/list"
"github.com/gofrs/uuid/v5"
)
type ConnectionEventType int
const (
ConnectionEventNew ConnectionEventType = iota
ConnectionEventClosed
)
type ConnectionEvent struct {
Type ConnectionEventType
ID uuid.UUID
Metadata *TrackerMetadata
ClosedAt time.Time
}
const closedConnectionsLimit = 1000
var (
_ adapter.ConnectionTracker = (*Manager)(nil)
_ adapter.LifecycleService = (*Manager)(nil)
)
type Manager struct {
outbound adapter.OutboundManager
uploadTotal atomic.Int64
downloadTotal atomic.Int64
connections compatible.Map[uuid.UUID, Tracker]
closedConnectionsAccess sync.Mutex
closedConnections list.List[TrackerMetadata]
eventSubscriber *observable.Subscriber[ConnectionEvent]
eventObserver *observable.Observer[ConnectionEvent]
cleaner *cleanup.Cleaner
}
func NewManager(outbound adapter.OutboundManager) *Manager {
manager := &Manager{
outbound: outbound,
eventSubscriber: observable.NewSubscriber[ConnectionEvent](256),
}
manager.eventObserver = observable.NewObserver(manager.eventSubscriber, 64)
manager.cleaner = cleanup.Add(manager.Clear)
return manager
}
func (m *Manager) Name() string {
return "traffic manager"
}
func (m *Manager) Start(stage adapter.StartStage) error {
return nil
}
func (m *Manager) Close() error {
m.cleaner.Close()
return m.eventObserver.Close()
}
func (m *Manager) SubscribeEvents() (observable.Subscription[ConnectionEvent], <-chan struct{}, error) {
return m.eventObserver.Subscribe()
}
func (m *Manager) UnSubscribeEvents(subscription observable.Subscription[ConnectionEvent]) {
m.eventObserver.UnSubscribe(subscription)
}
func (m *Manager) join(tracker Tracker) {
metadata := tracker.Metadata()
m.connections.Store(metadata.ID, tracker)
m.eventSubscriber.Emit(ConnectionEvent{
Type: ConnectionEventNew,
ID: metadata.ID,
Metadata: metadata,
})
}
func (m *Manager) leave(tracker Tracker) {
metadata := tracker.Metadata()
_, loaded := m.connections.LoadAndDelete(metadata.ID)
if !loaded {
return
}
closedAt := time.Now()
metadata.ClosedAt = closedAt
metadataCopy := *metadata
m.closedConnectionsAccess.Lock()
if m.closedConnections.Len() >= closedConnectionsLimit {
m.closedConnections.PopFront()
}
m.closedConnections.PushBack(metadataCopy)
m.closedConnectionsAccess.Unlock()
m.eventSubscriber.Emit(ConnectionEvent{
Type: ConnectionEventClosed,
ID: metadata.ID,
Metadata: &metadataCopy,
ClosedAt: closedAt,
})
}
func (m *Manager) Total() (uplinkTotal int64, downlinkTotal int64) {
return m.uploadTotal.Load(), m.downloadTotal.Load()
}
func (m *Manager) ConnectionsLen() int {
return m.connections.Len()
}
func (m *Manager) Connections() []*TrackerMetadata {
var connections []*TrackerMetadata
m.connections.Range(func(_ uuid.UUID, tracker Tracker) bool {
connections = append(connections, tracker.Metadata())
return true
})
return connections
}
func (m *Manager) ClosedConnections() []*TrackerMetadata {
m.closedConnectionsAccess.Lock()
values := m.closedConnections.Array()
m.closedConnectionsAccess.Unlock()
if len(values) == 0 {
return nil
}
connections := make([]*TrackerMetadata, len(values))
for i := range values {
connections[i] = &values[i]
}
return connections
}
func (m *Manager) Connection(id uuid.UUID) Tracker {
connection, loaded := m.connections.Load(id)
if !loaded {
return nil
}
return connection
}
func (m *Manager) CloseAllConnections() {
m.connections.Range(func(_ uuid.UUID, tracker Tracker) bool {
tracker.Close()
return true
})
}
func (m *Manager) Clear() {
m.closedConnectionsAccess.Lock()
defer m.closedConnectionsAccess.Unlock()
m.closedConnections.Init()
}