import re import pandas as pd import numpy as np import os import json import argparse from datetime import datetime, timedelta from PyPDF2 import PdfReader # --- Argumentos -------------------------------------------------------- parser = argparse.ArgumentParser() parser.add_argument('--usuario', default='default', help='Nombre del usuario') args = parser.parse_args() # Carpeta con los tickets del usuario y directorio de salida ticket_folder = os.path.join("tickets", args.usuario) output_dir = os.path.join("datos", args.usuario) os.makedirs(ticket_folder, exist_ok=True) os.makedirs(output_dir, exist_ok=True) # Palabras clave que indican líneas que no hay que procesar exclude_keywords = [ "TARJETA BANCARIA", "IVA BASE IMPONIBLE", "CUOTA", "TOTAL", "SE ADMITEN DEVOLUCIONES CON TICKET", "N.C", "AUT", "AID", "Verificado por dispositivo", "Visa Credit", "IMPORTE", "TARJ. BANCARIA" ] def extract_data_from_pdf(file_path): reader = PdfReader(file_path) text = "" for page in reader.pages: text += page.extract_text() + "\n" # Buscar la fecha date_match = re.search(r"(\d{2}/\d{2}/\d{4})", text) fecha = datetime.strptime(date_match.group(1), "%d/%m/%Y") if date_match else None products = [] lines = text.splitlines() i = 0 while i < len(lines): line = lines[i].strip() if not line or any(kw in line for kw in exclude_keywords): i += 1 continue # Producto por peso (2 líneas): # "1BROCOLI" # "1,048 kg 2,60 €/kg 2,72" if i + 1 < len(lines): next_line = lines[i + 1].strip() weight_m = re.search(r"kg.*?(\d+[,.]\d{2})\s*$", next_line) if weight_m and re.match(r"^\d+[A-ZÁÉÍÓÚÑa-záéíóúñ+]", line): name_m = re.match(r"^(\d+)(.+)$", line) if name_m: cantidad = int(name_m.group(1)) producto = name_m.group(2).strip().upper() precio_total = float(weight_m.group(1).replace(",", ".")) products.append((fecha, cantidad, producto, round(precio_total / cantidad, 2), precio_total)) i += 2 continue # Formato: "2ARROZ SOS 1,88 3,76" → cantidad / nombre / p.unit / total m = re.match(r"^(\d+)(.+?)\s+(\d+,\d{2})\s+(\d+,\d{2})$", line) if m: try: cantidad = int(m.group(1)) producto = m.group(2).strip().upper() precio_unitario = float(m.group(3).replace(",", ".")) precio_total = float(m.group(4).replace(",", ".")) products.append((fecha, cantidad, producto, precio_unitario, precio_total)) i += 1 continue except ValueError: pass # Formato: "1CALABAZA TROZOS 2,50" → cantidad / nombre / total m = re.match(r"^(\d+)(.+?)\s+(\d+,\d{2})$", line) if m: try: cantidad = int(m.group(1)) producto = m.group(2).strip().upper() precio_total = float(m.group(3).replace(",", ".")) products.append((fecha, cantidad, producto, round(precio_total / cantidad, 2), precio_total)) i += 1 continue except ValueError: pass i += 1 return products # Recolectar todos los datos de los tickets datos = [] for file in os.listdir(ticket_folder): path = os.path.join(ticket_folder, file) if file.endswith(".pdf"): datos.extend(extract_data_from_pdf(path)) elif file.endswith(".json"): # Tickets procesados con OCR desde foto de móvil try: with open(path, encoding="utf-8") as f: ticket = json.load(f) fecha_str = ticket.get("fecha", "") try: fecha = datetime.strptime(fecha_str, "%d/%m/%Y") except ValueError: continue for p in ticket.get("productos", []): datos.append(( fecha, p.get("cantidad", 1), str(p.get("producto", "")).upper(), float(p.get("precio_unitario", 0.0)), float(p.get("precio_total", 0.0)), )) except Exception as e: print(f"[warn] No se pudo leer {file}: {e}") # Crear DataFrame columnas = ["fecha", "cantidad", "producto", "precio_unitario", "precio_total"] df = pd.DataFrame(datos, columns=columnas) df.dropna(subset=["fecha"], inplace=True) if df.empty: print(f"[info] No hay tickets para el usuario '{args.usuario}'. Creando archivos vacios.") empty = pd.DataFrame(columns=["producto", "diferencia_dias", "proxima_compra", "es_estacional", "meses_temporada", "fecha_estimada_proxima_compra"]) empty.to_csv(os.path.join(output_dir, "lista_compra_estimado.csv"), index=False) import sys; sys.exit(0) # Guardar detalle completo df.to_csv(os.path.join(output_dir, "detalle_productos.csv"), index=False) # Agrupar por producto resumen = df.groupby("producto").agg( veces_comprado=("fecha", "count"), total_unidades=("cantidad", "sum"), gasto_total=("precio_total", "sum"), primera_vez=("fecha", "min"), ultima_vez=("fecha", "max") ).sort_values("gasto_total", ascending=False) resumen.to_csv(os.path.join(output_dir, "resumen_productos.csv")) # Gasto mensual df["mes"] = df["fecha"].dt.to_period("M") gasto_mensual = df.groupby("mes")["precio_total"].sum() gasto_mensual.to_csv(os.path.join(output_dir, "gasto_mensual.csv")) # --------------------------------------------------------------------------- # Cálculo de frecuencia con estacionalidad real por meses del año # --------------------------------------------------------------------------- # Algoritmo: # 1. Detectar en qué meses del año se compra el producto. # 2. Si se compra en ≤ 5 meses distintos → producto estacional. # 3. Frecuencia = media de gaps entre compras consecutivas dentro de temporada (≤ 90 días). # 4. Si hoy está en temporada → proyectar normalmente desde la última compra. # 5. Si hoy está FUERA de temporada → proyectar al inicio del próximo período activo. hoy = datetime.now() def _meses_activos(meses_compra): """Devuelve el conjunto de meses activos con ±1 mes de tolerancia, circular.""" activos = set() for m in meses_compra: activos.add(m) activos.add((m % 12) + 1) # mes siguiente activos.add(((m - 2) % 12) + 1) # mes anterior return activos def _proximo_mes_activo(mes_hoy, meses_activos): """Devuelve cuántos meses hay que avanzar para entrar en temporada.""" for i in range(1, 13): if ((mes_hoy - 1 + i) % 12) + 1 in meses_activos: return i return 0 def calcular_frecuencia_estacional(grupo): fechas = grupo['fecha'].sort_values().dropna() if len(fechas) < 1: return pd.Series({'diferencia_dias': float('nan'), 'proxima_compra': pd.NaT, 'es_estacional': False, 'meses_temporada': ''}) meses_compra = set(int(m) for m in fechas.dt.month.unique()) es_estacional = len(meses_compra) <= 5 # Frecuencia: solo gaps dentro de temporada (cortos) gaps = fechas.diff().dt.days.dropna() gaps_validos = gaps[(gaps > 0) & (gaps <= 90)] if len(gaps_validos) >= 1: freq = float(gaps_validos.mean()) elif len(gaps[gaps > 0]) >= 1: freq = float(gaps[gaps > 0].mean()) else: return pd.Series({'diferencia_dias': float('nan'), 'proxima_compra': pd.NaT, 'es_estacional': es_estacional, 'meses_temporada': ','.join(str(m) for m in sorted(meses_compra))}) ultima = fechas.max() proxima = ultima + timedelta(days=freq) if es_estacional: activos = _meses_activos(meses_compra) if hoy.month not in activos: # Fuera de temporada: buscar el próximo mes activo avance = _proximo_mes_activo(hoy.month, activos) mes_inicio = ((hoy.month - 1 + avance) % 12) + 1 anio = hoy.year if mes_inicio > hoy.month else hoy.year + 1 dia = min(int(ultima.day), 28) try: proxima = datetime(anio, mes_inicio, dia) except ValueError: proxima = datetime(anio, mes_inicio, 1) # Si estamos en temporada, la proyección normal (ultima + freq) ya es correcta return pd.Series({ 'diferencia_dias': freq, 'proxima_compra': pd.Timestamp(proxima), 'es_estacional': es_estacional, 'meses_temporada': ','.join(str(m) for m in sorted(meses_compra)), }) resumen_estacional = ( df.groupby("producto", group_keys=False) .apply(calcular_frecuencia_estacional) .reset_index() ) # Calcular próxima compra y filtrar para esta semana (pipeline original) proxima_semana = datetime.now() + pd.Timedelta(days=7) df = df.merge(resumen_estacional, on="producto", how="left") compra_estimacion = df[df["proxima_compra"] <= proxima_semana] # Guardar lista estimada compra_estimacion.to_csv(os.path.join(output_dir, "compra_estimacion.csv"), index=False) # Generar HTML visual html = compra_estimacion[["producto", "cantidad", "precio_unitario", "precio_total", "proxima_compra"]].sort_values("proxima_compra").to_html(index=False) with open(os.path.join(output_dir, "lista_compra_estimada.html"), "w") as file: file.write(html) print("\n✅ Todo listo. Archivos generados:") print("- detalle_productos.csv") print("- resumen_productos.csv") print("- gasto_mensual.csv") print("- compra_estimacion.csv") print("- lista_compra_estimada.html") # Generar lista_compra_estimado.csv (usado por generar_lista.py) lista_estimado = resumen_estacional.dropna(subset=["diferencia_dias"]).copy() lista_estimado["producto"] = lista_estimado["producto"].str.title() lista_estimado["fecha_estimada_proxima_compra"] = lista_estimado["proxima_compra"].dt.strftime("%d/%m/%Y") lista_estimado = lista_estimado.sort_values("diferencia_dias", ascending=True) lista_estimado.to_csv(os.path.join(output_dir, "lista_compra_estimado.csv"), index=False) print("- lista_compra_estimado.csv")