feat: linebot & invoice_fetcher

1. add manual captcha
2. remove transaction
This commit is contained in:
2026-03-09 15:35:49 +08:00
parent 1ea7feacf1
commit 1fa12fcfad
5 changed files with 317 additions and 153 deletions

4
app/captcha_state.py Normal file
View File

@@ -0,0 +1,4 @@
import asyncio
captcha_answer = None
captcha_event = None

View File

@@ -1,14 +1,27 @@
import asyncio
import base64
import os
import time
import asyncio
import requests
import anthropic
import urllib3
import threading
import captcha_state
from PIL import Image, ImageOps
import io
import cloudinary
import cloudinary.uploader
from datetime import datetime, timedelta
from dotenv import load_dotenv
from playwright.async_api import async_playwright
from sqlalchemy import create_engine, Column, Integer, String, Float, DateTime, text
from sqlalchemy.orm import declarative_base, sessionmaker
from linebot.v3.messaging import (
Configuration, ApiClient, MessagingApi,
PushMessageRequest, TextMessage, ImageMessage
)
from urllib.parse import parse_qs
urllib3.disable_warnings()
load_dotenv("../.env")
@@ -16,6 +29,13 @@ load_dotenv("../.env")
EINVOICE_USER = os.getenv("EINVOICE_USER")
EINVOICE_PASS = os.getenv("EINVOICE_PASS")
ANTHROPIC_API_KEY = os.getenv("ANTHROPIC_API_KEY")
MY_USER_ID = os.getenv("LINE_USER_ID")
cloudinary.config(
cloud_name=os.getenv("CLOUDINARY_CLOUD_NAME"),
api_key=os.getenv("CLOUDINARY_API_KEY"),
api_secret=os.getenv("CLOUDINARY_API_SECRET")
)
# 本地直接連 localhost
# DATABASE_URL = os.getenv("LOCAL_DATABASE_URL")
@@ -23,14 +43,14 @@ ANTHROPIC_API_KEY = os.getenv("ANTHROPIC_API_KEY")
# SessionLocal = sessionmaker(bind=engine)
Base = declarative_base()
class Transaction(Base):
__tablename__ = "transactions"
id = Column(Integer, primary_key=True, index=True)
user_id = Column(String)
category = Column(String)
amount = Column(Float)
note = Column(String, nullable=True)
created_at = Column(DateTime, default=datetime.now)
# class Transaction(Base):
# __tablename__ = "transactions"
# id = Column(Integer, primary_key=True, index=True)
# user_id = Column(String)
# category = Column(String)
# amount = Column(Float)
# note = Column(String, nullable=True)
# created_at = Column(DateTime, default=datetime.now)
def solve_captcha(img_b64: str) -> str:
client = anthropic.Anthropic(api_key=ANTHROPIC_API_KEY)
@@ -57,81 +77,150 @@ def solve_captcha(img_b64: str) -> str:
)
return msg.content[0].text.strip()
async def solve_captcha_manual(img_b64: str):
# 1. 解碼圖片並轉成白底
img_data = base64.b64decode(img_b64)
img = Image.open(io.BytesIO(img_data)).convert("RGBA")
# 建立白色背景
background = Image.new("RGBA", img.size, (255, 255, 255, 255))
background.paste(img, mask=img.split()[3]) # 用 alpha channel 合併
white_img = background.convert("RGB")
# 存成 bytes
buf = io.BytesIO()
white_img.save(buf, format="PNG")
buf.seek(0)
white_b64 = base64.b64encode(buf.read()).decode()
# 1. 上傳圖片到 Cloudinary
upload_res = cloudinary.uploader.upload(
f"data:image/png;base64,{white_b64}",
public_id=f"captcha_{int(time.time())}",
overwrite=True,
quality="auto:best",
fetch_format="png",
)
image_url = upload_res["secure_url"]
print(f"圖片 URL: {image_url}")
# 推播給 LINE
configuration = Configuration(access_token=os.getenv("LINE_CHANNEL_ACCESS_TOKEN"))
with ApiClient(configuration) as api_client:
line_bot_api = MessagingApi(api_client)
line_bot_api.push_message(PushMessageRequest(
to=os.getenv("LINE_USER_ID"),
messages=[
ImageMessage(
original_content_url=image_url,
preview_image_url=image_url
),
TextMessage(text="請輸入驗證碼數字:")
]
))
# 4. 等待回覆
# 用 threading.Event 等待
captcha_state.captcha_answer = None
captcha_state.captcha_event = threading.Event()
triggered = captcha_state.captcha_event.wait(timeout=120)
if not triggered:
raise Exception("⏰ 驗證碼等待超時")
return captcha_state.captcha_answer
async def login_and_get_token() -> str | None:
async with async_playwright() as p:
# 載入登入頁拿 login_challenge
browser = await p.chromium.launch(headless=False)
page = await browser.new_page()
await page.goto("https://www.einvoice.nat.gov.tw/accounts/login/mw")
await page.wait_for_timeout(8000)
url = page.url
print(f"目前 URL: {url}")
max_retry = 3
for attempt in range(max_retry):
print(f"登入嘗試第 {attempt + 1} 次...")
try:
async with async_playwright() as p:
# 載入登入頁拿 login_challenge
browser = await p.chromium.launch(headless=False)
page = await browser.new_page()
await page.goto("https://www.einvoice.nat.gov.tw/accounts/login/mw")
await page.wait_for_timeout(8000)
url = page.url
print(f"目前 URL: {url}")
from urllib.parse import parse_qs
fragment = url.split("?")[-1] if "?" in url else ""
params = parse_qs(fragment)
login_challenge = params.get("login_challenge", [None])[0]
print(f"login_challenge: {login_challenge}")
# 拿驗證碼
res = requests.get(
"https://service-mc.einvoice.nat.gov.tw/act/login/api/act002i/captcha",
verify=False
)
captcha_data = res.json()
captcha_token = captcha_data["token"]
captcha_text = solve_captcha(captcha_data["image"])
print(f"驗證碼: {captcha_text}")
fragment = url.split("?")[-1] if "?" in url else ""
params = parse_qs(fragment)
login_challenge = params.get("login_challenge", [None])[0]
print(f"login_challenge: {login_challenge}")
# 登入
res = requests.post(
"https://service-mc.einvoice.nat.gov.tw/act/login/api/client/doLogin",
json={
"loginType": "U",
"userType": "MW",
"loginChallenge": login_challenge,
"captchaToken": captcha_token,
"captcha": captcha_text,
"customId": EINVOICE_USER,
"password": EINVOICE_PASS,
},
verify=False
)
data = res.json()
redirect_url = data.get("redirectTo")
print(f"redirectTo: {redirect_url}")
# 拿驗證碼
res = requests.get(
"https://service-mc.einvoice.nat.gov.tw/act/login/api/act002i/captcha",
verify=False
)
captcha_data = res.json()
captcha_token = captcha_data["token"]
if not redirect_url:
print(f"登入失敗: {data}")
await browser.close()
return None
# 將拿到的圖片存成檔案穰後轉給linebot處理
# ✅ 透過 LINE Bot 取得驗證碼
captcha_text = await solve_captcha_manual(captcha_data["image"])
print(f"驗證碼: {captcha_text}")
# 跟隨 redirect 讓 token 存進 localStorage
await page.goto(redirect_url)
await page.wait_for_load_state("domcontentloaded")
await page.wait_for_timeout(8000) # 等久一點
# 登入
res = requests.post(
"https://service-mc.einvoice.nat.gov.tw/act/login/api/client/doLogin",
json={
"loginType": "U",
"userType": "MW",
"loginChallenge": login_challenge,
"captchaToken": captcha_token,
"captcha": captcha_text,
"customId": EINVOICE_USER,
"password": EINVOICE_PASS,
},
verify=False
)
data = res.json()
redirect_url = data.get("redirectTo")
print(f"redirectTo: {redirect_url}")
url = page.url
print(f"redirect 後 URL: {url}")
if not redirect_url:
print(f"登入失敗: {data}")
await browser.close()
return None
# 印出所有 localStorage
# 同時檢查 localStorage 和 sessionStorage
local_keys = await page.evaluate("Object.keys(localStorage)")
session_keys = await page.evaluate("Object.keys(sessionStorage)")
print("localStorage keys:", local_keys)
print("sessionStorage keys:", session_keys)
await page.wait_for_timeout(3000)
for key in session_keys:
val = await page.evaluate(f"sessionStorage.getItem('{key}')")
print(f" session {key}: {val[:80] if val else None}")
# 跟隨 redirect 讓 token 存進 localStorage
await page.goto(redirect_url)
await page.wait_for_load_state("domcontentloaded")
await page.wait_for_timeout(8000) # 等久一點
token = await page.evaluate("sessionStorage.getItem('token') || localStorage.getItem('token')")
print(f"token: {token[:30] if token else 'None'}")
url = page.url
print(f"redirect 後 URL: {url}")
await browser.close()
return token
# 印出所有 localStorage
# 同時檢查 localStorage 和 sessionStorage
local_keys = await page.evaluate("Object.keys(localStorage)")
session_keys = await page.evaluate("Object.keys(sessionStorage)")
print("localStorage keys:", local_keys)
print("sessionStorage keys:", session_keys)
await page.wait_for_timeout(3000)
for key in session_keys:
val = await page.evaluate(f"sessionStorage.getItem('{key}')")
print(f" session {key}: {val[:80] if val else None}")
token = await page.evaluate("sessionStorage.getItem('token') || localStorage.getItem('token')")
print(f"token: {token[:30] if token else 'None'}")
await browser.close()
if token:
return token
else:
print(f"⚠️ 第 {attempt + 1} 次登入失敗,重試...")
continue
except Exception as e:
print(f"❌ 第 {attempt + 1} 次發生錯誤: {e}")
continue
print("❌ 登入失敗超過最大重試次數")
return None
async def fetch_invoices(token: str, days: int = 7) -> list:
print(f"🔍 開始抓發票token: {token[:20]}")
end_date = datetime.now()
start_date = end_date - timedelta(days=days)

