2025-12-04 10:04:21 +08:00

625 lines
20 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# app/db/repositories/articles.py
from typing import List, Optional, Sequence, Tuple
from asyncpg import Connection, Record
from pathlib import Path
from app.db.errors import EntityDoesNotExist
from app.db.queries.queries import queries
from app.db.repositories.base import BaseRepository
from app.db.repositories.profiles import ProfilesRepository
from app.db.repositories.tags import TagsRepository
from app.models.domain.articles import Article
from app.models.domain.users import User
AUTHOR_USERNAME_ALIAS = "author_username"
SLUG_ALIAS = "slug"
class ArticlesRepository(BaseRepository): # noqa: WPS214
def __init__(self, conn: Connection) -> None:
super().__init__(conn)
self._profiles_repo = ProfilesRepository(conn)
self._tags_repo = TagsRepository(conn)
# ===== 内部工具 =====
async def _ensure_article_flag_columns(self) -> None:
"""
给 articles 表补充置顶/推荐/权重字段,兼容旧库。
多次执行使用 IF NOT EXISTS不会抛错。
"""
await self.connection.execute(
"""
ALTER TABLE articles
ADD COLUMN IF NOT EXISTS is_top BOOLEAN NOT NULL DEFAULT FALSE,
ADD COLUMN IF NOT EXISTS is_featured BOOLEAN NOT NULL DEFAULT FALSE,
ADD COLUMN IF NOT EXISTS sort_weight INT NOT NULL DEFAULT 0;
""",
)
def _try_delete_cover_file(self, cover: Optional[str]) -> None:
"""
清空 cover 时顺带删除 static/uploads 下的旧封面文件。
"""
if not cover:
return
try:
path_str = cover.lstrip("/")
if not path_str.startswith("static/uploads/"):
return
p = Path(path_str)
if not p.is_absolute():
p = Path(".") / p
if p.is_file():
p.unlink()
except Exception:
# 可以按需加日志,这里静默
pass
# ===== CRUD =====
async def create_article( # noqa: WPS211
self,
*,
slug: str,
title: str,
description: str,
body: str,
author: User,
tags: Optional[Sequence[str]] = None,
cover: Optional[str] = None,
) -> Article:
await self._ensure_article_flag_columns()
async with self.connection.transaction():
article_row = await queries.create_new_article(
self.connection,
slug=slug,
title=title,
description=description,
body=body,
author_username=author.username,
cover=cover,
)
if tags:
await self._tags_repo.create_tags_that_dont_exist(tags=tags)
await self._link_article_with_tags(slug=slug, tags=tags)
return await self._get_article_from_db_record(
article_row=article_row,
slug=slug,
author_username=article_row[AUTHOR_USERNAME_ALIAS],
requested_user=author,
)
async def update_article( # noqa: WPS211
self,
*,
article: Article,
slug: Optional[str] = None,
title: Optional[str] = None,
body: Optional[str] = None,
description: Optional[str] = None,
cover: Optional[str] = None,
cover_provided: bool = False,
) -> Article:
"""
cover_provided:
- True 表示本次请求体里包含 cover 字段(可能是字符串/""/null
- False 表示前端没动 cover保持不变
"""
await self._ensure_article_flag_columns()
updated_article = article.copy(deep=True)
updated_article.slug = slug or updated_article.slug
updated_article.title = title or article.title
updated_article.body = body or article.body
updated_article.description = description or article.description
old_cover = article.cover
if cover_provided:
# 约定None / "" 视为清空封面
updated_article.cover = cover or None
async with self.connection.transaction():
updated_row = await queries.update_article(
self.connection,
slug=article.slug,
author_username=article.author.username,
new_slug=updated_article.slug,
new_title=updated_article.title,
new_body=updated_article.body,
new_description=updated_article.description,
new_cover=updated_article.cover,
)
updated_article.updated_at = updated_row["updated_at"]
# 如果这次真的更新了 cover并且旧值存在且发生变化则尝试删除旧文件
if cover_provided and old_cover and old_cover != updated_article.cover:
self._try_delete_cover_file(old_cover)
return updated_article
async def delete_article(self, *, article: Article) -> None:
await self._ensure_article_flag_columns()
async with self.connection.transaction():
await queries.delete_article(
self.connection,
slug=article.slug,
author_username=article.author.username,
)
if article.cover:
self._try_delete_cover_file(article.cover)
async def filter_articles( # noqa: WPS211
self,
*,
tag: Optional[str] = None,
tags: Optional[Sequence[str]] = None,
author: Optional[str] = None,
favorited: Optional[str] = None,
search: Optional[str] = None,
limit: int = 20,
offset: int = 0,
requested_user: Optional[User] = None,
tag_mode: str = "and",
) -> List[Article]:
await self._ensure_article_flag_columns()
tag_list: List[str] = []
if tags:
tag_list.extend([t.strip() for t in tags if str(t).strip()])
if tag:
tag_list.append(tag.strip())
# 去重,保留顺序
seen = set()
tag_list = [t for t in tag_list if not (t in seen or seen.add(t))]
tag_mode = (tag_mode or "and").lower()
if tag_mode not in ("and", "or"):
tag_mode = "and"
params: List[object] = []
joins: List[str] = ["LEFT JOIN users u ON u.id = a.author_id"]
where_clauses: List[str] = []
having_clause = ""
if author:
params.append(author)
where_clauses.append(f"u.username = ${len(params)}")
if favorited:
params.append(favorited)
joins.append(
f"""JOIN favorites f
ON f.article_id = a.id
AND f.user_id = (SELECT id FROM users WHERE username = ${len(params)})""",
)
if tag_list:
params.append(tag_list)
joins.append(
f"JOIN articles_to_tags att ON att.article_id = a.id AND att.tag = ANY(${len(params)})",
)
# AND 逻辑:命中全部 tag
if tag_mode == "and":
having_clause = f"HAVING COUNT(DISTINCT att.tag) >= {len(tag_list)}"
if search:
params.append(f"%{search}%")
where_clauses.append(
f"(a.title ILIKE ${len(params)} OR a.description ILIKE ${len(params)} OR a.slug ILIKE ${len(params)})",
)
where_sql = f"WHERE {' AND '.join(where_clauses)}" if where_clauses else ""
limit_idx = len(params) + 1
offset_idx = len(params) + 2
params.extend([limit, offset])
group_cols = ", ".join(
[
"a.id",
"a.slug",
"a.title",
"a.description",
"a.body",
"a.cover",
"a.views",
"a.created_at",
"a.updated_at",
"a.is_top",
"a.is_featured",
"a.sort_weight",
"u.username",
],
)
sql = f"""
SELECT
a.id,
a.slug,
a.title,
a.description,
a.body,
a.cover,
a.views,
a.created_at,
a.updated_at,
a.is_top,
a.is_featured,
a.sort_weight,
u.username AS {AUTHOR_USERNAME_ALIAS}
FROM articles a
{' '.join(joins)}
{where_sql}
GROUP BY {group_cols}
{having_clause}
ORDER BY a.is_top DESC, a.sort_weight DESC, a.updated_at DESC, a.created_at DESC
LIMIT ${limit_idx}
OFFSET ${offset_idx}
"""
articles_rows = await self.connection.fetch(sql, *params)
return [
await self._get_article_from_db_record(
article_row=article_row,
slug=article_row[SLUG_ALIAS],
author_username=article_row[AUTHOR_USERNAME_ALIAS],
requested_user=requested_user,
)
for article_row in articles_rows
]
async def list_articles_by_slugs(
self,
*,
slugs: Sequence[str],
requested_user: Optional[User] = None,
) -> List[Article]:
"""
按给定顺序批量获取文章;缺失的 slug 会被忽略。
"""
if not slugs:
return []
await self._ensure_article_flag_columns()
unique_slugs: List[str] = []
for slug in slugs:
if slug not in unique_slugs:
unique_slugs.append(slug)
rows = await self.connection.fetch(
f"""
SELECT
a.id,
a.slug,
a.title,
a.description,
a.body,
a.cover,
a.views,
a.is_top,
a.is_featured,
a.sort_weight,
a.created_at,
a.updated_at,
u.username AS {AUTHOR_USERNAME_ALIAS}
FROM articles a
LEFT JOIN users u ON u.id = a.author_id
WHERE a.slug = ANY($1::text[])
ORDER BY array_position($1::text[], a.slug)
""",
unique_slugs,
)
articles: List[Article] = []
for row in rows:
articles.append(
await self._get_article_from_db_record(
article_row=row,
slug=row[SLUG_ALIAS],
author_username=row[AUTHOR_USERNAME_ALIAS],
requested_user=requested_user,
),
)
return articles
async def get_articles_for_user_feed(
self,
*,
user: User,
limit: int = 20,
offset: int = 0,
) -> List[Article]:
await self._ensure_article_flag_columns()
articles_rows = await queries.get_articles_for_feed(
self.connection,
follower_username=user.username,
limit=limit,
offset=offset,
)
return [
await self._get_article_from_db_record(
article_row=article_row,
slug=article_row[SLUG_ALIAS],
author_username=article_row[AUTHOR_USERNAME_ALIAS],
requested_user=user,
)
for article_row in articles_rows
]
async def get_article_by_slug(
self,
*,
slug: str,
requested_user: Optional[User] = None,
) -> Article:
await self._ensure_article_flag_columns()
article_row = await queries.get_article_by_slug(self.connection, slug=slug)
if article_row:
return await self._get_article_from_db_record(
article_row=article_row,
slug=article_row[SLUG_ALIAS],
author_username=article_row[AUTHOR_USERNAME_ALIAS],
requested_user=requested_user,
)
raise EntityDoesNotExist(f"article with slug {slug} does not exist")
async def get_tags_for_article_by_slug(self, *, slug: str) -> List[str]:
tag_rows = await queries.get_tags_for_article_by_slug(
self.connection,
slug=slug,
)
return [row["tag"] for row in tag_rows]
async def get_favorites_count_for_article_by_slug(self, *, slug: str) -> int:
return (
await queries.get_favorites_count_for_article(self.connection, slug=slug)
)["favorites_count"]
async def is_article_favorited_by_user(self, *, slug: str, user: User) -> bool:
return (
await queries.is_article_in_favorites(
self.connection,
username=user.username,
slug=slug,
)
)["favorited"]
async def add_article_into_favorites(self, *, article: Article, user: User) -> None:
await queries.add_article_to_favorites(
self.connection,
username=user.username,
slug=article.slug,
)
async def remove_article_from_favorites(
self,
*,
article: Article,
user: User,
) -> None:
await queries.remove_article_from_favorites(
self.connection,
username=user.username,
slug=article.slug,
)
async def _get_article_from_db_record(
self,
*,
article_row: Record,
slug: str,
author_username: str,
requested_user: Optional[User],
) -> Article:
cover = article_row.get("cover") if "cover" in article_row else None
views = article_row.get("views", 0)
is_top = bool(article_row.get("is_top", False))
is_featured = bool(article_row.get("is_featured", False))
sort_weight = int(article_row.get("sort_weight", 0) or 0)
return Article(
id_=article_row["id"],
slug=slug,
title=article_row["title"],
description=article_row["description"],
body=article_row["body"],
cover=cover,
is_top=is_top,
is_featured=is_featured,
sort_weight=sort_weight,
views=views,
author=await self._profiles_repo.get_profile_by_username(
username=author_username,
requested_user=requested_user,
),
tags=await self.get_tags_for_article_by_slug(slug=slug),
favorites_count=await self.get_favorites_count_for_article_by_slug(
slug=slug,
),
favorited=await self.is_article_favorited_by_user(
slug=slug,
user=requested_user,
)
if requested_user
else False,
created_at=article_row["created_at"],
updated_at=article_row["updated_at"],
)
async def increment_article_views(self, *, slug: str) -> int:
result = await queries.increment_article_views(self.connection, slug=slug)
return result["views"]
async def _link_article_with_tags(self, *, slug: str, tags: Sequence[str]) -> None:
"""
把 tag 列表绑定到文章。
"""
for tag in tags:
await queries.add_tags_to_article(
self.connection,
slug=slug,
tag=tag,
)
async def list_articles_for_admin(
self,
*,
search: Optional[str] = None,
author: Optional[str] = None,
limit: int = 20,
offset: int = 0,
) -> Tuple[List[Article], int]:
await self._ensure_article_flag_columns()
clauses: List[str] = []
params: List[object] = []
if author:
placeholder = f"${len(params) + 1}"
params.append(author)
clauses.append(f"u.username = {placeholder}")
if search:
placeholder = f"${len(params) + 1}"
params.append(f"%{search}%")
clauses.append(
f"(a.title ILIKE {placeholder} OR a.slug ILIKE {placeholder} OR a.description ILIKE {placeholder})",
)
where_sql = ""
if clauses:
where_sql = "WHERE " + " AND ".join(clauses)
count_sql = f"""
SELECT COUNT(*)
FROM articles a
LEFT JOIN users u ON u.id = a.author_id
{where_sql}
"""
total = await self.connection.fetchval(count_sql, *params)
list_params = list(params)
list_params.extend([limit, offset])
list_sql = f"""
SELECT
a.id,
a.slug,
a.title,
a.description,
a.body,
a.cover,
a.views,
a.created_at,
a.updated_at,
a.is_top,
a.is_featured,
a.sort_weight,
u.username AS {AUTHOR_USERNAME_ALIAS}
FROM articles a
LEFT JOIN users u ON u.id = a.author_id
{where_sql}
ORDER BY a.is_top DESC, a.sort_weight DESC, a.updated_at DESC, a.created_at DESC
LIMIT ${len(params) + 1}
OFFSET ${len(params) + 2}
"""
rows = await self.connection.fetch(list_sql, *list_params)
articles = [
await self._get_article_from_db_record(
article_row=row,
slug=row[SLUG_ALIAS],
author_username=row[AUTHOR_USERNAME_ALIAS],
requested_user=None,
)
for row in rows
]
return articles, int(total or 0)
async def admin_update_article(
self,
*,
article: Article,
slug: Optional[str] = None,
title: Optional[str] = None,
body: Optional[str] = None,
description: Optional[str] = None,
cover: Optional[str] = None,
is_top: Optional[bool] = None,
is_featured: Optional[bool] = None,
sort_weight: Optional[int] = None,
cover_provided: bool = False,
) -> Article:
await self._ensure_article_flag_columns()
updated_article = article.copy(deep=True)
updated_article.slug = slug or updated_article.slug
updated_article.title = title or article.title
updated_article.body = body or article.body
updated_article.description = description or article.description
if is_top is not None:
updated_article.is_top = is_top
if is_featured is not None:
updated_article.is_featured = is_featured
if sort_weight is not None:
updated_article.sort_weight = sort_weight
old_cover = article.cover
if cover_provided:
updated_article.cover = cover or None
async with self.connection.transaction():
updated_row = await self.connection.fetchrow(
"""
UPDATE articles
SET slug = COALESCE($2, slug),
title = COALESCE($3, title),
body = COALESCE($4, body),
description = COALESCE($5, description),
cover = $6,
is_top = COALESCE($7, is_top),
is_featured = COALESCE($8, is_featured),
sort_weight = COALESCE($9, sort_weight)
WHERE id = $1
RETURNING updated_at
""",
article.id_,
updated_article.slug,
updated_article.title,
updated_article.body,
updated_article.description,
updated_article.cover,
updated_article.is_top,
updated_article.is_featured,
updated_article.sort_weight,
)
updated_article.updated_at = updated_row["updated_at"]
if cover_provided and old_cover and old_cover != updated_article.cover:
self._try_delete_cover_file(old_cover)
return updated_article
async def admin_delete_article(self, *, article: Article) -> None:
await self._ensure_article_flag_columns()
async with self.connection.transaction():
await self.connection.execute(
"DELETE FROM articles WHERE id = $1",
article.id_,
)
if article.cover:
self._try_delete_cover_file(article.cover)