Initial commit

This commit is contained in:
2025-11-30 12:19:31 +00:00
commit 349c21ec8a
29 changed files with 7251 additions and 0 deletions
View File
+18
View File
@@ -0,0 +1,18 @@
from fastapi import APIRouter
from auth import auth_backend, fastapi_users
from schemas import UserRead, UserCreate
router = APIRouter()
router.include_router(
fastapi_users.get_auth_router(auth_backend),
prefix="/jwt",
tags=["auth"],
)
router.include_router(
fastapi_users.get_register_router(UserRead, UserCreate),
tags=["auth"],
)
# You can include more routers from fastapi-users like reset password, verify, etc.
# router.include_router(fastapi_users.get_reset_password_router(), tags=["auth"])
# router.include_router(fastapi_users.get_verify_router(UserRead), tags=["auth"])
+167
View File
@@ -0,0 +1,167 @@
import uuid
from typing import List, Optional
from fastapi import APIRouter, Depends, HTTPException, status, Response
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select, desc
from database import get_async_session
from models import User, URLList
from schemas import URLListCreate, URLListRead, URLListUpdate
from auth import current_active_user
router = APIRouter(
prefix="/url-lists",
tags=["url-lists"],
dependencies=[Depends(current_active_user)], # Requires authenticated user
)
@router.post("/", response_model=URLListRead, status_code=status.HTTP_201_CREATED)
async def create_url_list(
url_list_in: URLListCreate,
current_user: User = Depends(current_active_user),
session: AsyncSession = Depends(get_async_session)
):
# Check if slug already exists
result = await session.execute(
select(URLList).filter(URLList.unique_slug == url_list_in.unique_slug)
)
existing_list = result.scalar_one_or_none()
if existing_list:
raise HTTPException(
status_code=status.HTTP_409_CONFLICT,
detail="A URL list with this unique slug already exists. Please choose a different one."
)
db_url_list = URLList(**url_list_in.dict(), owner_id=current_user.id)
session.add(db_url_list)
await session.commit()
await session.refresh(db_url_list)
return db_url_list
@router.get("/", response_model=List[URLListRead])
async def get_my_url_lists(
current_user: User = Depends(current_active_user),
session: AsyncSession = Depends(get_async_session)
):
result = await session.execute(
select(URLList).filter(URLList.owner_id == current_user.id)
)
return result.scalars().all()
@router.get("/{list_id}", response_model=URLListRead)
async def get_url_list_by_id(
list_id: uuid.UUID,
current_user: User = Depends(current_active_user),
session: AsyncSession = Depends(get_async_session)
):
result = await session.execute(
select(URLList).filter(URLList.id == list_id, URLList.owner_id == current_user.id)
)
url_list = result.scalar_one_or_none()
if not url_list:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="URL list not found")
return url_list
@router.put("/{list_id}", response_model=URLListRead)
async def update_url_list(
list_id: uuid.UUID,
url_list_update: URLListUpdate,
current_user: User = Depends(current_active_user),
session: AsyncSession = Depends(get_async_session)
):
result = await session.execute(
select(URLList).filter(URLList.id == list_id, URLList.owner_id == current_user.id)
)
db_url_list = result.scalar_one_or_none()
if not db_url_list:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="URL list not found")
# Check if new slug already exists and is different from current one
if url_list_update.unique_slug and url_list_update.unique_slug != db_url_list.unique_slug:
existing_slug_list_query = await session.execute(
select(URLList).filter(URLList.unique_slug == url_list_update.unique_slug)
)
if existing_slug_list_query.scalar_one_or_none():
raise HTTPException(
status_code=status.HTTP_409_CONFLICT,
detail="A URL list with this unique slug already exists. Please choose a different one."
)
for field, value in url_list_update.dict(exclude_unset=True).items():
setattr(db_url_list, field, value)
await session.commit()
await session.refresh(db_url_list)
return db_url_list
@router.delete("/{list_id}", status_code=status.HTTP_204_NO_CONTENT)
async def delete_url_list(
list_id: uuid.UUID,
current_user: User = Depends(current_active_user),
session: AsyncSession = Depends(get_async_session)
):
result = await session.execute(
select(URLList).filter(URLList.id == list_id, URLList.owner_id == current_user.id)
)
db_url_list = result.scalar_one_or_none()
if not db_url_list:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="URL list not found")
await session.delete(db_url_list)
await session.commit()
return Response(status_code=status.HTTP_204_NO_CONTENT)
# Public endpoints (no authentication required)
public_router = APIRouter(
prefix="/public-lists",
tags=["public-lists"],
)
@public_router.get("/", response_model=List[URLListRead])
async def get_public_url_lists(
session: AsyncSession = Depends(get_async_session)
):
"""Get all public URL lists, ordered by updated_at descending (newest first)"""
result = await session.execute(
select(URLList)
.filter(URLList.is_public == True)
.order_by(desc(URLList.updated_at))
)
return result.scalars().all()
@public_router.get("/search", response_model=List[URLListRead])
async def search_public_lists_by_domain(
domain: str,
session: AsyncSession = Depends(get_async_session)
):
"""Search public URL lists by domain. Returns lists containing the specified domain."""
if not domain or len(domain.strip()) == 0:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Domain parameter is required"
)
# Get all public lists
result = await session.execute(
select(URLList)
.filter(URLList.is_public == True)
.order_by(desc(URLList.updated_at))
)
all_public_lists = result.scalars().all()
# Filter locally - check if domain exists in the list's URLs
domain_normalized = domain.strip().lower().replace('https://', '').replace('http://', '')
matching_lists = []
for url_list in all_public_lists:
# Parse URLs from the list
urls = url_list.urls.split('\n')
for url in urls:
url_normalized = url.strip().lower().replace('https://', '').replace('http://', '')
# Check if domain matches (exact or subdomain match)
if domain_normalized in url_normalized or url_normalized in domain_normalized:
matching_lists.append(url_list)
break
return matching_lists
+13
View File
@@ -0,0 +1,13 @@
from fastapi import APIRouter, Depends
from auth import fastapi_users, current_superuser
from schemas import UserRead, UserUpdate
router = APIRouter(
prefix="/users",
tags=["users"],
dependencies=[Depends(current_superuser)], # Only superusers can access these routes
)
router.include_router(
fastapi_users.get_users_router(UserRead, UserUpdate),
)