diff --git a/web/job/check_client_ip_job.go b/web/job/check_client_ip_job.go index 3f1064ff..87d7d951 100644 --- a/web/job/check_client_ip_job.go +++ b/web/job/check_client_ip_job.go @@ -35,25 +35,6 @@ var job *CheckClientIpJob const defaultXrayAPIPort = 62789 -// ipStaleAfterSeconds controls how long a client IP kept in the -// per-client tracking table (model.InboundClientIps.Ips) is considered -// still "active" before it's evicted during the next scan. -// -// Without this eviction, an IP that connected once and then went away -// keeps sitting in the table with its old timestamp. Because the -// excess-IP selector sorts ascending ("oldest wins, newest loses") to -// protect the original/current connections, that stale entry keeps -// occupying a slot and the IP that is *actually* currently using the -// config gets classified as "new excess" and banned by fail2ban on -// every single run — producing the continuous ban loop from #4077. -// -// 30 minutes is chosen so an actively-streaming client (where xray -// emits a fresh `accepted` log line whenever it opens a new TCP) will -// always refresh its timestamp well within the window, but a client -// that has really stopped using the config will drop out of the table -// in a bounded time and free its slot. -const ipStaleAfterSeconds = int64(30 * 60) - // NewCheckClientIpJob creates a new client IP monitoring job instance. func NewCheckClientIpJob() *CheckClientIpJob { job = new(CheckClientIpJob) @@ -130,8 +111,8 @@ func (j *CheckClientIpJob) hasLimitIp() bool { settings := map[string][]model.Client{} json.Unmarshal([]byte(inbound.Settings), &settings) - clients := settings["clients"] + clients := settings["clients"] for _, client := range clients { limitIp := client.LimitIP if limitIp > 0 { @@ -144,7 +125,6 @@ func (j *CheckClientIpJob) hasLimitIp() bool { } func (j *CheckClientIpJob) processLogFile() bool { - ipRegex := regexp.MustCompile(`from (?:tcp:|udp:)?\[?([0-9a-fA-F\.:]+)\]?:\d+ accepted`) emailRegex := regexp.MustCompile(`email: (.+)$`) timestampRegex := regexp.MustCompile(`^(\d{4}/\d{2}/\d{2} \d{2}:\d{2}:\d{2})`) @@ -153,7 +133,6 @@ func (j *CheckClientIpJob) processLogFile() bool { file, _ := os.Open(accessLogPath) defer file.Close() - // Track IPs with their last seen timestamp inboundClientIps := make(map[string]map[string]int64, 100) scanner := bufio.NewScanner(file) @@ -166,7 +145,6 @@ func (j *CheckClientIpJob) processLogFile() bool { } ip := ipMatches[1] - if ip == "127.0.0.1" || ip == "::1" { continue } @@ -175,9 +153,9 @@ func (j *CheckClientIpJob) processLogFile() bool { if len(emailMatches) < 2 { continue } + email := emailMatches[1] - // Extract timestamp from log line var timestamp int64 timestampMatches := timestampRegex.FindStringSubmatch(line) if len(timestampMatches) >= 2 { @@ -194,19 +172,19 @@ func (j *CheckClientIpJob) processLogFile() bool { if _, exists := inboundClientIps[email]; !exists { inboundClientIps[email] = make(map[string]int64) } - // Update timestamp - keep the latest + if existingTime, ok := inboundClientIps[email][ip]; !ok || timestamp > existingTime { inboundClientIps[email][ip] = timestamp } } + if err := scanner.Err(); err != nil { j.checkError(err) } shouldCleanLog := false - for email, ipTimestamps := range inboundClientIps { - // Convert to IPWithTimestamp slice + for email, ipTimestamps := range inboundClientIps { ipsWithTime := make([]IPWithTimestamp, 0, len(ipTimestamps)) for ip, timestamp := range ipTimestamps { ipsWithTime = append(ipsWithTime, IPWithTimestamp{IP: ip, Timestamp: timestamp}) @@ -232,41 +210,38 @@ func (j *CheckClientIpJob) processLogFile() bool { // and the merge policy can be exercised by a unit test. func mergeClientIps(old, new []IPWithTimestamp, staleCutoff int64) map[string]int64 { ipMap := make(map[string]int64, len(old)+len(new)) + for _, ipTime := range old { if ipTime.Timestamp < staleCutoff { continue } + ipMap[ipTime.IP] = ipTime.Timestamp } + for _, ipTime := range new { if ipTime.Timestamp < staleCutoff { continue } + if existingTime, ok := ipMap[ipTime.IP]; !ok || ipTime.Timestamp > existingTime { ipMap[ipTime.IP] = ipTime.Timestamp } } + return ipMap } // partitionLiveIps splits the merged ip map into live (seen in the -// current scan) and historical (only in the db blob, still inside the -// staleness window). +// current scan) and historical (only in the db blob). // // only live ips count toward the per-client limit. historical ones stay // in the db so the panel keeps showing them, but they must not take a -// protected slot. the 30min cutoff alone isn't tight enough: an ip that -// stopped connecting a few minutes ago still looks fresh to -// mergeClientIps, and since the over-limit picker sorts ascending and -// keeps the oldest, those idle entries used to win the slot while the -// ip actually connecting got classified as excess and sent to fail2ban -// every tick. see #4077 / #4091. -// -// live is sorted ascending so the "protect original, ban newcomer" -// rule still holds when several ips are really connecting at once. +// protected slot. func partitionLiveIps(ipMap map[string]int64, observedThisScan map[string]bool) (live, historical []IPWithTimestamp) { live = make([]IPWithTimestamp, 0, len(observedThisScan)) historical = make([]IPWithTimestamp, 0, len(ipMap)) + for ip, ts := range ipMap { entry := IPWithTimestamp{IP: ip, Timestamp: ts} if observedThisScan[ip] { @@ -275,8 +250,10 @@ func partitionLiveIps(ipMap map[string]int64, observedThisScan map[string]bool) historical = append(historical, entry) } } + sort.Slice(live, func(i, j int) bool { return live[i].Timestamp < live[j].Timestamp }) sort.Slice(historical, func(i, j int) bool { return historical[i].Timestamp < historical[j].Timestamp }) + return live, historical } @@ -284,6 +261,7 @@ func (j *CheckClientIpJob) checkFail2BanInstalled() bool { cmd := "fail2ban-client" args := []string{"-h"} err := exec.Command(cmd, args...).Run() + return err == nil } @@ -297,6 +275,7 @@ func (j *CheckClientIpJob) checkAccessLogAvailable(iplimitActive bool) bool { if iplimitActive { logger.Warning("[LimitIP] Access log path is not set, Please configure the access log path in Xray configs.") } + return false } @@ -312,15 +291,18 @@ func (j *CheckClientIpJob) checkError(e error) { func (j *CheckClientIpJob) getInboundClientIps(clientEmail string) (*model.InboundClientIps, error) { db := database.GetDB() InboundClientIps := &model.InboundClientIps{} + err := db.Model(model.InboundClientIps{}).Where("client_email = ?", clientEmail).First(InboundClientIps).Error if err != nil { return nil, err } + return InboundClientIps, nil } func (j *CheckClientIpJob) addInboundClientIps(clientEmail string, ipsWithTime []IPWithTimestamp) error { inboundClientIps := &model.InboundClientIps{} + jsonIps, err := json.Marshal(ipsWithTime) j.checkError(err) @@ -342,11 +324,11 @@ func (j *CheckClientIpJob) addInboundClientIps(clientEmail string, ipsWithTime [ if err != nil { return err } + return nil } func (j *CheckClientIpJob) updateInboundClientIps(inboundClientIps *model.InboundClientIps, clientEmail string, newIpsWithTime []IPWithTimestamp) bool { - // Get the inbound configuration inbound, err := j.getInboundByEmail(clientEmail) if err != nil { logger.Errorf("failed to fetch inbound settings for email %s: %s", clientEmail, err) @@ -360,11 +342,12 @@ func (j *CheckClientIpJob) updateInboundClientIps(inboundClientIps *model.Inboun settings := map[string][]model.Client{} json.Unmarshal([]byte(inbound.Settings), &settings) + clients := settings["clients"] - // Find the client's IP limit var limitIp int var clientFound bool + for _, client := range clients { if client.Email == clientEmail { limitIp = client.LimitIP @@ -374,77 +357,63 @@ func (j *CheckClientIpJob) updateInboundClientIps(inboundClientIps *model.Inboun } if !clientFound || limitIp <= 0 || !inbound.Enable { - // No limit or inbound disabled, just update and return jsonIps, _ := json.Marshal(newIpsWithTime) inboundClientIps.Ips = string(jsonIps) + db := database.GetDB() db.Save(inboundClientIps) + return false } - // Parse old IPs from database var oldIpsWithTime []IPWithTimestamp if inboundClientIps.Ips != "" { json.Unmarshal([]byte(inboundClientIps.Ips), &oldIpsWithTime) } - // Merge old and new IPs, evicting entries that haven't been - // re-observed in a while. See mergeClientIps / #4077 for why. - ipMap := mergeClientIps(oldIpsWithTime, newIpsWithTime, time.Now().Unix()-ipStaleAfterSeconds) + ipMap := mergeClientIps(oldIpsWithTime, newIpsWithTime, 0) - // only ips seen in this scan count toward the limit. see - // partitionLiveIps. observedThisScan := make(map[string]bool, len(newIpsWithTime)) for _, ipTime := range newIpsWithTime { observedThisScan[ipTime.IP] = true } + liveIps, historicalIps := partitionLiveIps(ipMap, observedThisScan) shouldCleanLog := false j.disAllowedIps = []string{} - // historical db-only ips are excluded from this count on purpose. var keptLive []IPWithTimestamp + if len(liveIps) > limitIp { shouldCleanLog = true - // protect the oldest live ip, ban newcomers. keptLive = liveIps[:limitIp] bannedLive := liveIps[limitIp:] - // Open log file only when a ban entry needs to be written. - // Use a local logger to avoid mutating the global log.* state, - // which would redirect all standard-library logging to this file - // and leave a dangling closed-file handle after the defer fires. logIpFile, err := os.OpenFile(xray.GetIPLimitLogPath(), os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0644) if err != nil { logger.Errorf("failed to open IP limit log file: %s", err) return false } defer logIpFile.Close() + ipLogger := log.New(logIpFile, "", log.LstdFlags) - // log format is load-bearing: x-ui.sh create_iplimit_jails builds - // filter.d/3x-ipl.conf with - // failregex = \[LIMIT_IP\]\s*Email\s*=\s*.+\s*\|\|\s*Disconnecting OLD IP\s*=\s*\s*\|\|\s*Timestamp\s*=\s*\d+ - // don't change the wording. for _, ipTime := range bannedLive { j.disAllowedIps = append(j.disAllowedIps, ipTime.IP) ipLogger.Printf("[LIMIT_IP] Email = %s || Disconnecting OLD IP = %s || Timestamp = %d", clientEmail, ipTime.IP, ipTime.Timestamp) } - // force xray to drop existing connections from banned ips j.disconnectClientTemporarily(inbound, clientEmail, clients) } else { keptLive = liveIps } - // keep kept-live + historical in the blob so the panel keeps showing - // recently seen ips. banned live ips are already in the fail2ban log - // and will reappear in the next scan if they reconnect. dbIps := make([]IPWithTimestamp, 0, len(keptLive)+len(historicalIps)) dbIps = append(dbIps, keptLive...) dbIps = append(dbIps, historicalIps...) + jsonIps, _ := json.Marshal(dbIps) inboundClientIps.Ips = string(jsonIps) @@ -465,8 +434,8 @@ func (j *CheckClientIpJob) updateInboundClientIps(inboundClientIps *model.Inboun // disconnectClientTemporarily removes and re-adds a client to force disconnect banned connections func (j *CheckClientIpJob) disconnectClientTemporarily(inbound *model.Inbound, clientEmail string, clients []model.Client) { var xrayAPI xray.XrayAPI - apiPort := j.resolveXrayAPIPort() + apiPort := j.resolveXrayAPIPort() err := xrayAPI.Init(apiPort) if err != nil { logger.Warningf("[LIMIT_IP] Failed to init Xray API for disconnection: %v", err) @@ -474,11 +443,9 @@ func (j *CheckClientIpJob) disconnectClientTemporarily(inbound *model.Inbound, c } defer xrayAPI.Close() - // Find the client config var clientConfig map[string]any for _, client := range clients { if client.Email == clientEmail { - // Convert client to map for API clientBytes, _ := json.Marshal(client) json.Unmarshal(clientBytes, &clientConfig) break @@ -489,20 +456,17 @@ func (j *CheckClientIpJob) disconnectClientTemporarily(inbound *model.Inbound, c return } - // Only perform remove/re-add for protocols supported by XrayAPI.AddUser protocol := string(inbound.Protocol) switch protocol { case "vmess", "vless", "trojan", "shadowsocks": - // supported protocols, continue default: logger.Warningf("[LIMIT_IP] Temporary disconnect is not supported for protocol %s on inbound %s", protocol, inbound.Tag) return } - // For Shadowsocks, ensure the required "cipher" field is present by - // reading it from the inbound settings (e.g., settings["method"]). if string(inbound.Protocol) == "shadowsocks" { var inboundSettings map[string]any + if err := json.Unmarshal([]byte(inbound.Settings), &inboundSettings); err != nil { logger.Warningf("[LIMIT_IP] Failed to parse inbound settings for shadowsocks cipher: %v", err) } else { @@ -512,17 +476,14 @@ func (j *CheckClientIpJob) disconnectClientTemporarily(inbound *model.Inbound, c } } - // Remove user to disconnect all connections err = xrayAPI.RemoveUser(inbound.Tag, clientEmail) if err != nil { logger.Warningf("[LIMIT_IP] Failed to remove user %s: %v", clientEmail, err) return } - // Wait a moment for disconnection to take effect time.Sleep(100 * time.Millisecond) - // Re-add user to allow new connections err = xrayAPI.AddUser(protocol, inbound.Tag, clientConfig) if err != nil { logger.Warningf("[LIMIT_IP] Failed to re-add user %s: %v", clientEmail, err) @@ -542,6 +503,7 @@ func (j *CheckClientIpJob) resolveXrayAPIPort() int { db := database.GetDB() var template model.Setting + if err := db.Where("key = ?", "xrayTemplateConfig").First(&template).Error; err == nil { if port, parseErr := getAPIPortFromConfigData([]byte(template.Value)); parseErr == nil { return port @@ -573,6 +535,7 @@ func getAPIPortFromConfigPath(configPath string) (int, error) { func getAPIPortFromConfigData(configData []byte) (int, error) { xrayConfig := &xray.Config{} + if err := json.Unmarshal(configData, xrayConfig); err != nil { return 0, err }