Files
linebot_finance/app/old_main.py
henry yo f9aa2b7774 feat: linebot
change groq model
2026-03-16 01:34:44 +08:00

247 lines
8.5 KiB
Python
Executable File
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import os
import re
import asyncio
import threading
import nest_asyncio
nest_asyncio.apply()
import captcha_state
from invoice_fetcher import main
from dotenv import load_dotenv
from fastapi import FastAPI, Request, HTTPException
from fastapi.staticfiles import StaticFiles
from linebot.v3 import WebhookHandler
from linebot.v3.messaging import (
Configuration,
ApiClient,
MessagingApi,
ReplyMessageRequest,
TextMessage,
)
from linebot.v3.webhooks import MessageEvent, TextMessageContent, FollowEvent
from linebot.v3.exceptions import InvalidSignatureError
from sqlalchemy import create_engine, Column, Integer, String, Float, DateTime, text
from sqlalchemy.orm import declarative_base, sessionmaker
from datetime import datetime
load_dotenv()
app = FastAPI()
# LINE 設定
configuration = Configuration(access_token=os.getenv("LINE_CHANNEL_ACCESS_TOKEN"))
handler = WebhookHandler(os.getenv("LINE_CHANNEL_SECRET"))
# DB 設定
# DATABASE_URL = os.getenv("DATABASE_URL")
# engine = create_engine(DATABASE_URL)
# SessionLocal = sessionmaker(bind=engine)
# Base = declarative_base()
# 資料表
# class Transaction(Base):
# __tablename__ = "transactions"
# id = Column(Integer, primary_key=True, index=True)
# user_id = Column(String)
# category = Column(String)
# amount = Column(Float)
# note = Column(String, nullable=True)
# created_at = Column(DateTime, default=datetime.now)
# Base.metadata.create_all(bind=engine)
# Webhook endpoint
@app.post("/webhook")
async def webhook(request: Request):
signature = request.headers.get("X-Line-Signature", "")
body = await request.body()
try:
handler.handle(body.decode(), signature)
except InvalidSignatureError:
raise HTTPException(status_code=400, detail="Invalid signature")
return "OK"
def run_fetch_in_thread():
# 開一個全新的 event loop 跑 Playwright
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
loop.run_until_complete(main())
finally:
loop.close()
@app.get("/fetch")
async def fetch_invoices():
print("🚀 開始抓取發票...")
thread = threading.Thread(target=run_fetch_in_thread)
thread.start()
return {"status": "started"} # 立刻回傳,不等爬蟲
@handler.add(MessageEvent, message=TextMessageContent)
def handle_message(event):
user_id = event.source.user_id
text = event.message.text.strip()
if text.isdigit() and captcha_state.captcha_event and not captcha_state.captcha_event.is_set():
captcha_state.captcha_answer = text
captcha_state.captcha_event.set() # 通知爬蟲
reply = "✅ 驗證碼已送出!"
else:
reply = parse_and_save(user_id, text)
with ApiClient(configuration) as api_client:
line_bot_api = MessagingApi(api_client)
line_bot_api.reply_message(
ReplyMessageRequest(
reply_token=event.reply_token,
messages=[TextMessage(text=reply)]
)
)
@handler.add(FollowEvent)
def handle_follow(event):
with ApiClient(configuration) as api_client:
line_bot_api = MessagingApi(api_client)
line_bot_api.reply_message(
ReplyMessageRequest(
reply_token=event.reply_token,
messages=[TextMessage(text=(
"👋 歡迎使用 Myfinance 記帳 Bot\n\n"
"📝 記帳格式:\n"
"類別 金額\n"
"類別 金額 備註\n\n"
"範例:\n"
"早餐 80\n"
"午餐 120 便當\n"
"交通 50 捷運\n\n"
"📊 查詢指令:\n"
"查今天 → 今日明細\n"
"查本月 → 本月統計\n\n"
"開始記帳吧!💪"
))]
)
)
def parse_and_save(user_id: str, text: str) -> str:
# 查詢指令
if text == "查今天":
return query_today(user_id)
if text == "查本月":
return query_month(user_id)
if text == "記帳說明":
return (
"📝 記帳格式:\n"
"類別 金額\n"
"類別 金額 備註\n\n"
"範例:\n"
"早餐 80\n"
"午餐 120 便當\n"
"交通 50 捷運\n\n"
"輸入後 Bot 會自動記錄 ✅"
)
# 刪除指令:刪除 1 或 刪除 早餐
if text.startswith("刪除 "):
target = text[3:].strip()
return delete_transaction(user_id, target)
if re.match(r"^\d{5}$", text):
return f"接收到驗證碼{text}"
return "格式錯誤 😅\n記帳請輸入:類別 金額\n例如:早餐 80\n\n查詢請輸入:查今天 / 查本月"
def save_transaction(user_id, category, amount, note):
print(f"記錄交易(模擬):{user_id} {category} {amount} {note}")
# db = SessionLocal()
# try:
# db.add(Transaction(user_id=user_id, category=category, amount=amount, note=note))
# db.commit()
# finally:
# db.close()
def delete_transaction(user_id: str, target: str) -> str:
# db = SessionLocal()
try:
today = datetime.now().date()
# rows = db.query(Transaction).filter(
# Transaction.user_id == user_id,
# text("DATE(created_at) = :today")
# ).params(today=today).all()
# if not rows:
# return "今天還沒有記錄 📭"
# 用編號刪除
# if target.isdigit():
# idx = int(target) - 1
# if idx < 0 or idx >= len(rows):
# return f"沒有第 {target} 筆記錄,請先輸入「查今天」確認編號"
# row = rows[idx]
# db.delete(row)
# db.commit()
# return f"✅ 已刪除:{row.category} ${row.amount:.0f}"
# 用類別刪除(刪最後一筆)
# matched = [r for r in rows if r.category == target]
# if not matched:
# return f"今天沒有「{target}」的記錄"
# row = matched[-1]
# db.delete(row)
# db.commit()
# return f"✅ 已刪除:{row.category} ${row.amount:.0f}"
return f"已刪除(模擬)"
# finally:
# db.close()
except Exception as e:
print("❌ 刪除失敗:", e)
return "刪除失敗,請稍後再試"
def query_today(user_id):
# db = SessionLocal()
try:
today = datetime.now().date()
# rows = db.query(Transaction).filter(
# Transaction.user_id == user_id,
# text("DATE(created_at) = :today")
# ).params(today=today).all()
# if not rows:
# return "今天還沒有記錄 📭"
# total = sum(r.amount for r in rows)
# lines = [
# f"{i+1}. {r.category} ${r.amount:.0f}" + (f"{r.note}" if r.note else "")
# for i, r in enumerate(rows)
# ]
# return "📋 今日記錄:\n" + "\n".join(lines) + f"\n\n💰 合計:${total:.0f}\n\n🗑 刪除請輸入:刪除 編號\n例如刪除 1"
print(f"查詢今日記錄(模擬)")
return "📋 今日記錄:\n1. 早餐 $80\n2. 午餐 $120 便當\n3. 交通 $50 捷運\n\n💰 合計:$250\n\n🗑 刪除請輸入:刪除 編號\n例如:刪除 1"
# finally:
# db.close()
except Exception as e:
print("❌ 查詢失敗:", e)
return "查詢失敗,請稍後再試"
def query_month(user_id):
# db = SessionLocal()
try:
now = datetime.now()
# rows = db.query(Transaction).filter(
# Transaction.user_id == user_id,
# text("EXTRACT(YEAR FROM created_at) = :year AND EXTRACT(MONTH FROM created_at) = :month")
# ).params(year=now.year, month=now.month).all()
# if not rows:
# return "本月還沒有記錄 📭"
# total = sum(r.amount for r in rows)
# 依類別統計
# summary = {}
# for r in rows:
# summary[r.category] = summary.get(r.category, 0) + r.amount
# lines = [f"{cat}${amt:.0f}" for cat, amt in sorted(summary.items(), key=lambda x: -x[1])]
# return f"📊 本月統計({now.month}月):\n" + "\n".join(lines) + f"\n\n💰 總計:${total:.0f}"
print(f"查詢本月記錄(模擬)")
return f"📊 本月統計({now.month}月):\n早餐:$800\n午餐:$1200\n交通:$500\n\n💰 總計:$2500"
# finally:
# db.close()
except Exception as e:
print("❌ 查詢失敗:", e)
return "查詢失敗,請稍後再試"