mirror of
https://github.com/henry4682/linebot_finance.git
synced 2026-05-16 04:41:52 +00:00
208 lines
7.0 KiB
Python
208 lines
7.0 KiB
Python
import os
|
||
import re
|
||
from dotenv import load_dotenv
|
||
from fastapi import FastAPI, Request, HTTPException
|
||
from linebot.v3 import WebhookHandler
|
||
from linebot.v3.messaging import (
|
||
Configuration,
|
||
ApiClient,
|
||
MessagingApi,
|
||
ReplyMessageRequest,
|
||
TextMessage,
|
||
)
|
||
from linebot.v3.webhooks import MessageEvent, TextMessageContent, FollowEvent
|
||
from linebot.v3.exceptions import InvalidSignatureError
|
||
from sqlalchemy import create_engine, Column, Integer, String, Float, DateTime, text
|
||
from sqlalchemy.orm import declarative_base, sessionmaker
|
||
from datetime import datetime
|
||
|
||
|
||
load_dotenv()
|
||
|
||
app = FastAPI()
|
||
|
||
# LINE 設定
|
||
configuration = Configuration(access_token=os.getenv("LINE_CHANNEL_ACCESS_TOKEN"))
|
||
handler = WebhookHandler(os.getenv("LINE_CHANNEL_SECRET"))
|
||
|
||
# DB 設定
|
||
DATABASE_URL = os.getenv("DATABASE_URL")
|
||
engine = create_engine(DATABASE_URL)
|
||
SessionLocal = sessionmaker(bind=engine)
|
||
Base = declarative_base()
|
||
|
||
# 資料表
|
||
class Transaction(Base):
|
||
__tablename__ = "transactions"
|
||
id = Column(Integer, primary_key=True, index=True)
|
||
user_id = Column(String)
|
||
category = Column(String)
|
||
amount = Column(Float)
|
||
note = Column(String, nullable=True)
|
||
created_at = Column(DateTime, default=datetime.now)
|
||
|
||
Base.metadata.create_all(bind=engine)
|
||
|
||
# Webhook endpoint
|
||
@app.post("/webhook")
|
||
async def webhook(request: Request):
|
||
signature = request.headers.get("X-Line-Signature", "")
|
||
body = await request.body()
|
||
try:
|
||
handler.handle(body.decode(), signature)
|
||
except InvalidSignatureError:
|
||
raise HTTPException(status_code=400, detail="Invalid signature")
|
||
return "OK"
|
||
|
||
@handler.add(MessageEvent, message=TextMessageContent)
|
||
def handle_message(event):
|
||
user_id = event.source.user_id
|
||
text = event.message.text.strip()
|
||
reply = parse_and_save(user_id, text)
|
||
|
||
with ApiClient(configuration) as api_client:
|
||
line_bot_api = MessagingApi(api_client)
|
||
line_bot_api.reply_message(
|
||
ReplyMessageRequest(
|
||
reply_token=event.reply_token,
|
||
messages=[TextMessage(text=reply)]
|
||
)
|
||
)
|
||
|
||
@handler.add(FollowEvent)
|
||
def handle_follow(event):
|
||
with ApiClient(configuration) as api_client:
|
||
line_bot_api = MessagingApi(api_client)
|
||
line_bot_api.reply_message(
|
||
ReplyMessageRequest(
|
||
reply_token=event.reply_token,
|
||
messages=[TextMessage(text=(
|
||
"👋 歡迎使用 Myfinance 記帳 Bot!\n\n"
|
||
"📝 記帳格式:\n"
|
||
"類別 金額\n"
|
||
"類別 金額 備註\n\n"
|
||
"範例:\n"
|
||
"早餐 80\n"
|
||
"午餐 120 便當\n"
|
||
"交通 50 捷運\n\n"
|
||
"📊 查詢指令:\n"
|
||
"查今天 → 今日明細\n"
|
||
"查本月 → 本月統計\n\n"
|
||
"開始記帳吧!💪"
|
||
))]
|
||
)
|
||
)
|
||
|
||
def parse_and_save(user_id: str, text: str) -> str:
|
||
# 查詢指令
|
||
if text == "查今天":
|
||
return query_today(user_id)
|
||
if text == "查本月":
|
||
return query_month(user_id)
|
||
if text == "記帳說明":
|
||
return (
|
||
"📝 記帳格式:\n"
|
||
"類別 金額\n"
|
||
"類別 金額 備註\n\n"
|
||
"範例:\n"
|
||
"早餐 80\n"
|
||
"午餐 120 便當\n"
|
||
"交通 50 捷運\n\n"
|
||
"輸入後 Bot 會自動記錄 ✅"
|
||
)
|
||
# 刪除指令:刪除 1 或 刪除 早餐
|
||
if text.startswith("刪除 "):
|
||
target = text[3:].strip()
|
||
return delete_transaction(user_id, target)
|
||
|
||
# 記帳格式:「早餐 80」或「早餐 80 備註」
|
||
match = re.match(r"^(\S+)\s+(\d+(?:\.\d+)?)(?:\s+(.+))?$", text)
|
||
if match:
|
||
category = match.group(1)
|
||
amount = float(match.group(2))
|
||
note = match.group(3)
|
||
save_transaction(user_id, category, amount, note)
|
||
return f"✅ 已記錄:{category} ${amount:.0f}" + (f"({note})" if note else "")
|
||
|
||
return "格式錯誤 😅\n記帳請輸入:類別 金額\n例如:早餐 80\n\n查詢請輸入:查今天 / 查本月"
|
||
|
||
def save_transaction(user_id, category, amount, note):
|
||
db = SessionLocal()
|
||
try:
|
||
db.add(Transaction(user_id=user_id, category=category, amount=amount, note=note))
|
||
db.commit()
|
||
finally:
|
||
db.close()
|
||
|
||
def delete_transaction(user_id: str, target: str) -> str:
|
||
db = SessionLocal()
|
||
try:
|
||
today = datetime.now().date()
|
||
rows = db.query(Transaction).filter(
|
||
Transaction.user_id == user_id,
|
||
text("DATE(created_at) = :today")
|
||
).params(today=today).all()
|
||
|
||
if not rows:
|
||
return "今天還沒有記錄 📭"
|
||
|
||
# 用編號刪除
|
||
if target.isdigit():
|
||
idx = int(target) - 1
|
||
if idx < 0 or idx >= len(rows):
|
||
return f"沒有第 {target} 筆記錄,請先輸入「查今天」確認編號"
|
||
row = rows[idx]
|
||
db.delete(row)
|
||
db.commit()
|
||
return f"✅ 已刪除:{row.category} ${row.amount:.0f}"
|
||
|
||
# 用類別刪除(刪最後一筆)
|
||
matched = [r for r in rows if r.category == target]
|
||
if not matched:
|
||
return f"今天沒有「{target}」的記錄"
|
||
row = matched[-1]
|
||
db.delete(row)
|
||
db.commit()
|
||
return f"✅ 已刪除:{row.category} ${row.amount:.0f}"
|
||
|
||
finally:
|
||
db.close()
|
||
|
||
def query_today(user_id):
|
||
db = SessionLocal()
|
||
try:
|
||
today = datetime.now().date()
|
||
rows = db.query(Transaction).filter(
|
||
Transaction.user_id == user_id,
|
||
text("DATE(created_at) = :today")
|
||
).params(today=today).all()
|
||
if not rows:
|
||
return "今天還沒有記錄 📭"
|
||
total = sum(r.amount for r in rows)
|
||
lines = [
|
||
f"{i+1}. {r.category} ${r.amount:.0f}" + (f"({r.note})" if r.note else "")
|
||
for i, r in enumerate(rows)
|
||
]
|
||
return "📋 今日記錄:\n" + "\n".join(lines) + f"\n\n💰 合計:${total:.0f}\n\n🗑 刪除請輸入:刪除 編號\n例如:刪除 1"
|
||
finally:
|
||
db.close()
|
||
|
||
def query_month(user_id):
|
||
db = SessionLocal()
|
||
try:
|
||
now = datetime.now()
|
||
rows = db.query(Transaction).filter(
|
||
Transaction.user_id == user_id,
|
||
text("EXTRACT(YEAR FROM created_at) = :year AND EXTRACT(MONTH FROM created_at) = :month")
|
||
).params(year=now.year, month=now.month).all()
|
||
if not rows:
|
||
return "本月還沒有記錄 📭"
|
||
total = sum(r.amount for r in rows)
|
||
# 依類別統計
|
||
summary = {}
|
||
for r in rows:
|
||
summary[r.category] = summary.get(r.category, 0) + r.amount
|
||
lines = [f"{cat}:${amt:.0f}" for cat, amt in sorted(summary.items(), key=lambda x: -x[1])]
|
||
return f"📊 本月統計({now.month}月):\n" + "\n".join(lines) + f"\n\n💰 總計:${total:.0f}"
|
||
finally:
|
||
db.close() |