diff --git a/app/main.py b/app/main.py index 8bf341f..fd84bb1 100644 --- a/app/main.py +++ b/app/main.py @@ -5,10 +5,9 @@ import threading import nest_asyncio nest_asyncio.apply() import captcha_state -from invoice_fetcher import main +from invoice_fetcher import main as fetch_main from dotenv import load_dotenv from fastapi import FastAPI, Request, HTTPException -from fastapi.staticfiles import StaticFiles from linebot.v3 import WebhookHandler from linebot.v3.messaging import ( Configuration, @@ -19,10 +18,9 @@ from linebot.v3.messaging import ( ) from linebot.v3.webhooks import MessageEvent, TextMessageContent, FollowEvent from linebot.v3.exceptions import InvalidSignatureError -from sqlalchemy import create_engine, Column, Integer, String, Float, DateTime, text +from sqlalchemy import create_engine, Column, Integer, String, Numeric, Date, DateTime, BigInteger, text from sqlalchemy.orm import declarative_base, sessionmaker -from datetime import datetime - +from datetime import datetime, date load_dotenv() @@ -33,22 +31,198 @@ configuration = Configuration(access_token=os.getenv("LINE_CHANNEL_ACCESS_TOKEN" handler = WebhookHandler(os.getenv("LINE_CHANNEL_SECRET")) # DB 設定 -# DATABASE_URL = os.getenv("DATABASE_URL") -# engine = create_engine(DATABASE_URL) -# SessionLocal = sessionmaker(bind=engine) -# Base = declarative_base() +DATABASE_URL = os.getenv("DATABASE_URL") +engine = create_engine(DATABASE_URL) +SessionLocal = sessionmaker(bind=engine) +Base = declarative_base() -# 資料表 -# class Transaction(Base): -# __tablename__ = "transactions" -# id = Column(Integer, primary_key=True, index=True) -# user_id = Column(String) -# category = Column(String) -# amount = Column(Float) -# note = Column(String, nullable=True) -# created_at = Column(DateTime, default=datetime.now) +# Models +class User(Base): + __tablename__ = "users" + id = Column(BigInteger, primary_key=True) + name = Column(String) + email = Column(String, unique=True) -# Base.metadata.create_all(bind=engine) +class LineUser(Base): + __tablename__ = "line_users" + id = Column(BigInteger, primary_key=True) + line_user_id = Column(String, unique=True) + user_id = Column(BigInteger) + created_at = Column(DateTime, default=datetime.now) + +class Category(Base): + __tablename__ = "categories" + id = Column(BigInteger, primary_key=True) + user_id = Column(BigInteger) + parent_id = Column(BigInteger, nullable=True) + name = Column(String) + color = Column(String, default='#3B82F6') + icon = Column(String, nullable=True) + +class CategoryRule(Base): + __tablename__ = "category_rules" + id = Column(BigInteger, primary_key=True) + user_id = Column(BigInteger) + category_id = Column(BigInteger) + keyword = Column(String) + +class Expense(Base): + __tablename__ = "expenses" + id = Column(BigInteger, primary_key=True) + user_id = Column(BigInteger) + category_id = Column(BigInteger, nullable=True) + amount = Column(Numeric(10, 2)) + note = Column(String, nullable=True) + invoice_number = Column(String, nullable=True) + seller_name = Column(String, nullable=True) + item_name = Column(String, nullable=True) + date = Column(Date) + created_at = Column(DateTime, default=datetime.now) + updated_at = Column(DateTime, default=datetime.now) + +# 記帳模板 +EXPENSE_TEMPLATE = ( + "請填寫以下記帳資料後傳回:\n\n" + "品項: \n" + "類別: \n" + "金額: \n" + "店家: \n" + "備註: " +) + +def get_or_create_user(db, line_user_id: str) -> int: + line_user = db.query(LineUser).filter( + LineUser.line_user_id == line_user_id + ).first() + if line_user: + return line_user.user_id + + new_user = User( + name="LINE用戶", + email=f"{line_user_id}@line.user" + ) + db.add(new_user) + db.flush() + + default_categories = [ + ("餐飲", "#F97316", "🍽"), + ("交通", "#3B82F6", "🚗"), + ("購物", "#8B5CF6", "🛍"), + ("娛樂", "#EC4899", "🎮"), + ("醫療", "#10B981", "🏥"), + ("其他", "#6B7280", "📦"), + ] + parent_ids = {} + for name, color, icon in default_categories: + cat = Category(user_id=new_user.id, name=name, color=color, icon=icon) + db.add(cat) + db.flush() + parent_ids[name] = cat.id + + default_rules = { + "餐飲": ["早餐", "午餐", "晚餐", "飲料", "便當", "餐飲"], + "交通": ["捷運", "公車", "計程車", "油錢", "停車", "交通"], + "購物": ["衣服", "3C", "日用品", "購物"], + "娛樂": ["電影", "遊戲", "旅遊", "娛樂"], + "醫療": ["藥局", "診所", "醫院", "醫療"], + } + for cat_name, keywords in default_rules.items(): + for kw in keywords: + db.add(CategoryRule( + user_id=new_user.id, + category_id=parent_ids[cat_name], + keyword=kw + )) + + db.add(LineUser(line_user_id=line_user_id, user_id=new_user.id)) + db.commit() + return new_user.id + +def resolve_category(db, user_id: int, category_input: str) -> tuple[int, int | None]: + """ + 回傳 (category_id, subcategory_id) + 先查 category_rules keyword + 找不到就新增 subcategory 到「其他」 + """ + # 查 rule + rule = db.query(CategoryRule).filter( + CategoryRule.user_id == user_id, + CategoryRule.keyword == category_input + ).first() + if rule: + return rule.category_id, None + + # 查是否是大類名稱 + parent = db.query(Category).filter( + Category.user_id == user_id, + Category.name == category_input, + Category.parent_id == None + ).first() + if parent: + return parent.id, None + + # 查是否是已存在的小類 + sub = db.query(Category).filter( + Category.user_id == user_id, + Category.name == category_input, + Category.parent_id != None + ).first() + if sub: + return sub.parent_id, sub.id + + # 找不到 → 新增 subcategory 到「其他」 + other = db.query(Category).filter( + Category.user_id == user_id, + Category.name == "其他", + Category.parent_id == None + ).first() + parent_id = other.id if other else None + + new_sub = Category( + user_id=user_id, + parent_id=parent_id, + name=category_input, + color="#6B7280", + ) + db.add(new_sub) + db.flush() + + db.add(CategoryRule( + user_id=user_id, + category_id=new_sub.id, + keyword=category_input + )) + + return parent_id, new_sub.id + +def parse_multiline_expense(text: str) -> dict | None: + """解析多行記帳格式,欄位不限順序""" + fields = {} + field_map = { + "品項": "item_name", + "類別": "category", + "金額": "amount", + "店家": "seller_name", + "備註": "note", + } + for line in text.strip().splitlines(): + for key, field in field_map.items(): + if line.startswith(f"{key}:") or line.startswith(f"{key}:"): + value = re.split(r"[::]", line, 1)[-1].strip() + if value: + fields[field] = value + break + + required = ["item_name", "category", "amount"] + if not all(k in fields for k in required): + return None + + try: + fields["amount"] = float(re.sub(r"[^\d.]", "", fields["amount"])) + except ValueError: + return None + + return fields # Webhook endpoint @app.post("/webhook") @@ -62,11 +236,10 @@ async def webhook(request: Request): return "OK" def run_fetch_in_thread(): - # 開一個全新的 event loop 跑 Playwright loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) try: - loop.run_until_complete(main()) + loop.run_until_complete(fetch_main()) finally: loop.close() @@ -75,18 +248,19 @@ async def fetch_invoices(): print("🚀 開始抓取發票...") thread = threading.Thread(target=run_fetch_in_thread) thread.start() - return {"status": "started"} # 立刻回傳,不等爬蟲 + return {"status": "started"} @handler.add(MessageEvent, message=TextMessageContent) def handle_message(event): - user_id = event.source.user_id - text = event.message.text.strip() - if text.isdigit() and captcha_state.captcha_event and not captcha_state.captcha_event.is_set(): - captcha_state.captcha_answer = text - captcha_state.captcha_event.set() # 通知爬蟲 + line_user_id = event.source.user_id + msg = event.message.text.strip() + + if msg.isdigit() and len(msg) == 5 and captcha_state.captcha_event and not captcha_state.captcha_event.is_set(): + captcha_state.captcha_answer = msg + captcha_state.captcha_event.set() reply = "✅ 驗證碼已送出!" else: - reply = parse_and_save(user_id, text) + reply = handle_text(line_user_id, msg) with ApiClient(configuration) as api_client: line_bot_api = MessagingApi(api_client) @@ -106,142 +280,155 @@ def handle_follow(event): reply_token=event.reply_token, messages=[TextMessage(text=( "👋 歡迎使用 Myfinance 記帳 Bot!\n\n" - "📝 記帳格式:\n" - "類別 金額\n" - "類別 金額 備註\n\n" - "範例:\n" - "早餐 80\n" - "午餐 120 便當\n" - "交通 50 捷運\n\n" - "📊 查詢指令:\n" - "查今天 → 今日明細\n" - "查本月 → 本月統計\n\n" + "📝 點下方「記帳」開始記帳\n" + "📊 點「查今天」或「查本月」查詢\n\n" "開始記帳吧!💪" ))] ) ) -def parse_and_save(user_id: str, text: str) -> str: - # 查詢指令 +def handle_text(line_user_id: str, text: str) -> str: if text == "查今天": - return query_today(user_id) + return query_today(line_user_id) if text == "查本月": - return query_month(user_id) - if text == "記帳說明": - return ( - "📝 記帳格式:\n" - "類別 金額\n" - "類別 金額 備註\n\n" - "範例:\n" - "早餐 80\n" - "午餐 120 便當\n" - "交通 50 捷運\n\n" - "輸入後 Bot 會自動記錄 ✅" - ) - - # 刪除指令:刪除 1 或 刪除 早餐 + return query_month(line_user_id) + if text in ("記帳", "記帳說明"): + return EXPENSE_TEMPLATE if text.startswith("刪除 "): - target = text[3:].strip() - return delete_transaction(user_id, target) + return delete_expense(line_user_id, text[3:].strip()) + # 多行記帳格式 + if "\n" in text: + fields = parse_multiline_expense(text) + if fields: + return save_expense(line_user_id, fields) + return ( + "格式不正確 😅\n\n" + "必填欄位:品項、類別、金額\n" + "點下方「記帳」取得模板!" + ) - if re.match(r"^\d{5}$", text): - return f"接收到驗證碼{text}" + return ( + "點下方「記帳」取得記帳模板\n" + "或輸入「查今天」/「查本月」查詢" + ) - return "格式錯誤 😅\n記帳請輸入:類別 金額\n例如:早餐 80\n\n查詢請輸入:查今天 / 查本月" - -def save_transaction(user_id, category, amount, note): - print(f"記錄交易(模擬):{user_id} {category} {amount} {note}") - # db = SessionLocal() - # try: - # db.add(Transaction(user_id=user_id, category=category, amount=amount, note=note)) - # db.commit() - # finally: - # db.close() - -def delete_transaction(user_id: str, target: str) -> str: - # db = SessionLocal() +def save_expense(line_user_id: str, fields: dict) -> str: + db = SessionLocal() try: - today = datetime.now().date() - # rows = db.query(Transaction).filter( - # Transaction.user_id == user_id, - # text("DATE(created_at) = :today") - # ).params(today=today).all() + user_id = get_or_create_user(db, line_user_id) + category_id, subcategory_id = resolve_category(db, user_id, fields["category"]) - # if not rows: - # return "今天還沒有記錄 📭" + db.add(Expense( + user_id=user_id, + category_id=subcategory_id or category_id, + amount=fields["amount"], + note=fields.get("note"), + seller_name=fields.get("seller_name"), + item_name=fields["item_name"], + date=date.today(), + )) + db.commit() - # 用編號刪除 - # if target.isdigit(): - # idx = int(target) - 1 - # if idx < 0 or idx >= len(rows): - # return f"沒有第 {target} 筆記錄,請先輸入「查今天」確認編號" - # row = rows[idx] - # db.delete(row) - # db.commit() - # return f"✅ 已刪除:{row.category} ${row.amount:.0f}" - - # 用類別刪除(刪最後一筆) - # matched = [r for r in rows if r.category == target] - # if not matched: - # return f"今天沒有「{target}」的記錄" - # row = matched[-1] - # db.delete(row) - # db.commit() - # return f"✅ 已刪除:{row.category} ${row.amount:.0f}" - return f"已刪除(模擬)" - - # finally: - # db.close() + reply = ( + f"✅ 已記錄!\n" + f"品項:{fields['item_name']}\n" + f"類別:{fields['category']}\n" + f"金額:${fields['amount']:.0f}\n" + ) + if fields.get("seller_name"): + reply += f"店家:{fields['seller_name']}\n" + if fields.get("note"): + reply += f"備註:{fields['note']}\n" + return reply.strip() except Exception as e: + db.rollback() + print("❌ 儲存失敗:", e) + return "儲存失敗,請稍後再試" + finally: + db.close() + +def delete_expense(line_user_id: str, target: str) -> str: + db = SessionLocal() + try: + user_id = get_or_create_user(db, line_user_id) + rows = db.query(Expense).filter( + Expense.user_id == user_id, + text("DATE(date) = :today") + ).params(today=date.today()).all() + + if not rows: + return "今天還沒有記錄 📭" + + if target.isdigit(): + idx = int(target) - 1 + if idx < 0 or idx >= len(rows): + return f"沒有第 {target} 筆記錄,請先輸入「查今天」確認編號" + row = rows[idx] + db.delete(row) + db.commit() + return f"✅ 已刪除:{row.item_name} ${float(row.amount):.0f}" + + matched = [r for r in rows if r.item_name == target] + if not matched: + return f"今天沒有「{target}」的記錄" + row = matched[-1] + db.delete(row) + db.commit() + return f"✅ 已刪除:{row.item_name} ${float(row.amount):.0f}" + except Exception as e: + db.rollback() print("❌ 刪除失敗:", e) return "刪除失敗,請稍後再試" + finally: + db.close() -def query_today(user_id): - # db = SessionLocal() +def query_today(line_user_id: str) -> str: + db = SessionLocal() try: - today = datetime.now().date() - # rows = db.query(Transaction).filter( - # Transaction.user_id == user_id, - # text("DATE(created_at) = :today") - # ).params(today=today).all() - # if not rows: - # return "今天還沒有記錄 📭" - # total = sum(r.amount for r in rows) - # lines = [ - # f"{i+1}. {r.category} ${r.amount:.0f}" + (f"({r.note})" if r.note else "") - # for i, r in enumerate(rows) - # ] - # return "📋 今日記錄:\n" + "\n".join(lines) + f"\n\n💰 合計:${total:.0f}\n\n🗑 刪除請輸入:刪除 編號\n例如:刪除 1" - print(f"查詢今日記錄(模擬)") - return "📋 今日記錄:\n1. 早餐 $80\n2. 午餐 $120 便當\n3. 交通 $50 捷運\n\n💰 合計:$250\n\n🗑 刪除請輸入:刪除 編號\n例如:刪除 1" - # finally: - # db.close() + user_id = get_or_create_user(db, line_user_id) + rows = db.query(Expense).filter( + Expense.user_id == user_id, + text("DATE(date) = :today") + ).params(today=date.today()).all() + + if not rows: + return "今天還沒有記錄 📭" + + total = sum(float(r.amount) for r in rows) + lines = [ + f"{i+1}. {r.item_name} ${float(r.amount):.0f}" + (f"({r.note})" if r.note else "") + for i, r in enumerate(rows) + ] + return "📋 今日記錄:\n" + "\n".join(lines) + f"\n\n💰 合計:${total:.0f}\n\n🗑 刪除請輸入:刪除 編號\n例如:刪除 1" except Exception as e: print("❌ 查詢失敗:", e) return "查詢失敗,請稍後再試" + finally: + db.close() -def query_month(user_id): - # db = SessionLocal() +def query_month(line_user_id: str) -> str: + db = SessionLocal() try: + user_id = get_or_create_user(db, line_user_id) now = datetime.now() - # rows = db.query(Transaction).filter( - # Transaction.user_id == user_id, - # text("EXTRACT(YEAR FROM created_at) = :year AND EXTRACT(MONTH FROM created_at) = :month") - # ).params(year=now.year, month=now.month).all() - # if not rows: - # return "本月還沒有記錄 📭" - # total = sum(r.amount for r in rows) - # 依類別統計 - # summary = {} - # for r in rows: - # summary[r.category] = summary.get(r.category, 0) + r.amount - # lines = [f"{cat}:${amt:.0f}" for cat, amt in sorted(summary.items(), key=lambda x: -x[1])] - # return f"📊 本月統計({now.month}月):\n" + "\n".join(lines) + f"\n\n💰 總計:${total:.0f}" - print(f"查詢本月記錄(模擬)") - return f"📊 本月統計({now.month}月):\n早餐:$800\n午餐:$1200\n交通:$500\n\n💰 總計:$2500" - # finally: - # db.close() + rows = db.query(Expense).filter( + Expense.user_id == user_id, + text("EXTRACT(YEAR FROM date) = :year AND EXTRACT(MONTH FROM date) = :month") + ).params(year=now.year, month=now.month).all() + + if not rows: + return "本月還沒有記錄 📭" + + total = sum(float(r.amount) for r in rows) + summary = {} + for r in rows: + key = r.item_name or "其他" + summary[key] = summary.get(key, 0) + float(r.amount) + lines = [f"{cat}:${amt:.0f}" for cat, amt in sorted(summary.items(), key=lambda x: -x[1])] + return f"📊 本月統計({now.month}月):\n" + "\n".join(lines) + f"\n\n💰 總計:${total:.0f}" except Exception as e: print("❌ 查詢失敗:", e) - return "查詢失敗,請稍後再試" \ No newline at end of file + return "查詢失敗,請稍後再試" + finally: + db.close() \ No newline at end of file