Files
linebot_finance/app/syncLineInvoiceData.py
henry4682 e055f7c7fb
All checks were successful
Oracle-Deploy / redeploy (push) Successful in 31s
feat: line_invoice_fetcher
2026-03-23 18:09:17 +08:00

143 lines
4.4 KiB
Python

import requests
from datetime import datetime
import psycopg2
from psycopg2.extras import execute_values
import os
from dotenv import load_dotenv
load_dotenv()
# 1. 設定區 (把妳拿到的 Token 貼在這裡,或讀取 .env.token)
TOKEN = "r:407b1c9de10b67e8ad107b850d2edba0" # 妳剛才抓到的那串
POSTGRES_USER = os.getenv("POSTGRES_USER")
POSTGRES_PASSWORD = os.getenv("POSTGRES_PASSWORD")
POSTGRES_HOST = os.getenv("POSTGRES_HOST")
POSTGRES_PORT = os.getenv("POSTGRES_PORT")
DB_CONFIG = {
"dbname": "myfinance",
"user": POSTGRES_USER, # 💡 檢查 1Panel 裡的使用者名稱
"password": POSTGRES_PASSWORD, # 💡 檢查密碼
"host": POSTGRES_HOST, # 妳的 Oracle IP
"port": POSTGRES_PORT, # 妳的 Oracle Port
}
# 2. GraphQL 查詢 (妳之前逆向出來的精華)
GRAPHQL_URL = "https://invoice.line.me/graphql"
QUERY = """
query GetInvoices($periodCode: String!) {
invoices(where: {periodCode: {equalTo: $periodCode}}, first: 50) {
edges {
node {
amount
brandName
invoiceDate
sellerName
serial
lineItems { ... on Element { value } }
}
}
}
}
"""
def get_merchant_dict(cur):
"""從資料庫讀取商家對照表"""
cur.execute("SELECT pattern, display_name, default_category_id FROM merchant_mapping")
return cur.fetchall()
def sync():
# A. 抓取 LINE 資料
headers = {
"Content-Type": "application/json",
"x-parse-application-id": "line-invoice",
"x-parse-session-token": TOKEN,
"User-Agent": "Mozilla/5.0..."
}
# 假設抓 11502 (115年2月)
payload = {"variables": {"periodCode": "11504"}, "query": QUERY}
res = requests.post(GRAPHQL_URL, headers=headers, json=payload)
if res.status_code != 200:
print(f"❌ 抓取失敗: {res.status_code}")
return
invoices = res.json()['data']['invoices']['edges']
conn = psycopg2.connect(**DB_CONFIG)
cur = conn.cursor()
merchant_rules = get_merchant_dict(cur)
for inv in invoices:
node = inv['node']
serial = node['serial']
seller = node.get('sellerName', '')
brand = node.get('brandName', '')
# 預設值
display_name = brand or seller or "電子發票"
category_id = 1 # 預設分類
# 自動翻譯邏輯:如果資料庫有定義規則,就覆蓋掉
for pattern, mapping_name, mapping_cat in merchant_rules:
if pattern in seller:
display_name = mapping_name
if mapping_cat:
category_id = mapping_cat
break
# 1. 檢查主表是否已存在
cur.execute("SELECT id FROM expenses WHERE invoice_number = %s", (serial,))
if cur.fetchone():
continue
# 2. 寫入主表並取得 ID
# 注意:我們直接用總額 node['amount'] 作為支出金額
sql_main = """
INSERT INTO expenses (user_id, category_id, amount, seller_name, item_name, invoice_number, date)
VALUES (%s, %s, %s, %s, %s, %s, %s) RETURNING id
"""
cur.execute(sql_main, (
2,
category_id, # 這裡改用翻譯後的 category_id
node['amount'],
node['sellerName'],
display_name, # 這裡改用翻譯後的 display_name
serial,
node['invoiceDate']
))
new_expense_id = cur.fetchone()[0]
# 3. 處理子表 (lineItems)
if node.get('lineItems'):
item_data = []
for item in node['lineItems']:
val = item.get('value', {})
if not val: continue
# 準備批量插入的數據
item_data.append((
new_expense_id,
val.get('name', '未知品項'),
val.get('quantity', 1),
val.get('unitPrice', 0),
val.get('amount', 0),
val.get('category', 'others')
))
if item_data:
sql_items = """
INSERT INTO expense_items (expense_id, name, quantity, unit_price, total_price, category)
VALUES %s
"""
execute_values(cur, sql_items, item_data)
conn.commit()
cur.close()
conn.close()
print(f"✅ 深度同步完成!")
if __name__ == "__main__":
sync()