from __future__ import annotations import json from typing import List, Optional, Tuple from app.db.repositories.base import BaseRepository from app.models.schemas.admin import AdminRoleLite, AdminUserSummary, AdminDashboardStats class AdminRepository(BaseRepository): def _normalize_roles(self, payload) -> List[AdminRoleLite]: roles_payload = payload or [] if isinstance(roles_payload, str): try: roles_payload = json.loads(roles_payload) except ValueError: roles_payload = [] return [ AdminRoleLite(**role) for role in roles_payload if role ] def _record_to_user(self, record) -> AdminUserSummary: roles = self._normalize_roles(record.get("roles")) return AdminUserSummary( id=record["id"], username=record["username"], email=record["email"], bio=record.get("bio"), image=record.get("image"), roles=roles, created_at=record["created_at"], updated_at=record["updated_at"], ) def _build_user_filters( self, search: Optional[str], role_id: Optional[int], ) -> Tuple[str, List[object]]: clauses: List[str] = [] params: List[object] = [] if search: placeholder = f"${len(params) + 1}" params.append(f"%{search}%") clauses.append( f"(u.username ILIKE {placeholder} OR u.email ILIKE {placeholder})", ) if role_id: placeholder = f"${len(params) + 1}" params.append(role_id) clauses.append( f"EXISTS (SELECT 1 FROM user_roles ur WHERE ur.user_id = u.id AND ur.role_id = {placeholder})", ) if not clauses: return "", params return "WHERE " + " AND ".join(clauses), params async def list_users( self, *, search: Optional[str], role_id: Optional[int], limit: int, offset: int, ) -> Tuple[List[AdminUserSummary], int]: where_sql, params = self._build_user_filters(search, role_id) base_params = list(params) count_sql = f"SELECT COUNT(*) FROM users u {where_sql}" total = await self.connection.fetchval(count_sql, *base_params) list_params = list(base_params) list_params.extend([limit, offset]) list_sql = f""" SELECT u.id, u.username, u.email, u.bio, u.image, u.created_at, u.updated_at, COALESCE( jsonb_agg( DISTINCT jsonb_build_object( 'id', r.id, 'name', r.name, 'description', r.description, 'permissions', r.permissions ) ) FILTER (WHERE r.id IS NOT NULL), '[]'::jsonb ) AS roles FROM users u LEFT JOIN user_roles ur ON ur.user_id = u.id LEFT JOIN roles r ON r.id = ur.role_id {where_sql} GROUP BY u.id ORDER BY u.created_at DESC LIMIT ${len(base_params) + 1} OFFSET ${len(base_params) + 2} """ rows = await self.connection.fetch(list_sql, *list_params) return [self._record_to_user(row) for row in rows], int(total or 0) async def get_user_summary(self, user_id: int) -> Optional[AdminUserSummary]: sql = """ SELECT u.id, u.username, u.email, u.bio, u.image, u.created_at, u.updated_at, COALESCE( jsonb_agg( DISTINCT jsonb_build_object( 'id', r.id, 'name', r.name, 'description', r.description, 'permissions', r.permissions ) ) FILTER (WHERE r.id IS NOT NULL), '[]'::jsonb ) AS roles FROM users u LEFT JOIN user_roles ur ON ur.user_id = u.id LEFT JOIN roles r ON r.id = ur.role_id WHERE u.id = $1 GROUP BY u.id """ record = await self.connection.fetchrow(sql, user_id) if not record: return None return self._record_to_user(record) async def get_dashboard_stats(self) -> AdminDashboardStats: users_count = await self.connection.fetchval("SELECT COUNT(*) FROM users") roles_count = await self.connection.fetchval("SELECT COUNT(*) FROM roles") articles_count = await self.connection.fetchval("SELECT COUNT(*) FROM articles") total_views = await self.connection.fetchval( "SELECT COALESCE(SUM(views), 0) FROM articles", ) published_today = await self.connection.fetchval( "SELECT COUNT(*) FROM articles WHERE created_at >= (NOW() - INTERVAL '1 day')", ) return AdminDashboardStats( users=int(users_count or 0), roles=int(roles_count or 0), articles=int(articles_count or 0), total_views=int(total_views or 0), published_today=int(published_today or 0), )