| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110 |
- from pathlib import Path
- import pandas as pd
- import pymysql
- def get_conn():
- """
- 数据库连接配置(你自己补)。
- """
- return pymysql.connect(
- host="127.0.0.1",
- port=3306,
- user="root",
- password="your_password",
- database="your_database",
- charset="utf8mb4",
- autocommit=False,
- )
- def read_source_file(file_path: Path, sheet_name=0) -> pd.DataFrame:
- suffix = file_path.suffix.lower()
- if suffix in {".xlsx", ".xls"}:
- return pd.read_excel(file_path, sheet_name=sheet_name, dtype=object)
- if suffix == ".csv":
- return pd.read_csv(file_path, dtype=object, encoding="utf-8-sig")
- raise ValueError(f"不支持的文件类型: {suffix}")
- def get_table_columns(conn, table_name: str):
- sql = """
- SELECT COLUMN_NAME
- FROM information_schema.COLUMNS
- WHERE TABLE_SCHEMA = DATABASE()
- AND TABLE_NAME = %s
- ORDER BY ORDINAL_POSITION
- """
- with conn.cursor() as cur:
- cur.execute(sql, (table_name,))
- rows = cur.fetchall()
- return [r[0] for r in rows]
- def clean_df(df: pd.DataFrame) -> pd.DataFrame:
- # 去掉空列名和列名前后空格
- df = df.rename(columns=lambda x: str(x).strip() if x is not None else "")
- df = df[[c for c in df.columns if c]]
- # 把 NaN 转为 None,便于写库
- return df.where(pd.notna(df), None)
- def import_to_jd_provider_tmp(file_path: str, table_name="jd_provider_tmp", sheet_name=0, batch_size=500):
- file = Path(file_path)
- if not file.exists():
- raise FileNotFoundError(f"文件不存在: {file}")
- df = clean_df(read_source_file(file, sheet_name=sheet_name))
- if df.empty:
- print("源文件无数据,跳过导入。")
- return
- conn = get_conn()
- try:
- table_columns = get_table_columns(conn, table_name)
- if not table_columns:
- raise RuntimeError(f"未找到表或无字段: {table_name}")
- # 仅导入“表头和数据库字段同名”的列
- import_columns = [c for c in df.columns if c in table_columns]
- if not import_columns:
- raise RuntimeError("文件表头与数据库字段无交集,请检查表头是否与表字段同名。")
- missing_columns = [c for c in df.columns if c not in table_columns]
- if missing_columns:
- print(f"以下表头不在数据库表中,已自动忽略: {missing_columns}")
- insert_sql = (
- f"INSERT INTO `{table_name}` ({', '.join([f'`{c}`' for c in import_columns])}) "
- f"VALUES ({', '.join(['%s'] * len(import_columns))})"
- )
- values = [tuple(row[c] for c in import_columns) for _, row in df.iterrows()]
- total = len(values)
- inserted = 0
- with conn.cursor() as cur:
- for i in range(0, total, batch_size):
- batch = values[i:i + batch_size]
- cur.executemany(insert_sql, batch)
- inserted += len(batch)
- print(f"已导入: {inserted}/{total}")
- conn.commit()
- print(f"导入完成,表 `{table_name}` 共导入 {inserted} 条。")
- except Exception:
- conn.rollback()
- raise
- finally:
- conn.close()
- if __name__ == "__main__":
- fixed_file = Path.cwd() / "1111.xlsx"
- import_to_jd_provider_tmp(
- file_path=str(fixed_file),
- table_name="jd_provider_tmp",
- sheet_name=0,
- batch_size=500,
- )
|