fix: linebot

correct the way import
This commit is contained in:
2026-03-10 10:42:15 +08:00
parent 991ebeb6fd
commit 7af5bd8caf
5 changed files with 7 additions and 7 deletions

View File

@@ -0,0 +1,4 @@
import threading
captcha_answer: str | None = None
captcha_event: threading.Event | None = None

View File

@@ -0,0 +1,259 @@
import re
from datetime import datetime, date
from sqlalchemy.orm import Session
from sqlalchemy import text
from DB.Models import User, LineUser, Category, CategoryRule, Expense
from DB.Session import SessionLocal
EXPENSE_TEMPLATE = (
"請填寫以下記帳資料後傳回:\n\n"
"品項: \n"
"類別: \n"
"金額: \n"
"店家: \n"
"備註: "
)
DEFAULT_CATEGORIES = [
("餐飲", "#F97316", "🍽"),
("交通", "#3B82F6", "🚗"),
("購物", "#8B5CF6", "🛍"),
("娛樂", "#EC4899", "🎮"),
("醫療", "#10B981", "🏥"),
("其他", "#6B7280", "📦"),
]
DEFAULT_RULES = {
"餐飲": ["早餐", "午餐", "晚餐", "飲料", "便當", "餐飲"],
"交通": ["捷運", "公車", "計程車", "油錢", "停車", "交通"],
"購物": ["衣服", "3C", "日用品", "購物"],
"娛樂": ["電影", "遊戲", "旅遊", "娛樂"],
"醫療": ["藥局", "診所", "醫院", "醫療"],
}
def get_or_create_user(db: Session, line_user_id: str) -> int:
line_user = db.query(LineUser).filter(
LineUser.line_user_id == line_user_id
).first()
if line_user:
return line_user.user_id
new_user = User(name="LINE用戶", email=f"{line_user_id}@line.user")
db.add(new_user)
db.flush()
parent_ids = {}
for name, color, icon in DEFAULT_CATEGORIES:
cat = Category(user_id=new_user.id, name=name, color=color, icon=icon)
db.add(cat)
db.flush()
parent_ids[name] = cat.id
for cat_name, keywords in DEFAULT_RULES.items():
for kw in keywords:
db.add(CategoryRule(
user_id=new_user.id,
category_id=parent_ids[cat_name],
keyword=kw
))
db.add(LineUser(line_user_id=line_user_id, user_id=new_user.id))
db.commit()
return new_user.id
def resolve_category(db: Session, user_id: int, category_input: str) -> tuple[int, int | None]:
"""回傳 (category_id, subcategory_id)"""
# 查 rule
rule = db.query(CategoryRule).filter(
CategoryRule.user_id == user_id,
CategoryRule.keyword == category_input
).first()
if rule:
return rule.category_id, None
# 查是否是大類
parent = db.query(Category).filter(
Category.user_id == user_id,
Category.name == category_input,
Category.parent_id == None
).first()
if parent:
return parent.id, None
# 查是否是已存在小類
sub = db.query(Category).filter(
Category.user_id == user_id,
Category.name == category_input,
Category.parent_id != None
).first()
if sub:
return sub.parent_id, sub.id
# 找不到 → 新增 subcategory 到「其他」
other = db.query(Category).filter(
Category.user_id == user_id,
Category.name == "其他",
Category.parent_id == None
).first()
parent_id = other.id if other else None
new_sub = Category(user_id=user_id, parent_id=parent_id, name=category_input, color="#6B7280")
db.add(new_sub)
db.flush()
db.add(CategoryRule(user_id=user_id, category_id=new_sub.id, keyword=category_input))
return parent_id, new_sub.id
def parse_multiline_expense(text: str) -> dict | None:
fields = {}
field_map = {
"品項": "item_name",
"類別": "category",
"金額": "amount",
"店家": "seller_name",
"備註": "note",
}
for line in text.strip().splitlines():
for key, field in field_map.items():
if line.startswith(f"{key}:") or line.startswith(f"{key}"):
value = re.split(r"[:]", line, 1)[-1].strip()
if value:
fields[field] = value
break
if not all(k in fields for k in ["item_name", "category", "amount"]):
return None
try:
fields["amount"] = float(re.sub(r"[^\d.]", "", fields["amount"]))
except ValueError:
return None
return fields
def save_expense(line_user_id: str, fields: dict) -> str:
db = SessionLocal()
try:
user_id = get_or_create_user(db, line_user_id)
category_id, subcategory_id = resolve_category(db, user_id, fields["category"])
db.add(Expense(
user_id=user_id,
category_id=subcategory_id or category_id,
amount=fields["amount"],
note=fields.get("note"),
seller_name=fields.get("seller_name"),
item_name=fields["item_name"],
date=date.today(),
))
db.commit()
reply = (
f"✅ 已記錄!\n"
f"品項:{fields['item_name']}\n"
f"類別:{fields['category']}\n"
f"金額:${fields['amount']:.0f}\n"
)
if fields.get("seller_name"):
reply += f"店家:{fields['seller_name']}\n"
if fields.get("note"):
reply += f"備註:{fields['note']}\n"
return reply.strip()
except Exception as e:
db.rollback()
print("❌ 儲存失敗:", e)
return "儲存失敗,請稍後再試"
finally:
db.close()
def delete_expense(line_user_id: str, target: str) -> str:
db = SessionLocal()
try:
user_id = get_or_create_user(db, line_user_id)
rows = db.query(Expense).filter(
Expense.user_id == user_id,
text("DATE(date) = :today")
).params(today=date.today()).all()
if not rows:
return "今天還沒有記錄 📭"
if target.isdigit():
idx = int(target) - 1
if idx < 0 or idx >= len(rows):
return f"沒有第 {target} 筆記錄,請先輸入「查今天」確認編號"
row = rows[idx]
db.delete(row)
db.commit()
return f"✅ 已刪除:{row.item_name} ${float(row.amount):.0f}"
matched = [r for r in rows if r.item_name == target]
if not matched:
return f"今天沒有「{target}」的記錄"
row = matched[-1]
db.delete(row)
db.commit()
return f"✅ 已刪除:{row.item_name} ${float(row.amount):.0f}"
except Exception as e:
db.rollback()
print("❌ 刪除失敗:", e)
return "刪除失敗,請稍後再試"
finally:
db.close()
def query_today(line_user_id: str) -> str:
db = SessionLocal()
try:
user_id = get_or_create_user(db, line_user_id)
rows = db.query(Expense).filter(
Expense.user_id == user_id,
text("DATE(date) = :today")
).params(today=date.today()).all()
if not rows:
return "今天還沒有記錄 📭"
total = sum(float(r.amount) for r in rows)
lines = [
f"{i+1}. {r.item_name} ${float(r.amount):.0f}" + (f"{r.note}" if r.note else "")
for i, r in enumerate(rows)
]
return "📋 今日記錄:\n" + "\n".join(lines) + f"\n\n💰 合計:${total:.0f}\n\n🗑 刪除請輸入:刪除 編號\n例如:刪除 1"
except Exception as e:
print("❌ 查詢失敗:", e)
return "查詢失敗,請稍後再試"
finally:
db.close()
def query_month(line_user_id: str) -> str:
db = SessionLocal()
try:
user_id = get_or_create_user(db, line_user_id)
now = datetime.now()
rows = db.query(Expense).filter(
Expense.user_id == user_id,
text("EXTRACT(YEAR FROM date) = :year AND EXTRACT(MONTH FROM date) = :month")
).params(year=now.year, month=now.month).all()
if not rows:
return "本月還沒有記錄 📭"
total = sum(float(r.amount) for r in rows)
summary = {}
for r in rows:
key = r.item_name or "其他"
summary[key] = summary.get(key, 0) + float(r.amount)
lines = [f"{cat}${amt:.0f}" for cat, amt in sorted(summary.items(), key=lambda x: -x[1])]
return f"📊 本月統計({now.month}月):\n" + "\n".join(lines) + f"\n\n💰 總計:${total:.0f}"
except Exception as e:
print("❌ 查詢失敗:", e)
return "查詢失敗,請稍後再試"
finally:
db.close()

