Merge pull request #44 from remnawave:development

Update SDK for Python compatibility and add IP control features
This commit is contained in:
Artem 2026-02-24 22:55:58 +01:00 committed by GitHub
commit 09b7294528
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
11 changed files with 303 additions and 30 deletions

2
.gitignore vendored
View file

@ -99,7 +99,7 @@ ipython_config.py
# Similar to Pipfile.lock, it is generally recommended to include uv.lock in version control.
# This is especially recommended for binary packages to ensure reproducibility, and is more
# commonly ignored for libraries.
#uv.lock
uv.lock
# poetry
# Similar to Pipfile.lock, it is generally recommended to include poetry.lock in version control.

View file

@ -50,6 +50,7 @@ pip install git+https://github.com/remnawave/python-sdk.git@development
| Contract Version | Remnawave Panel Version |
| ---------------- | ----------------------- |
| 2.6.2 | >=2.6.2 |
| 2.6.1 | >=2.6.0 |
| 2.4.4 | >=2.4.0 |
| 2.3.2 | >=2.3.0, <2.4.0 |
@ -104,7 +105,7 @@ async def main():
remnawave = RemnawaveSDK(base_url=base_url, token=token)
# Fetch all users
response: UsersResponseDto = await remnawave.users.get_all_users_v2()
response: UsersResponseDto = await remnawave.users.get_all_users()
total_users: int = response.total
users: list[UserResponseDto] = response.users
print("Total users: ", total_users)

View file

@ -1,13 +1,13 @@
[project]
name = "remnawave"
version = "2.6.1"
description = "A Python SDK for interacting with the Remnawave API v2.6.1."
version = "2.6.2"
description = "A Python SDK for interacting with the Remnawave API v2.6.2."
authors = [
{name = "Artem",email = "dev@forestsnet.com"}
]
license = { text = "MIT" }
readme = "README.md"
requires-python = ">=3.11,<4.0"
requires-python = ">=3.11,<3.14"
dependencies = [
"rapid-api-client (==0.6.0)",
"orjson (>=3.10.15,<4.0.0)",

View file

@ -32,6 +32,7 @@ from remnawave.controllers import (
SnippetsController,
RemnawaveSettingsController,
SubscriptionPageConfigController,
IpControlController,
)
@ -44,6 +45,7 @@ class RemnawaveSDK:
caddy_token: Optional[str] = None,
ssl_ignore: Optional[bool] = False,
custom_headers: Optional[dict] = None,
cookies: Optional[dict] = None,
):
"""
Remnawave SDK init
@ -55,6 +57,7 @@ class RemnawaveSDK:
caddy_token (Optional[str]): - Token for Caddy Auth (Headers). Defaults to None.
ssl_ignore (Optional[bool]): - Whether to ignore SSL certificate errors. Defaults to False.
custom_headers (Optional[dict]): - Custom headers to include in the requests. Defaults to None.
cookies (Optional[dict]): - Cookies for Reverse Proxy authorization. Defaults to None.
"""
self._client = client
self._token = token
@ -62,6 +65,7 @@ class RemnawaveSDK:
self.caddy_token = caddy_token
self.ssl_ignore = ssl_ignore
self.custom_headers = custom_headers
self.cookies = cookies
self._validate_params()
@ -96,6 +100,7 @@ class RemnawaveSDK:
self.snippets = SnippetsController(self._client)
self.remnawave_settings = RemnawaveSettingsController(self._client)
self.subscription_page_config = SubscriptionPageConfigController(self._client)
self.ip_control = IpControlController(self._client)
def _validate_params(self) -> None:
if self._client is None:
@ -108,13 +113,21 @@ class RemnawaveSDK:
logging.warning(
"base_url and token will be ignored if client is provided"
)
if self.cookies is not None:
logging.warning(
"cookies will be ignored if client is provided"
)
def _prepare_client(self) -> httpx.AsyncClient:
return httpx.AsyncClient(
base_url=self._prepare_url(),
headers=self._prepare_headers(),
verify=not self.ssl_ignore,
)
client_kwargs = {
"base_url": self._prepare_url(),
"headers": self._prepare_headers(),
"verify": not self.ssl_ignore,
}
if self.cookies is not None:
client_kwargs["cookies"] = self.cookies
return httpx.AsyncClient(**client_kwargs)
def _prepare_headers(self) -> dict:
headers = {}

View file

