mirror of
https://github.com/henry4682/linebot_finance.git
synced 2026-05-16 04:41:52 +00:00
248 lines
8.5 KiB
Python
248 lines
8.5 KiB
Python
import os
|
||
import re
|
||
import asyncio
|
||
import threading
|
||
import nest_asyncio
|
||
nest_asyncio.apply()
|
||
import captcha_state
|
||
from invoice_fetcher import main
|
||
from dotenv import load_dotenv
|
||
from fastapi import FastAPI, Request, HTTPException
|
||
from fastapi.staticfiles import StaticFiles
|
||
from linebot.v3 import WebhookHandler
|
||
from linebot.v3.messaging import (
|
||
Configuration,
|
||
ApiClient,
|
||
MessagingApi,
|
||
ReplyMessageRequest,
|
||
TextMessage,
|
||
)
|
||
from linebot.v3.webhooks import MessageEvent, TextMessageContent, FollowEvent
|
||
from linebot.v3.exceptions import InvalidSignatureError
|
||
from sqlalchemy import create_engine, Column, Integer, String, Float, DateTime, text
|
||
from sqlalchemy.orm import declarative_base, sessionmaker
|
||
from datetime import datetime
|
||
|
||
|
||
load_dotenv()
|
||
|
||
app = FastAPI()
|
||
|
||
# LINE 設定
|
||
configuration = Configuration(access_token=os.getenv("LINE_CHANNEL_ACCESS_TOKEN"))
|
||
handler = WebhookHandler(os.getenv("LINE_CHANNEL_SECRET"))
|
||
|
||
# DB 設定
|
||
# DATABASE_URL = os.getenv("DATABASE_URL")
|
||
# engine = create_engine(DATABASE_URL)
|
||
# SessionLocal = sessionmaker(bind=engine)
|
||
# Base = declarative_base()
|
||
|
||
# 資料表
|
||
# class Transaction(Base):
|
||
# __tablename__ = "transactions"
|
||
# id = Column(Integer, primary_key=True, index=True)
|
||
# user_id = Column(String)
|
||
# category = Column(String)
|
||
# amount = Column(Float)
|
||
# note = Column(String, nullable=True)
|
||
# created_at = Column(DateTime, default=datetime.now)
|
||
|
||
# Base.metadata.create_all(bind=engine)
|
||
|
||
# Webhook endpoint
|
||
@app.post("/webhook")
|
||
async def webhook(request: Request):
|
||
signature = request.headers.get("X-Line-Signature", "")
|
||
body = await request.body()
|
||
try:
|
||
handler.handle(body.decode(), signature)
|
||
except InvalidSignatureError:
|
||
raise HTTPException(status_code=400, detail="Invalid signature")
|
||
return "OK"
|
||
|
||
def run_fetch_in_thread():
|
||
# 開一個全新的 event loop 跑 Playwright
|
||
loop = asyncio.new_event_loop()
|
||
asyncio.set_event_loop(loop)
|
||
try:
|
||
loop.run_until_complete(main())
|
||
finally:
|
||
loop.close()
|
||
|
||
@app.get("/fetch")
|
||
async def fetch_invoices():
|
||
print("🚀 開始抓取發票...")
|
||
thread = threading.Thread(target=run_fetch_in_thread)
|
||
thread.start()
|
||
return {"status": "started"} # 立刻回傳,不等爬蟲
|
||
|
||
@handler.add(MessageEvent, message=TextMessageContent)
|
||
def handle_message(event):
|
||
user_id = event.source.user_id
|
||
text = event.message.text.strip()
|
||
if text.isdigit() and captcha_state.captcha_event and not captcha_state.captcha_event.is_set():
|
||
captcha_state.captcha_answer = text
|
||
captcha_state.captcha_event.set() # 通知爬蟲
|
||
reply = "✅ 驗證碼已送出!"
|
||
else:
|
||
reply = parse_and_save(user_id, text)
|
||
|
||
with ApiClient(configuration) as api_client:
|
||
line_bot_api = MessagingApi(api_client)
|
||
line_bot_api.reply_message(
|
||
ReplyMessageRequest(
|
||
reply_token=event.reply_token,
|
||
messages=[TextMessage(text=reply)]
|
||
)
|
||
)
|
||
|
||
@handler.add(FollowEvent)
|
||
def handle_follow(event):
|
||
with ApiClient(configuration) as api_client:
|
||
line_bot_api = MessagingApi(api_client)
|
||
line_bot_api.reply_message(
|
||
ReplyMessageRequest(
|
||
reply_token=event.reply_token,
|
||
messages=[TextMessage(text=(
|
||
"👋 歡迎使用 Myfinance 記帳 Bot!\n\n"
|
||
"📝 記帳格式:\n"
|
||
"類別 金額\n"
|
||
"類別 金額 備註\n\n"
|
||
"範例:\n"
|
||
"早餐 80\n"
|
||
"午餐 120 便當\n"
|
||
"交通 50 捷運\n\n"
|
||
"📊 查詢指令:\n"
|
||
"查今天 → 今日明細\n"
|
||
"查本月 → 本月統計\n\n"
|
||
"開始記帳吧!💪"
|
||
))]
|
||
)
|
||
)
|
||
|
||
def parse_and_save(user_id: str, text: str) -> str:
|
||
# 查詢指令
|
||
if text == "查今天":
|
||
return query_today(user_id)
|
||
if text == "查本月":
|
||
return query_month(user_id)
|
||
if text == "記帳說明":
|
||
return (
|
||
"📝 記帳格式:\n"
|
||
"類別 金額\n"
|
||
"類別 金額 備註\n\n"
|
||
"範例:\n"
|
||
"早餐 80\n"
|
||
"午餐 120 便當\n"
|
||
"交通 50 捷運\n\n"
|
||
"輸入後 Bot 會自動記錄 ✅"
|
||
)
|
||
|
||
# 刪除指令:刪除 1 或 刪除 早餐
|
||
if text.startswith("刪除 "):
|
||
target = text[3:].strip()
|
||
return delete_transaction(user_id, target)
|
||
|
||
|
||
if re.match(r"^\d{5}$", text):
|
||
return f"接收到驗證碼{text}"
|
||
|
||
return "格式錯誤 😅\n記帳請輸入:類別 金額\n例如:早餐 80\n\n查詢請輸入:查今天 / 查本月"
|
||
|
||
def save_transaction(user_id, category, amount, note):
|
||
print(f"記錄交易(模擬):{user_id} {category} {amount} {note}")
|
||
# db = SessionLocal()
|
||
# try:
|
||
# db.add(Transaction(user_id=user_id, category=category, amount=amount, note=note))
|
||
# db.commit()
|
||
# finally:
|
||
# db.close()
|
||
|
||
def delete_transaction(user_id: str, target: str) -> str:
|
||
# db = SessionLocal()
|
||
try:
|
||
today = datetime.now().date()
|
||
# rows = db.query(Transaction).filter(
|
||
# Transaction.user_id == user_id,
|
||
# text("DATE(created_at) = :today")
|
||
# ).params(today=today).all()
|
||
|
||
# if not rows:
|
||
# return "今天還沒有記錄 📭"
|
||
|
||
# 用編號刪除
|
||
# if target.isdigit():
|
||
# idx = int(target) - 1
|
||
# if idx < 0 or idx >= len(rows):
|
||
# return f"沒有第 {target} 筆記錄,請先輸入「查今天」確認編號"
|
||
# row = rows[idx]
|
||
# db.delete(row)
|
||
# db.commit()
|
||
# return f"✅ 已刪除:{row.category} ${row.amount:.0f}"
|
||
|
||
# 用類別刪除(刪最後一筆)
|
||
# matched = [r for r in rows if r.category == target]
|
||
# if not matched:
|
||
# return f"今天沒有「{target}」的記錄"
|
||
# row = matched[-1]
|
||
# db.delete(row)
|
||
# db.commit()
|
||
# return f"✅ 已刪除:{row.category} ${row.amount:.0f}"
|
||
return f"已刪除(模擬)"
|
||
|
||
# finally:
|
||
# db.close()
|
||
except Exception as e:
|
||
print("❌ 刪除失敗:", e)
|
||
return "刪除失敗,請稍後再試"
|
||
|
||
def query_today(user_id):
|
||
# db = SessionLocal()
|
||
try:
|
||
today = datetime.now().date()
|
||
# rows = db.query(Transaction).filter(
|
||
# Transaction.user_id == user_id,
|
||
# text("DATE(created_at) = :today")
|
||
# ).params(today=today).all()
|
||
# if not rows:
|
||
# return "今天還沒有記錄 📭"
|
||
# total = sum(r.amount for r in rows)
|
||
# lines = [
|
||
# f"{i+1}. {r.category} ${r.amount:.0f}" + (f"({r.note})" if r.note else "")
|
||
# for i, r in enumerate(rows)
|
||
# ]
|
||
# return "📋 今日記錄:\n" + "\n".join(lines) + f"\n\n💰 合計:${total:.0f}\n\n🗑 刪除請輸入:刪除 編號\n例如:刪除 1"
|
||
print(f"查詢今日記錄(模擬)")
|
||
return "📋 今日記錄:\n1. 早餐 $80\n2. 午餐 $120 便當\n3. 交通 $50 捷運\n\n💰 合計:$250\n\n🗑 刪除請輸入:刪除 編號\n例如:刪除 1"
|
||
# finally:
|
||
# db.close()
|
||
except Exception as e:
|
||
print("❌ 查詢失敗:", e)
|
||
return "查詢失敗,請稍後再試"
|
||
|
||
def query_month(user_id):
|
||
# db = SessionLocal()
|
||
try:
|
||
now = datetime.now()
|
||
# rows = db.query(Transaction).filter(
|
||
# Transaction.user_id == user_id,
|
||
# text("EXTRACT(YEAR FROM created_at) = :year AND EXTRACT(MONTH FROM created_at) = :month")
|
||
# ).params(year=now.year, month=now.month).all()
|
||
# if not rows:
|
||
# return "本月還沒有記錄 📭"
|
||
# total = sum(r.amount for r in rows)
|
||
# 依類別統計
|
||
# summary = {}
|
||
# for r in rows:
|
||
# summary[r.category] = summary.get(r.category, 0) + r.amount
|
||
# lines = [f"{cat}:${amt:.0f}" for cat, amt in sorted(summary.items(), key=lambda x: -x[1])]
|
||
# return f"📊 本月統計({now.month}月):\n" + "\n".join(lines) + f"\n\n💰 總計:${total:.0f}"
|
||
print(f"查詢本月記錄(模擬)")
|
||
return f"📊 本月統計({now.month}月):\n早餐:$800\n午餐:$1200\n交通:$500\n\n💰 總計:$2500"
|
||
# finally:
|
||
# db.close()
|
||
except Exception as e:
|
||
print("❌ 查詢失敗:", e)
|
||
return "查詢失敗,請稍後再試"
|
||
# comment |