Files
linebot_finance/app/main.py
henry yo c8ab185c3c feat: linebot
1. add DB CRUD
2. change expense format
2026-03-10 00:44:14 +08:00

434 lines
14 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import os
import re
import asyncio
import threading
import nest_asyncio
nest_asyncio.apply()
import captcha_state
from invoice_fetcher import main as fetch_main
from dotenv import load_dotenv
from fastapi import FastAPI, Request, HTTPException
from linebot.v3 import WebhookHandler
from linebot.v3.messaging import (
Configuration,
ApiClient,
MessagingApi,
ReplyMessageRequest,
TextMessage,
)
from linebot.v3.webhooks import MessageEvent, TextMessageContent, FollowEvent
from linebot.v3.exceptions import InvalidSignatureError
from sqlalchemy import create_engine, Column, Integer, String, Numeric, Date, DateTime, BigInteger, text
from sqlalchemy.orm import declarative_base, sessionmaker
from datetime import datetime, date
load_dotenv()
app = FastAPI()
# LINE 設定
configuration = Configuration(access_token=os.getenv("LINE_CHANNEL_ACCESS_TOKEN"))
handler = WebhookHandler(os.getenv("LINE_CHANNEL_SECRET"))
# DB 設定
DATABASE_URL = os.getenv("DATABASE_URL")
engine = create_engine(DATABASE_URL)
SessionLocal = sessionmaker(bind=engine)
Base = declarative_base()
# Models
class User(Base):
__tablename__ = "users"
id = Column(BigInteger, primary_key=True)
name = Column(String)
email = Column(String, unique=True)
class LineUser(Base):
__tablename__ = "line_users"
id = Column(BigInteger, primary_key=True)
line_user_id = Column(String, unique=True)
user_id = Column(BigInteger)
created_at = Column(DateTime, default=datetime.now)
class Category(Base):
__tablename__ = "categories"
id = Column(BigInteger, primary_key=True)
user_id = Column(BigInteger)
parent_id = Column(BigInteger, nullable=True)
name = Column(String)
color = Column(String, default='#3B82F6')
icon = Column(String, nullable=True)
class CategoryRule(Base):
__tablename__ = "category_rules"
id = Column(BigInteger, primary_key=True)
user_id = Column(BigInteger)
category_id = Column(BigInteger)
keyword = Column(String)
class Expense(Base):
__tablename__ = "expenses"
id = Column(BigInteger, primary_key=True)
user_id = Column(BigInteger)
category_id = Column(BigInteger, nullable=True)
amount = Column(Numeric(10, 2))
note = Column(String, nullable=True)
invoice_number = Column(String, nullable=True)
seller_name = Column(String, nullable=True)
item_name = Column(String, nullable=True)
date = Column(Date)
created_at = Column(DateTime, default=datetime.now)
updated_at = Column(DateTime, default=datetime.now)
# 記帳模板
EXPENSE_TEMPLATE = (
"請填寫以下記帳資料後傳回:\n\n"
"品項: \n"
"類別: \n"
"金額: \n"
"店家: \n"
"備註: "
)
def get_or_create_user(db, line_user_id: str) -> int:
line_user = db.query(LineUser).filter(
LineUser.line_user_id == line_user_id
).first()
if line_user:
return line_user.user_id
new_user = User(
name="LINE用戶",
email=f"{line_user_id}@line.user"
)
db.add(new_user)
db.flush()
default_categories = [
("餐飲", "#F97316", "🍽"),
("交通", "#3B82F6", "🚗"),
("購物", "#8B5CF6", "🛍"),
("娛樂", "#EC4899", "🎮"),
("醫療", "#10B981", "🏥"),
("其他", "#6B7280", "📦"),
]
parent_ids = {}
for name, color, icon in default_categories:
cat = Category(user_id=new_user.id, name=name, color=color, icon=icon)
db.add(cat)
db.flush()
parent_ids[name] = cat.id
default_rules = {
"餐飲": ["早餐", "午餐", "晚餐", "飲料", "便當", "餐飲"],
"交通": ["捷運", "公車", "計程車", "油錢", "停車", "交通"],
"購物": ["衣服", "3C", "日用品", "購物"],
"娛樂": ["電影", "遊戲", "旅遊", "娛樂"],
"醫療": ["藥局", "診所", "醫院", "醫療"],
}
for cat_name, keywords in default_rules.items():
for kw in keywords:
db.add(CategoryRule(
user_id=new_user.id,
category_id=parent_ids[cat_name],
keyword=kw
))
db.add(LineUser(line_user_id=line_user_id, user_id=new_user.id))
db.commit()
return new_user.id
def resolve_category(db, user_id: int, category_input: str) -> tuple[int, int | None]:
"""
回傳 (category_id, subcategory_id)
先查 category_rules keyword
找不到就新增 subcategory 到「其他」
"""
# 查 rule
rule = db.query(CategoryRule).filter(
CategoryRule.user_id == user_id,
CategoryRule.keyword == category_input
).first()
if rule:
return rule.category_id, None
# 查是否是大類名稱
parent = db.query(Category).filter(
Category.user_id == user_id,
Category.name == category_input,
Category.parent_id == None
).first()
if parent:
return parent.id, None
# 查是否是已存在的小類
sub = db.query(Category).filter(
Category.user_id == user_id,
Category.name == category_input,
Category.parent_id != None
).first()
if sub:
return sub.parent_id, sub.id
# 找不到 → 新增 subcategory 到「其他」
other = db.query(Category).filter(
Category.user_id == user_id,
Category.name == "其他",
Category.parent_id == None
).first()
parent_id = other.id if other else None
new_sub = Category(
user_id=user_id,
parent_id=parent_id,
name=category_input,
color="#6B7280",
)
db.add(new_sub)
db.flush()
db.add(CategoryRule(
user_id=user_id,
category_id=new_sub.id,
keyword=category_input
))
return parent_id, new_sub.id
def parse_multiline_expense(text: str) -> dict | None:
"""解析多行記帳格式,欄位不限順序"""
fields = {}
field_map = {
"品項": "item_name",
"類別": "category",
"金額": "amount",
"店家": "seller_name",
"備註": "note",
}
for line in text.strip().splitlines():
for key, field in field_map.items():
if line.startswith(f"{key}:") or line.startswith(f"{key}"):
value = re.split(r"[:]", line, 1)[-1].strip()
if value:
fields[field] = value
break
required = ["item_name", "category", "amount"]
if not all(k in fields for k in required):
return None
try:
fields["amount"] = float(re.sub(r"[^\d.]", "", fields["amount"]))
except ValueError:
return None
return fields
# Webhook endpoint
@app.post("/webhook")
async def webhook(request: Request):
signature = request.headers.get("X-Line-Signature", "")
body = await request.body()
try:
handler.handle(body.decode(), signature)
except InvalidSignatureError:
raise HTTPException(status_code=400, detail="Invalid signature")
return "OK"
def run_fetch_in_thread():
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
loop.run_until_complete(fetch_main())
finally:
loop.close()
@app.get("/fetch")
async def fetch_invoices():
print("🚀 開始抓取發票...")
thread = threading.Thread(target=run_fetch_in_thread)
thread.start()
return {"status": "started"}
@handler.add(MessageEvent, message=TextMessageContent)
def handle_message(event):
line_user_id = event.source.user_id
msg = event.message.text.strip()
if msg.isdigit() and len(msg) == 5 and captcha_state.captcha_event and not captcha_state.captcha_event.is_set():
captcha_state.captcha_answer = msg
captcha_state.captcha_event.set()
reply = "✅ 驗證碼已送出!"
else:
reply = handle_text(line_user_id, msg)
with ApiClient(configuration) as api_client:
line_bot_api = MessagingApi(api_client)
line_bot_api.reply_message(
ReplyMessageRequest(
reply_token=event.reply_token,
messages=[TextMessage(text=reply)]
)
)
@handler.add(FollowEvent)
def handle_follow(event):
with ApiClient(configuration) as api_client:
line_bot_api = MessagingApi(api_client)
line_bot_api.reply_message(
ReplyMessageRequest(
reply_token=event.reply_token,
messages=[TextMessage(text=(
"👋 歡迎使用 Myfinance 記帳 Bot\n\n"
"📝 點下方「記帳」開始記帳\n"
"📊 點「查今天」或「查本月」查詢\n\n"
"開始記帳吧!💪"
))]
)
)
def handle_text(line_user_id: str, text: str) -> str:
if text == "查今天":
return query_today(line_user_id)
if text == "查本月":
return query_month(line_user_id)
if text in ("記帳", "記帳說明"):
return EXPENSE_TEMPLATE
if text.startswith("刪除 "):
return delete_expense(line_user_id, text[3:].strip())
# 多行記帳格式
if "\n" in text:
fields = parse_multiline_expense(text)
if fields:
return save_expense(line_user_id, fields)
return (
"格式不正確 😅\n\n"
"必填欄位:品項、類別、金額\n"
"點下方「記帳」取得模板!"
)
return (
"點下方「記帳」取得記帳模板\n"
"或輸入「查今天」/「查本月」查詢"
)
def save_expense(line_user_id: str, fields: dict) -> str:
db = SessionLocal()
try:
user_id = get_or_create_user(db, line_user_id)
category_id, subcategory_id = resolve_category(db, user_id, fields["category"])
db.add(Expense(
user_id=user_id,
category_id=subcategory_id or category_id,
amount=fields["amount"],
note=fields.get("note"),
seller_name=fields.get("seller_name"),
item_name=fields["item_name"],
date=date.today(),
))
db.commit()
reply = (
f"✅ 已記錄!\n"
f"品項:{fields['item_name']}\n"
f"類別:{fields['category']}\n"
f"金額:${fields['amount']:.0f}\n"
)
if fields.get("seller_name"):
reply += f"店家:{fields['seller_name']}\n"
if fields.get("note"):
reply += f"備註:{fields['note']}\n"
return reply.strip()
except Exception as e:
db.rollback()
print("❌ 儲存失敗:", e)
return "儲存失敗,請稍後再試"
finally:
db.close()
def delete_expense(line_user_id: str, target: str) -> str:
db = SessionLocal()
try:
user_id = get_or_create_user(db, line_user_id)
rows = db.query(Expense).filter(
Expense.user_id == user_id,
text("DATE(date) = :today")
).params(today=date.today()).all()
if not rows:
return "今天還沒有記錄 📭"
if target.isdigit():
idx = int(target) - 1
if idx < 0 or idx >= len(rows):
return f"沒有第 {target} 筆記錄,請先輸入「查今天」確認編號"
row = rows[idx]
db.delete(row)
db.commit()
return f"✅ 已刪除:{row.item_name} ${float(row.amount):.0f}"
matched = [r for r in rows if r.item_name == target]
if not matched:
return f"今天沒有「{target}」的記錄"
row = matched[-1]
db.delete(row)
db.commit()
return f"✅ 已刪除:{row.item_name} ${float(row.amount):.0f}"
except Exception as e:
db.rollback()
print("❌ 刪除失敗:", e)
return "刪除失敗,請稍後再試"
finally:
db.close()
def query_today(line_user_id: str) -> str:
db = SessionLocal()
try:
user_id = get_or_create_user(db, line_user_id)
rows = db.query(Expense).filter(
Expense.user_id == user_id,
text("DATE(date) = :today")
).params(today=date.today()).all()
if not rows:
return "今天還沒有記錄 📭"
total = sum(float(r.amount) for r in rows)
lines = [
f"{i+1}. {r.item_name} ${float(r.amount):.0f}" + (f"{r.note}" if r.note else "")
for i, r in enumerate(rows)
]
return "📋 今日記錄:\n" + "\n".join(lines) + f"\n\n💰 合計:${total:.0f}\n\n🗑 刪除請輸入:刪除 編號\n例如:刪除 1"
except Exception as e:
print("❌ 查詢失敗:", e)
return "查詢失敗,請稍後再試"
finally:
db.close()
def query_month(line_user_id: str) -> str:
db = SessionLocal()
try:
user_id = get_or_create_user(db, line_user_id)
now = datetime.now()
rows = db.query(Expense).filter(
Expense.user_id == user_id,
text("EXTRACT(YEAR FROM date) = :year AND EXTRACT(MONTH FROM date) = :month")
).params(year=now.year, month=now.month).all()
if not rows:
return "本月還沒有記錄 📭"
total = sum(float(r.amount) for r in rows)
summary = {}
for r in rows:
key = r.item_name or "其他"
summary[key] = summary.get(key, 0) + float(r.amount)
lines = [f"{cat}${amt:.0f}" for cat, amt in sorted(summary.items(), key=lambda x: -x[1])]
return f"📊 本月統計({now.month}月):\n" + "\n".join(lines) + f"\n\n💰 總計:${total:.0f}"
except Exception as e:
print("❌ 查詢失敗:", e)
return "查詢失敗,請稍後再試"
finally:
db.close()