Files
henry4682 6c4f0491aa
All checks were successful
Oracle-Deploy / redeploy (push) Successful in 30s
feat: linebot
1. change query today
2026-03-24 11:03:25 +08:00

310 lines
10 KiB
Python
Executable File
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import re
from datetime import datetime, date
from sqlalchemy.orm import Session
from sqlalchemy import text
from DB.Models import User, LineUser, Category, CategoryRule, Expense
from DB.Session import SessionLocal
EXPENSE_TEMPLATE = (
"請填寫以下記帳資料後傳回:\n\n"
"品項: \n"
"類別: \n"
"金額: \n"
"店家: \n"
"備註: "
)
DEFAULT_CATEGORIES = [
("餐飲", "#F97316", "🍽"),
("交通", "#3B82F6", "🚗"),
("購物", "#8B5CF6", "🛍"),
("娛樂", "#EC4899", "🎮"),
("醫療", "#10B981", "🏥"),
("其他", "#6B7280", "📦"),
]
DEFAULT_RULES = {
"餐飲": ["早餐", "午餐", "晚餐", "飲料", "便當", "餐飲"],
"交通": ["捷運", "公車", "計程車", "油錢", "停車", "交通"],
"購物": ["衣服", "3C", "日用品", "購物"],
"娛樂": ["電影", "遊戲", "旅遊", "娛樂"],
"醫療": ["藥局", "診所", "醫院", "醫療"],
}
def get_or_create_user(db: Session, line_user_id: str) -> int:
line_user = db.query(LineUser).filter(
LineUser.line_user_id == line_user_id
).first()
if line_user:
return line_user.user_id
new_user = User(name="LINE用戶", email=f"{line_user_id}@line.user")
db.add(new_user)
db.flush()
parent_ids = {}
for name, color, icon in DEFAULT_CATEGORIES:
cat = Category(user_id=new_user.id, name=name, color=color, icon=icon)
db.add(cat)
db.flush()
parent_ids[name] = cat.id
for cat_name, keywords in DEFAULT_RULES.items():
for kw in keywords:
db.add(CategoryRule(
user_id=new_user.id,
category_id=parent_ids[cat_name],
keyword=kw
))
db.add(LineUser(line_user_id=line_user_id, user_id=new_user.id))
db.commit()
return new_user.id
def resolve_category(db: Session, user_id: int, category_input: str) -> tuple[int, int | None]:
"""回傳 (category_id, subcategory_id)"""
# 查 rule
rule = db.query(CategoryRule).filter(
CategoryRule.user_id == user_id,
CategoryRule.keyword == category_input
).first()
if rule:
return rule.category_id, None
# 查是否是大類
parent = db.query(Category).filter(
Category.user_id == user_id,
Category.name == category_input,
Category.parent_id == None
).first()
if parent:
return parent.id, None
# 查是否是已存在小類
sub = db.query(Category).filter(
Category.user_id == user_id,
Category.name == category_input,
Category.parent_id != None
).first()
if sub:
return sub.parent_id, sub.id
# 找不到 → 新增 subcategory 到「其他」
other = db.query(Category).filter(
Category.user_id == user_id,
Category.name == "其他",
Category.parent_id == None
).first()
parent_id = other.id if other else None
new_sub = Category(user_id=user_id, parent_id=parent_id, name=category_input, color="#6B7280")
db.add(new_sub)
db.flush()
db.add(CategoryRule(user_id=user_id, category_id=new_sub.id, keyword=category_input))
return parent_id, new_sub.id
def parse_multiline_expense(text: str) -> dict | None:
fields = {}
field_map = {
"品項": "item_name",
"類別": "category",
"金額": "total_amount",
"店家": "seller_name",
"備註": "note",
}
for line in text.strip().splitlines():
for key, field in field_map.items():
if line.startswith(f"{key}:") or line.startswith(f"{key}"):
value = re.split(r"[:]", line, 1)[-1].strip()
if value:
fields[field] = value
break
if not all(k in fields for k in ["item_name", "category", "total_amount"]):
return None
try:
fields["total_amount"] = float(re.sub(r"[^\d.]", "", fields["total_amount"]))
except ValueError:
return None
return fields
def save_expense(line_user_id: str, fields: dict) -> str:
db = SessionLocal()
try:
user_id = get_or_create_user(db, line_user_id)
category_id, subcategory_id = resolve_category(db, user_id, fields["category"])
db.add(Expense(
user_id=user_id,
category_id=subcategory_id or category_id,
amount=fields["total_amount"],
note=fields.get("note"),
seller_name=fields.get("seller_name"),
item_name=fields["item_name"],
date=date.today(),
))
db.commit()
reply = (
f"✅ 已記錄!\n"
f"品項:{fields['item_name']}\n"
f"類別:{fields['category']}\n"
f"金額:${fields['total_amount']:.0f}\n"
)
if fields.get("seller_name"):
reply += f"店家:{fields['seller_name']}\n"
if fields.get("note"):
reply += f"備註:{fields['note']}\n"
return reply.strip()
except Exception as e:
db.rollback()
import traceback
error_detail = traceback.format_exc() # 抓取詳細錯誤堆疊
print("❌ 儲存失敗:", error_detail)
return f"儲存失敗!錯誤訊息:\n{str(e)}" # 暫時把錯誤回傳到 Line
finally:
db.close()
def delete_expense(line_user_id: str, target: str) -> str:
db = SessionLocal()
try:
user_id = get_or_create_user(db, line_user_id)
rows = db.query(Expense).filter(
Expense.user_id == user_id,
text("DATE(date) = :today")
).params(today=date.today()).all()
if not rows:
return "今天還沒有記錄 📭"
if target.isdigit():
idx = int(target) - 1
if idx < 0 or idx >= len(rows):
return f"沒有第 {target} 筆記錄,請先輸入「查今天」確認編號"
row = rows[idx]
db.delete(row)
db.commit()
return f"✅ 已刪除:{row.item_name} ${float(row.total_amount):.0f}"
matched = [r for r in rows if r.item_name == target]
if not matched:
return f"今天沒有「{target}」的記錄"
row = matched[-1]
db.delete(row)
db.commit()
return f"✅ 已刪除:{row.item_name} ${float(row.total_amount):.0f}"
except Exception as e:
db.rollback()
print("❌ 刪除失敗:", e)
return "刪除失敗,請稍後再試"
finally:
db.close()
def query_today(line_user_id: str) -> str:
db = SessionLocal()
try:
user_id = get_or_create_user(db, line_user_id)
sql = text("""
SELECT
COALESCE(m.display_name, e.item_name) AS display_name,
SUM(e.amount) as total_amount -- 💡 記得這裡是 amount
FROM expenses e
LEFT JOIN merchant_mapping m ON e.seller_name LIKE '%%' || m.pattern || '%%'
WHERE e.user_id = :user_id
AND e.date = CURRENT_DATE -- 💡 這是查今天的關鍵
GROUP BY COALESCE(m.display_name, e.item_name)
ORDER BY total_amount DESC
""")
rows = db.execute(sql, {"user_id": user_id}).fetchall()
data_list = []
total = 0.0
for r in rows:
# 這裡依照妳 SQL 的別名抓資料r.total_amount 或是 r['total_amount']
amt = float(r.total_amount)
total += amt
# 判斷是否有 display_name (從 mapping 來的),沒有就用原始 item_name
name = getattr(r, 'display_name', r.item_name)
note_val = getattr(r, 'note', "")
note_str = f"{note_val}" if note_val else ""
data_list.append({
"name": name,
"amt": amt,
"note": note_str
})
# 3. 檢查是否有資料
if not data_list:
return "📋 今日還沒有記錄喔!💨"
# 4. 組裝每一行的文字
lines = [
f"{i+1}. {item['name']} ${item['amt']:.0f}{item['note']}"
for i, item in enumerate(data_list)
]
# 5. 回傳最終訊息
header = "📋 今日記錄:\n"
footer = f"\n\n💰 合計:${total:.0f}\n\n🗑 刪除請輸入:刪除 編號\n例如:刪除 1"
return header + "\n".join(lines) + footer
except Exception as e:
print("❌ 查詢失敗:", e)
return "查詢失敗,請稍後再試"
finally:
db.close()
def query_month(line_user_id: str) -> str:
db = SessionLocal()
try:
user_id = get_or_create_user(db, line_user_id)
now = datetime.now()
sql = text("""
SELECT
COALESCE(m.display_name, e.item_name) AS display_name,
SUM(e.amount) as total_amount
FROM expenses e
LEFT JOIN merchant_mapping m ON e.seller_name LIKE '%%' || m.pattern || '%%'
WHERE e.user_id = :user_id
AND EXTRACT(YEAR FROM e.date) = :year
AND EXTRACT(MONTH FROM e.date) = :month
GROUP BY COALESCE(m.display_name, e.item_name) -- 💡 這裡不能只寫 display_name要寫完整邏輯
ORDER BY total_amount DESC
""")
rows = db.execute(sql, {"user_id": user_id, "year": now.year, "month": now.month}).fetchall()
if not rows:
return "本月還沒有記錄 📭"
total = sum(float(r.total_amount) for r in rows)
summary = []
for row in rows:
name = row.display_name
amt = float(row.total_amount)
percent = (amt / total) * 100
# 加上一點視覺效果
summary.append(f"🔹 {name}${amt:.0f} ({percent:.1f}%)")
header = f"📊 {now.year}{now.month}月 消費統計"
divider = "--------------------------"
footer = f"\n{divider}\n💰 本月總支出:${total:.0f}"
return f"{header}\n{divider}\n" + "\n".join(summary) + footer
except Exception as e:
print("❌ 查詢失敗:", e)
return "查詢失敗,請稍後再試"
finally:
db.close()