mirror of
https://github.com/SagerNet/sing-box.git
synced 2026-06-29 04:01:23 +00:00
166 lines
3.9 KiB
Go
166 lines
3.9 KiB
Go
package trafficcontrol
|
|
|
|
import (
|
|
"sync"
|
|
"sync/atomic"
|
|
"time"
|
|
|
|
"github.com/sagernet/sing-box/adapter"
|
|
"github.com/sagernet/sing-box/common/compatible"
|
|
"github.com/sagernet/sing/common/cleanup"
|
|
"github.com/sagernet/sing/common/observable"
|
|
"github.com/sagernet/sing/common/x/list"
|
|
|
|
"github.com/gofrs/uuid/v5"
|
|
)
|
|
|
|
type ConnectionEventType int
|
|
|
|
const (
|
|
ConnectionEventNew ConnectionEventType = iota
|
|
ConnectionEventClosed
|
|
)
|
|
|
|
type ConnectionEvent struct {
|
|
Type ConnectionEventType
|
|
ID uuid.UUID
|
|
Metadata *TrackerMetadata
|
|
ClosedAt time.Time
|
|
}
|
|
|
|
const closedConnectionsLimit = 1000
|
|
|
|
var (
|
|
_ adapter.ConnectionTracker = (*Manager)(nil)
|
|
_ adapter.LifecycleService = (*Manager)(nil)
|
|
)
|
|
|
|
type Manager struct {
|
|
outbound adapter.OutboundManager
|
|
uploadTotal atomic.Int64
|
|
downloadTotal atomic.Int64
|
|
|
|
connections compatible.Map[uuid.UUID, Tracker]
|
|
closedConnectionsAccess sync.Mutex
|
|
closedConnections list.List[TrackerMetadata]
|
|
|
|
eventSubscriber *observable.Subscriber[ConnectionEvent]
|
|
eventObserver *observable.Observer[ConnectionEvent]
|
|
cleaner *cleanup.Cleaner
|
|
}
|
|
|
|
func NewManager(outbound adapter.OutboundManager) *Manager {
|
|
manager := &Manager{
|
|
outbound: outbound,
|
|
eventSubscriber: observable.NewSubscriber[ConnectionEvent](256),
|
|
}
|
|
manager.eventObserver = observable.NewObserver(manager.eventSubscriber, 64)
|
|
manager.cleaner = cleanup.Add(manager.Clear)
|
|
return manager
|
|
}
|
|
|
|
func (m *Manager) Name() string {
|
|
return "traffic manager"
|
|
}
|
|
|
|
func (m *Manager) Start(stage adapter.StartStage) error {
|
|
return nil
|
|
}
|
|
|
|
func (m *Manager) Close() error {
|
|
m.cleaner.Close()
|
|
return m.eventObserver.Close()
|
|
}
|
|
|
|
func (m *Manager) SubscribeEvents() (observable.Subscription[ConnectionEvent], <-chan struct{}, error) {
|
|
return m.eventObserver.Subscribe()
|
|
}
|
|
|
|
func (m *Manager) UnSubscribeEvents(subscription observable.Subscription[ConnectionEvent]) {
|
|
m.eventObserver.UnSubscribe(subscription)
|
|
}
|
|
|
|
func (m *Manager) join(tracker Tracker) {
|
|
metadata := tracker.Metadata()
|
|
m.connections.Store(metadata.ID, tracker)
|
|
m.eventSubscriber.Emit(ConnectionEvent{
|
|
Type: ConnectionEventNew,
|
|
ID: metadata.ID,
|
|
Metadata: metadata,
|
|
})
|
|
}
|
|
|
|
func (m *Manager) leave(tracker Tracker) {
|
|
metadata := tracker.Metadata()
|
|
_, loaded := m.connections.LoadAndDelete(metadata.ID)
|
|
if !loaded {
|
|
return
|
|
}
|
|
closedAt := time.Now()
|
|
metadata.ClosedAt = closedAt
|
|
metadataCopy := *metadata
|
|
m.closedConnectionsAccess.Lock()
|
|
if m.closedConnections.Len() >= closedConnectionsLimit {
|
|
m.closedConnections.PopFront()
|
|
}
|
|
m.closedConnections.PushBack(metadataCopy)
|
|
m.closedConnectionsAccess.Unlock()
|
|
m.eventSubscriber.Emit(ConnectionEvent{
|
|
Type: ConnectionEventClosed,
|
|
ID: metadata.ID,
|
|
Metadata: &metadataCopy,
|
|
ClosedAt: closedAt,
|
|
})
|
|
}
|
|
|
|
func (m *Manager) Total() (uplinkTotal int64, downlinkTotal int64) {
|
|
return m.uploadTotal.Load(), m.downloadTotal.Load()
|
|
}
|
|
|
|
func (m *Manager) ConnectionsLen() int {
|
|
return m.connections.Len()
|
|
}
|
|
|
|
func (m *Manager) Connections() []*TrackerMetadata {
|
|
var connections []*TrackerMetadata
|
|
m.connections.Range(func(_ uuid.UUID, tracker Tracker) bool {
|
|
connections = append(connections, tracker.Metadata())
|
|
return true
|
|
})
|
|
return connections
|
|
}
|
|
|
|
func (m *Manager) ClosedConnections() []*TrackerMetadata {
|
|
m.closedConnectionsAccess.Lock()
|
|
values := m.closedConnections.Array()
|
|
m.closedConnectionsAccess.Unlock()
|
|
if len(values) == 0 {
|
|
return nil
|
|
}
|
|
connections := make([]*TrackerMetadata, len(values))
|
|
for i := range values {
|
|
connections[i] = &values[i]
|
|
}
|
|
return connections
|
|
}
|
|
|
|
func (m *Manager) Connection(id uuid.UUID) Tracker {
|
|
connection, loaded := m.connections.Load(id)
|
|
if !loaded {
|
|
return nil
|
|
}
|
|
return connection
|
|
}
|
|
|
|
func (m *Manager) CloseAllConnections() {
|
|
m.connections.Range(func(_ uuid.UUID, tracker Tracker) bool {
|
|
tracker.Close()
|
|
return true
|
|
})
|
|
}
|
|
|
|
func (m *Manager) Clear() {
|
|
m.closedConnectionsAccess.Lock()
|
|
defer m.closedConnectionsAccess.Unlock()
|
|
m.closedConnections.Init()
|
|
}
|