@ -26,6 +26,7 @@ from .external_squads import ExternalSquadsController
from .snippets import SnippetsController
from .remnawave_settings import RemnawaveSettingsController
from .subscription_page import SubscriptionPageConfigController
from .ip_control import IpControlController
__all__ = [
"APITokensManagementController",
@ -55,5 +56,6 @@ __all__ = [
"ExternalSquadsController",
"SnippetsController",
"RemnawaveSettingsController",
"SubscriptionPageConfigController"
"SubscriptionPageConfigController",
"IpControlController",
]

View file

@ -0,0 +1,55 @@
from typing import Annotated
from rapid_api_client import Path
from rapid_api_client.annotations import PydanticBody
from remnawave.models import (
DropConnectionsRequestDto,
DropConnectionsResponseDto,
FetchIpsResponseDto,
FetchIpsResultResponseDto,
)
from remnawave.rapid import BaseController, get, post
class IpControlController(BaseController):
@post("/ip-control/fetch-ips/{uuid}", response_class=FetchIpsResponseDto)
async def fetch_user_ips(
self,
uuid: Annotated[str, Path(description="UUID of the user")],
) -> FetchIpsResponseDto:
"""Request IP List for User.
Starts a background job that queries all connected nodes for the IPs
used by the given user. The returned ``job_id`` must be passed to
:meth:`get_fetch_ips_result` to retrieve the actual list once the job
is complete.
"""
...
@get("/ip-control/fetch-ips/result/{jobId}", response_class=FetchIpsResultResponseDto)
async def get_fetch_ips_result(
self,
jobId: Annotated[str, Path(description="Job ID returned by fetch_user_ips")],
) -> FetchIpsResultResponseDto:
"""Get IP List Result by Job ID.
Poll this endpoint after calling :meth:`fetch_user_ips`. When
``is_completed`` is ``True`` the ``result`` field contains per-node
IP lists. When ``is_failed`` is ``True`` the job encountered an
error.
"""
...
@post("/ip-control/drop-connections", response_class=DropConnectionsResponseDto)
async def drop_connections(
self,
body: Annotated[DropConnectionsRequestDto, PydanticBody()],
) -> DropConnectionsResponseDto:
"""Drop active connections.
Sends a drop-connections event to the target nodes. You can specify
the connections to drop either by user UUIDs or by IP addresses, and
you can target all connected nodes or a specific subset.
"""
...

View file

@ -382,6 +382,7 @@ from .webhook import (
BaseUserDto,
UserDto,
NodeDto,
WebhookNodeConfigProfileDto,
ConfigProfileInboundDto,
InfraProviderDto,
LoginAttemptDto,
@ -467,6 +468,25 @@ from .subscription_page import (
UpdateSubscriptionPageConfigRequestDto,
UpdateSubscriptionPageConfigResponseDto,
)
from .ip_control import (
# Request DTOs
DropConnectionsRequestDto,
DropByUserUuids,
DropByIpAddresses,
TargetAllNodes,
TargetSpecificNodes,
# Response DTOs
FetchIpsResponseDto,
FetchIpsResultResponseDto,
DropConnectionsResponseDto,
# Data models
FetchIpsJobData,
FetchIpsProgressData,
FetchIpsNodeResult,
FetchIpsResult,
FetchIpsResultData,
DropConnectionsResponseData,
)
__all__ = [
# Auth models
@ -829,6 +849,7 @@ __all__ = [
"ConfigProfileInboundDto",
"InfraProviderDto",
"NodeDto",
"WebhookNodeConfigProfileDto",
"NodeEventDto",
# ERROR EVENTS
@ -916,4 +937,20 @@ __all__ = [
"SubscriptionPageConfigDto",
"UpdateSubscriptionPageConfigRequestDto",
"UpdateSubscriptionPageConfigResponseDto",
# IP Control models
"DropConnectionsRequestDto",
"DropByUserUuids",
"DropByIpAddresses",
"TargetAllNodes",
"TargetSpecificNodes",
"FetchIpsResponseDto",
"FetchIpsResultResponseDto",
"DropConnectionsResponseDto",
"FetchIpsJobData",
"FetchIpsProgressData",
"FetchIpsNodeResult",
"FetchIpsResult",
"FetchIpsResultData",
"DropConnectionsResponseData",
]

View file

@ -30,15 +30,15 @@ class ExternalSquadTemplateDto(BaseModel):
class ExternalSquadSubscriptionSettingsDto(BaseModel):
"""External squad subscription settings"""
profile_title: str = Field(alias="profileTitle")
support_link: str = Field(alias="supportLink")
profile_update_interval: int = Field(alias="profileUpdateInterval", ge=1)
is_profile_webpage_url_enabled: bool = Field(alias="isProfileWebpageUrlEnabled")
serve_json_at_base_subscription: bool = Field(alias="serveJsonAtBaseSubscription")
is_show_custom_remarks: bool = Field(alias="isShowCustomRemarks")
profile_title: Optional[str] = Field(None, alias="profileTitle")
support_link: Optional[str] = Field(None, alias="supportLink")
profile_update_interval: Optional[int] = Field(None, alias="profileUpdateInterval", ge=1)
is_profile_webpage_url_enabled: Optional[bool] = Field(None, alias="isProfileWebpageUrlEnabled")
serve_json_at_base_subscription: Optional[bool] = Field(None, alias="serveJsonAtBaseSubscription")
is_show_custom_remarks: Optional[bool] = Field(None, alias="isShowCustomRemarks")
happ_announce: Optional[str] = Field(None, alias="happAnnounce")
happ_routing: Optional[str] = Field(None, alias="happRouting")
randomize_hosts: bool = Field(alias="randomizeHosts")
randomize_hosts: Optional[bool] = Field(None, alias="randomizeHosts")
class ExternalSquadHostOverridesDto(BaseModel):
"""External squad host overrides"""

View file

@ -0,0 +1,142 @@
from typing import Annotated, List, Literal, Optional, Union
from uuid import UUID
from pydantic import BaseModel, Field
# ─────────────────────────────────────────────────────────────────────────────
# Fetch IPs step 1: start the job
# ─────────────────────────────────────────────────────────────────────────────
class FetchIpsJobData(BaseModel):
"""Returned job ID after requesting IP fetch"""
job_id: str = Field(alias="jobId")
class FetchIpsResponseDto(FetchIpsJobData):
"""Response for POST /api/ip-control/fetch-ips/{uuid}"""
pass
# ─────────────────────────────────────────────────────────────────────────────
# Fetch IPs step 2: poll the job result
# ─────────────────────────────────────────────────────────────────────────────
class FetchIpsProgressData(BaseModel):
"""Progress information for an IP-fetch job"""
total: int
completed: int
percent: float
class FetchIpsNodeResult(BaseModel):
"""Per-node IP list for a user"""
node_uuid: UUID = Field(alias="nodeUuid")
node_name: str = Field(alias="nodeName")
country_code: str = Field(alias="countryCode")
ips: List[str]
class FetchIpsResult(BaseModel):
"""Full result payload when the job is completed"""
success: bool
user_uuid: UUID = Field(alias="userUuid")
user_id: str = Field(alias="userId")
nodes: List[FetchIpsNodeResult]
class FetchIpsResultData(BaseModel):
"""Job state + optional result"""
is_completed: bool = Field(alias="isCompleted")
is_failed: bool = Field(alias="isFailed")
progress: FetchIpsProgressData
result: Optional[FetchIpsResult] = None
class FetchIpsResultResponseDto(FetchIpsResultData):
"""Response for GET /api/ip-control/fetch-ips/result/{jobId}"""
pass
# ─────────────────────────────────────────────────────────────────────────────
# Drop Connections request discriminated unions for dropBy / targetNodes
# ─────────────────────────────────────────────────────────────────────────────
class DropByUserUuids(BaseModel):
"""Drop connections for specific user UUIDs"""
by: Literal["userUuids"] = "userUuids"
user_uuids: List[UUID] = Field(
...,
serialization_alias="userUuids",
min_length=1,
description="List of user UUIDs whose connections should be dropped",
)
class DropByIpAddresses(BaseModel):
"""Drop connections from specific IP addresses"""
by: Literal["ipAddresses"] = "ipAddresses"
ip_addresses: List[str] = Field(
...,
serialization_alias="ipAddresses",
min_length=1,
description="List of IP addresses to disconnect",
)
# Discriminated union use `by` field as the discriminator
DropBy = Annotated[
Union[DropByUserUuids, DropByIpAddresses],
Field(discriminator="by"),
]
class TargetAllNodes(BaseModel):
"""Send the drop-connections event to all connected nodes"""
target: Literal["allNodes"] = "allNodes"
class TargetSpecificNodes(BaseModel):
"""Send the drop-connections event to specific nodes only"""
target: Literal["specificNodes"] = "specificNodes"
node_uuids: List[UUID] = Field(
...,
serialization_alias="nodeUuids",
min_length=1,
description="List of node UUIDs to target",
)
# Discriminated union use `target` field as the discriminator
TargetNodes = Annotated[
Union[TargetAllNodes, TargetSpecificNodes],
Field(discriminator="target"),
]
class DropConnectionsRequestDto(BaseModel):
"""Request body for POST /api/ip-control/drop-connections"""
drop_by: DropBy = Field(
...,
serialization_alias="dropBy",
description="Selector for whose connections to drop",
)
target_nodes: TargetNodes = Field(
...,
serialization_alias="targetNodes",
description="Selector for which nodes to send the drop event to",
)
# ─────────────────────────────────────────────────────────────────────────────
# Drop Connections response
# ─────────────────────────────────────────────────────────────────────────────
class DropConnectionsResponseData(BaseModel):
"""Payload confirming the drop-connections event was sent"""
event_sent: bool = Field(alias="eventSent")
class DropConnectionsResponseDto(DropConnectionsResponseData):
"""Response for POST /api/ip-control/drop-connections"""
pass

View file

@ -109,8 +109,8 @@ class UpdateUserRequestDto(BaseModel):
class UserTrafficDto(BaseModel):
"""User traffic information"""
used_traffic_bytes: float = Field(alias="usedTrafficBytes")
lifetime_used_traffic_bytes: float = Field(alias="lifetimeUsedTrafficBytes")
used_traffic_bytes: int = Field(alias="usedTrafficBytes")
lifetime_used_traffic_bytes: int = Field(alias="lifetimeUsedTrafficBytes")
online_at: Optional[datetime] = Field(None, alias="onlineAt")
first_connected_at: Optional[datetime] = Field(None, alias="firstConnectedAt")
last_connected_node_uuid: Optional[UUID] = Field(None, alias="lastConnectedNodeUuid")
@ -119,6 +119,7 @@ class UserTrafficDto(BaseModel):
class UserResponseDto(BaseModel):
"""User response DTO - обновленная структура с userTraffic"""
uuid: UUID
id: int
short_uuid: str = Field(alias="shortUuid")
username: str
status: UserStatus = Field(default=UserStatus.ACTIVE)
@ -148,12 +149,12 @@ class UserResponseDto(BaseModel):
user_traffic: UserTrafficDto = Field(alias="userTraffic")
@property
def used_traffic_bytes(self) -> float:
def used_traffic_bytes(self) -> int:
"""Backward compatibility property"""
return self.user_traffic.used_traffic_bytes
@property
def lifetime_used_traffic_bytes(self) -> float:
def lifetime_used_traffic_bytes(self) -> int:
"""Backward compatibility property"""
return self.user_traffic.lifetime_used_traffic_bytes

View file

@ -37,6 +37,7 @@ class UserTrafficDto(BaseModel):
class BaseUserDto(BaseModel):
uuid: UUID
id: int
short_uuid: str
username: str
status: TUsersStatus
@ -59,10 +60,13 @@ class BaseUserDto(BaseModel):
tag: Optional[str] = None
telegram_id: Optional[int] = None
email: Optional[str] = None
external_squad_uuid: Optional[UUID] = None
hwid_device_limit: Optional[int] = None
last_triggered_threshold: int
subscription_url: str
created_at: datetime
updated_at: datetime
@ -211,6 +215,14 @@ class InfraProviderDto(BaseModel):
model_config = {"alias_generator": to_camel, "populate_by_name": True}
class WebhookNodeConfigProfileDto(BaseModel):
"""Nested config profile for node webhook events"""
active_config_profile_uuid: Optional[UUID] = None
active_inbounds: List[ConfigProfileInboundDto] = Field(default_factory=list)
model_config = {"alias_generator": to_camel, "populate_by_name": True}
class NodeDto(BaseModel):
uuid: UUID
name: str
@ -230,13 +242,15 @@ class NodeDto(BaseModel):
is_traffic_tracking_active: bool
traffic_reset_day: Optional[int] = None
traffic_limit_bytes: Optional[int] = None
traffic_used_bytes: Optional[int] = None
traffic_limit_bytes: Optional[float] = None
traffic_used_bytes: Optional[float] = None
notify_percent: Optional[int] = None
view_position: int
country_code: str
consumption_multiplier: int
consumption_multiplier: float
tags: List[str] = Field(default_factory=list)
cpu_count: Optional[int] = None
cpu_model: Optional[str] = None
@ -245,14 +259,22 @@ class NodeDto(BaseModel):
created_at: datetime
updated_at: datetime
active_config_profile_uuid: Optional[UUID] = None
active_inbounds: List[ConfigProfileInboundDto] = Field(default_factory=list)
config_profile: WebhookNodeConfigProfileDto
provider_uuid: Optional[UUID] = None
provider: Optional[InfraProviderDto] = None
model_config = {"alias_generator": to_camel, "populate_by_name": True}
# Backward-compat shims for code that used the flat fields directly
@property
def active_config_profile_uuid(self) -> Optional[UUID]:
return self.config_profile.active_config_profile_uuid
@property
def active_inbounds(self) -> List[ConfigProfileInboundDto]:
return self.config_profile.active_inbounds
class NodeEventDto(BaseModel):
event_name: TNodeEvents