netgescon-master/scripts/Script fatti per prova e per ora sospesi/step2_import_verticale_partial.py

88 lines
3.0 KiB
Python

import os
import pandas as pd
from sqlalchemy import create_engine, Column, Integer, String, Table, MetaData, inspect, text
from sqlalchemy.dialects.mysql import LONGTEXT
DB_USER = "laravel_user"
DB_PWD = "P4ssw0rd.96!"
DB_HOST = "127.0.0.1"
DB_DB = "laravel_db"
OUT_PATH = "estratti_serializzati"
engine = create_engine(f"mysql+pymysql://{DB_USER}:{DB_PWD}@{DB_HOST}/{DB_DB}?charset=utf8mb4")
metadata = MetaData()
def ensure_vertical_table():
insp = inspect(engine)
if insp.has_table("csv_verticale"):
print(f"[INFO] Tabella verticale esistente.")
with engine.connect() as conn:
result = conn.execute(text("SHOW CREATE TABLE csv_verticale"))
print(result.fetchone()[1])
return
Table(
"csv_verticale", metadata,
Column('id', Integer, primary_key=True, autoincrement=True),
Column('tabella_orig', String(128)),
Column('riga_csv', Integer),
Column('colonna', String(128)),
Column('valore', LONGTEXT),
mysql_engine='InnoDB',
mysql_row_format='DYNAMIC'
).create(engine)
print("[OK] Tabella verticale creata.")
def import_vertical(tablename, df):
records = []
for idx, row in df.iterrows():
for col in df.columns:
records.append({
"tabella_orig": tablename,
"riga_csv": idx,
"colonna": col,
"valore": str(row[col]) if pd.notnull(row[col]) else None
})
if not records:
return
try:
pd.DataFrame(records).to_sql("csv_verticale", engine, if_exists='append', index=False, method='multi', chunksize=1000)
print(f"[OK] Importate {len(records)} celle da {tablename}")
except Exception as e:
print(f"Errore importazione tabella {tablename}: {e}")
# Stampa esempio del record che ha fatto fallire
for rec in records:
if rec["valore"] is not None and len(rec["valore"]) > 1000000: # LONGTEXT = 4GB ma testiamo valori > 1MB
print("--- Record troppo lungo ---")
print(rec)
raise
def is_root_file(fname):
# Nessun doppio underscore (solo root)
return "__" not in fname
def is_condominio_0021(fname):
# Primo pezzo == "0021"
return fname.startswith("0021__")
def main():
# SOLO ALLA PRIMA ESECUZIONE, POI RIMUOVI!
with engine.connect() as conn:
conn.execute(text("DROP TABLE IF EXISTS csv_verticale"))
conn.commit()
ensure_vertical_table()
files = os.listdir(OUT_PATH)
files_root = [f for f in files if f.endswith(".pkl") and is_root_file(f) and f != "all_tables.pkl"]
files_0021 = [f for f in files if f.endswith(".pkl") and is_condominio_0021(f)]
print(f"File root: {files_root}")
print(f"File condominio 0021: {files_0021}")
# Importa prima la root, poi il 0021
for fname in files_root + files_0021:
tablename = fname.replace(".pkl", "")
df = pd.read_pickle(os.path.join(OUT_PATH, fname))
import_vertical(tablename, df)
if __name__ == "__main__":
main()