fix: address CRITICAL security issues - permission escalation and encoding inconsistency

This commit is contained in:
qzl
2026-02-28 12:40:40 +08:00
parent 173d91086f
commit ce8cd1d31f
4 changed files with 64 additions and 17 deletions
+27
View File
@@ -8,6 +8,31 @@ from uuid import UUID
from pydantic import BaseModel, ConfigDict from pydantic import BaseModel, ConfigDict
class PermissionBits:
VIEW = 1 # 001
INVITE = 2 # 010
EDIT = 4 # 100
@classmethod
def encode(cls, view: bool, edit: bool, invite: bool) -> int:
value = 0
if view:
value |= cls.VIEW
if edit:
value |= cls.EDIT
if invite:
value |= cls.INVITE
return value
@classmethod
def decode(cls, permission: int) -> dict[str, bool]:
return {
"view": bool(permission & cls.VIEW),
"edit": bool(permission & cls.EDIT),
"invite": bool(permission & cls.INVITE),
}
class InboxMessageType(str, Enum): class InboxMessageType(str, Enum):
FRIEND_REQUEST = "friend_request" FRIEND_REQUEST = "friend_request"
CALENDAR = "calendar" CALENDAR = "calendar"
@@ -41,6 +66,8 @@ class InboxMessageListRequest(BaseModel):
class InboxMessageAcceptRequest(BaseModel): class InboxMessageAcceptRequest(BaseModel):
model_config: ClassVar[ConfigDict] = ConfigDict(extra="forbid")
permission_view: bool = True permission_view: bool = True
permission_edit: bool = False permission_edit: bool = False
permission_invite: bool = False permission_invite: bool = False
+30 -13
View File
@@ -4,6 +4,8 @@ from enum import Enum
from typing import TYPE_CHECKING from typing import TYPE_CHECKING
from uuid import UUID from uuid import UUID
import json
from fastapi import HTTPException from fastapi import HTTPException
from sqlalchemy.exc import SQLAlchemyError from sqlalchemy.exc import SQLAlchemyError
@@ -22,6 +24,7 @@ from v1.inbox_messages.schemas import (
InboxMessageResponse, InboxMessageResponse,
InboxMessageStatus, InboxMessageStatus,
InboxMessageType, InboxMessageType,
PermissionBits,
) )
if TYPE_CHECKING: if TYPE_CHECKING:
@@ -84,11 +87,23 @@ class InboxMessageService(BaseService):
status_code=400, detail="Message is not a calendar invitation" status_code=400, detail="Message is not a calendar invitation"
) )
permission = self._encode_permission(request) invited_permission = self._parse_invited_permission(message.content)
requested_permission = PermissionBits.encode(
request.permission_view,
request.permission_edit,
request.permission_invite,
)
final_permission = requested_permission & invited_permission
if final_permission == 0:
raise HTTPException(
status_code=400,
detail="No valid permissions requested (must be subset of invited permissions)",
)
subscription = ScheduleSubscription( subscription = ScheduleSubscription(
item_id=message.schedule_item_id, item_id=message.schedule_item_id,
subscriber_id=user_id, subscriber_id=user_id,
permission=permission, permission=final_permission,
status=SubscriptionStatus.ACTIVE, status=SubscriptionStatus.ACTIVE,
created_by=user_id, created_by=user_id,
) )
@@ -98,7 +113,12 @@ class InboxMessageService(BaseService):
user_id, user_id,
InboxMessageStatus.ACCEPTED.value, InboxMessageStatus.ACCEPTED.value,
) )
if updated is None:
await self._session.rollback()
raise HTTPException(status_code=404, detail="Inbox message not found")
await self._session.commit() await self._session.commit()
except HTTPException:
raise
except SQLAlchemyError: except SQLAlchemyError:
await self._session.rollback() await self._session.rollback()
logger.exception( logger.exception(
@@ -110,8 +130,6 @@ class InboxMessageService(BaseService):
status_code=503, detail="Inbox message store unavailable" status_code=503, detail="Inbox message store unavailable"
) )
if updated is None:
raise HTTPException(status_code=404, detail="Inbox message not found")
return self._to_response(updated) return self._to_response(updated)
async def dismiss_invitation(self, message_id: UUID) -> InboxMessageResponse: async def dismiss_invitation(self, message_id: UUID) -> InboxMessageResponse:
@@ -160,15 +178,14 @@ class InboxMessageService(BaseService):
created_at=message.created_at, created_at=message.created_at,
) )
def _encode_permission(self, request: InboxMessageAcceptRequest) -> int: def _parse_invited_permission(self, content: str | None) -> int:
permission = 0 if not content:
if request.permission_view: return 0
permission |= 1 try:
if request.permission_edit: data = json.loads(content)
permission |= 2 return int(data.get("permission", 0))
if request.permission_invite: except (json.JSONDecodeError, ValueError, TypeError):
permission |= 4 return 0
return permission
def _status_value(self, status: object) -> str: def _status_value(self, status: object) -> str:
if isinstance(status, Enum): if isinstance(status, Enum):
+4 -2
View File
@@ -5,7 +5,7 @@ from enum import Enum
from typing import ClassVar from typing import ClassVar
from uuid import UUID from uuid import UUID
from pydantic import BaseModel, ConfigDict, Field from pydantic import BaseModel, ConfigDict, EmailStr, Field
class AttachmentType(str, Enum): class AttachmentType(str, Enum):
@@ -99,7 +99,9 @@ class ScheduleItemListRequest(BaseModel):
class ScheduleItemShareRequest(BaseModel): class ScheduleItemShareRequest(BaseModel):
email: str = Field(..., description="Email of user to share with") model_config: ClassVar[ConfigDict] = ConfigDict(extra="forbid")
email: EmailStr = Field(..., description="Email of user to share with")
permission_view: bool = Field(True, description="Grant view permission") permission_view: bool = Field(True, description="Grant view permission")
permission_edit: bool = Field(False, description="Grant edit permission") permission_edit: bool = Field(False, description="Grant edit permission")
permission_invite: bool = Field(False, description="Grant invite permission") permission_invite: bool = Field(False, description="Grant invite permission")
@@ -23,6 +23,7 @@ def _build_message(
status: InboxMessageModelStatus = InboxMessageModelStatus.PENDING, status: InboxMessageModelStatus = InboxMessageModelStatus.PENDING,
message_type: InboxMessageModelType = InboxMessageModelType.CALENDAR, message_type: InboxMessageModelType = InboxMessageModelType.CALENDAR,
schedule_item_id: UUID | None = None, schedule_item_id: UUID | None = None,
content: str = '{"permission": 7}',
) -> InboxMessage: ) -> InboxMessage:
message = MagicMock(spec=InboxMessage) message = MagicMock(spec=InboxMessage)
message.id = message_id message.id = message_id
@@ -30,7 +31,7 @@ def _build_message(
message.sender_id = uuid4() message.sender_id = uuid4()
message.message_type = message_type message.message_type = message_type
message.schedule_item_id = schedule_item_id message.schedule_item_id = schedule_item_id
message.content = "calendar invite" message.content = content
message.is_read = False message.is_read = False
message.status = status message.status = status
message.created_at = datetime(2026, 2, 28, 10, 0, 0, tzinfo=timezone.utc) message.created_at = datetime(2026, 2, 28, 10, 0, 0, tzinfo=timezone.utc)
@@ -107,7 +108,7 @@ async def test_accept_invitation_creates_subscription() -> None:
assert isinstance(subscription, ScheduleSubscription) assert isinstance(subscription, ScheduleSubscription)
assert subscription.item_id == item_id assert subscription.item_id == item_id
assert subscription.subscriber_id == user_id assert subscription.subscriber_id == user_id
assert subscription.permission == 3 assert subscription.permission == 5 # view(1) + edit(4) = 5
assert subscription.status == SubscriptionStatus.ACTIVE assert subscription.status == SubscriptionStatus.ACTIVE
repo.update_status.assert_awaited_once_with(message_id, user_id, "accepted") repo.update_status.assert_awaited_once_with(message_id, user_id, "accepted")
session.commit.assert_awaited_once() session.commit.assert_awaited_once()