""" importar_tickets_email.py ------------------------- Descarga los tickets PDF de Mercadona desde el correo (IMAP) y los guarda en tickets/{usuario}/ para ser procesados. Uso: python importar_tickets_email.py --usuario tatiana Configuracion en config.ini (montado como volumen en Docker). Soporta SSL sin verificacion de certificado (util con servidores locales). """ import argparse import imaplib import email import email.utils import os import ssl import configparser import sys import subprocess from pathlib import Path from datetime import datetime from email.header import decode_header # ----------------------------------------------------------------------- # Argumentos # ----------------------------------------------------------------------- _parser = argparse.ArgumentParser() _parser.add_argument("--usuario", default="default", help="Usuario al que asignar los tickets") _args = _parser.parse_args() # ----------------------------------------------------------------------- # Rutas # ----------------------------------------------------------------------- BASE_DIR = Path(__file__).parent CONFIG_FILE = BASE_DIR / "config.ini" # ----------------------------------------------------------------------- # Configuracion # ----------------------------------------------------------------------- DEFAULTS = { "imap_host": "localhost", "imap_port": "993", "correo": "", "password": "", "remitente": "noreply@mercadona.es", "solo_nuevos": "true", "ssl_verify": "true", "ejecutar_pipeline": "true", } def leer_config(): cfg = configparser.ConfigParser() if CONFIG_FILE.exists(): cfg.read(CONFIG_FILE, encoding="utf-8") if "email" not in cfg: cfg["email"] = {} for k, v in DEFAULTS.items(): cfg["email"].setdefault(k, v) return cfg # ----------------------------------------------------------------------- # Decodificacion de nombres de archivo # ----------------------------------------------------------------------- def decodificar_nombre(nombre_raw): if not nombre_raw: return None partes = decode_header(nombre_raw) nombre = "" for chunk, enc in partes: if isinstance(chunk, bytes): nombre += chunk.decode(enc or "utf-8", errors="replace") else: nombre += chunk return nombre.strip() # ----------------------------------------------------------------------- # Descarga de tickets # ----------------------------------------------------------------------- def descargar_tickets(usuario: str) -> int: cfg = leer_config() sec = cfg["email"] # Config por usuario desde users.json (tiene prioridad sobre config.ini) users_file = BASE_DIR / "users.json" user_email_cfg = {} if users_file.exists(): try: import json as _json with open(users_file, encoding="utf-8") as f: users = _json.load(f) user_email_cfg = users.get(usuario, {}).get("email_config", {}) except Exception: pass def _get(key, default=""): return user_email_cfg.get(key) or os.environ.get( {"imap_host": "EMAIL_IMAP_HOST", "imap_port": "EMAIL_IMAP_PORT", "correo": "EMAIL_CORREO", "password": "EMAIL_PASSWORD"}.get(key, ""), "" ) or sec.get(key, default) host = _get("imap_host", "localhost").strip() port = int(_get("imap_port", "993")) correo = _get("correo").strip() password = _get("password").strip() remitente = _get("remitente", "noreply@mercadona.es") solo_nuevos = _get("ssl_verify", "true") != "false" # reutilizamos lógica abajo ssl_verify = _get("ssl_verify", "true").lower() == "true" solo_nuevos = sec["solo_nuevos"].lower() == "true" ejecutar_pipeline = sec["ejecutar_pipeline"].lower() == "true" if not correo or not password: print("ERROR: correo o password no configurados en config.ini / variables de entorno.") sys.exit(1) tickets_dir = BASE_DIR / "tickets" / usuario tickets_dir.mkdir(parents=True, exist_ok=True) print(f"[email] Conectando a {host}:{port} como {correo} (ssl_verify={ssl_verify})...") try: if ssl_verify: conn = imaplib.IMAP4_SSL(host, port) else: ctx = ssl.create_default_context() ctx.check_hostname = False ctx.verify_mode = ssl.CERT_NONE conn = imaplib.IMAP4_SSL(host, port, ssl_context=ctx) conn.login(correo, password) except imaplib.IMAP4.error as e: print(f"[email] Error de autenticacion: {e}") sys.exit(1) except Exception as e: print(f"[email] Error de conexion: {e}") sys.exit(1) conn.select("INBOX") criterio = f'(FROM "{remitente}")' if solo_nuevos: criterio = f'(UNSEEN FROM "{remitente}")' _, ids = conn.search(None, criterio) ids = ids[0].split() if not ids: print("[email] No hay tickets nuevos de Mercadona.") conn.logout() return 0 print(f"[email] Encontrados {len(ids)} correo(s) de Mercadona.") descargados = 0 for uid in ids: _, data = conn.fetch(uid, "(RFC822)") msg = email.message_from_bytes(data[0][1]) fecha_str = msg.get("Date", "") try: fecha_dt = email.utils.parsedate_to_datetime(fecha_str) fecha_fmt = fecha_dt.strftime("%Y%m%d") except Exception: fecha_fmt = datetime.now().strftime("%Y%m%d") adjuntos_guardados = 0 for part in msg.walk(): if part.get_content_type() == "application/pdf": nombre_orig = decodificar_nombre(part.get_filename()) if not nombre_orig: nombre_orig = f"ticket_{fecha_fmt}_{uid.decode()}.pdf" destino = tickets_dir / nombre_orig if destino.exists(): destino = tickets_dir / f"{fecha_fmt}_{nombre_orig}" destino.write_bytes(part.get_payload(decode=True)) print(f"[email] Guardado: {destino.name}") adjuntos_guardados += 1 descargados += 1 if adjuntos_guardados == 0: print(f"[email] Correo {uid.decode()}: sin PDF adjunto, ignorado.") conn.store(uid, "+FLAGS", "\\Seen") conn.logout() print(f"[email] Total descargados: {descargados} archivo(s) en {tickets_dir}") if ejecutar_pipeline and descargados > 0: print("[email] Ejecutando pipeline...") subprocess.run( [sys.executable, str(BASE_DIR / "autocompra7.py"), "--usuario", usuario], cwd=str(BASE_DIR), check=False ) subprocess.run( [sys.executable, str(BASE_DIR / "generar_lista.py"), "--usuario", usuario], cwd=str(BASE_DIR), check=False ) print("[email] Pipeline completado.") return descargados # ----------------------------------------------------------------------- # Entry point # ----------------------------------------------------------------------- if __name__ == "__main__": n = descargar_tickets(_args.usuario) sys.exit(0 if n >= 0 else 1)