""" importar_tickets_email.py ------------------------- Descarga los tickets PDF de Mercadona desde el correo (IMAP) y los guarda en la carpeta tickets/ para ser procesados. Uso: python importar_tickets_email.py Configuracion en config.ini (se crea automaticamente la primera vez). Para Gmail necesitas una "contrasena de aplicacion": Cuenta Google -> Seguridad -> Verificacion en dos pasos -> Contrasenas de aplicacion -> Otra -> copiar los 16 caracteres. """ import imaplib import email import os import configparser import sys import subprocess from pathlib import Path from datetime import datetime from email.header import decode_header # ----------------------------------------------------------------------- # Rutas # ----------------------------------------------------------------------- BASE_DIR = Path(__file__).parent TICKETS_DIR = BASE_DIR / "tickets" CONFIG_FILE = BASE_DIR / "config.ini" TICKETS_DIR.mkdir(exist_ok=True) # ----------------------------------------------------------------------- # Configuracion # ----------------------------------------------------------------------- DEFAULTS = { "imap_host": "imap.gmail.com", "imap_port": "993", "correo": "", "password": "", "remitente": "noreply@mercadona.es", "solo_nuevos": "true", "ejecutar_pipeline": "false", } def leer_config(): cfg = configparser.ConfigParser() if CONFIG_FILE.exists(): cfg.read(CONFIG_FILE, encoding="utf-8") if "email" not in cfg: cfg["email"] = {} for k, v in DEFAULTS.items(): cfg["email"].setdefault(k, v) return cfg def guardar_config(cfg): with open(CONFIG_FILE, "w", encoding="utf-8") as f: cfg.write(f) def pedir_config(): """Solicita los datos de acceso si no estan configurados.""" cfg = leer_config() sec = cfg["email"] if not sec["correo"] or not sec["password"]: print("=== Configuracion inicial ===") print("Necesito los datos de acceso al correo.") print("Para Gmail usa una contrasena de aplicacion (no tu contrasena normal).") print("https://myaccount.google.com/apppasswords\n") correo = input("Correo electronico: ").strip() pwd = input("Contrasena (o contrasena de aplicacion): ").strip() host = input(f"Servidor IMAP [{sec['imap_host']}]: ").strip() or sec["imap_host"] sec["correo"] = correo sec["password"] = pwd sec["imap_host"] = host guardar_config(cfg) print(f"\nConfiguracion guardada en {CONFIG_FILE}\n") return cfg # ----------------------------------------------------------------------- # Decodificacion de nombres de archivo # ----------------------------------------------------------------------- def decodificar_nombre(nombre_raw): if not nombre_raw: return None partes = decode_header(nombre_raw) nombre = "" for chunk, enc in partes: if isinstance(chunk, bytes): nombre += chunk.decode(enc or "utf-8", errors="replace") else: nombre += chunk return nombre.strip() # ----------------------------------------------------------------------- # Descarga de tickets # ----------------------------------------------------------------------- def descargar_tickets(): cfg = pedir_config() sec = cfg["email"] host = sec["imap_host"] port = int(sec["imap_port"]) correo = sec["correo"] password = sec["password"] remitente = sec["remitente"] solo_nuevos = sec["solo_nuevos"].lower() == "true" ejecutar_pipeline = sec["ejecutar_pipeline"].lower() == "true" print(f"Conectando a {host}:{port} como {correo}...") try: conn = imaplib.IMAP4_SSL(host, port) conn.login(correo, password) except imaplib.IMAP4.error as e: print(f"Error de autenticacion: {e}") print("Comprueba el correo y la contrasena en config.ini") sys.exit(1) conn.select("INBOX") criterio = f'(FROM "{remitente}")' if solo_nuevos: criterio = f'(UNSEEN FROM "{remitente}")' _, ids = conn.search(None, criterio) ids = ids[0].split() if not ids: print("No hay tickets nuevos de Mercadona.") conn.logout() return 0 print(f"Encontrados {len(ids)} correo(s) de Mercadona.") descargados = 0 for uid in ids: _, data = conn.fetch(uid, "(RFC822)") msg = email.message_from_bytes(data[0][1]) # Fecha del correo para nombre de archivo fecha_str = msg.get("Date", "") try: fecha_dt = email.utils.parsedate_to_datetime(fecha_str) fecha_fmt = fecha_dt.strftime("%Y%m%d") except Exception: fecha_fmt = datetime.now().strftime("%Y%m%d") adjuntos_guardados = 0 for part in msg.walk(): if part.get_content_type() == "application/pdf": nombre_orig = decodificar_nombre(part.get_filename()) if not nombre_orig: nombre_orig = f"ticket_{fecha_fmt}_{uid.decode()}.pdf" # Evitar colisiones destino = TICKETS_DIR / nombre_orig if destino.exists(): destino = TICKETS_DIR / f"{fecha_fmt}_{nombre_orig}" destino.write_bytes(part.get_payload(decode=True)) print(f" Guardado: {destino.name}") adjuntos_guardados += 1 descargados += 1 if adjuntos_guardados == 0: print(f" Correo {uid.decode()}: sin PDF adjunto, ignorado.") # Marcar como leido conn.store(uid, "+FLAGS", "\\Seen") conn.logout() print(f"\nTotal descargados: {descargados} archivo(s) en {TICKETS_DIR}") # Ejecutar pipeline si esta configurado if ejecutar_pipeline and descargados > 0: print("\nEjecutando pipeline de procesado...") subprocess.run([sys.executable, str(BASE_DIR / "autocompra7.py")], check=False) subprocess.run([sys.executable, str(BASE_DIR / "generar_lista.py")], check=False) print("Pipeline completado.") return descargados # ----------------------------------------------------------------------- # Entry point # ----------------------------------------------------------------------- if __name__ == "__main__": descargados = descargar_tickets() if descargados == 0: sys.exit(0)