142 lines
5.3 KiB
Python
142 lines
5.3 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Estrazione struttura e sample dati archivi condominiali (CSV+MDB) in JSON.
|
|
Versione: 2025-06-04
|
|
Autore: Pikappa2 & Copilot
|
|
|
|
- Scansiona ricorsivamente la cartella archiviata (lettura path da agent_config.json).
|
|
- Per ogni file CSV e MDB:
|
|
- Estrae header e prima riga di ogni tabella.
|
|
- Salva tutto in un file sample_archivio.json
|
|
- Logga colonne/tabelle sospette (es: che iniziano con Q) in log_tabelle_sospette.json
|
|
- Log generale in sample_archivio.log
|
|
"""
|
|
|
|
import os
|
|
import csv
|
|
import json
|
|
import subprocess
|
|
import datetime
|
|
|
|
SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__))
|
|
CONFIG_PATH = os.path.join(SCRIPT_DIR, "../agent_config.json")
|
|
LOG_DIR = os.path.join(SCRIPT_DIR, "../log")
|
|
os.makedirs(LOG_DIR, exist_ok=True)
|
|
|
|
# Log file paths
|
|
LOGFILE = os.path.join(LOG_DIR, "sample_archivio.log")
|
|
LOG_SOSP = os.path.join(LOG_DIR, "log_tabelle_sospette.json")
|
|
OUTFILE = os.path.join(SCRIPT_DIR, "sample_archivio.json")
|
|
|
|
def log_event(msg, **kwargs):
|
|
row = {"event": msg, "timestamp": datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")}
|
|
row.update(kwargs)
|
|
print("[LOG]", row)
|
|
with open(LOGFILE, "a") as f:
|
|
f.write(json.dumps(row, ensure_ascii=False) + "\n")
|
|
|
|
def log_sospetto(tabella, campo, motivo):
|
|
# Logga ogni tabella/campo sospetto in un file JSON unico
|
|
rec = {"tabella": tabella, "campo": campo, "motivo": motivo, "timestamp": datetime.datetime.now().isoformat()}
|
|
try:
|
|
if os.path.exists(LOG_SOSP):
|
|
with open(LOG_SOSP, "r") as f:
|
|
data = json.load(f)
|
|
else:
|
|
data = []
|
|
except Exception:
|
|
data = []
|
|
data.append(rec)
|
|
with open(LOG_SOSP, "w") as f:
|
|
json.dump(data, f, indent=2, ensure_ascii=False)
|
|
|
|
# Leggi config
|
|
with open(CONFIG_PATH) as f:
|
|
config = json.load(f)
|
|
ROOTDIR = config.get("OutputDirectory", os.path.join(SCRIPT_DIR, "../estratti"))
|
|
|
|
def scan_csv(path, table_hint=None):
|
|
with open(path, encoding="utf-8", errors="ignore") as f:
|
|
reader = csv.DictReader(f)
|
|
try:
|
|
first = next(reader)
|
|
except StopIteration:
|
|
first = {}
|
|
header = list(reader.fieldnames)
|
|
# Log campi sospetti (iniziano per Q o altri pattern custom)
|
|
for campo in header:
|
|
if campo.upper().startswith("Q"):
|
|
log_sospetto(table_hint or path, campo, "Campo sospetto: inizia per Q")
|
|
return header, first
|
|
|
|
def scan_mdb(path):
|
|
# Restituisce: {tabella: (header, sample_row)}
|
|
sample = {}
|
|
try:
|
|
tables = subprocess.check_output(["mdb-tables", "-1", path]).decode().split()
|
|
except Exception:
|
|
return {}
|
|
for tab in tables:
|
|
try:
|
|
header_proc = subprocess.check_output(["mdb-export", "-H", path, tab])
|
|
header = [h.strip() for h in header_proc.decode(errors="ignore").strip().split(",")]
|
|
# Log campi sospetti
|
|
for campo in header:
|
|
if campo.upper().startswith("Q"):
|
|
log_sospetto(f"{os.path.basename(path)}::{tab}", campo, "Campo sospetto: inizia per Q")
|
|
# Estrai prima riga
|
|
proc = subprocess.Popen(
|
|
["mdb-export", "--no-header", "-d", ",", path, tab],
|
|
stdout=subprocess.PIPE, stderr=subprocess.PIPE
|
|
)
|
|
out, err = proc.communicate()
|
|
if proc.returncode != 0:
|
|
continue
|
|
lines = out.decode(errors="ignore").splitlines()
|
|
if not lines:
|
|
sample_row = {}
|
|
else:
|
|
reader = csv.DictReader(lines, fieldnames=header)
|
|
try:
|
|
sample_row = next(reader)
|
|
except StopIteration:
|
|
sample_row = {}
|
|
sample[tab] = (header, sample_row)
|
|
except Exception:
|
|
continue
|
|
return sample
|
|
|
|
def main():
|
|
samples = []
|
|
for root, dirs, files in os.walk(ROOTDIR):
|
|
for fname in files:
|
|
fullpath = os.path.join(root, fname)
|
|
relpath = os.path.relpath(fullpath, ROOTDIR)
|
|
if fname.lower().endswith(".csv"):
|
|
header, row = scan_csv(fullpath, table_hint=os.path.splitext(os.path.basename(fname))[0])
|
|
samples.append({
|
|
"source": relpath,
|
|
"type": "csv",
|
|
"table": os.path.splitext(os.path.basename(fname))[0],
|
|
"fields": header,
|
|
"sample_row": row
|
|
})
|
|
log_event("csv_scanned", file=relpath, fields=len(header))
|
|
elif fname.lower().endswith(".mdb"):
|
|
mdb_info = scan_mdb(fullpath)
|
|
for tab, (header, row) in mdb_info.items():
|
|
samples.append({
|
|
"source": relpath,
|
|
"type": "mdb",
|
|
"table": tab,
|
|
"fields": header,
|
|
"sample_row": row
|
|
})
|
|
log_event("mdb_table_scanned", file=relpath, table=tab, fields=len(header))
|
|
with open(OUTFILE, "w", encoding="utf-8") as f:
|
|
json.dump(samples, f, indent=2, ensure_ascii=False)
|
|
log_event("sample_json_created", file=OUTFILE, tables=len(samples))
|
|
print(f"Creato {OUTFILE} con {len(samples)} tabelle/estratti")
|
|
|
|
if __name__ == "__main__":
|
|
main() |