Skip to navigation
Migrate from mssql to mysql
13.05.26
``` """ MSSQL → MySQL Data Migration Script Usage: 1. pip install sqlalchemy pyodbc pymysql 2. Fill in connection details below 3. python migrate.py """ from sqlalchemy import create_engine, inspect, MetaData, text from sqlalchemy.orm import sessionmaker import math # ─── CONFIGURATION — Replace these ───────────────────────────────── MSSQL_CONN = "mssql+pyodbc://user:pass@HOST:PORT/DB?driver=ODBC+Driver+17+for+SQL+Server" MYSQL_CONN = "mysql+pymysql://user:pass@HOST:PORT/DB" CHUNK_SIZE = 1000 # rows per batch # ─────────────────────────────────────────────────────────────────── def migrate(): src = create_engine(MSSQL_CONN) dst = create_engine(MYSQL_CONN) src_conn = src.connect() dst_conn = dst.connect() dst_session = sessionmaker(bind=dst_conn)() inspector = inspect(src) tables = inspector.get_table_names() for table in tables: print(f"Migrating {table}...") # Count rows count = src_conn.execute(text(f"SELECT COUNT(*) FROM [{table}]")).scalar() if count == 0: print(f" → empty, skipped") continue # Truncate target dst_conn.execute(text(f"TRUNCATE TABLE `{table}`")) # Migrate in chunks chunks = math.ceil(count / CHUNK_SIZE) for i in range(chunks): offset = i * CHUNK_SIZE rows = src_conn.execute( text(f"SELECT * FROM [{table}] ORDER BY 1 OFFSET :off ROWS FETCH NEXT :lim ROWS ONLY"), {"off": offset, "lim": CHUNK_SIZE} ).fetchall() if not rows: continue # Build INSERT columns = rows[0]._fields cols = ", ".join([f"`{c}`" for c in columns]) placeholders = ", ".join([f":{c}" for c in columns]) stmt = text(f"INSERT INTO `{table}` ({cols}) VALUES ({placeholders})") batch = [dict(zip(columns, row)) for row in rows] dst_session.execute(stmt, batch) dst_session.commit() print(f" → {count} rows migrated") src_conn.close() dst_conn.close() print("Done.") if __name__ == "__main__": migrate() ```
Reply
Anonymous
## migrate schema from mssql to mysql ``` """ MSSQL → MySQL Schema Migration Script Usage: 1. pip install sqlalchemy pyodbc pymysql 2. Fill in connection details below 3. python migrate_schema.py """ from sqlalchemy import create_engine, inspect, MetaData, text import re MSSQL_CONN = "mssql+pyodbc://user:pass@HOST:PORT/DB?driver=ODBC+Driver+17+for+SQL+Server" MYSQL_CONN = "mysql+pymysql://user:pass@HOST:PORT/DB" # ─── MSSQL → MySQL type mapping ──────────────────────────────────── TYPE_MAP = { "int": "INT", "bigint": "BIGINT", "smallint": "SMALLINT", "tinyint": "TINYINT", "bit": "TINYINT(1)", "decimal": "DECIMAL", "numeric": "DECIMAL", "money": "DECIMAL(19,4)", "smallmoney": "DECIMAL(10,4)", "float": "DOUBLE", "real": "FLOAT", "date": "DATE", "datetime": "DATETIME", "datetime2": "DATETIME", "smalldatetime": "DATETIME", "time": "TIME", "char": "CHAR", "nchar": "CHAR", "varchar": "VARCHAR", "nvarchar": "VARCHAR", "text": "TEXT", "ntext": "LONGTEXT", "varchar(max)": "LONGTEXT", "nvarchar(max)": "LONGTEXT", "text(max)": "LONGTEXT", "uniqueidentifier": "CHAR(36)", "binary": "BINARY", "varbinary": "VARBINARY", "image": "LONGBLOB", "timestamp": "BINARY(8)", } def map_type(col): raw = col["type"].lower() for key, mysql_type in TYPE_MAP.items(): if raw.startswith(key): # Preserve precision like VARCHAR(255), DECIMAL(10,2) rest = raw[len(key):] if rest and rest[0] == "(": return mysql_type + rest return mysql_type # Fallback with logging print(f" ⚠ unknown type '{raw}', using TEXT") return "TEXT" def default_expr(mssql_default): """Convert MSSQL default expressions to MySQL.""" val = mssql_default.strip() val_upper = val.upper() if val_upper == "(GETDATE())": return "CURRENT_TIMESTAMP" if val_upper == "(NEWID())": return "(UUID())" if val_upper == "(NEWSEQUENTIALID())": return "(UUID())" if val_upper == "(GETUTCDATE())": return "(UTC_TIMESTAMP)" if val_upper == "(CURRENT_TIMESTAMP)": return "CURRENT_TIMESTAMP" if val_upper == "(NULL)": return "NULL" if val_upper == "(0)": return "0" if val_upper == "('')": return "''" # Return as-is, strip parens for literals if val.startswith("(") and val.endswith(")"): inner = val[1:-1].strip() if inner.startswith("'") or inner.startswith("-") or inner.isdigit(): return inner return val def clean_name(name): """Remove schema prefixes like dbo.""" return name.split(".")[-1] def migrate_schema(): src = create_engine(MSSQL_CONN) dst = create_engine(MYSQL_CONN) inspector = inspect(src) tables = inspector.get_table_names() dst_conn = dst.connect() dst_conn.execute(text("SET FOREIGN_KEY_CHECKS = 0")) for table in tables: print(f"\nCreating {table}...") cols = inspector.get_columns(table) pk = inspector.get_pk_constraint(table) or {} pk_cols = pk.get("constrained_columns", []) fks = inspector.get_foreign_keys(table) indexes = inspector.get_indexes(table) identity_cols = [] lines = [] for col in cols: name = col["name"] mysql_type = map_type(col) parts = [f" `{name}` {mysql_type}"] # Identity → AUTO_INCREMENT if col.get("autoincrement"): # Check if col['type'] has autoincrement attribute pass # Check for IDENTITY via column default pattern nullable = col.get("nullable", True) default = col.get("default") # Detect IDENTITY: default contains "IDENTITY" if default and "IDENTITY" in str(default).upper(): parts.append("NOT NULL AUTO_INCREMENT") identity_cols.append(name) else: if not nullable: parts.append("NOT NULL") else: parts.append("NULL") if default is not None: expr = str(default) converted = default_expr(expr) if converted: parts.append(f"DEFAULT {converted}") lines.append(" ".join(parts)) # Primary key if pk_cols: pk_quoted = ", ".join(f"`{c}`" for c in pk_cols) lines.append(f" PRIMARY KEY ({pk_quoted})") # Foreign keys for fk in fks: fk_cols = ", ".join(f"`{c}`" for c in fk["constrained_columns"]) ref_table = clean_name(fk["referred_table"]) ref_cols = ", ".join(f"`{c}`" for c in fk["referred_columns"]) lines.append( f" FOREIGN KEY ({fk_cols}) REFERENCES `{ref_table}` ({ref_cols})" ) # Indexes uniq_indexes = [] norm_indexes = [] for idx in indexes: name = idx.get("name", "") # Skip PK indexes (already defined) if name and f"PK__{table}" in name.upper(): continue idx_cols = ", ".join(f"`{c}`" for c in idx["column_names"]) if idx.get("unique"): uniq_indexes.append(f" UNIQUE KEY `{name}` ({idx_cols})") else: norm_indexes.append(f" UNIQUE KEY `{name}` ({idx_cols})" if "unique" in str(idx).lower() else f" INDEX `{name}` ({idx_cols})" ) lines.extend(uniq_indexes) lines.extend(norm_indexes) ddl = f"CREATE TABLE IF NOT EXISTS `{table}` (\n" + ",\n".join(lines) + "\n) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;" try: dst_conn.execute(text(ddl)) print(f" ✓ {table} created") except Exception as e: print(f" ✗ {table} failed: {e}") print(f" DDL: {ddl[:200]}...") dst_conn.execute(text("SET FOREIGN_KEY_CHECKS = 1")) dst_conn.close() print("\nDone.") if __name__ == "__main__": migrate_schema() ```
13.05.26
Reply
Anonymous
Information Epoch 1778972770
Don't force yourself or others to reimplement functionality.
Home
Notebook
Contact us