2025-12-04 10:04:21 +08:00

162 lines
5.4 KiB
Python

from __future__ import annotations
import json
from typing import List, Optional, Tuple
from app.db.repositories.base import BaseRepository
from app.models.schemas.admin import AdminRoleLite, AdminUserSummary, AdminDashboardStats
class AdminRepository(BaseRepository):
def _normalize_roles(self, payload) -> List[AdminRoleLite]:
roles_payload = payload or []
if isinstance(roles_payload, str):
try:
roles_payload = json.loads(roles_payload)
except ValueError:
roles_payload = []
return [
AdminRoleLite(**role)
for role in roles_payload
if role
]
def _record_to_user(self, record) -> AdminUserSummary:
roles = self._normalize_roles(record.get("roles"))
return AdminUserSummary(
id=record["id"],
username=record["username"],
email=record["email"],
bio=record.get("bio"),
image=record.get("image"),
roles=roles,
created_at=record["created_at"],
updated_at=record["updated_at"],
)
def _build_user_filters(
self,
search: Optional[str],
role_id: Optional[int],
) -> Tuple[str, List[object]]:
clauses: List[str] = []
params: List[object] = []
if search:
placeholder = f"${len(params) + 1}"
params.append(f"%{search}%")
clauses.append(
f"(u.username ILIKE {placeholder} OR u.email ILIKE {placeholder})",
)
if role_id:
placeholder = f"${len(params) + 1}"
params.append(role_id)
clauses.append(
f"EXISTS (SELECT 1 FROM user_roles ur WHERE ur.user_id = u.id AND ur.role_id = {placeholder})",
)
if not clauses:
return "", params
return "WHERE " + " AND ".join(clauses), params
async def list_users(
self,
*,
search: Optional[str],
role_id: Optional[int],
limit: int,
offset: int,
) -> Tuple[List[AdminUserSummary], int]:
where_sql, params = self._build_user_filters(search, role_id)
base_params = list(params)
count_sql = f"SELECT COUNT(*) FROM users u {where_sql}"
total = await self.connection.fetchval(count_sql, *base_params)
list_params = list(base_params)
list_params.extend([limit, offset])
list_sql = f"""
SELECT
u.id,
u.username,
u.email,
u.bio,
u.image,
u.created_at,
u.updated_at,
COALESCE(
jsonb_agg(
DISTINCT jsonb_build_object(
'id', r.id,
'name', r.name,
'description', r.description,
'permissions', r.permissions
)
) FILTER (WHERE r.id IS NOT NULL),
'[]'::jsonb
) AS roles
FROM users u
LEFT JOIN user_roles ur ON ur.user_id = u.id
LEFT JOIN roles r ON r.id = ur.role_id
{where_sql}
GROUP BY u.id
ORDER BY u.created_at DESC
LIMIT ${len(base_params) + 1}
OFFSET ${len(base_params) + 2}
"""
rows = await self.connection.fetch(list_sql, *list_params)
return [self._record_to_user(row) for row in rows], int(total or 0)
async def get_user_summary(self, user_id: int) -> Optional[AdminUserSummary]:
sql = """
SELECT
u.id,
u.username,
u.email,
u.bio,
u.image,
u.created_at,
u.updated_at,
COALESCE(
jsonb_agg(
DISTINCT jsonb_build_object(
'id', r.id,
'name', r.name,
'description', r.description,
'permissions', r.permissions
)
) FILTER (WHERE r.id IS NOT NULL),
'[]'::jsonb
) AS roles
FROM users u
LEFT JOIN user_roles ur ON ur.user_id = u.id
LEFT JOIN roles r ON r.id = ur.role_id
WHERE u.id = $1
GROUP BY u.id
"""
record = await self.connection.fetchrow(sql, user_id)
if not record:
return None
return self._record_to_user(record)
async def get_dashboard_stats(self) -> AdminDashboardStats:
users_count = await self.connection.fetchval("SELECT COUNT(*) FROM users")
roles_count = await self.connection.fetchval("SELECT COUNT(*) FROM roles")
articles_count = await self.connection.fetchval("SELECT COUNT(*) FROM articles")
total_views = await self.connection.fetchval(
"SELECT COALESCE(SUM(views), 0) FROM articles",
)
published_today = await self.connection.fetchval(
"SELECT COUNT(*) FROM articles WHERE created_at >= (NOW() - INTERVAL '1 day')",
)
return AdminDashboardStats(
users=int(users_count or 0),
roles=int(roles_count or 0),
articles=int(articles_count or 0),
total_views=int(total_views or 0),
published_today=int(published_today or 0),
)