169 lines
6.0 KiB
Python
169 lines
6.0 KiB
Python
import os
|
|
import json
|
|
import hashlib
|
|
from sqlalchemy import create_engine, Column, Integer, String, Table, MetaData, inspect, text
|
|
from sqlalchemy.dialects.mysql import LONGTEXT
|
|
from sqlalchemy.orm import sessionmaker
|
|
from tqdm import tqdm
|
|
import pandas as pd
|
|
from datetime import datetime
|
|
|
|
DB_USER = "laravel_user"
|
|
DB_PWD = "P4ssw0rd.96!"
|
|
DB_HOST = "127.0.0.1"
|
|
DB_DB = "laravel_db"
|
|
|
|
BASE_PATH = "../estratti"
|
|
SCHEMA_JSON = "schema_estratti.json"
|
|
|
|
# SOLO queste colonne sono "brevi"
|
|
KEY_FIELDS = ["id_amministratore", "id_condominio", "import_hash"]
|
|
|
|
def guess_sqla_type(colname):
|
|
col = colname.lower()
|
|
if col in KEY_FIELDS:
|
|
return String(32)
|
|
if col == "data_import":
|
|
return String(32)
|
|
return LONGTEXT
|
|
|
|
def table_name_from_path(path, base_path=BASE_PATH):
|
|
rel_path = os.path.relpath(path, base_path)
|
|
rel_path = rel_path.replace(os.sep, "__").replace(".csv", "")
|
|
if rel_path[0].isdigit():
|
|
rel_path = "t_" + rel_path
|
|
else:
|
|
rel_path = "t_global__" + rel_path
|
|
rel_path = rel_path.lower()
|
|
if len(rel_path) > 55:
|
|
short = rel_path[:40]
|
|
h = hashlib.md5(rel_path.encode("utf-8")).hexdigest()[:8]
|
|
rel_path = f"{short}__{h}"
|
|
return rel_path
|
|
|
|
def file_hash(path, blocksize=65536):
|
|
hasher = hashlib.md5()
|
|
with open(path, "rb") as afile:
|
|
buf = afile.read(blocksize)
|
|
while len(buf) > 0:
|
|
hasher.update(buf)
|
|
buf = afile.read(blocksize)
|
|
return hasher.hexdigest()
|
|
|
|
def get_condominio_from_path(path):
|
|
parts = os.path.normpath(path).split(os.sep)
|
|
for part in parts:
|
|
if part.isdigit():
|
|
return part
|
|
return None
|
|
|
|
def get_amministratore_from_cond(cond):
|
|
return "1"
|
|
|
|
with open(SCHEMA_JSON, 'r', encoding='utf-8') as f:
|
|
schema = json.load(f)
|
|
|
|
conn_str = f"mysql+pymysql://{DB_USER}:{DB_PWD}@{DB_HOST}/{DB_DB}?charset=utf8mb4"
|
|
engine = create_engine(conn_str, echo=False, future=True)
|
|
Session = sessionmaker(bind=engine)
|
|
session = Session()
|
|
metadata = MetaData()
|
|
|
|
def ensure_table(table_name, columns_schema):
|
|
insp = inspect(engine)
|
|
if insp.has_table(table_name):
|
|
print(f"[INFO] Tabella esistente: {table_name}")
|
|
return
|
|
|
|
cols = [
|
|
Column('id', Integer, primary_key=True, autoincrement=True),
|
|
Column('id_amministratore', String(32)),
|
|
Column('id_condominio', String(32)),
|
|
Column('import_hash', String(64)),
|
|
Column('data_import', String(32)),
|
|
]
|
|
for col in columns_schema:
|
|
c = col.strip().replace(" ", "_").replace("-", "_")
|
|
if c.lower() in ["id", "id_amministratore", "id_condominio", "import_hash", "data_import"]:
|
|
continue
|
|
cols.append(Column(c, guess_sqla_type(c)))
|
|
table = Table(
|
|
table_name, metadata, *cols,
|
|
mysql_engine='InnoDB',
|
|
mysql_row_format='DYNAMIC'
|
|
)
|
|
table.create(engine)
|
|
print(f"[OK] Tabella creata: {table_name}")
|
|
|
|
def upsert_rows(table_name, df, key_fields):
|
|
with engine.begin() as conn:
|
|
for _, row in df.iterrows():
|
|
keys = {k: row[k] for k in key_fields}
|
|
where_clause = " AND ".join([f"{k}=:k_{k}" for k in key_fields])
|
|
update_clause = ", ".join([f"{col}=:u_{col}" for col in df.columns if col not in key_fields])
|
|
params = {f"k_{k}": row[k] for k in key_fields}
|
|
params.update({f"u_{col}": row[col] for col in df.columns if col not in key_fields})
|
|
|
|
stmt_update = text(f"UPDATE `{table_name}` SET {update_clause} WHERE {where_clause}")
|
|
result = conn.execute(stmt_update, params)
|
|
if result.rowcount == 0:
|
|
insert_cols = ", ".join(df.columns)
|
|
insert_vals = ", ".join([f":{col}" for col in df.columns])
|
|
stmt_insert = text(f"INSERT INTO `{table_name}` ({insert_cols}) VALUES ({insert_vals})")
|
|
conn.execute(stmt_insert, {col: row[col] for col in df.columns})
|
|
|
|
def import_csv_to_table(csv_path, table_name, columns_schema):
|
|
condominio = get_condominio_from_path(csv_path) or ""
|
|
amministratore = get_amministratore_from_cond(condominio)
|
|
hash_file = file_hash(csv_path)
|
|
|
|
print(f"[IMPORT/UPDATE] {csv_path} -> {table_name}")
|
|
try:
|
|
df = pd.read_csv(csv_path, dtype=str, keep_default_na=False)
|
|
except Exception as e:
|
|
print(f"[ERRORE Lettura CSV] {csv_path}: {e}")
|
|
return
|
|
|
|
df['id_amministratore'] = amministratore
|
|
df['id_condominio'] = condominio
|
|
df['import_hash'] = hash_file
|
|
df['data_import'] = datetime.now().isoformat()
|
|
|
|
eff_cols = ['id_amministratore', 'id_condominio', 'import_hash', 'data_import'] + [
|
|
c.strip().replace(" ", "_").replace("-", "_") for c in columns_schema if c.lower() not in ["id", "id_amministratore", "id_condominio", "import_hash", "data_import"]
|
|
]
|
|
df = df.reindex(columns=eff_cols, fill_value=None)
|
|
|
|
key_fields = ['import_hash', 'id_condominio']
|
|
try:
|
|
upsert_rows(table_name, df, key_fields)
|
|
except Exception as e:
|
|
print(f"[ERRORE Upsert dati] {csv_path}: {e}")
|
|
return
|
|
|
|
print(f"[OK] Importati/aggiornati {len(df)} record in {table_name}")
|
|
|
|
def main():
|
|
files = []
|
|
for root, dirs, fs in os.walk(BASE_PATH):
|
|
for f in fs:
|
|
if f.lower().endswith(".csv"):
|
|
files.append(os.path.join(root, f))
|
|
|
|
print(f"Trovati {len(files)} file CSV.")
|
|
for path in tqdm(files, desc="Importazione CSV"):
|
|
rel = os.path.relpath(path, BASE_PATH).replace(os.sep, "/")
|
|
key = rel if rel in schema else os.path.basename(path)
|
|
columns_schema = schema.get(key) or schema.get(os.path.basename(path))
|
|
if not columns_schema:
|
|
print(f"[WARN] Schema mancante per {key}, salto.")
|
|
continue
|
|
table_name = table_name_from_path(path)
|
|
print(f"[DEBUG] Tabella generata per {path}: {table_name}")
|
|
ensure_table(table_name, columns_schema)
|
|
import_csv_to_table(path, table_name, columns_schema)
|
|
|
|
print("[FINE] Tutte le tabelle sono state create e popolate.")
|
|
|
|
if __name__ == "__main__":
|
|
main() |