View File

@@ -1,7 +1,14 @@
import os
import re
import asyncio
import threading
import nest_asyncio
nest_asyncio.apply()
import captcha_state
from invoice_fetcher import main
from dotenv import load_dotenv
from fastapi import FastAPI, Request, HTTPException
from fastapi.staticfiles import StaticFiles
from linebot.v3 import WebhookHandler
from linebot.v3.messaging import (
Configuration,
@@ -20,28 +27,30 @@ from datetime import datetime
load_dotenv()
app = FastAPI()
app.mount("/static", StaticFiles(directory="static"), name="static")
# LINE 設定
configuration = Configuration(access_token=os.getenv("LINE_CHANNEL_ACCESS_TOKEN"))
handler = WebhookHandler(os.getenv("LINE_CHANNEL_SECRET"))
# DB 設定
DATABASE_URL = os.getenv("DATABASE_URL")
engine = create_engine(DATABASE_URL)
SessionLocal = sessionmaker(bind=engine)
Base = declarative_base()
# DATABASE_URL = os.getenv("DATABASE_URL")
# engine = create_engine(DATABASE_URL)
# SessionLocal = sessionmaker(bind=engine)
# Base = declarative_base()
# 資料表
class Transaction(Base):
__tablename__ = "transactions"
id = Column(Integer, primary_key=True, index=True)
user_id = Column(String)
category = Column(String)
amount = Column(Float)
note = Column(String, nullable=True)
created_at = Column(DateTime, default=datetime.now)
# class Transaction(Base):
# __tablename__ = "transactions"
# id = Column(Integer, primary_key=True, index=True)
# user_id = Column(String)
# category = Column(String)
# amount = Column(Float)
# note = Column(String, nullable=True)
# created_at = Column(DateTime, default=datetime.now)
Base.metadata.create_all(bind=engine)
# Base.metadata.create_all(bind=engine)
# Webhook endpoint
@app.post("/webhook")
@@ -54,11 +63,33 @@ async def webhook(request: Request):
raise HTTPException(status_code=400, detail="Invalid signature")
return "OK"
def run_fetch_in_thread():
# 開一個全新的 event loop 跑 Playwright
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
loop.run_until_complete(main())
finally:
loop.close()
@app.get("/fetch")
async def fetch_invoices():
print("🚀 開始抓取發票...")
thread = threading.Thread(target=run_fetch_in_thread)
thread.start()
return {"status": "started"} # 立刻回傳,不等爬蟲
@handler.add(MessageEvent, message=TextMessageContent)
def handle_message(event):
user_id = event.source.user_id
text = event.message.text.strip()
reply = parse_and_save(user_id, text)
if text.isdigit() and captcha_state.captcha_event and not captcha_state.captcha_event.is_set():
captcha_state.captcha_answer = text
captcha_state.captcha_event.set() # 通知爬蟲
reply = "✅ 驗證碼已送出!"
else:
print(f"captcha_future 狀態: {captcha_state.captcha_future}") # 加這行
reply = parse_and_save(user_id, text)
with ApiClient(configuration) as api_client:
line_bot_api = MessagingApi(api_client)
@@ -110,99 +141,110 @@ def parse_and_save(user_id: str, text: str) -> str:
"交通 50 捷運\n\n"
"輸入後 Bot 會自動記錄 ✅"
)
# 刪除指令:刪除 1 或 刪除 早餐
if text.startswith("刪除 "):
target = text[3:].strip()
return delete_transaction(user_id, target)
# 記帳格式:「早餐 80」或「早餐 80 備註」
match = re.match(r"^(\S+)\s+(\d+(?:\.\d+)?)(?:\s+(.+))?$", text)
if match:
category = match.group(1)
amount = float(match.group(2))
note = match.group(3)
save_transaction(user_id, category, amount, note)
return f"✅ 已記錄:{category} ${amount:.0f}" + (f"{note}" if note else "")
if re.match(r"^\d{5}$", text):
return f"接收到驗證碼{text}"
return "格式錯誤 😅\n記帳請輸入:類別 金額\n例如:早餐 80\n\n查詢請輸入:查今天 / 查本月"
def save_transaction(user_id, category, amount, note):
db = SessionLocal()
try:
db.add(Transaction(user_id=user_id, category=category, amount=amount, note=note))
db.commit()
finally:
db.close()
print(f"記錄交易(模擬):{user_id} {category} {amount} {note}")
# db = SessionLocal()
# try:
# db.add(Transaction(user_id=user_id, category=category, amount=amount, note=note))
# db.commit()
# finally:
# db.close()
def delete_transaction(user_id: str, target: str) -> str:
db = SessionLocal()
# db = SessionLocal()
try:
today = datetime.now().date()
rows = db.query(Transaction).filter(
Transaction.user_id == user_id,
text("DATE(created_at) = :today")
).params(today=today).all()
# rows = db.query(Transaction).filter(
# Transaction.user_id == user_id,
# text("DATE(created_at) = :today")
# ).params(today=today).all()
if not rows:
return "今天還沒有記錄 📭"
# if not rows:
# return "今天還沒有記錄 📭"
# 用編號刪除
if target.isdigit():
idx = int(target) - 1
if idx < 0 or idx >= len(rows):
return f"沒有第 {target} 筆記錄,請先輸入「查今天」確認編號"
row = rows[idx]
db.delete(row)
db.commit()
return f"✅ 已刪除:{row.category} ${row.amount:.0f}"
# if target.isdigit():
# idx = int(target) - 1
# if idx < 0 or idx >= len(rows):
# return f"沒有第 {target} 筆記錄,請先輸入「查今天」確認編號"
# row = rows[idx]
# db.delete(row)
# db.commit()
# return f"✅ 已刪除:{row.category} ${row.amount:.0f}"
# 用類別刪除(刪最後一筆)
matched = [r for r in rows if r.category == target]
if not matched:
return f"今天沒有「{target}」的記錄"
row = matched[-1]
db.delete(row)
db.commit()
return f"✅ 已刪除:{row.category} ${row.amount:.0f}"
# matched = [r for r in rows if r.category == target]
# if not matched:
# return f"今天沒有「{target}」的記錄"
# row = matched[-1]
# db.delete(row)
# db.commit()
# return f"✅ 已刪除:{row.category} ${row.amount:.0f}"
return f"已刪除(模擬)"
finally:
db.close()
# finally:
# db.close()
except Exception as e:
print("❌ 刪除失敗:", e)
return "刪除失敗,請稍後再試"
def query_today(user_id):
db = SessionLocal()
# db = SessionLocal()
try:
today = datetime.now().date()
rows = db.query(Transaction).filter(
Transaction.user_id == user_id,
text("DATE(created_at) = :today")
).params(today=today).all()
if not rows:
return "今天還沒有記錄 📭"
total = sum(r.amount for r in rows)
lines = [
f"{i+1}. {r.category} ${r.amount:.0f}" + (f"{r.note}" if r.note else "")
for i, r in enumerate(rows)
]
return "📋 今日記錄:\n" + "\n".join(lines) + f"\n\n💰 合計:${total:.0f}\n\n🗑 刪除請輸入:刪除 編號\n例如:刪除 1"
finally:
db.close()
# rows = db.query(Transaction).filter(
# Transaction.user_id == user_id,
# text("DATE(created_at) = :today")
# ).params(today=today).all()
# if not rows:
# return "今天還沒有記錄 📭"
# total = sum(r.amount for r in rows)
# lines = [
# f"{i+1}. {r.category} ${r.amount:.0f}" + (f"{r.note}" if r.note else "")
# for i, r in enumerate(rows)
# ]
# return "📋 今日記錄:\n" + "\n".join(lines) + f"\n\n💰 合計:${total:.0f}\n\n🗑 刪除請輸入:刪除 編號\n例如:刪除 1"
print(f"查詢今日記錄(模擬)")
return "📋 今日記錄:\n1. 早餐 $80\n2. 午餐 $120 便當\n3. 交通 $50 捷運\n\n💰 合計:$250\n\n🗑 刪除請輸入:刪除 編號\n例如:刪除 1"
# finally:
# db.close()
except Exception as e:
print("❌ 查詢失敗:", e)
return "查詢失敗,請稍後再試"
def query_month(user_id):
db = SessionLocal()
# db = SessionLocal()
try:
now = datetime.now()
rows = db.query(Transaction).filter(
Transaction.user_id == user_id,
text("EXTRACT(YEAR FROM created_at) = :year AND EXTRACT(MONTH FROM created_at) = :month")
).params(year=now.year, month=now.month).all()
if not rows:
return "本月還沒有記錄 📭"
total = sum(r.amount for r in rows)
# rows = db.query(Transaction).filter(
# Transaction.user_id == user_id,
# text("EXTRACT(YEAR FROM created_at) = :year AND EXTRACT(MONTH FROM created_at) = :month")
# ).params(year=now.year, month=now.month).all()
# if not rows:
# return "本月還沒有記錄 📭"
# total = sum(r.amount for r in rows)
# 依類別統計
summary = {}
for r in rows:
summary[r.category] = summary.get(r.category, 0) + r.amount
lines = [f"{cat}${amt:.0f}" for cat, amt in sorted(summary.items(), key=lambda x: -x[1])]
return f"📊 本月統計({now.month}月):\n" + "\n".join(lines) + f"\n\n💰 總計:${total:.0f}"
finally:
db.close()
# summary = {}
# for r in rows:
# summary[r.category] = summary.get(r.category, 0) + r.amount
# lines = [f"{cat}${amt:.0f}" for cat, amt in sorted(summary.items(), key=lambda x: -x[1])]
# return f"📊 本月統計({now.month}月):\n" + "\n".join(lines) + f"\n\n💰 總計:${total:.0f}"
print(f"查詢本月記錄(模擬)")
return f"📊 本月統計({now.month}月):\n早餐:$800\n午餐:$1200\n交通:$500\n\n💰 總計:$2500"
# finally:
# db.close()
except Exception as e:
print("❌ 查詢失敗:", e)
return "查詢失敗,請稍後再試"

