Files
linebot_finance/app/Linebot_handler/Expense.py
henry4682 d0a7f92774
All checks were successful
Oracle-Deploy / redeploy (push) Successful in 30s
feat: linebot
1. show error on line
2026-03-23 15:49:30 +08:00

261 lines
8.2 KiB
Python
Executable File
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import re
from datetime import datetime, date
from sqlalchemy.orm import Session
from sqlalchemy import text
from DB.Models import User, LineUser, Category, CategoryRule, Expense
from DB.Session import SessionLocal
EXPENSE_TEMPLATE = (
"請填寫以下記帳資料後傳回:\n\n"
"品項: \n"
"類別: \n"
"金額: \n"
"店家: \n"
"備註: "
)
DEFAULT_CATEGORIES = [
("餐飲", "#F97316", "🍽"),
("交通", "#3B82F6", "🚗"),
("購物", "#8B5CF6", "🛍"),
("娛樂", "#EC4899", "🎮"),
("醫療", "#10B981", "🏥"),
("其他", "#6B7280", "📦"),
]
DEFAULT_RULES = {
"餐飲": ["早餐", "午餐", "晚餐", "飲料", "便當", "餐飲"],
"交通": ["捷運", "公車", "計程車", "油錢", "停車", "交通"],
"購物": ["衣服", "3C", "日用品", "購物"],
"娛樂": ["電影", "遊戲", "旅遊", "娛樂"],
"醫療": ["藥局", "診所", "醫院", "醫療"],
}
def get_or_create_user(db: Session, line_user_id: str) -> int:
line_user = db.query(LineUser).filter(
LineUser.line_user_id == line_user_id
).first()
if line_user:
return line_user.user_id
new_user = User(name="LINE用戶", email=f"{line_user_id}@line.user")
db.add(new_user)
db.flush()
parent_ids = {}
for name, color, icon in DEFAULT_CATEGORIES:
cat = Category(user_id=new_user.id, name=name, color=color, icon=icon)
db.add(cat)
db.flush()
parent_ids[name] = cat.id
for cat_name, keywords in DEFAULT_RULES.items():
for kw in keywords:
db.add(CategoryRule(
user_id=new_user.id,
category_id=parent_ids[cat_name],
keyword=kw
))
db.add(LineUser(line_user_id=line_user_id, user_id=new_user.id))
db.commit()
return new_user.id
def resolve_category(db: Session, user_id: int, category_input: str) -> tuple[int, int | None]:
"""回傳 (category_id, subcategory_id)"""
# 查 rule
rule = db.query(CategoryRule).filter(
CategoryRule.user_id == user_id,
CategoryRule.keyword == category_input
).first()
if rule:
return rule.category_id, None
# 查是否是大類
parent = db.query(Category).filter(
Category.user_id == user_id,
Category.name == category_input,
Category.parent_id == None
).first()
if parent:
return parent.id, None
# 查是否是已存在小類
sub = db.query(Category).filter(
Category.user_id == user_id,
Category.name == category_input,
Category.parent_id != None
).first()
if sub:
return sub.parent_id, sub.id
# 找不到 → 新增 subcategory 到「其他」
other = db.query(Category).filter(
Category.user_id == user_id,
Category.name == "其他",
Category.parent_id == None
).first()
parent_id = other.id if other else None
new_sub = Category(user_id=user_id, parent_id=parent_id, name=category_input, color="#6B7280")
db.add(new_sub)
db.flush()
db.add(CategoryRule(user_id=user_id, category_id=new_sub.id, keyword=category_input))
return parent_id, new_sub.id
def parse_multiline_expense(text: str) -> dict | None:
fields = {}
field_map = {
"品項": "item_name",
"類別": "category",
"金額": "amount",
"店家": "seller_name",
"備註": "note",
}
for line in text.strip().splitlines():
for key, field in field_map.items():
if line.startswith(f"{key}:") or line.startswith(f"{key}"):
value = re.split(r"[:]", line, 1)[-1].strip()
if value:
fields[field] = value
break
if not all(k in fields for k in ["item_name", "category", "amount"]):
return None
try:
fields["amount"] = float(re.sub(r"[^\d.]", "", fields["amount"]))
except ValueError:
return None
return fields
def save_expense(line_user_id: str, fields: dict) -> str:
db = SessionLocal()
try:
user_id = get_or_create_user(db, line_user_id)
category_id, subcategory_id = resolve_category(db, user_id, fields["category"])
db.add(Expense(
user_id=user_id,
category_id=subcategory_id or category_id,
amount=fields["amount"],
note=fields.get("note"),
seller_name=fields.get("seller_name"),
item_name=fields["item_name"],
date=date.today(),
))
db.commit()
reply = (
f"✅ 已記錄!\n"
f"品項:{fields['item_name']}\n"
f"類別:{fields['category']}\n"
f"金額:${fields['amount']:.0f}\n"
)
if fields.get("seller_name"):
reply += f"店家:{fields['seller_name']}\n"
if fields.get("note"):
reply += f"備註:{fields['note']}\n"
return reply.strip()
except Exception as e:
db.rollback()
import traceback
error_detail = traceback.format_exc() # 抓取詳細錯誤堆疊
print("❌ 儲存失敗:", error_detail)
return f"儲存失敗!錯誤訊息:\n{str(e)}" # 暫時把錯誤回傳到 Line
finally:
db.close()
def delete_expense(line_user_id: str, target: str) -> str:
db = SessionLocal()
try:
user_id = get_or_create_user(db, line_user_id)
rows = db.query(Expense).filter(
Expense.user_id == user_id,
text("DATE(date) = :today")
).params(today=date.today()).all()
if not rows:
return "今天還沒有記錄 📭"
if target.isdigit():
idx = int(target) - 1
if idx < 0 or idx >= len(rows):
return f"沒有第 {target} 筆記錄,請先輸入「查今天」確認編號"
row = rows[idx]
db.delete(row)
db.commit()
return f"✅ 已刪除:{row.item_name} ${float(row.amount):.0f}"
matched = [r for r in rows if r.item_name == target]
if not matched:
return f"今天沒有「{target}」的記錄"
row = matched[-1]
db.delete(row)
db.commit()
return f"✅ 已刪除:{row.item_name} ${float(row.amount):.0f}"
except Exception as e:
db.rollback()
print("❌ 刪除失敗:", e)
return "刪除失敗,請稍後再試"
finally:
db.close()
def query_today(line_user_id: str) -> str:
db = SessionLocal()
try:
user_id = get_or_create_user(db, line_user_id)
rows = db.query(Expense).filter(
Expense.user_id == user_id,
text("DATE(date) = :today")
).params(today=date.today()).all()
if not rows:
return "今天還沒有記錄 📭"
total = sum(float(r.amount) for r in rows)
lines = [
f"{i+1}. {r.item_name} ${float(r.amount):.0f}" + (f"{r.note}" if r.note else "")
for i, r in enumerate(rows)
]
return "📋 今日記錄:\n" + "\n".join(lines) + f"\n\n💰 合計:${total:.0f}\n\n🗑 刪除請輸入:刪除 編號\n例如:刪除 1"
except Exception as e:
print("❌ 查詢失敗:", e)
return "查詢失敗,請稍後再試"
finally:
db.close()
def query_month(line_user_id: str) -> str:
db = SessionLocal()
try:
user_id = get_or_create_user(db, line_user_id)
now = datetime.now()
rows = db.query(Expense).filter(
Expense.user_id == user_id,
text("EXTRACT(YEAR FROM date) = :year AND EXTRACT(MONTH FROM date) = :month")
).params(year=now.year, month=now.month).all()
if not rows:
return "本月還沒有記錄 📭"
total = sum(float(r.amount) for r in rows)
summary = {}
for r in rows:
key = r.item_name or "其他"
summary[key] = summary.get(key, 0) + float(r.amount)
lines = [f"{cat}${amt:.0f}" for cat, amt in sorted(summary.items(), key=lambda x: -x[1])]
return f"📊 本月統計({now.month}月):\n" + "\n".join(lines) + f"\n\n💰 總計:${total:.0f}"
except Exception as e:
print("❌ 查詢失敗:", e)
return "查詢失敗,請稍後再試"
finally:
db.close()