mirror of
https://github.com/henry4682/linebot_finance.git
synced 2026-05-16 04:41:52 +00:00
All checks were successful
Oracle-Deploy / redeploy (push) Successful in 29s
1. change query today
309 lines
10 KiB
Python
Executable File
309 lines
10 KiB
Python
Executable File
import re
|
||
from datetime import datetime, date
|
||
from sqlalchemy.orm import Session
|
||
from sqlalchemy import text
|
||
from DB.Models import User, LineUser, Category, CategoryRule, Expense
|
||
from DB.Session import SessionLocal
|
||
|
||
EXPENSE_TEMPLATE = (
|
||
"請填寫以下記帳資料後傳回:\n\n"
|
||
"品項: \n"
|
||
"類別: \n"
|
||
"金額: \n"
|
||
"店家: \n"
|
||
"備註: "
|
||
)
|
||
|
||
DEFAULT_CATEGORIES = [
|
||
("餐飲", "#F97316", "🍽"),
|
||
("交通", "#3B82F6", "🚗"),
|
||
("購物", "#8B5CF6", "🛍"),
|
||
("娛樂", "#EC4899", "🎮"),
|
||
("醫療", "#10B981", "🏥"),
|
||
("其他", "#6B7280", "📦"),
|
||
]
|
||
|
||
DEFAULT_RULES = {
|
||
"餐飲": ["早餐", "午餐", "晚餐", "飲料", "便當", "餐飲"],
|
||
"交通": ["捷運", "公車", "計程車", "油錢", "停車", "交通"],
|
||
"購物": ["衣服", "3C", "日用品", "購物"],
|
||
"娛樂": ["電影", "遊戲", "旅遊", "娛樂"],
|
||
"醫療": ["藥局", "診所", "醫院", "醫療"],
|
||
}
|
||
|
||
|
||
def get_or_create_user(db: Session, line_user_id: str) -> int:
|
||
line_user = db.query(LineUser).filter(
|
||
LineUser.line_user_id == line_user_id
|
||
).first()
|
||
if line_user:
|
||
return line_user.user_id
|
||
|
||
new_user = User(name="LINE用戶", email=f"{line_user_id}@line.user")
|
||
db.add(new_user)
|
||
db.flush()
|
||
|
||
parent_ids = {}
|
||
for name, color, icon in DEFAULT_CATEGORIES:
|
||
cat = Category(user_id=new_user.id, name=name, color=color, icon=icon)
|
||
db.add(cat)
|
||
db.flush()
|
||
parent_ids[name] = cat.id
|
||
|
||
for cat_name, keywords in DEFAULT_RULES.items():
|
||
for kw in keywords:
|
||
db.add(CategoryRule(
|
||
user_id=new_user.id,
|
||
category_id=parent_ids[cat_name],
|
||
keyword=kw
|
||
))
|
||
|
||
db.add(LineUser(line_user_id=line_user_id, user_id=new_user.id))
|
||
db.commit()
|
||
return new_user.id
|
||
|
||
|
||
def resolve_category(db: Session, user_id: int, category_input: str) -> tuple[int, int | None]:
|
||
"""回傳 (category_id, subcategory_id)"""
|
||
# 查 rule
|
||
rule = db.query(CategoryRule).filter(
|
||
CategoryRule.user_id == user_id,
|
||
CategoryRule.keyword == category_input
|
||
).first()
|
||
if rule:
|
||
return rule.category_id, None
|
||
|
||
# 查是否是大類
|
||
parent = db.query(Category).filter(
|
||
Category.user_id == user_id,
|
||
Category.name == category_input,
|
||
Category.parent_id == None
|
||
).first()
|
||
if parent:
|
||
return parent.id, None
|
||
|
||
# 查是否是已存在小類
|
||
sub = db.query(Category).filter(
|
||
Category.user_id == user_id,
|
||
Category.name == category_input,
|
||
Category.parent_id != None
|
||
).first()
|
||
if sub:
|
||
return sub.parent_id, sub.id
|
||
|
||
# 找不到 → 新增 subcategory 到「其他」
|
||
other = db.query(Category).filter(
|
||
Category.user_id == user_id,
|
||
Category.name == "其他",
|
||
Category.parent_id == None
|
||
).first()
|
||
parent_id = other.id if other else None
|
||
|
||
new_sub = Category(user_id=user_id, parent_id=parent_id, name=category_input, color="#6B7280")
|
||
db.add(new_sub)
|
||
db.flush()
|
||
|
||
db.add(CategoryRule(user_id=user_id, category_id=new_sub.id, keyword=category_input))
|
||
return parent_id, new_sub.id
|
||
|
||
|
||
def parse_multiline_expense(text: str) -> dict | None:
|
||
fields = {}
|
||
field_map = {
|
||
"品項": "item_name",
|
||
"類別": "category",
|
||
"金額": "total_amount",
|
||
"店家": "seller_name",
|
||
"備註": "note",
|
||
}
|
||
for line in text.strip().splitlines():
|
||
for key, field in field_map.items():
|
||
if line.startswith(f"{key}:") or line.startswith(f"{key}:"):
|
||
value = re.split(r"[::]", line, 1)[-1].strip()
|
||
if value:
|
||
fields[field] = value
|
||
break
|
||
|
||
if not all(k in fields for k in ["item_name", "category", "total_amount"]):
|
||
return None
|
||
|
||
try:
|
||
fields["total_amount"] = float(re.sub(r"[^\d.]", "", fields["total_amount"]))
|
||
except ValueError:
|
||
return None
|
||
|
||
return fields
|
||
|
||
|
||
def save_expense(line_user_id: str, fields: dict) -> str:
|
||
db = SessionLocal()
|
||
try:
|
||
user_id = get_or_create_user(db, line_user_id)
|
||
category_id, subcategory_id = resolve_category(db, user_id, fields["category"])
|
||
|
||
db.add(Expense(
|
||
user_id=user_id,
|
||
category_id=subcategory_id or category_id,
|
||
amount=fields["total_amount"],
|
||
note=fields.get("note"),
|
||
seller_name=fields.get("seller_name"),
|
||
item_name=fields["item_name"],
|
||
date=date.today(),
|
||
))
|
||
db.commit()
|
||
|
||
reply = (
|
||
f"✅ 已記錄!\n"
|
||
f"品項:{fields['item_name']}\n"
|
||
f"類別:{fields['category']}\n"
|
||
f"金額:${fields['total_amount']:.0f}\n"
|
||
)
|
||
if fields.get("seller_name"):
|
||
reply += f"店家:{fields['seller_name']}\n"
|
||
if fields.get("note"):
|
||
reply += f"備註:{fields['note']}\n"
|
||
return reply.strip()
|
||
except Exception as e:
|
||
db.rollback()
|
||
import traceback
|
||
error_detail = traceback.format_exc() # 抓取詳細錯誤堆疊
|
||
print("❌ 儲存失敗:", error_detail)
|
||
return f"儲存失敗!錯誤訊息:\n{str(e)}" # 暫時把錯誤回傳到 Line
|
||
finally:
|
||
db.close()
|
||
|
||
|
||
def delete_expense(line_user_id: str, target: str) -> str:
|
||
db = SessionLocal()
|
||
try:
|
||
user_id = get_or_create_user(db, line_user_id)
|
||
rows = db.query(Expense).filter(
|
||
Expense.user_id == user_id,
|
||
text("DATE(date) = :today")
|
||
).params(today=date.today()).all()
|
||
|
||
if not rows:
|
||
return "今天還沒有記錄 📭"
|
||
|
||
if target.isdigit():
|
||
idx = int(target) - 1
|
||
if idx < 0 or idx >= len(rows):
|
||
return f"沒有第 {target} 筆記錄,請先輸入「查今天」確認編號"
|
||
row = rows[idx]
|
||
db.delete(row)
|
||
db.commit()
|
||
return f"✅ 已刪除:{row.item_name} ${float(row.total_amount):.0f}"
|
||
|
||
matched = [r for r in rows if r.item_name == target]
|
||
if not matched:
|
||
return f"今天沒有「{target}」的記錄"
|
||
row = matched[-1]
|
||
db.delete(row)
|
||
db.commit()
|
||
return f"✅ 已刪除:{row.item_name} ${float(row.total_amount):.0f}"
|
||
except Exception as e:
|
||
db.rollback()
|
||
print("❌ 刪除失敗:", e)
|
||
return "刪除失敗,請稍後再試"
|
||
finally:
|
||
db.close()
|
||
|
||
|
||
def query_today(line_user_id: str) -> str:
|
||
db = SessionLocal()
|
||
try:
|
||
user_id = get_or_create_user(db, line_user_id)
|
||
sql = text("""
|
||
SELECT
|
||
COALESCE(m.display_name, e.item_name) AS display_name,
|
||
SUM(e.amount) as total_amount -- 💡 記得這裡是 amount
|
||
FROM expenses e
|
||
LEFT JOIN merchant_mapping m ON e.seller_name LIKE '%%' || m.pattern || '%%'
|
||
WHERE e.user_id = :user_id
|
||
AND e.date = CURRENT_DATE -- 💡 這是查今天的關鍵
|
||
GROUP BY COALESCE(m.display_name, e.item_name)
|
||
ORDER BY total_amount DESC
|
||
""")
|
||
rows = db.execute(sql, {"user_id": user_id}).fetchall()
|
||
|
||
data_list = [dict(row) if hasattr(row, '_mapping') else row for row in rows]
|
||
total = 0.0
|
||
|
||
for r in rows:
|
||
# 這裡依照妳 SQL 的別名抓資料,r.total_amount 或是 r['total_amount']
|
||
amt = float(r.total_amount)
|
||
total += amt
|
||
|
||
# 抓取顯示名稱與備註
|
||
name = r.display_name if hasattr(r, 'display_name') else r.item_name
|
||
note_str = f"({r.note})" if hasattr(r, 'note') and r.note else ""
|
||
|
||
data_list.append({
|
||
"name": name,
|
||
"amt": amt,
|
||
"note": note_str
|
||
})
|
||
|
||
# 3. 檢查是否有資料
|
||
if not data_list:
|
||
return "📋 今日還沒有記錄喔!💨"
|
||
|
||
# 4. 組裝每一行的文字
|
||
lines = [
|
||
f"{i+1}. {item['name']} ${item['amt']:.0f}{item['note']}"
|
||
for i, item in enumerate(data_list)
|
||
]
|
||
|
||
# 5. 回傳最終訊息
|
||
header = "📋 今日記錄:\n"
|
||
footer = f"\n\n💰 合計:${total:.0f}\n\n🗑 刪除請輸入:刪除 編號\n例如:刪除 1"
|
||
return header + "\n".join(lines) + footer
|
||
except Exception as e:
|
||
print("❌ 查詢失敗:", e)
|
||
return "查詢失敗,請稍後再試"
|
||
finally:
|
||
db.close()
|
||
|
||
|
||
def query_month(line_user_id: str) -> str:
|
||
db = SessionLocal()
|
||
try:
|
||
user_id = get_or_create_user(db, line_user_id)
|
||
now = datetime.now()
|
||
|
||
sql = text("""
|
||
SELECT
|
||
COALESCE(m.display_name, e.item_name) AS display_name,
|
||
SUM(e.amount) as total_amount
|
||
FROM expenses e
|
||
LEFT JOIN merchant_mapping m ON e.seller_name LIKE '%%' || m.pattern || '%%'
|
||
WHERE e.user_id = :user_id
|
||
AND EXTRACT(YEAR FROM e.date) = :year
|
||
AND EXTRACT(MONTH FROM e.date) = :month
|
||
GROUP BY COALESCE(m.display_name, e.item_name) -- 💡 這裡不能只寫 display_name,要寫完整邏輯
|
||
ORDER BY total_amount DESC
|
||
""")
|
||
|
||
rows = db.execute(sql, {"user_id": user_id, "year": now.year, "month": now.month}).fetchall()
|
||
|
||
if not rows:
|
||
return "本月還沒有記錄 📭"
|
||
|
||
total = sum(float(r.total_amount) for r in rows)
|
||
summary = []
|
||
for row in rows:
|
||
name = row.display_name
|
||
amt = float(row.total_amount)
|
||
percent = (amt / total) * 100
|
||
# 加上一點視覺效果
|
||
summary.append(f"🔹 {name}:${amt:.0f} ({percent:.1f}%)")
|
||
|
||
header = f"📊 {now.year}年{now.month}月 消費統計"
|
||
divider = "--------------------------"
|
||
footer = f"\n{divider}\n💰 本月總支出:${total:.0f}"
|
||
return f"{header}\n{divider}\n" + "\n".join(summary) + footer
|
||
except Exception as e:
|
||
print("❌ 查詢失敗:", e)
|
||
return "查詢失敗,請稍後再試"
|
||
finally:
|
||
db.close() |