from pathlib import Path import pandas as pd from commons.conn_mysql import MySQLPoolOnline def get_conn(): """ 数据库连接配置(你自己补)。 """ return MySQLPoolOnline() def read_source_file(file_path: Path, sheet_name=0) -> pd.DataFrame: suffix = file_path.suffix.lower() if suffix in {".xlsx", ".xls"}: return pd.read_excel(file_path, sheet_name=sheet_name, dtype=object) if suffix == ".csv": return pd.read_csv(file_path, dtype=object, encoding="utf-8-sig") raise ValueError(f"不支持的文件类型: {suffix}") def get_table_columns(conn, table_name: str): sql = """ SELECT COLUMN_NAME FROM information_schema.COLUMNS WHERE TABLE_SCHEMA = DATABASE() AND TABLE_NAME = %s ORDER BY ORDINAL_POSITION """ # 兼容 MySQLPoolOnline(select_data)和原生 pymysql 连接(cursor) if hasattr(conn, "select_data"): rows = conn.select_data(sql, (table_name,)) return [r.get("COLUMN_NAME") for r in rows] with conn.cursor() as cur: cur.execute(sql, (table_name,)) rows = cur.fetchall() return [r[0] for r in rows] def clean_df(df: pd.DataFrame) -> pd.DataFrame: # 去掉空列名和列名前后空格 df = df.rename(columns=lambda x: str(x).strip() if x is not None else "") df = df[[c for c in df.columns if c]] # 把 NaN 转为 None,便于写库 return df.where(pd.notna(df), None) def import_to_jd_provider_tmp(file_path: str, table_name="jd_provider_tmp", sheet_name=0, batch_size=500): file = Path(file_path) if not file.exists(): raise FileNotFoundError(f"文件不存在: {file}") df = clean_df(read_source_file(file, sheet_name=sheet_name)) if df.empty: print("源文件无数据,跳过导入。") return conn = get_conn() is_pool_conn = hasattr(conn, "select_data") and hasattr(conn, "execute_many") try: table_columns = get_table_columns(conn, table_name) if not table_columns: raise RuntimeError(f"未找到表或无字段: {table_name}") # 仅导入“表头和数据库字段同名”的列 import_columns = [c for c in df.columns if c in table_columns] if not import_columns: raise RuntimeError("文件表头与数据库字段无交集,请检查表头是否与表字段同名。") missing_columns = [c for c in df.columns if c not in table_columns] if missing_columns: print(f"以下表头不在数据库表中,已自动忽略: {missing_columns}") insert_sql = ( f"INSERT INTO `{table_name}` ({', '.join([f'`{c}`' for c in import_columns])}) " f"VALUES ({', '.join(['%s'] * len(import_columns))})" ) values = [tuple(row[c] for c in import_columns) for _, row in df.iterrows()] total = len(values) inserted = 0 if is_pool_conn: for i in range(0, total, batch_size): batch = values[i:i + batch_size] conn.execute_many(insert_sql, batch) inserted += len(batch) print(f"已导入: {inserted}/{total}") else: with conn.cursor() as cur: for i in range(0, total, batch_size): batch = values[i:i + batch_size] cur.executemany(insert_sql, batch) inserted += len(batch) print(f"已导入: {inserted}/{total}") conn.commit() print(f"导入完成,表 `{table_name}` 共导入 {inserted} 条。") except Exception: if not is_pool_conn: conn.rollback() raise finally: # 连接池对象无需 close;原生连接需要 close if hasattr(conn, "close") and not is_pool_conn: conn.close() if __name__ == "__main__": fixed_file = Path.cwd() / "11111.xlsx" import_to_jd_provider_tmp( file_path=str(fixed_file), table_name="jd_provider_tmp", sheet_name=0, batch_size=500, )