From d86e87ed3019772d9af78187d5e824a3d00f779c Mon Sep 17 00:00:00 2001 From: "Farhad H. P. Shirvan" <9374298+farhadh@users.noreply.github.com> Date: Tue, 12 May 2026 11:36:05 +0200 Subject: [PATCH] Fix: traffic writer restart freeze (#4265) * feat(traffic_writer): enhance traffic writer with concurrency safety and state management * Revert "feat(traffic_writer): enhance traffic writer with concurrency safety and state management" This reverts commit e6760ae39629a592dec293197768f27ff0f5a578. * feat(traffic_writer): enhance traffic writer with concurrency safety and state management * feat(web): implement panel-only start/stop methods for in-process restarts --- main.go | 8 +- web/service/traffic_writer.go | 82 ++++++++++--- web/service/traffic_writer_test.go | 190 +++++++++++++++++++++++++++++ web/web.go | 50 ++++++-- 4 files changed, 294 insertions(+), 36 deletions(-) create mode 100644 web/service/traffic_writer_test.go diff --git a/main.go b/main.go index 9bb1d0b9..90db08da 100644 --- a/main.go +++ b/main.go @@ -81,11 +81,7 @@ func runWebServer() { case syscall.SIGHUP: logger.Info("Received SIGHUP signal. Restarting servers...") - // --- FIX FOR TELEGRAM BOT CONFLICT (409): Stop bot before restart --- - service.StopBot() - // -- - - err := server.Stop() + err := server.StopPanelOnly() if err != nil { logger.Debug("Error stopping web server:", err) } @@ -96,7 +92,7 @@ func runWebServer() { server = web.NewServer() global.SetWebServer(server) - err = server.Start() + err = server.StartPanelOnly() if err != nil { log.Fatalf("Error restarting web server: %v", err) return diff --git a/web/service/traffic_writer.go b/web/service/traffic_writer.go index b15c459a..f7b3fef6 100644 --- a/web/service/traffic_writer.go +++ b/web/service/traffic_writer.go @@ -23,6 +23,7 @@ type trafficWriteRequest struct { var ( twMu sync.Mutex twQueue chan *trafficWriteRequest + twCtx context.Context twCancel context.CancelFunc twDone chan struct{} ) @@ -37,16 +38,26 @@ var ( func StartTrafficWriter() { twMu.Lock() defer twMu.Unlock() - if twQueue != nil { - return + + if twCancel != nil && twDone != nil { + select { + case <-twDone: + clearTrafficWriterState() + default: + return + } } + queue := make(chan *trafficWriteRequest, trafficWriterQueueSize) ctx, cancel := context.WithCancel(context.Background()) done := make(chan struct{}) + twQueue = queue + twCtx = ctx twCancel = cancel twDone = done - go runTrafficWriter(queue, ctx, done) + + go runTrafficWriter(ctx, queue, done) } // StopTrafficWriter cancels the writer context and waits for the goroutine to @@ -56,20 +67,30 @@ func StopTrafficWriter() { twMu.Lock() cancel := twCancel done := twDone - twQueue = nil - twCancel = nil - twDone = nil + if cancel == nil || done == nil { + twMu.Unlock() + return + } + cancel() twMu.Unlock() - if cancel != nil { - cancel() - } - if done != nil { - <-done + <-done + + twMu.Lock() + if twDone == done { + clearTrafficWriterState() } + twMu.Unlock() } -func runTrafficWriter(queue chan *trafficWriteRequest, ctx context.Context, done chan struct{}) { +func clearTrafficWriterState() { + twQueue = nil + twCtx = nil + twCancel = nil + twDone = nil +} + +func runTrafficWriter(ctx context.Context, queue chan *trafficWriteRequest, done chan struct{}) { defer close(done) for { select { @@ -99,18 +120,43 @@ func safeApply(fn func() error) (err error) { } func submitTrafficWrite(fn func() error) error { + req := &trafficWriteRequest{apply: fn, done: make(chan error, 1)} + twMu.Lock() queue := twQueue - twMu.Unlock() - - if queue == nil { + ctx := twCtx + done := twDone + if queue == nil || ctx == nil || done == nil { + twMu.Unlock() return safeApply(fn) } - req := &trafficWriteRequest{apply: fn, done: make(chan error, 1)} + + select { + case <-ctx.Done(): + twMu.Unlock() + return safeApply(fn) + default: + } + + timer := time.NewTimer(trafficWriterSubmitTimeout) + defer timer.Stop() select { case queue <- req: - case <-time.After(trafficWriterSubmitTimeout): + twMu.Unlock() + case <-timer.C: + twMu.Unlock() return errors.New("traffic writer queue full") } - return <-req.done + + select { + case err := <-req.done: + return err + case <-done: + select { + case err := <-req.done: + return err + default: + return errors.New("traffic writer stopped before write completed") + } + } } diff --git a/web/service/traffic_writer_test.go b/web/service/traffic_writer_test.go new file mode 100644 index 00000000..6ecb5eb9 --- /dev/null +++ b/web/service/traffic_writer_test.go @@ -0,0 +1,190 @@ +package service + +import ( + "sync/atomic" + "testing" + "time" +) + +func TestTrafficWriterStartStopStartAcceptsWrites(t *testing.T) { + resetTrafficWriterForTest(t) + + StartTrafficWriter() + var writes atomic.Int32 + if err := submitTrafficWrite(func() error { + writes.Add(1) + return nil + }); err != nil { + t.Fatalf("first submitTrafficWrite: %v", err) + } + + StopTrafficWriter() + StartTrafficWriter() + if err := submitTrafficWrite(func() error { + writes.Add(1) + return nil + }); err != nil { + t.Fatalf("second submitTrafficWrite: %v", err) + } + + if got := writes.Load(); got != 2 { + t.Fatalf("writes = %d, want 2", got) + } +} + +func TestTrafficWriterSubmitAfterStopRunsInline(t *testing.T) { + resetTrafficWriterForTest(t) + + StartTrafficWriter() + StopTrafficWriter() + + ran := make(chan struct{}) + errCh := make(chan error, 1) + go func() { + errCh <- submitTrafficWrite(func() error { + close(ran) + return nil + }) + }() + + select { + case <-ran: + case <-time.After(time.Second): + t.Fatal("submitTrafficWrite did not run after traffic writer stopped") + } + if err := waitTrafficWriterErr(t, errCh); err != nil { + t.Fatalf("submitTrafficWrite after stop: %v", err) + } +} + +func TestTrafficWriterStopDrainsQueuedWrite(t *testing.T) { + resetTrafficWriterForTest(t) + + StartTrafficWriter() + firstStarted := make(chan struct{}) + releaseFirst := make(chan struct{}) + firstErr := make(chan error, 1) + go func() { + firstErr <- submitTrafficWrite(func() error { + close(firstStarted) + <-releaseFirst + return nil + }) + }() + waitTrafficWriterSignal(t, firstStarted, "first write did not start") + + secondRan := make(chan struct{}) + secondErr := make(chan error, 1) + go func() { + secondErr <- submitTrafficWrite(func() error { + close(secondRan) + return nil + }) + }() + waitTrafficWriterQueued(t) + + stopDone := make(chan struct{}) + go func() { + StopTrafficWriter() + close(stopDone) + }() + + select { + case <-stopDone: + t.Fatal("StopTrafficWriter returned before in-flight write was released") + case <-time.After(50 * time.Millisecond): + } + + close(releaseFirst) + waitTrafficWriterSignal(t, stopDone, "StopTrafficWriter did not return") + waitTrafficWriterSignal(t, secondRan, "queued write was not drained") + + if err := waitTrafficWriterErr(t, firstErr); err != nil { + t.Fatalf("first submitTrafficWrite: %v", err) + } + if err := waitTrafficWriterErr(t, secondErr); err != nil { + t.Fatalf("second submitTrafficWrite: %v", err) + } +} + +func TestTrafficWriterConcurrentStopDuringSubmitDoesNotHang(t *testing.T) { + resetTrafficWriterForTest(t) + + StartTrafficWriter() + started := make(chan struct{}) + release := make(chan struct{}) + errCh := make(chan error, 1) + go func() { + errCh <- submitTrafficWrite(func() error { + close(started) + <-release + return nil + }) + }() + waitTrafficWriterSignal(t, started, "write did not start") + + stopDone := make(chan struct{}) + go func() { + StopTrafficWriter() + close(stopDone) + }() + + close(release) + waitTrafficWriterSignal(t, stopDone, "StopTrafficWriter hung during submit") + if err := waitTrafficWriterErr(t, errCh); err != nil { + t.Fatalf("submitTrafficWrite during stop: %v", err) + } +} + +func resetTrafficWriterForTest(t *testing.T) { + t.Helper() + StopTrafficWriter() + twMu.Lock() + clearTrafficWriterState() + twMu.Unlock() + t.Cleanup(func() { + StopTrafficWriter() + twMu.Lock() + clearTrafficWriterState() + twMu.Unlock() + }) +} + +func waitTrafficWriterQueued(t *testing.T) { + t.Helper() + + deadline := time.Now().Add(time.Second) + for time.Now().Before(deadline) { + twMu.Lock() + queued := 0 + if twQueue != nil { + queued = len(twQueue) + } + twMu.Unlock() + if queued > 0 { + return + } + time.Sleep(10 * time.Millisecond) + } + t.Fatal("write was not queued") +} + +func waitTrafficWriterSignal(t *testing.T, ch <-chan struct{}, msg string) { + t.Helper() + select { + case <-ch: + case <-time.After(time.Second): + t.Fatal(msg) + } +} + +func waitTrafficWriterErr(t *testing.T, ch <-chan error) error { + t.Helper() + select { + case err := <-ch: + return err + case <-time.After(time.Second): + t.Fatal("timed out waiting for traffic writer result") + return nil + } +} diff --git a/web/web.go b/web/web.go index 4ba70550..2b8157ed 100644 --- a/web/web.go +++ b/web/web.go @@ -259,11 +259,13 @@ func (s *Server) initRouter() (*gin.Engine, error) { // startTask schedules background jobs (Xray checks, traffic jobs, cron // jobs) which the panel relies on for periodic maintenance and monitoring. -func (s *Server) startTask() { +func (s *Server) startTask(restartXray bool) { s.customGeoService.EnsureOnStartup() - err := s.xrayService.RestartXray(true) - if err != nil { - logger.Warning("start xray failed:", err) + if restartXray { + err := s.xrayService.RestartXray(true) + if err != nil { + logger.Warning("start xray failed:", err) + } } // Check whether xray is running every second s.cron.AddJob("@every 1s", job.NewCheckXrayRunningJob()) @@ -348,6 +350,15 @@ func (s *Server) startTask() { // Start initializes and starts the web server with configured settings, routes, and background jobs. func (s *Server) Start() (err error) { + return s.start(true, true) +} + +// StartPanelOnly initializes the panel during an in-process panel restart without cycling Xray. +func (s *Server) StartPanelOnly() (err error) { + return s.start(false, false) +} + +func (s *Server) start(restartXray bool, startTgBot bool) (err error) { // This is an anonymous function, no function name defer func() { if err != nil { @@ -427,12 +438,14 @@ func (s *Server) Start() (err error) { s.httpServer.Serve(listener) }() - s.startTask() + s.startTask(restartXray) - isTgbotenabled, err := s.settingService.GetTgbotEnabled() - if (err == nil) && (isTgbotenabled) { - tgBot := s.tgbotService.NewTgbot() - tgBot.Start(i18nFS) + if startTgBot { + isTgbotenabled, err := s.settingService.GetTgbotEnabled() + if (err == nil) && (isTgbotenabled) { + tgBot := s.tgbotService.NewTgbot() + tgBot.Start(i18nFS) + } } return nil @@ -440,13 +453,26 @@ func (s *Server) Start() (err error) { // Stop gracefully shuts down the web server, stops Xray, cron jobs, and Telegram bot. func (s *Server) Stop() error { + return s.stop(true, true) +} + +// StopPanelOnly stops only panel-owned HTTP/background resources for an in-process panel restart. +func (s *Server) StopPanelOnly() error { + return s.stop(false, false) +} + +func (s *Server) stop(stopXray bool, stopTgBot bool) error { s.cancel() - s.xrayService.StopXray() + if stopXray { + s.xrayService.StopXray() + } if s.cron != nil { s.cron.Stop() } - service.StopTrafficWriter() - if s.tgbotService.IsRunning() { + if stopXray { + service.StopTrafficWriter() + } + if stopTgBot && s.tgbotService.IsRunning() { s.tgbotService.Stop() } // Gracefully stop WebSocket hub