| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182 |
- """
- 从一张表查询,插入另一张表(两表字段一致)。
- 用法:改下面 SOURCE_TABLE / TARGET_TABLE,在项目根执行:
- python ceshi/copy_table_same_columns.py
- 逻辑:读 information_schema 取源表列顺序,与目标表取交集后
- INSERT INTO 目标 SELECT 相同列 FROM 源(一条 SQL,由 MySQL 搬运数据)。
- """
- from commons.conn_mysql import MySQLPoolOnline
- # 修改为你要复制的表名(同一库 drug_retrieve)
- SOURCE_TABLE = "retrieve_ybm_shop_info_middle"
- TARGET_TABLE = "retrieve_scrape_shop_info"
- # True:重复键忽略(需表上有唯一索引);False:普通 INSERT,冲突则报错
- USE_INSERT_IGNORE = True
- # 若共同列中含 platform,用该数字覆盖源表;设为 None 则仍从源表读 platform
- FIXED_PLATFORM_VALUE = 9
- def _columns(conn, table: str) -> list:
- rows = conn.select_data(
- """
- SELECT COLUMN_NAME AS COLUMN_NAME
- FROM information_schema.COLUMNS
- WHERE TABLE_SCHEMA = DATABASE()
- AND TABLE_NAME = %s
- ORDER BY ORDINAL_POSITION
- """,
- (table,),
- )
- if not rows:
- return []
- return [r["COLUMN_NAME"] for r in rows]
- def main():
- conn = MySQLPoolOnline()
- src_cols = _columns(conn, SOURCE_TABLE)
- tgt_cols = _columns(conn, TARGET_TABLE)
- if not src_cols:
- print(f"源表不存在或无字段: {SOURCE_TABLE}")
- return
- if not tgt_cols:
- print(f"目标表不存在或无字段: {TARGET_TABLE}")
- return
- tgt_set = set(tgt_cols)
- common = [c for c in src_cols if c in tgt_set and c != "id"]
- if not common:
- print("源表与目标表无共同字段")
- return
- only_src = [c for c in src_cols if c not in tgt_set]
- only_tgt = [c for c in tgt_cols if c not in set(src_cols)]
- if only_src:
- print(f"提示:源表多出的列(目标表无,已跳过): {only_src}")
- if only_tgt:
- print(f"提示:目标表多出的列(本次不写入,走默认值): {only_tgt}")
- cols_sql = ", ".join(f"`{c}`" for c in common)
- select_parts = []
- for c in common:
- if c == "platform" and FIXED_PLATFORM_VALUE is not None:
- select_parts.append(str(int(FIXED_PLATFORM_VALUE)))
- else:
- select_parts.append(f"`{c}`")
- select_sql = ", ".join(select_parts)
- kw = "INSERT IGNORE" if USE_INSERT_IGNORE else "INSERT"
- sql = f"{kw} INTO `{TARGET_TABLE}` ({cols_sql}) SELECT {select_sql} FROM `{SOURCE_TABLE}`"
- n = conn.execute(sql)
- print(f"执行完成: {sql.strip()[:120]}...")
- print(f"受影响行数: {n}")
- if __name__ == "__main__":
- main()
|