import re from datetime import datetime, date from sqlalchemy.orm import Session from sqlalchemy import text from app.db.models import User, LineUser, Category, CategoryRule, Expense from app.db.session import SessionLocal EXPENSE_TEMPLATE = ( "請填寫以下記帳資料後傳回:\n\n" "品項: \n" "類別: \n" "金額: \n" "店家: \n" "備註: " ) DEFAULT_CATEGORIES = [ ("餐飲", "#F97316", "🍽"), ("交通", "#3B82F6", "🚗"), ("購物", "#8B5CF6", "🛍"), ("娛樂", "#EC4899", "🎮"), ("醫療", "#10B981", "🏥"), ("其他", "#6B7280", "📦"), ] DEFAULT_RULES = { "餐飲": ["早餐", "午餐", "晚餐", "飲料", "便當", "餐飲"], "交通": ["捷運", "公車", "計程車", "油錢", "停車", "交通"], "購物": ["衣服", "3C", "日用品", "購物"], "娛樂": ["電影", "遊戲", "旅遊", "娛樂"], "醫療": ["藥局", "診所", "醫院", "醫療"], } def get_or_create_user(db: Session, line_user_id: str) -> int: line_user = db.query(LineUser).filter( LineUser.line_user_id == line_user_id ).first() if line_user: return line_user.user_id new_user = User(name="LINE用戶", email=f"{line_user_id}@line.user") db.add(new_user) db.flush() parent_ids = {} for name, color, icon in DEFAULT_CATEGORIES: cat = Category(user_id=new_user.id, name=name, color=color, icon=icon) db.add(cat) db.flush() parent_ids[name] = cat.id for cat_name, keywords in DEFAULT_RULES.items(): for kw in keywords: db.add(CategoryRule( user_id=new_user.id, category_id=parent_ids[cat_name], keyword=kw )) db.add(LineUser(line_user_id=line_user_id, user_id=new_user.id)) db.commit() return new_user.id def resolve_category(db: Session, user_id: int, category_input: str) -> tuple[int, int | None]: """回傳 (category_id, subcategory_id)""" # 查 rule rule = db.query(CategoryRule).filter( CategoryRule.user_id == user_id, CategoryRule.keyword == category_input ).first() if rule: return rule.category_id, None # 查是否是大類 parent = db.query(Category).filter( Category.user_id == user_id, Category.name == category_input, Category.parent_id == None ).first() if parent: return parent.id, None # 查是否是已存在小類 sub = db.query(Category).filter( Category.user_id == user_id, Category.name == category_input, Category.parent_id != None ).first() if sub: return sub.parent_id, sub.id # 找不到 → 新增 subcategory 到「其他」 other = db.query(Category).filter( Category.user_id == user_id, Category.name == "其他", Category.parent_id == None ).first() parent_id = other.id if other else None new_sub = Category(user_id=user_id, parent_id=parent_id, name=category_input, color="#6B7280") db.add(new_sub) db.flush() db.add(CategoryRule(user_id=user_id, category_id=new_sub.id, keyword=category_input)) return parent_id, new_sub.id def parse_multiline_expense(text: str) -> dict | None: fields = {} field_map = { "品項": "item_name", "類別": "category", "金額": "amount", "店家": "seller_name", "備註": "note", } for line in text.strip().splitlines(): for key, field in field_map.items(): if line.startswith(f"{key}:") or line.startswith(f"{key}:"): value = re.split(r"[::]", line, 1)[-1].strip() if value: fields[field] = value break if not all(k in fields for k in ["item_name", "category", "amount"]): return None try: fields["amount"] = float(re.sub(r"[^\d.]", "", fields["amount"])) except ValueError: return None return fields def save_expense(line_user_id: str, fields: dict) -> str: db = SessionLocal() try: user_id = get_or_create_user(db, line_user_id) category_id, subcategory_id = resolve_category(db, user_id, fields["category"]) db.add(Expense( user_id=user_id, category_id=subcategory_id or category_id, amount=fields["amount"], note=fields.get("note"), seller_name=fields.get("seller_name"), item_name=fields["item_name"], date=date.today(), )) db.commit() reply = ( f"✅ 已記錄!\n" f"品項:{fields['item_name']}\n" f"類別:{fields['category']}\n" f"金額:${fields['amount']:.0f}\n" ) if fields.get("seller_name"): reply += f"店家:{fields['seller_name']}\n" if fields.get("note"): reply += f"備註:{fields['note']}\n" return reply.strip() except Exception as e: db.rollback() print("❌ 儲存失敗:", e) return "儲存失敗,請稍後再試" finally: db.close() def delete_expense(line_user_id: str, target: str) -> str: db = SessionLocal() try: user_id = get_or_create_user(db, line_user_id) rows = db.query(Expense).filter( Expense.user_id == user_id, text("DATE(date) = :today") ).params(today=date.today()).all() if not rows: return "今天還沒有記錄 📭" if target.isdigit(): idx = int(target) - 1 if idx < 0 or idx >= len(rows): return f"沒有第 {target} 筆記錄,請先輸入「查今天」確認編號" row = rows[idx] db.delete(row) db.commit() return f"✅ 已刪除:{row.item_name} ${float(row.amount):.0f}" matched = [r for r in rows if r.item_name == target] if not matched: return f"今天沒有「{target}」的記錄" row = matched[-1] db.delete(row) db.commit() return f"✅ 已刪除:{row.item_name} ${float(row.amount):.0f}" except Exception as e: db.rollback() print("❌ 刪除失敗:", e) return "刪除失敗,請稍後再試" finally: db.close() def query_today(line_user_id: str) -> str: db = SessionLocal() try: user_id = get_or_create_user(db, line_user_id) rows = db.query(Expense).filter( Expense.user_id == user_id, text("DATE(date) = :today") ).params(today=date.today()).all() if not rows: return "今天還沒有記錄 📭" total = sum(float(r.amount) for r in rows) lines = [ f"{i+1}. {r.item_name} ${float(r.amount):.0f}" + (f"({r.note})" if r.note else "") for i, r in enumerate(rows) ] return "📋 今日記錄:\n" + "\n".join(lines) + f"\n\n💰 合計:${total:.0f}\n\n🗑 刪除請輸入:刪除 編號\n例如:刪除 1" except Exception as e: print("❌ 查詢失敗:", e) return "查詢失敗,請稍後再試" finally: db.close() def query_month(line_user_id: str) -> str: db = SessionLocal() try: user_id = get_or_create_user(db, line_user_id) now = datetime.now() rows = db.query(Expense).filter( Expense.user_id == user_id, text("EXTRACT(YEAR FROM date) = :year AND EXTRACT(MONTH FROM date) = :month") ).params(year=now.year, month=now.month).all() if not rows: return "本月還沒有記錄 📭" total = sum(float(r.amount) for r in rows) summary = {} for r in rows: key = r.item_name or "其他" summary[key] = summary.get(key, 0) + float(r.amount) lines = [f"{cat}:${amt:.0f}" for cat, amt in sorted(summary.items(), key=lambda x: -x[1])] return f"📊 本月統計({now.month}月):\n" + "\n".join(lines) + f"\n\n💰 總計:${total:.0f}" except Exception as e: print("❌ 查詢失敗:", e) return "查詢失敗,請稍後再試" finally: db.close()