netgescon-master/scripts/Script fatti per prova e per ora sospesi/importa_csv_auto.py

169 lines
6.0 KiB
Python

import os
import json
import hashlib
from sqlalchemy import create_engine, Column, Integer, String, Table, MetaData, inspect, text
from sqlalchemy.dialects.mysql import LONGTEXT
from sqlalchemy.orm import sessionmaker
from tqdm import tqdm
import pandas as pd
from datetime import datetime
DB_USER = "laravel_user"
DB_PWD = "P4ssw0rd.96!"
DB_HOST = "127.0.0.1"
DB_DB = "laravel_db"
BASE_PATH = "../estratti"
SCHEMA_JSON = "schema_estratti.json"
# SOLO queste colonne sono "brevi"
KEY_FIELDS = ["id_amministratore", "id_condominio", "import_hash"]
def guess_sqla_type(colname):
col = colname.lower()
if col in KEY_FIELDS:
return String(32)
if col == "data_import":
return String(32)
return LONGTEXT
def table_name_from_path(path, base_path=BASE_PATH):
rel_path = os.path.relpath(path, base_path)
rel_path = rel_path.replace(os.sep, "__").replace(".csv", "")
if rel_path[0].isdigit():
rel_path = "t_" + rel_path
else:
rel_path = "t_global__" + rel_path
rel_path = rel_path.lower()
if len(rel_path) > 55:
short = rel_path[:40]
h = hashlib.md5(rel_path.encode("utf-8")).hexdigest()[:8]
rel_path = f"{short}__{h}"
return rel_path
def file_hash(path, blocksize=65536):
hasher = hashlib.md5()
with open(path, "rb") as afile:
buf = afile.read(blocksize)
while len(buf) > 0:
hasher.update(buf)
buf = afile.read(blocksize)
return hasher.hexdigest()
def get_condominio_from_path(path):
parts = os.path.normpath(path).split(os.sep)
for part in parts:
if part.isdigit():
return part
return None
def get_amministratore_from_cond(cond):
return "1"
with open(SCHEMA_JSON, 'r', encoding='utf-8') as f:
schema = json.load(f)
conn_str = f"mysql+pymysql://{DB_USER}:{DB_PWD}@{DB_HOST}/{DB_DB}?charset=utf8mb4"
engine = create_engine(conn_str, echo=False, future=True)
Session = sessionmaker(bind=engine)
session = Session()
metadata = MetaData()
def ensure_table(table_name, columns_schema):
insp = inspect(engine)
if insp.has_table(table_name):
print(f"[INFO] Tabella esistente: {table_name}")
return
cols = [
Column('id', Integer, primary_key=True, autoincrement=True),
Column('id_amministratore', String(32)),
Column('id_condominio', String(32)),
Column('import_hash', String(64)),
Column('data_import', String(32)),
]
for col in columns_schema:
c = col.strip().replace(" ", "_").replace("-", "_")
if c.lower() in ["id", "id_amministratore", "id_condominio", "import_hash", "data_import"]:
continue
cols.append(Column(c, guess_sqla_type(c)))
table = Table(
table_name, metadata, *cols,
mysql_engine='InnoDB',
mysql_row_format='DYNAMIC'
)
table.create(engine)
print(f"[OK] Tabella creata: {table_name}")
def upsert_rows(table_name, df, key_fields):
with engine.begin() as conn:
for _, row in df.iterrows():
keys = {k: row[k] for k in key_fields}
where_clause = " AND ".join([f"{k}=:k_{k}" for k in key_fields])
update_clause = ", ".join([f"{col}=:u_{col}" for col in df.columns if col not in key_fields])
params = {f"k_{k}": row[k] for k in key_fields}
params.update({f"u_{col}": row[col] for col in df.columns if col not in key_fields})
stmt_update = text(f"UPDATE `{table_name}` SET {update_clause} WHERE {where_clause}")
result = conn.execute(stmt_update, params)
if result.rowcount == 0:
insert_cols = ", ".join(df.columns)
insert_vals = ", ".join([f":{col}" for col in df.columns])
stmt_insert = text(f"INSERT INTO `{table_name}` ({insert_cols}) VALUES ({insert_vals})")
conn.execute(stmt_insert, {col: row[col] for col in df.columns})
def import_csv_to_table(csv_path, table_name, columns_schema):
condominio = get_condominio_from_path(csv_path) or ""
amministratore = get_amministratore_from_cond(condominio)
hash_file = file_hash(csv_path)
print(f"[IMPORT/UPDATE] {csv_path} -> {table_name}")
try:
df = pd.read_csv(csv_path, dtype=str, keep_default_na=False)
except Exception as e:
print(f"[ERRORE Lettura CSV] {csv_path}: {e}")
return
df['id_amministratore'] = amministratore
df['id_condominio'] = condominio
df['import_hash'] = hash_file
df['data_import'] = datetime.now().isoformat()
eff_cols = ['id_amministratore', 'id_condominio', 'import_hash', 'data_import'] + [
c.strip().replace(" ", "_").replace("-", "_") for c in columns_schema if c.lower() not in ["id", "id_amministratore", "id_condominio", "import_hash", "data_import"]
]
df = df.reindex(columns=eff_cols, fill_value=None)
key_fields = ['import_hash', 'id_condominio']
try:
upsert_rows(table_name, df, key_fields)
except Exception as e:
print(f"[ERRORE Upsert dati] {csv_path}: {e}")
return
print(f"[OK] Importati/aggiornati {len(df)} record in {table_name}")
def main():
files = []
for root, dirs, fs in os.walk(BASE_PATH):
for f in fs:
if f.lower().endswith(".csv"):
files.append(os.path.join(root, f))
print(f"Trovati {len(files)} file CSV.")
for path in tqdm(files, desc="Importazione CSV"):
rel = os.path.relpath(path, BASE_PATH).replace(os.sep, "/")
key = rel if rel in schema else os.path.basename(path)
columns_schema = schema.get(key) or schema.get(os.path.basename(path))
if not columns_schema:
print(f"[WARN] Schema mancante per {key}, salto.")
continue
table_name = table_name_from_path(path)
print(f"[DEBUG] Tabella generata per {path}: {table_name}")
ensure_table(table_name, columns_schema)
import_csv_to_table(path, table_name, columns_schema)
print("[FINE] Tutte le tabelle sono state create e popolate.")
if __name__ == "__main__":
main()