View File

@@ -6,8 +6,10 @@ readme = "README.md"
requires-python = ">=3.13"
dependencies = [
"anthropic>=0.84.0",
"cloudinary>=1.44.1",
"fastapi>=0.135.1",
"line-bot-sdk>=3.22.0",
"nest-asyncio>=1.6.0",
"numpy>=2.4.2",
"pillow>=12.1.1",
"playwright>=1.58.0",

27
app/uv.lock generated
View File

@@ -155,8 +155,10 @@ version = "0.1.0"
source = { virtual = "." }
dependencies = [
{ name = "anthropic" },
{ name = "cloudinary" },
{ name = "fastapi" },
{ name = "line-bot-sdk" },
{ name = "nest-asyncio" },
{ name = "numpy" },
{ name = "pillow" },
{ name = "playwright" },
@@ -170,8 +172,10 @@ dependencies = [
[package.metadata]
requires-dist = [
{ name = "anthropic", specifier = ">=0.84.0" },
{ name = "cloudinary", specifier = ">=1.44.1" },
{ name = "fastapi", specifier = ">=0.135.1" },
{ name = "line-bot-sdk", specifier = ">=3.22.0" },
{ name = "nest-asyncio", specifier = ">=1.6.0" },
{ name = "numpy", specifier = ">=2.4.2" },
{ name = "pillow", specifier = ">=12.1.1" },
{ name = "playwright", specifier = ">=1.58.0" },
@@ -253,6 +257,20 @@ wheels = [
{ url = "https://files.pythonhosted.org/packages/98/78/01c019cdb5d6498122777c1a43056ebb3ebfeef2076d9d026bfe15583b2b/click-8.3.1-py3-none-any.whl", hash = "sha256:981153a64e25f12d547d3426c367a4857371575ee7ad18df2a6183ab0545b2a6", size = 108274, upload-time = "2025-11-15T20:45:41.139Z" },
]
[[package]]
name = "cloudinary"
version = "1.44.1"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "certifi" },
{ name = "six" },
{ name = "urllib3" },
]
sdist = { url = "https://files.pythonhosted.org/packages/32/35/938a4cc3b5ac386184a8ea50e357cdbb4239c2744fc8c652c461674447e6/cloudinary-1.44.1.tar.gz", hash = "sha256:62d4374b79d5476de2a86cb6a1da709a5429e02aef474bfc5d99f3e38a1a62ff", size = 188225, upload-time = "2025-06-17T16:31:33.279Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/86/f0/518d151d3dfc009940947fe9b26cdf9f6e2fb9e4a29c12fe5b5ebe8aad65/cloudinary-1.44.1-py3-none-any.whl", hash = "sha256:b4785031179a5ec7010f46665e5c8fad2cae022c18405546f01d257e02f78b1c", size = 147808, upload-time = "2025-06-17T16:31:32.188Z" },
]
[[package]]
name = "colorama"
version = "0.4.6"
@@ -621,6 +639,15 @@ wheels = [
{ url = "https://files.pythonhosted.org/packages/81/08/7036c080d7117f28a4af526d794aab6a84463126db031b007717c1a6676e/multidict-6.7.1-py3-none-any.whl", hash = "sha256:55d97cc6dae627efa6a6e548885712d4864b81110ac76fa4e534c03819fa4a56", size = 12319, upload-time = "2026-01-26T02:46:44.004Z" },
]
[[package]]
name = "nest-asyncio"
version = "1.6.0"
source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/83/f8/51569ac65d696c8ecbee95938f89d4abf00f47d58d48f6fbabfe8f0baefe/nest_asyncio-1.6.0.tar.gz", hash = "sha256:6f172d5449aca15afd6c646851f4e31e02c598d553a667e38cafa997cfec55fe", size = 7418, upload-time = "2024-01-21T14:25:19.227Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/a0/c4/c2971a3ba4c6103a3d10c4b0f24f461ddc027f0f09763220cf35ca1401b3/nest_asyncio-1.6.0-py3-none-any.whl", hash = "sha256:87af6efd6b5e897c81050477ef65c62e2b2f35d51703cae01aff2905b1852e1c", size = 5195, upload-time = "2024-01-21T14:25:17.223Z" },
]
[[package]]
name = "numpy"
version = "2.4.2"