Remnawave_python-sdk/tests/test_models_validation.py
Artem 3e6be1f4b4
feat: Add new endpoints for fetching user IPs and recap statistics
- Implemented `fetch_users_ips` and `get_fetch_users_ips_result` in `IpControlController`.
- Added `get_recap` endpoint in `SystemController`.
- Introduced new models for user IP fetching and recap statistics in `models/ip_control.py` and `models/system.py`.
- Updated existing models and enums to accommodate new features.
- Added tests for new endpoints and model validations.
2026-03-28 19:51:37 +01:00

277 lines
8.4 KiB
Python

"""Tests for model field validation and serialization."""
import pytest
from datetime import datetime, timezone
from uuid import uuid4
from remnawave.models import (
# Users
ResolveUserRequestBodyDto,
ResolveUserResponseDto,
RevokeUserRequestDto,
# System
GetRecapResponseDto,
RecapThisMonth,
RecapTotal,
# IP Control
FetchUsersIpsResponseDto,
FetchUsersIpsResultResponseDto,
FetchUsersIpsUserIp,
FetchUsersIpsUser,
FetchUsersIpsResult,
DropConnectionsRequestDto,
DropByUserUuids,
DropByIpAddresses,
TargetAllNodes,
TargetSpecificNodes,
# Infra Billing
CreateInfraBillingHistoryRecordRequestDto,
CreateInfraBillingNodeRequestDto,
# Subscription Settings
ResponseRules,
ResponseRulesSettings,
# Webhook
NodeSystemDto,
NodeSystemInfoDto,
NodeSystemStatsDto,
NodeVersionsDto,
)
from remnawave.enums import ResponseRuleVersion
class TestResolveUserRequestBodyDto:
def test_create_with_uuid(self):
uid = uuid4()
dto = ResolveUserRequestBodyDto(uuid=uid)
assert dto.uuid == uid
assert dto.id is None
assert dto.username is None
def test_create_with_username(self):
dto = ResolveUserRequestBodyDto(username="testuser")
assert dto.username == "testuser"
assert dto.uuid is None
def test_create_with_short_uuid(self):
dto = ResolveUserRequestBodyDto(short_uuid="abc123")
assert dto.short_uuid == "abc123"
def test_serialization_alias(self):
dto = ResolveUserRequestBodyDto(short_uuid="abc123")
data = dto.model_dump(by_alias=True)
assert "shortUuid" in data
def test_create_with_id(self):
dto = ResolveUserRequestBodyDto(id=42)
assert dto.id == 42
class TestResolveUserResponseDto:
def test_from_api_response(self):
uid = uuid4()
dto = ResolveUserResponseDto(
uuid=uid,
username="testuser",
id=1,
shortUuid="abc123",
)
assert dto.uuid == uid
assert dto.username == "testuser"
assert dto.id == 1
assert dto.short_uuid == "abc123"
class TestGetRecapResponseDto:
def test_from_api_response(self):
dto = GetRecapResponseDto(
thisMonth={"users": 10, "traffic": "1.5 GB"},
total={
"users": 100,
"nodes": 5,
"traffic": "500 GB",
"nodesRam": "32 GB",
"nodesCpuCores": 16,
"distinctCountries": 3,
},
version="1.11.0",
initDate="2025-01-01T00:00:00Z",
)
assert dto.this_month.users == 10
assert dto.this_month.traffic == "1.5 GB"
assert dto.total.nodes == 5
assert dto.total.nodes_ram == "32 GB"
assert dto.total.nodes_cpu_cores == 16
assert dto.total.distinct_countries == 3
assert dto.version == "1.11.0"
assert isinstance(dto.init_date, datetime)
class TestFetchUsersIpsModels:
def test_response_dto(self):
dto = FetchUsersIpsResponseDto(jobId="job-123")
assert dto.job_id == "job-123"
def test_result_not_completed(self):
dto = FetchUsersIpsResultResponseDto(
isCompleted=False,
isFailed=False,
result=None,
)
assert dto.is_completed is False
assert dto.is_failed is False
assert dto.result is None
def test_result_completed(self):
uid = uuid4()
dto = FetchUsersIpsResultResponseDto(
isCompleted=True,
isFailed=False,
result={
"success": True,
"nodeUuid": str(uid),
"users": [
{
"userId": "user-1",
"ips": [
{"ip": "1.2.3.4", "lastSeen": "2025-01-01T00:00:00Z"},
],
}
],
},
)
assert dto.is_completed is True
assert dto.result.success is True
assert dto.result.node_uuid == uid
assert len(dto.result.users) == 1
assert dto.result.users[0].user_id == "user-1"
assert dto.result.users[0].ips[0].ip == "1.2.3.4"
class TestCreateInfraBillingHistoryRecordRequestDto:
def test_fields_match_spec(self):
uid = uuid4()
now = datetime.now(tz=timezone.utc)
dto = CreateInfraBillingHistoryRecordRequestDto(
provider_uuid=uid,
amount=29.99,
billed_at=now,
)
assert dto.provider_uuid == uid
assert dto.amount == 29.99
assert dto.billed_at == now
def test_serialization(self):
uid = uuid4()
now = datetime.now(tz=timezone.utc)
dto = CreateInfraBillingHistoryRecordRequestDto(
provider_uuid=uid,
amount=10.0,
billed_at=now,
)
data = dto.model_dump(by_alias=True)
assert "providerUuid" in data
assert "billedAt" in data
assert "amount" in data
def test_no_old_fields(self):
"""Ensure removed fields don't exist."""
assert not hasattr(CreateInfraBillingHistoryRecordRequestDto, "node_uuid")
assert not hasattr(CreateInfraBillingHistoryRecordRequestDto, "payment_date")
assert not hasattr(CreateInfraBillingHistoryRecordRequestDto, "description")
class TestCreateInfraBillingNodeRequestDto:
def test_next_billing_at_optional(self):
dto = CreateInfraBillingNodeRequestDto(
node_uuid=uuid4(),
provider_uuid=uuid4(),
)
assert dto.next_billing_at is None
def test_next_billing_at_provided(self):
now = datetime.now(tz=timezone.utc)
dto = CreateInfraBillingNodeRequestDto(
node_uuid=uuid4(),
provider_uuid=uuid4(),
next_billing_at=now,
)
assert dto.next_billing_at == now
class TestResponseRulesSettings:
def test_settings_field_exists(self):
rules = ResponseRules(
version=ResponseRuleVersion.V1,
rules=[],
settings=ResponseRulesSettings(
disable_subscription_access_by_path=True,
),
)
assert rules.settings is not None
assert rules.settings.disable_subscription_access_by_path is True
def test_settings_optional(self):
rules = ResponseRules(
version=ResponseRuleVersion.V1,
rules=[],
)
assert rules.settings is None
def test_settings_deserialization(self):
rules = ResponseRules.model_validate({
"version": "1",
"rules": [],
"settings": {"disableSubscriptionAccessByPath": False},
})
assert rules.settings.disable_subscription_access_by_path is False
class TestWebhookNodeDto:
def test_system_field(self):
system = NodeSystemDto.model_validate({
"info": {
"arch": "x64",
"cpus": 4,
"cpuModel": "Intel Core i7",
"memoryTotal": 16384,
"hostname": "node-1",
"platform": "linux",
"release": "5.15.0",
"type": "Linux",
"version": "#1 SMP",
"networkInterfaces": ["eth0", "lo"],
},
"stats": {
"memoryFree": 8192,
"memoryUsed": 8192,
"uptime": 3600,
"loadAvg": [0.5, 0.3, 0.1],
"interface": None,
},
})
assert system.info.arch == "x64"
assert system.info.cpus == 4
assert system.info.cpu_model == "Intel Core i7"
assert system.stats.memory_free == 8192
assert system.stats.uptime == 3600
def test_versions_field(self):
versions = NodeVersionsDto.model_validate({
"xray": "1.8.6",
"node": "0.5.0",
})
assert versions.xray == "1.8.6"
assert versions.node == "0.5.0"
def test_node_dto_has_new_fields(self):
from remnawave.models.webhook import NodeDto
fields = NodeDto.model_fields
assert "active_plugin_uuid" in fields
assert "system" in fields
assert "versions" in fields
def test_node_dto_xray_uptime_is_float(self):
from remnawave.models.webhook import NodeDto
field = NodeDto.model_fields["xray_uptime"]
assert field.annotation == float or field.annotation is float