carritoIA/importar_tickets_email.py

211 lines
7.1 KiB
Python

"""
importar_tickets_email.py
-------------------------
Descarga los tickets PDF de Mercadona desde el correo (IMAP)
y los guarda en tickets/{usuario}/ para ser procesados.
Uso:
python importar_tickets_email.py --usuario tatiana
Configuracion en config.ini (montado como volumen en Docker).
Soporta SSL sin verificacion de certificado (util con servidores locales).
"""
import argparse
import imaplib
import email
import email.utils
import os
import ssl
import configparser
import sys
import subprocess
from pathlib import Path
from datetime import datetime
from email.header import decode_header
# -----------------------------------------------------------------------
# Argumentos
# -----------------------------------------------------------------------
_parser = argparse.ArgumentParser()
_parser.add_argument("--usuario", default="default", help="Usuario al que asignar los tickets")
_args = _parser.parse_args()
# -----------------------------------------------------------------------
# Rutas
# -----------------------------------------------------------------------
BASE_DIR = Path(__file__).parent
CONFIG_FILE = BASE_DIR / "config.ini"
# -----------------------------------------------------------------------
# Configuracion
# -----------------------------------------------------------------------
DEFAULTS = {
"imap_host": "localhost",
"imap_port": "993",
"correo": "",
"password": "",
"remitente": "noreply@mercadona.es",
"solo_nuevos": "true",
"ssl_verify": "true",
"ejecutar_pipeline": "true",
}
def leer_config():
cfg = configparser.ConfigParser()
if CONFIG_FILE.exists():
cfg.read(CONFIG_FILE, encoding="utf-8")
if "email" not in cfg:
cfg["email"] = {}
for k, v in DEFAULTS.items():
cfg["email"].setdefault(k, v)
return cfg
# -----------------------------------------------------------------------
# Decodificacion de nombres de archivo
# -----------------------------------------------------------------------
def decodificar_nombre(nombre_raw):
if not nombre_raw:
return None
partes = decode_header(nombre_raw)
nombre = ""
for chunk, enc in partes:
if isinstance(chunk, bytes):
nombre += chunk.decode(enc or "utf-8", errors="replace")
else:
nombre += chunk
return nombre.strip()
# -----------------------------------------------------------------------
# Descarga de tickets
# -----------------------------------------------------------------------
def descargar_tickets(usuario: str) -> int:
cfg = leer_config()
sec = cfg["email"]
# Config por usuario desde users.json (tiene prioridad sobre config.ini)
users_file = BASE_DIR / "users.json"
user_email_cfg = {}
if users_file.exists():
try:
import json as _json
with open(users_file, encoding="utf-8") as f:
users = _json.load(f)
user_email_cfg = users.get(usuario, {}).get("email_config", {})
except Exception:
pass
def _get(key, default=""):
return user_email_cfg.get(key) or os.environ.get(
{"imap_host": "EMAIL_IMAP_HOST", "imap_port": "EMAIL_IMAP_PORT",
"correo": "EMAIL_CORREO", "password": "EMAIL_PASSWORD"}.get(key, ""),
""
) or sec.get(key, default)
host = _get("imap_host", "localhost").strip()
port = int(_get("imap_port", "993"))
correo = _get("correo").strip()
password = _get("password").strip()
remitente = _get("remitente", "noreply@mercadona.es")
solo_nuevos = _get("ssl_verify", "true") != "false" # reutilizamos lógica abajo
ssl_verify = _get("ssl_verify", "true").lower() == "true"
solo_nuevos = sec["solo_nuevos"].lower() == "true"
ejecutar_pipeline = sec["ejecutar_pipeline"].lower() == "true"
if not correo or not password:
print("ERROR: correo o password no configurados en config.ini / variables de entorno.")
sys.exit(1)
tickets_dir = BASE_DIR / "tickets" / usuario
tickets_dir.mkdir(parents=True, exist_ok=True)
print(f"[email] Conectando a {host}:{port} como {correo} (ssl_verify={ssl_verify})...")
try:
if ssl_verify:
conn = imaplib.IMAP4_SSL(host, port)
else:
ctx = ssl.create_default_context()
ctx.check_hostname = False
ctx.verify_mode = ssl.CERT_NONE
conn = imaplib.IMAP4_SSL(host, port, ssl_context=ctx)
conn.login(correo, password)
except imaplib.IMAP4.error as e:
print(f"[email] Error de autenticacion: {e}")
sys.exit(1)
except Exception as e:
print(f"[email] Error de conexion: {e}")
sys.exit(1)
conn.select("INBOX")
criterio = f'(FROM "{remitente}")'
if solo_nuevos:
criterio = f'(UNSEEN FROM "{remitente}")'
_, ids = conn.search(None, criterio)
ids = ids[0].split()
if not ids:
print("[email] No hay tickets nuevos de Mercadona.")
conn.logout()
return 0
print(f"[email] Encontrados {len(ids)} correo(s) de Mercadona.")
descargados = 0
for uid in ids:
_, data = conn.fetch(uid, "(RFC822)")
msg = email.message_from_bytes(data[0][1])
fecha_str = msg.get("Date", "")
try:
fecha_dt = email.utils.parsedate_to_datetime(fecha_str)
fecha_fmt = fecha_dt.strftime("%Y%m%d")
except Exception:
fecha_fmt = datetime.now().strftime("%Y%m%d")
adjuntos_guardados = 0
for part in msg.walk():
if part.get_content_type() == "application/pdf":
nombre_orig = decodificar_nombre(part.get_filename())
if not nombre_orig:
nombre_orig = f"ticket_{fecha_fmt}_{uid.decode()}.pdf"
destino = tickets_dir / nombre_orig
if destino.exists():
destino = tickets_dir / f"{fecha_fmt}_{nombre_orig}"
destino.write_bytes(part.get_payload(decode=True))
print(f"[email] Guardado: {destino.name}")
adjuntos_guardados += 1
descargados += 1
if adjuntos_guardados == 0:
print(f"[email] Correo {uid.decode()}: sin PDF adjunto, ignorado.")
conn.store(uid, "+FLAGS", "\\Seen")
conn.logout()
print(f"[email] Total descargados: {descargados} archivo(s) en {tickets_dir}")
if ejecutar_pipeline and descargados > 0:
print("[email] Ejecutando pipeline...")
subprocess.run(
[sys.executable, str(BASE_DIR / "autocompra7.py"), "--usuario", usuario],
cwd=str(BASE_DIR), check=False
)
subprocess.run(
[sys.executable, str(BASE_DIR / "generar_lista.py"), "--usuario", usuario],
cwd=str(BASE_DIR), check=False
)
print("[email] Pipeline completado.")
return descargados
# -----------------------------------------------------------------------
# Entry point
# -----------------------------------------------------------------------
if __name__ == "__main__":
n = descargar_tickets(_args.usuario)
sys.exit(0 if n >= 0 else 1)