Files
linebot_finance/app/Linebot_handler/Expense.py
henry4682 4bf0262c5f
All checks were successful
Oracle-Deploy / redeploy (push) Successful in 31s
feat: linebot
1. change syntax error
2026-03-24 10:36:26 +08:00

278 lines
8.9 KiB
Python
Executable File
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import re
from datetime import datetime, date
from sqlalchemy.orm import Session
from sqlalchemy import text
from DB.Models import User, LineUser, Category, CategoryRule, Expense
from DB.Session import SessionLocal
EXPENSE_TEMPLATE = (
"請填寫以下記帳資料後傳回:\n\n"
"品項: \n"
"類別: \n"
"金額: \n"
"店家: \n"
"備註: "
)
DEFAULT_CATEGORIES = [
("餐飲", "#F97316", "🍽"),
("交通", "#3B82F6", "🚗"),
("購物", "#8B5CF6", "🛍"),
("娛樂", "#EC4899", "🎮"),
("醫療", "#10B981", "🏥"),
("其他", "#6B7280", "📦"),
]
DEFAULT_RULES = {
"餐飲": ["早餐", "午餐", "晚餐", "飲料", "便當", "餐飲"],
"交通": ["捷運", "公車", "計程車", "油錢", "停車", "交通"],
"購物": ["衣服", "3C", "日用品", "購物"],
"娛樂": ["電影", "遊戲", "旅遊", "娛樂"],
"醫療": ["藥局", "診所", "醫院", "醫療"],
}
def get_or_create_user(db: Session, line_user_id: str) -> int:
line_user = db.query(LineUser).filter(
LineUser.line_user_id == line_user_id
).first()
if line_user:
return line_user.user_id
new_user = User(name="LINE用戶", email=f"{line_user_id}@line.user")
db.add(new_user)
db.flush()
parent_ids = {}
for name, color, icon in DEFAULT_CATEGORIES:
cat = Category(user_id=new_user.id, name=name, color=color, icon=icon)
db.add(cat)
db.flush()
parent_ids[name] = cat.id
for cat_name, keywords in DEFAULT_RULES.items():
for kw in keywords:
db.add(CategoryRule(
user_id=new_user.id,
category_id=parent_ids[cat_name],
keyword=kw
))
db.add(LineUser(line_user_id=line_user_id, user_id=new_user.id))
db.commit()
return new_user.id
def resolve_category(db: Session, user_id: int, category_input: str) -> tuple[int, int | None]:
"""回傳 (category_id, subcategory_id)"""
# 查 rule
rule = db.query(CategoryRule).filter(
CategoryRule.user_id == user_id,
CategoryRule.keyword == category_input
).first()
if rule:
return rule.category_id, None
# 查是否是大類
parent = db.query(Category).filter(
Category.user_id == user_id,
Category.name == category_input,
Category.parent_id == None
).first()
if parent:
return parent.id, None
# 查是否是已存在小類
sub = db.query(Category).filter(
Category.user_id == user_id,
Category.name == category_input,
Category.parent_id != None
).first()
if sub:
return sub.parent_id, sub.id
# 找不到 → 新增 subcategory 到「其他」
other = db.query(Category).filter(
Category.user_id == user_id,
Category.name == "其他",
Category.parent_id == None
).first()
parent_id = other.id if other else None
new_sub = Category(user_id=user_id, parent_id=parent_id, name=category_input, color="#6B7280")
db.add(new_sub)
db.flush()
db.add(CategoryRule(user_id=user_id, category_id=new_sub.id, keyword=category_input))
return parent_id, new_sub.id
def parse_multiline_expense(text: str) -> dict | None:
fields = {}
field_map = {
"品項": "item_name",
"類別": "category",
"金額": "total_amount",
"店家": "seller_name",
"備註": "note",
}
for line in text.strip().splitlines():
for key, field in field_map.items():
if line.startswith(f"{key}:") or line.startswith(f"{key}"):
value = re.split(r"[:]", line, 1)[-1].strip()
if value:
fields[field] = value
break
if not all(k in fields for k in ["item_name", "category", "total_amount"]):
return None
try:
fields["total_amount"] = float(re.sub(r"[^\d.]", "", fields["total_amount"]))
except ValueError:
return None
return fields
def save_expense(line_user_id: str, fields: dict) -> str:
db = SessionLocal()
try:
user_id = get_or_create_user(db, line_user_id)
category_id, subcategory_id = resolve_category(db, user_id, fields["category"])
db.add(Expense(
user_id=user_id,
category_id=subcategory_id or category_id,
amount=fields["total_amount"],
note=fields.get("note"),
seller_name=fields.get("seller_name"),
item_name=fields["item_name"],
date=date.today(),
))
db.commit()
reply = (
f"✅ 已記錄!\n"
f"品項:{fields['item_name']}\n"
f"類別:{fields['category']}\n"
f"金額:${fields['total_amount']:.0f}\n"
)
if fields.get("seller_name"):
reply += f"店家:{fields['seller_name']}\n"
if fields.get("note"):
reply += f"備註:{fields['note']}\n"
return reply.strip()
except Exception as e:
db.rollback()
import traceback
error_detail = traceback.format_exc() # 抓取詳細錯誤堆疊
print("❌ 儲存失敗:", error_detail)
return f"儲存失敗!錯誤訊息:\n{str(e)}" # 暫時把錯誤回傳到 Line
finally:
db.close()
def delete_expense(line_user_id: str, target: str) -> str:
db = SessionLocal()
try:
user_id = get_or_create_user(db, line_user_id)
rows = db.query(Expense).filter(
Expense.user_id == user_id,
text("DATE(date) = :today")
).params(today=date.today()).all()
if not rows:
return "今天還沒有記錄 📭"
if target.isdigit():
idx = int(target) - 1
if idx < 0 or idx >= len(rows):
return f"沒有第 {target} 筆記錄,請先輸入「查今天」確認編號"
row = rows[idx]
db.delete(row)
db.commit()
return f"✅ 已刪除:{row.item_name} ${float(row.total_amount):.0f}"
matched = [r for r in rows if r.item_name == target]
if not matched:
return f"今天沒有「{target}」的記錄"
row = matched[-1]
db.delete(row)
db.commit()
return f"✅ 已刪除:{row.item_name} ${float(row.total_amount):.0f}"
except Exception as e:
db.rollback()
print("❌ 刪除失敗:", e)
return "刪除失敗,請稍後再試"
finally:
db.close()
def query_today(line_user_id: str) -> str:
db = SessionLocal()
try:
user_id = get_or_create_user(db, line_user_id)
rows = db.query(Expense).filter(
Expense.user_id == user_id,
text("DATE(date) = :today")
).params(today=date.today()).all()
if not rows:
return "今天還沒有記錄 📭"
total = sum(float(r.total_amount) for r in rows)
lines = [
f"{i+1}. {r.item_name} ${float(r.amount):.0f}" + (f"{r.note}" if r.note else "")
for i, r in enumerate(rows)
]
return "📋 今日記錄:\n" + "\n".join(lines) + f"\n\n💰 合計:${total:.0f}\n\n🗑 刪除請輸入:刪除 編號\n例如:刪除 1"
except Exception as e:
print("❌ 查詢失敗:", e)
return "查詢失敗,請稍後再試"
finally:
db.close()
def query_month(line_user_id: str) -> str:
db = SessionLocal()
try:
user_id = get_or_create_user(db, line_user_id)
now = datetime.now()
sql = text("""
SELECT
COALESCE(m.display_name, e.item_name) AS display_name,
SUM(e.amount) as total_amount
FROM expenses e
LEFT JOIN merchant_mapping m ON e.seller_name LIKE '%%' || m.pattern || '%%'
WHERE e.user_id = :user_id
AND EXTRACT(YEAR FROM e.date) = :year
AND EXTRACT(MONTH FROM e.date) = :month
GROUP BY COALESCE(m.display_name, e.item_name) -- 💡 這裡不能只寫 display_name要寫完整邏輯
ORDER BY total_amount DESC
""")
rows = db.execute(sql, {"user_id": user_id, "year": now.year, "month": now.month}).fetchall()
if not rows:
return "本月還沒有記錄 📭"
total = sum(float(r.total_amount) for r in rows)
summary = []
for row in rows:
name = row.display_name
amt = float(row.total_amount)
percent = (amt / total) * 100
# 加上一點視覺效果
summary.append(f"🔹 {name}${amt:.0f} ({percent:.1f}%)")
header = f"📊 {now.year}{now.month}月 消費統計"
divider = "--------------------------"
footer = f"\n{divider}\n💰 本月總支出:${total:.0f}"
return f"{header}\n{divider}\n" + "\n".join(summary) + footer
except Exception as e:
print("❌ 查詢失敗:", e)
return "查詢失敗,請稍後再試"
finally:
db.close()