mirror of
https://github.com/sunrin-ana/2025-SSF.git
synced 2026-03-09 18:40:02 +00:00
179 lines
6.4 KiB
Python
179 lines
6.4 KiB
Python
import os
|
|
import aiofiles
|
|
from PIL import Image, ImageFilter, ImageEnhance
|
|
from datetime import datetime
|
|
from starlette.datastructures import UploadFile
|
|
|
|
|
|
class ImageProcessor:
|
|
def __init__(self):
|
|
self.filter_dir = "uploads/filtered"
|
|
os.makedirs(self.filter_dir, exist_ok=True)
|
|
|
|
async def apply_filter(self, image_path: str, filter_type: str) -> str:
|
|
"""Apply filter to image and return the path to filtered image"""
|
|
|
|
# Remove leading slash if exists for local file access
|
|
local_path = image_path.lstrip("/")
|
|
|
|
if not os.path.exists(local_path):
|
|
raise ValueError(f"Image file not found: {local_path}")
|
|
|
|
try:
|
|
with Image.open(local_path) as img:
|
|
# Convert to RGB if necessary
|
|
if img.mode != "RGB":
|
|
img = img.convert("RGB")
|
|
|
|
filtered_img = self._apply_filter_effect(img, filter_type)
|
|
|
|
# Generate filtered image filename
|
|
base_name = os.path.basename(image_path)
|
|
name, ext = os.path.splitext(base_name)
|
|
filtered_filename = f"{name}_{filter_type}{ext}"
|
|
filtered_path = os.path.join(self.filter_dir, filtered_filename)
|
|
|
|
# Save filtered image
|
|
filtered_img.save(filtered_path, quality=85, optimize=True)
|
|
|
|
return f"/uploads/filtered/{filtered_filename}"
|
|
|
|
except Exception as e:
|
|
raise ValueError(f"Failed to apply filter: {str(e)}")
|
|
|
|
def _apply_filter_effect(self, img: Image.Image, filter_type: str) -> Image.Image:
|
|
"""Apply specific filter effect to image"""
|
|
if filter_type == "none":
|
|
return img
|
|
elif filter_type == "vintage":
|
|
return self._apply_vintage_filter(img)
|
|
elif filter_type == "black_white":
|
|
return self._apply_black_white_filter(img)
|
|
elif filter_type == "sepia":
|
|
return self._apply_sepia_filter(img)
|
|
elif filter_type == "blur":
|
|
return self._apply_blur_filter(img)
|
|
elif filter_type == "sharpen":
|
|
return self._apply_sharpen_filter(img)
|
|
elif filter_type == "bright":
|
|
return self._apply_brightness_filter(img)
|
|
elif filter_type == "contrast":
|
|
return self._apply_contrast_filter(img)
|
|
else:
|
|
raise ValueError(f"Unknown filter type: {filter_type}")
|
|
|
|
def _apply_vintage_filter(self, img: Image.Image) -> Image.Image:
|
|
# Reduce saturation
|
|
enhancer = ImageEnhance.Color(img)
|
|
img = enhancer.enhance(0.7)
|
|
|
|
# Add slight warm tint
|
|
r, g, b = img.split()
|
|
r = ImageEnhance.Brightness(r).enhance(1.1)
|
|
g = ImageEnhance.Brightness(g).enhance(1.05)
|
|
b = ImageEnhance.Brightness(b).enhance(0.9)
|
|
|
|
return Image.merge("RGB", (r, g, b))
|
|
|
|
def _apply_black_white_filter(self, img: Image.Image) -> Image.Image:
|
|
return img.convert("L").convert("RGB")
|
|
|
|
def _apply_sepia_filter(self, img: Image.Image) -> Image.Image:
|
|
pixels = img.load()
|
|
width, height = img.size
|
|
|
|
for py in range(height):
|
|
for px in range(width):
|
|
r, g, b = pixels[px, py]
|
|
|
|
tr = int(0.393 * r + 0.769 * g + 0.189 * b)
|
|
tg = int(0.349 * r + 0.686 * g + 0.168 * b)
|
|
tb = int(0.272 * r + 0.534 * g + 0.131 * b)
|
|
|
|
pixels[px, py] = (min(255, tr), min(255, tg), min(255, tb))
|
|
|
|
return img
|
|
|
|
def _apply_blur_filter(self, img: Image.Image) -> Image.Image:
|
|
return img.filter(ImageFilter.GaussianBlur(radius=2))
|
|
|
|
def _apply_sharpen_filter(self, img: Image.Image) -> Image.Image:
|
|
return img.filter(ImageFilter.SHARPEN)
|
|
|
|
def _apply_brightness_filter(self, img: Image.Image) -> Image.Image:
|
|
enhancer = ImageEnhance.Brightness(img)
|
|
return enhancer.enhance(1.3)
|
|
|
|
def _apply_contrast_filter(self, img: Image.Image) -> Image.Image:
|
|
enhancer = ImageEnhance.Contrast(img)
|
|
return enhancer.enhance(1.2)
|
|
|
|
async def resize_image(
|
|
self, image_path: str, max_width: int = 800, max_height: int = 800
|
|
) -> str:
|
|
"""Resize image while maintaining aspect ratio"""
|
|
|
|
local_path = image_path.lstrip("/")
|
|
|
|
if not os.path.exists(local_path):
|
|
raise ValueError(f"Image file not found: {local_path}")
|
|
|
|
try:
|
|
with Image.open(local_path) as img:
|
|
# Calculate new size maintaining aspect ratio
|
|
img.thumbnail((max_width, max_height), Image.Resampling.LANCZOS)
|
|
|
|
# Generate resized image filename
|
|
base_name = os.path.basename(image_path)
|
|
name, ext = os.path.splitext(base_name)
|
|
resized_filename = f"{name}_resized{ext}"
|
|
resized_path = os.path.join(
|
|
os.path.dirname(local_path), resized_filename
|
|
)
|
|
|
|
# Save resized image
|
|
img.save(resized_path, quality=85, optimize=True)
|
|
|
|
return f"/{resized_path}"
|
|
|
|
except Exception as e:
|
|
raise ValueError(f"Failed to resize image: {str(e)}")
|
|
|
|
async def validate_image_file(self, filename: str, file_size: int):
|
|
"""Validate image file type and size"""
|
|
|
|
allowed_extensions = {".jpg", ".jpeg", ".png", ".gif", ".bmp", ".webp"}
|
|
|
|
# Check file extension
|
|
_, ext = os.path.splitext(filename.lower())
|
|
if ext not in allowed_extensions:
|
|
raise ValueError("Unsupported file type")
|
|
|
|
if file_size > 20 * 1024 * 1024: # 20MB limit
|
|
raise ValueError("File size must be less than 20MB")
|
|
|
|
def get_safe_filename(self, filename: str) -> str:
|
|
"""Generate safe filename by removing dangerous characters"""
|
|
|
|
import re
|
|
|
|
# Keep only alphanumeric characters, dots, and hyphens
|
|
safe_name = re.sub(r"[^a-zA-Z0-9._-]", "_", filename)
|
|
|
|
# Add timestamp to avoid collisions
|
|
name, ext = os.path.splitext(safe_name)
|
|
timestamp = int(datetime.now().timestamp())
|
|
|
|
return f"{name}_{timestamp}{ext}"
|
|
|
|
async def write_file_and_get_image_path(
|
|
self, file: UploadFile, upload_dir: str
|
|
) -> str:
|
|
filename = self.get_safe_filename(file.filename)
|
|
file_path = os.path.join(upload_dir, filename)
|
|
|
|
async with aiofiles.open(file_path, "wb") as f:
|
|
content = await file.read()
|
|
await f.write(content)
|
|
|
|
return file_path
|