View File

@@ -0,0 +1,50 @@
from line import captcha_state
from Linebot_handler.Expense import (
EXPENSE_TEMPLATE,
save_expense,
delete_expense,
query_today,
query_month,
parse_multiline_expense,
)
def handle_text(line_user_id: str, text: str) -> str:
if text == "查今天":
return query_today(line_user_id)
if text == "查本月":
return query_month(line_user_id)
if text in ("記帳", "記帳說明"):
return EXPENSE_TEMPLATE
if text.startswith("刪除 "):
return delete_expense(line_user_id, text[3:].strip())
# 多行記帳
if "\n" in text:
fields = parse_multiline_expense(text)
if fields:
return save_expense(line_user_id, fields)
return (
"格式不正確 😅\n\n"
"必填欄位:品項、類別、金額\n"
"點下方「記帳」取得模板!"
)
return (
"點下方「記帳」取得記帳模板\n"
"或輸入「查今天」/「查本月」查詢"
)
def handle_captcha(text: str) -> bool:
"""回傳 True 表示是驗證碼輸入"""
if (
text.isdigit()
and len(text) == 5
and captcha_state.captcha_event
and not captcha_state.captcha_event.is_set()
):
captcha_state.captcha_answer = text
captcha_state.captcha_event.set()
return True
return False

View File

@@ -0,0 +1,59 @@
from fastapi import APIRouter, Request, HTTPException
from linebot.v3 import WebhookHandler
from linebot.v3.messaging import (
Configuration,
ApiClient,
MessagingApi,
ReplyMessageRequest,
TextMessage,
)
from linebot.v3.webhooks import MessageEvent, TextMessageContent, FollowEvent
from linebot.v3.exceptions import InvalidSignatureError
from config import LINE_CHANNEL_ACCESS_TOKEN, LINE_CHANNEL_SECRET
from Linebot_handler.Handlers import handle_text, handle_captcha
router = APIRouter()
configuration = Configuration(access_token=LINE_CHANNEL_ACCESS_TOKEN)
handler = WebhookHandler(LINE_CHANNEL_SECRET)
@router.post("/webhook")
async def webhook(request: Request):
signature = request.headers.get("X-Line-Signature", "")
body = await request.body()
try:
handler.handle(body.decode(), signature)
except InvalidSignatureError:
raise HTTPException(status_code=400, detail="Invalid signature")
return "OK"
def _reply(reply_token: str, text: str):
with ApiClient(configuration) as api_client:
MessagingApi(api_client).reply_message(
ReplyMessageRequest(
reply_token=reply_token,
messages=[TextMessage(text=text)]
)
)
@handler.add(MessageEvent, message=TextMessageContent)
def on_message(event):
line_user_id = event.source.user_id
msg = event.message.text.strip()
if handle_captcha(msg):
_reply(event.reply_token, "✅ 驗證碼已送出!")
else:
_reply(event.reply_token, handle_text(line_user_id, msg))
@handler.add(FollowEvent)
def on_follow(event):
_reply(event.reply_token, (
"👋 歡迎使用 Myfinance 記帳 Bot\n\n"
"📝 點下方「記帳」開始記帳\n"
"📊 點「查今天」或「查本月」查詢\n\n"
"開始記帳吧!💪"
))