import os import re import asyncio import threading import nest_asyncio nest_asyncio.apply() import captcha_state from invoice_fetcher import main from dotenv import load_dotenv from fastapi import FastAPI, Request, HTTPException from fastapi.staticfiles import StaticFiles from linebot.v3 import WebhookHandler from linebot.v3.messaging import ( Configuration, ApiClient, MessagingApi, ReplyMessageRequest, TextMessage, ) from linebot.v3.webhooks import MessageEvent, TextMessageContent, FollowEvent from linebot.v3.exceptions import InvalidSignatureError from sqlalchemy import create_engine, Column, Integer, String, Float, DateTime, text from sqlalchemy.orm import declarative_base, sessionmaker from datetime import datetime load_dotenv() app = FastAPI() # LINE 設定 configuration = Configuration(access_token=os.getenv("LINE_CHANNEL_ACCESS_TOKEN")) handler = WebhookHandler(os.getenv("LINE_CHANNEL_SECRET")) # DB 設定 # DATABASE_URL = os.getenv("DATABASE_URL") # engine = create_engine(DATABASE_URL) # SessionLocal = sessionmaker(bind=engine) # Base = declarative_base() # 資料表 # class Transaction(Base): # __tablename__ = "transactions" # id = Column(Integer, primary_key=True, index=True) # user_id = Column(String) # category = Column(String) # amount = Column(Float) # note = Column(String, nullable=True) # created_at = Column(DateTime, default=datetime.now) # Base.metadata.create_all(bind=engine) # Webhook endpoint @app.post("/webhook") async def webhook(request: Request): signature = request.headers.get("X-Line-Signature", "") body = await request.body() try: handler.handle(body.decode(), signature) except InvalidSignatureError: raise HTTPException(status_code=400, detail="Invalid signature") return "OK" def run_fetch_in_thread(): # 開一個全新的 event loop 跑 Playwright loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) try: loop.run_until_complete(main()) finally: loop.close() @app.get("/fetch") async def fetch_invoices(): print("🚀 開始抓取發票...") thread = threading.Thread(target=run_fetch_in_thread) thread.start() return {"status": "started"} # 立刻回傳,不等爬蟲 @handler.add(MessageEvent, message=TextMessageContent) def handle_message(event): user_id = event.source.user_id text = event.message.text.strip() if text.isdigit() and captcha_state.captcha_event and not captcha_state.captcha_event.is_set(): captcha_state.captcha_answer = text captcha_state.captcha_event.set() # 通知爬蟲 reply = "✅ 驗證碼已送出!" else: reply = parse_and_save(user_id, text) with ApiClient(configuration) as api_client: line_bot_api = MessagingApi(api_client) line_bot_api.reply_message( ReplyMessageRequest( reply_token=event.reply_token, messages=[TextMessage(text=reply)] ) ) @handler.add(FollowEvent) def handle_follow(event): with ApiClient(configuration) as api_client: line_bot_api = MessagingApi(api_client) line_bot_api.reply_message( ReplyMessageRequest( reply_token=event.reply_token, messages=[TextMessage(text=( "👋 歡迎使用 Myfinance 記帳 Bot!\n\n" "📝 記帳格式:\n" "類別 金額\n" "類別 金額 備註\n\n" "範例:\n" "早餐 80\n" "午餐 120 便當\n" "交通 50 捷運\n\n" "📊 查詢指令:\n" "查今天 → 今日明細\n" "查本月 → 本月統計\n\n" "開始記帳吧!💪" ))] ) ) def parse_and_save(user_id: str, text: str) -> str: # 查詢指令 if text == "查今天": return query_today(user_id) if text == "查本月": return query_month(user_id) if text == "記帳說明": return ( "📝 記帳格式:\n" "類別 金額\n" "類別 金額 備註\n\n" "範例:\n" "早餐 80\n" "午餐 120 便當\n" "交通 50 捷運\n\n" "輸入後 Bot 會自動記錄 ✅" ) # 刪除指令:刪除 1 或 刪除 早餐 if text.startswith("刪除 "): target = text[3:].strip() return delete_transaction(user_id, target) if re.match(r"^\d{5}$", text): return f"接收到驗證碼{text}" return "格式錯誤 😅\n記帳請輸入:類別 金額\n例如:早餐 80\n\n查詢請輸入:查今天 / 查本月" def save_transaction(user_id, category, amount, note): print(f"記錄交易(模擬):{user_id} {category} {amount} {note}") # db = SessionLocal() # try: # db.add(Transaction(user_id=user_id, category=category, amount=amount, note=note)) # db.commit() # finally: # db.close() def delete_transaction(user_id: str, target: str) -> str: # db = SessionLocal() try: today = datetime.now().date() # rows = db.query(Transaction).filter( # Transaction.user_id == user_id, # text("DATE(created_at) = :today") # ).params(today=today).all() # if not rows: # return "今天還沒有記錄 📭" # 用編號刪除 # if target.isdigit(): # idx = int(target) - 1 # if idx < 0 or idx >= len(rows): # return f"沒有第 {target} 筆記錄,請先輸入「查今天」確認編號" # row = rows[idx] # db.delete(row) # db.commit() # return f"✅ 已刪除:{row.category} ${row.amount:.0f}" # 用類別刪除(刪最後一筆) # matched = [r for r in rows if r.category == target] # if not matched: # return f"今天沒有「{target}」的記錄" # row = matched[-1] # db.delete(row) # db.commit() # return f"✅ 已刪除:{row.category} ${row.amount:.0f}" return f"已刪除(模擬)" # finally: # db.close() except Exception as e: print("❌ 刪除失敗:", e) return "刪除失敗,請稍後再試" def query_today(user_id): # db = SessionLocal() try: today = datetime.now().date() # rows = db.query(Transaction).filter( # Transaction.user_id == user_id, # text("DATE(created_at) = :today") # ).params(today=today).all() # if not rows: # return "今天還沒有記錄 📭" # total = sum(r.amount for r in rows) # lines = [ # f"{i+1}. {r.category} ${r.amount:.0f}" + (f"({r.note})" if r.note else "") # for i, r in enumerate(rows) # ] # return "📋 今日記錄:\n" + "\n".join(lines) + f"\n\n💰 合計:${total:.0f}\n\n🗑 刪除請輸入:刪除 編號\n例如:刪除 1" print(f"查詢今日記錄(模擬)") return "📋 今日記錄:\n1. 早餐 $80\n2. 午餐 $120 便當\n3. 交通 $50 捷運\n\n💰 合計:$250\n\n🗑 刪除請輸入:刪除 編號\n例如:刪除 1" # finally: # db.close() except Exception as e: print("❌ 查詢失敗:", e) return "查詢失敗,請稍後再試" def query_month(user_id): # db = SessionLocal() try: now = datetime.now() # rows = db.query(Transaction).filter( # Transaction.user_id == user_id, # text("EXTRACT(YEAR FROM created_at) = :year AND EXTRACT(MONTH FROM created_at) = :month") # ).params(year=now.year, month=now.month).all() # if not rows: # return "本月還沒有記錄 📭" # total = sum(r.amount for r in rows) # 依類別統計 # summary = {} # for r in rows: # summary[r.category] = summary.get(r.category, 0) + r.amount # lines = [f"{cat}:${amt:.0f}" for cat, amt in sorted(summary.items(), key=lambda x: -x[1])] # return f"📊 本月統計({now.month}月):\n" + "\n".join(lines) + f"\n\n💰 總計:${total:.0f}" print(f"查詢本月記錄(模擬)") return f"📊 本月統計({now.month}月):\n早餐:$800\n午餐:$1200\n交通:$500\n\n💰 總計:$2500" # finally: # db.close() except Exception as e: print("❌ 查詢失敗:", e) return "查詢失敗,請稍後再試" # comment