import re import pandas as pd import numpy as np import os import json import argparse from datetime import datetime, timedelta from PyPDF2 import PdfReader # --- Argumentos -------------------------------------------------------- parser = argparse.ArgumentParser() parser.add_argument('--usuario', default='default', help='Nombre del usuario') args = parser.parse_args() # Carpeta con los tickets del usuario y directorio de salida ticket_folder = os.path.join("tickets", args.usuario) output_dir = os.path.join("datos", args.usuario) os.makedirs(ticket_folder, exist_ok=True) os.makedirs(output_dir, exist_ok=True) # Palabras clave que indican líneas que no hay que procesar exclude_keywords = [ "TARJETA BANCARIA", "IVA BASE IMPONIBLE", "CUOTA", "TOTAL", "SE ADMITEN DEVOLUCIONES CON TICKET", "N.C", "AUT", "AID", "Verificado por dispositivo", "Visa Credit", "IMPORTE", "TARJ. BANCARIA", "BASES IMPONIBLES", "BASE IMPONIBLE", "BASE CUOTA", "SUMA", "EFECTIVO", "CAMBIO", "MASTERCARD", "MERCADONA", "TICKET", ] # Nombre de producto que indica línea de IVA (p.ej. "%", "% 125,30") def _es_producto_invalido(nombre: str) -> bool: n = nombre.strip() return ( not n or n.startswith("%") or n == "%" or re.match(r'^%', n) or len(n) < 2 ) def extract_data_from_pdf(file_path): reader = PdfReader(file_path) text = "" for page in reader.pages: text += page.extract_text() + "\n" # Buscar la fecha date_match = re.search(r"(\d{2}/\d{2}/\d{4})", text) fecha = datetime.strptime(date_match.group(1), "%d/%m/%Y") if date_match else None products = [] lines = text.splitlines() i = 0 while i < len(lines): line = lines[i].strip() if not line or any(kw in line for kw in exclude_keywords): i += 1 continue # Producto por peso (2 líneas): # "1BROCOLI" # "1,048 kg 2,60 €/kg 2,72" if i + 1 < len(lines): next_line = lines[i + 1].strip() weight_m = re.search(r"kg.*?(\d+[,.]\d{2})\s*$", next_line) if weight_m and re.match(r"^\d+[A-ZÁÉÍÓÚÑa-záéíóúñ+]", line): name_m = re.match(r"^(\d+)(.+)$", line) if name_m: cantidad = int(name_m.group(1)) producto = name_m.group(2).strip().upper() if not _es_producto_invalido(producto): precio_total = float(weight_m.group(1).replace(",", ".")) products.append((fecha, cantidad, producto, round(precio_total / cantidad, 2), precio_total)) i += 2 continue # Formato: "2ARROZ SOS 1,88 3,76" → cantidad / nombre / p.unit / total m = re.match(r"^(\d+)(.+?)\s+(\d+,\d{2})\s+(\d+,\d{2})$", line) if m: try: cantidad = int(m.group(1)) producto = m.group(2).strip().upper() precio_unitario = float(m.group(3).replace(",", ".")) precio_total = float(m.group(4).replace(",", ".")) if not _es_producto_invalido(producto): products.append((fecha, cantidad, producto, precio_unitario, precio_total)) i += 1 continue except ValueError: pass # Formato: "1CALABAZA TROZOS 2,50" → cantidad / nombre / total m = re.match(r"^(\d+)(.+?)\s+(\d+,\d{2})$", line) if m: try: cantidad = int(m.group(1)) producto = m.group(2).strip().upper() precio_total = float(m.group(3).replace(",", ".")) if not _es_producto_invalido(producto): products.append((fecha, cantidad, producto, round(precio_total / cantidad, 2), precio_total)) i += 1 continue except ValueError: pass i += 1 return products # Recolectar todos los datos de los tickets datos = [] for file in os.listdir(ticket_folder): path = os.path.join(ticket_folder, file) if file.endswith(".pdf"): datos.extend(extract_data_from_pdf(path)) elif file.endswith(".json"): # Tickets procesados con OCR desde foto de móvil try: with open(path, encoding="utf-8") as f: ticket = json.load(f) fecha_str = ticket.get("fecha", "") try: fecha = datetime.strptime(fecha_str, "%d/%m/%Y") except ValueError: continue for p in ticket.get("productos", []): datos.append(( fecha, p.get("cantidad", 1), str(p.get("producto", "")).upper(), float(p.get("precio_unitario", 0.0)), float(p.get("precio_total", 0.0)), )) except Exception as e: print(f"[warn] No se pudo leer {file}: {e}") # Crear DataFrame columnas = ["fecha", "cantidad", "producto", "precio_unitario", "precio_total"] df = pd.DataFrame(datos, columns=columnas) df.dropna(subset=["fecha"], inplace=True) if df.empty: print(f"[info] No hay tickets para el usuario '{args.usuario}'. Creando archivos vacios.") empty = pd.DataFrame(columns=["producto", "diferencia_dias", "proxima_compra", "es_estacional", "meses_temporada", "fecha_estimada_proxima_compra"]) empty.to_csv(os.path.join(output_dir, "lista_compra_estimado.csv"), index=False) import sys; sys.exit(0) # Guardar detalle completo df.to_csv(os.path.join(output_dir, "detalle_productos.csv"), index=False) # Agrupar por producto resumen = df.groupby("producto").agg( veces_comprado=("fecha", "count"), total_unidades=("cantidad", "sum"), gasto_total=("precio_total", "sum"), primera_vez=("fecha", "min"), ultima_vez=("fecha", "max") ).sort_values("gasto_total", ascending=False) resumen.to_csv(os.path.join(output_dir, "resumen_productos.csv")) # Gasto mensual df["mes"] = df["fecha"].dt.to_period("M") gasto_mensual = df.groupby("mes")["precio_total"].sum() gasto_mensual.to_csv(os.path.join(output_dir, "gasto_mensual.csv")) # --------------------------------------------------------------------------- # Cálculo de frecuencia con estacionalidad real por meses del año # --------------------------------------------------------------------------- # Algoritmo: # 1. Detectar en qué meses del año se compra el producto. # 2. Si se compra en ≤ 5 meses distintos → producto estacional. # 3. Frecuencia = media de gaps entre compras consecutivas dentro de temporada (≤ 90 días). # 4. Si hoy está en temporada → proyectar normalmente desde la última compra. # 5. Si hoy está FUERA de temporada → proyectar al inicio del próximo período activo. hoy = datetime.now() def _meses_activos(meses_compra): """Devuelve el conjunto de meses activos con ±1 mes de tolerancia, circular.""" activos = set() for m in meses_compra: activos.add(m) activos.add((m % 12) + 1) # mes siguiente activos.add(((m - 2) % 12) + 1) # mes anterior return activos def _proximo_mes_activo(mes_hoy, meses_activos): """Devuelve cuántos meses hay que avanzar para entrar en temporada.""" for i in range(1, 13): if ((mes_hoy - 1 + i) % 12) + 1 in meses_activos: return i return 0 def calcular_frecuencia_estacional(grupo): fechas = grupo['fecha'].sort_values().dropna() if len(fechas) < 1: return pd.Series({'diferencia_dias': float('nan'), 'proxima_compra': pd.NaT, 'es_estacional': False, 'meses_temporada': ''}) meses_compra = set(int(m) for m in fechas.dt.month.unique()) es_estacional = len(meses_compra) <= 5 # Frecuencia: solo gaps dentro de temporada (cortos) gaps = fechas.diff().dt.days.dropna() gaps_validos = gaps[(gaps > 0) & (gaps <= 90)] if len(gaps_validos) >= 1: freq = float(gaps_validos.mean()) elif len(gaps[gaps > 0]) >= 1: freq = float(gaps[gaps > 0].mean()) else: return pd.Series({'diferencia_dias': float('nan'), 'proxima_compra': pd.NaT, 'es_estacional': es_estacional, 'meses_temporada': ','.join(str(m) for m in sorted(meses_compra))}) ultima = fechas.max() proxima = ultima + timedelta(days=freq) if es_estacional: activos = _meses_activos(meses_compra) if hoy.month not in activos: # Fuera de temporada: buscar el próximo mes activo avance = _proximo_mes_activo(hoy.month, activos) mes_inicio = ((hoy.month - 1 + avance) % 12) + 1 anio = hoy.year if mes_inicio > hoy.month else hoy.year + 1 dia = min(int(ultima.day), 28) try: proxima = datetime(anio, mes_inicio, dia) except ValueError: proxima = datetime(anio, mes_inicio, 1) # Si estamos en temporada, la proyección normal (ultima + freq) ya es correcta return pd.Series({ 'diferencia_dias': freq, 'proxima_compra': pd.Timestamp(proxima), 'es_estacional': es_estacional, 'meses_temporada': ','.join(str(m) for m in sorted(meses_compra)), }) resumen_estacional = ( df.groupby("producto", group_keys=False) .apply(calcular_frecuencia_estacional) .reset_index() ) # Calcular próxima compra y filtrar para esta semana (pipeline original) proxima_semana = datetime.now() + pd.Timedelta(days=7) df = df.merge(resumen_estacional, on="producto", how="left") compra_estimacion = df[df["proxima_compra"] <= proxima_semana] # Guardar lista estimada compra_estimacion.to_csv(os.path.join(output_dir, "compra_estimacion.csv"), index=False) # Generar HTML visual html = compra_estimacion[["producto", "cantidad", "precio_unitario", "precio_total", "proxima_compra"]].sort_values("proxima_compra").to_html(index=False) with open(os.path.join(output_dir, "lista_compra_estimada.html"), "w") as file: file.write(html) print("\n✅ Todo listo. Archivos generados:") print("- detalle_productos.csv") print("- resumen_productos.csv") print("- gasto_mensual.csv") print("- compra_estimacion.csv") print("- lista_compra_estimada.html") # Generar lista_compra_estimado.csv (usado por generar_lista.py) lista_estimado = resumen_estacional.dropna(subset=["diferencia_dias"]).copy() lista_estimado["producto"] = lista_estimado["producto"].str.title() lista_estimado["fecha_estimada_proxima_compra"] = lista_estimado["proxima_compra"].dt.strftime("%d/%m/%Y") lista_estimado = lista_estimado.sort_values("diferencia_dias", ascending=True) lista_estimado.to_csv(os.path.join(output_dir, "lista_compra_estimado.csv"), index=False) print("- lista_compra_estimado.csv")