netgescon-master/scripts/Script fatti per prova e per ora sospesi/sample_archivio_to_json.py

142 lines
5.3 KiB
Python

#!/usr/bin/env python3
"""
Estrazione struttura e sample dati archivi condominiali (CSV+MDB) in JSON.
Versione: 2025-06-04
Autore: Pikappa2 & Copilot
- Scansiona ricorsivamente la cartella archiviata (lettura path da agent_config.json).
- Per ogni file CSV e MDB:
- Estrae header e prima riga di ogni tabella.
- Salva tutto in un file sample_archivio.json
- Logga colonne/tabelle sospette (es: che iniziano con Q) in log_tabelle_sospette.json
- Log generale in sample_archivio.log
"""
import os
import csv
import json
import subprocess
import datetime
SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__))
CONFIG_PATH = os.path.join(SCRIPT_DIR, "../agent_config.json")
LOG_DIR = os.path.join(SCRIPT_DIR, "../log")
os.makedirs(LOG_DIR, exist_ok=True)
# Log file paths
LOGFILE = os.path.join(LOG_DIR, "sample_archivio.log")
LOG_SOSP = os.path.join(LOG_DIR, "log_tabelle_sospette.json")
OUTFILE = os.path.join(SCRIPT_DIR, "sample_archivio.json")
def log_event(msg, **kwargs):
row = {"event": msg, "timestamp": datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")}
row.update(kwargs)
print("[LOG]", row)
with open(LOGFILE, "a") as f:
f.write(json.dumps(row, ensure_ascii=False) + "\n")
def log_sospetto(tabella, campo, motivo):
# Logga ogni tabella/campo sospetto in un file JSON unico
rec = {"tabella": tabella, "campo": campo, "motivo": motivo, "timestamp": datetime.datetime.now().isoformat()}
try:
if os.path.exists(LOG_SOSP):
with open(LOG_SOSP, "r") as f:
data = json.load(f)
else:
data = []
except Exception:
data = []
data.append(rec)
with open(LOG_SOSP, "w") as f:
json.dump(data, f, indent=2, ensure_ascii=False)
# Leggi config
with open(CONFIG_PATH) as f:
config = json.load(f)
ROOTDIR = config.get("OutputDirectory", os.path.join(SCRIPT_DIR, "../estratti"))
def scan_csv(path, table_hint=None):
with open(path, encoding="utf-8", errors="ignore") as f:
reader = csv.DictReader(f)
try:
first = next(reader)
except StopIteration:
first = {}
header = list(reader.fieldnames)
# Log campi sospetti (iniziano per Q o altri pattern custom)
for campo in header:
if campo.upper().startswith("Q"):
log_sospetto(table_hint or path, campo, "Campo sospetto: inizia per Q")
return header, first
def scan_mdb(path):
# Restituisce: {tabella: (header, sample_row)}
sample = {}
try:
tables = subprocess.check_output(["mdb-tables", "-1", path]).decode().split()
except Exception:
return {}
for tab in tables:
try:
header_proc = subprocess.check_output(["mdb-export", "-H", path, tab])
header = [h.strip() for h in header_proc.decode(errors="ignore").strip().split(",")]
# Log campi sospetti
for campo in header:
if campo.upper().startswith("Q"):
log_sospetto(f"{os.path.basename(path)}::{tab}", campo, "Campo sospetto: inizia per Q")
# Estrai prima riga
proc = subprocess.Popen(
["mdb-export", "--no-header", "-d", ",", path, tab],
stdout=subprocess.PIPE, stderr=subprocess.PIPE
)
out, err = proc.communicate()
if proc.returncode != 0:
continue
lines = out.decode(errors="ignore").splitlines()
if not lines:
sample_row = {}
else:
reader = csv.DictReader(lines, fieldnames=header)
try:
sample_row = next(reader)
except StopIteration:
sample_row = {}
sample[tab] = (header, sample_row)
except Exception:
continue
return sample
def main():
samples = []
for root, dirs, files in os.walk(ROOTDIR):
for fname in files:
fullpath = os.path.join(root, fname)
relpath = os.path.relpath(fullpath, ROOTDIR)
if fname.lower().endswith(".csv"):
header, row = scan_csv(fullpath, table_hint=os.path.splitext(os.path.basename(fname))[0])
samples.append({
"source": relpath,
"type": "csv",
"table": os.path.splitext(os.path.basename(fname))[0],
"fields": header,
"sample_row": row
})
log_event("csv_scanned", file=relpath, fields=len(header))
elif fname.lower().endswith(".mdb"):
mdb_info = scan_mdb(fullpath)
for tab, (header, row) in mdb_info.items():
samples.append({
"source": relpath,
"type": "mdb",
"table": tab,
"fields": header,
"sample_row": row
})
log_event("mdb_table_scanned", file=relpath, table=tab, fields=len(header))
with open(OUTFILE, "w", encoding="utf-8") as f:
json.dump(samples, f, indent=2, ensure_ascii=False)
log_event("sample_json_created", file=OUTFILE, tables=len(samples))
print(f"Creato {OUTFILE} con {len(samples)} tabelle/estratti")
if __name__ == "__main__":
main()