carritoIA/autocompra7.py

260 lines
10 KiB
Python

import re
import pandas as pd
import numpy as np
import os
import json
import argparse
from datetime import datetime, timedelta
from PyPDF2 import PdfReader
# --- Argumentos --------------------------------------------------------
parser = argparse.ArgumentParser()
parser.add_argument('--usuario', default='default', help='Nombre del usuario')
args = parser.parse_args()
# Carpeta con los tickets del usuario y directorio de salida
ticket_folder = os.path.join("tickets", args.usuario)
output_dir = os.path.join("datos", args.usuario)
os.makedirs(ticket_folder, exist_ok=True)
os.makedirs(output_dir, exist_ok=True)
# Palabras clave que indican líneas que no hay que procesar
exclude_keywords = [
"TARJETA BANCARIA", "IVA BASE IMPONIBLE", "CUOTA", "TOTAL",
"SE ADMITEN DEVOLUCIONES CON TICKET", "N.C", "AUT", "AID",
"Verificado por dispositivo", "Visa Credit", "IMPORTE", "TARJ. BANCARIA"
]
def extract_data_from_pdf(file_path):
reader = PdfReader(file_path)
text = ""
for page in reader.pages:
text += page.extract_text() + "\n"
# Buscar la fecha
date_match = re.search(r"(\d{2}/\d{2}/\d{4})", text)
fecha = datetime.strptime(date_match.group(1), "%d/%m/%Y") if date_match else None
products = []
lines = text.splitlines()
i = 0
while i < len(lines):
line = lines[i].strip()
if not line or any(kw in line for kw in exclude_keywords):
i += 1
continue
# Producto por peso (2 líneas):
# "1BROCOLI"
# "1,048 kg 2,60 €/kg 2,72"
if i + 1 < len(lines):
next_line = lines[i + 1].strip()
weight_m = re.search(r"kg.*?(\d+[,.]\d{2})\s*$", next_line)
if weight_m and re.match(r"^\d+[A-ZÁÉÍÓÚÑa-záéíóúñ+]", line):
name_m = re.match(r"^(\d+)(.+)$", line)
if name_m:
cantidad = int(name_m.group(1))
producto = name_m.group(2).strip().upper()
precio_total = float(weight_m.group(1).replace(",", "."))
products.append((fecha, cantidad, producto,
round(precio_total / cantidad, 2), precio_total))
i += 2
continue
# Formato: "2ARROZ SOS 1,88 3,76" → cantidad / nombre / p.unit / total
m = re.match(r"^(\d+)(.+?)\s+(\d+,\d{2})\s+(\d+,\d{2})$", line)
if m:
try:
cantidad = int(m.group(1))
producto = m.group(2).strip().upper()
precio_unitario = float(m.group(3).replace(",", "."))
precio_total = float(m.group(4).replace(",", "."))
products.append((fecha, cantidad, producto, precio_unitario, precio_total))
i += 1
continue
except ValueError:
pass
# Formato: "1CALABAZA TROZOS 2,50" → cantidad / nombre / total
m = re.match(r"^(\d+)(.+?)\s+(\d+,\d{2})$", line)
if m:
try:
cantidad = int(m.group(1))
producto = m.group(2).strip().upper()
precio_total = float(m.group(3).replace(",", "."))
products.append((fecha, cantidad, producto,
round(precio_total / cantidad, 2), precio_total))
i += 1
continue
except ValueError:
pass
i += 1
return products
# Recolectar todos los datos de los tickets
datos = []
for file in os.listdir(ticket_folder):
path = os.path.join(ticket_folder, file)
if file.endswith(".pdf"):
datos.extend(extract_data_from_pdf(path))
elif file.endswith(".json"):
# Tickets procesados con OCR desde foto de móvil
try:
with open(path, encoding="utf-8") as f:
ticket = json.load(f)
fecha_str = ticket.get("fecha", "")
try:
fecha = datetime.strptime(fecha_str, "%d/%m/%Y")
except ValueError:
continue
for p in ticket.get("productos", []):
datos.append((
fecha,
p.get("cantidad", 1),
str(p.get("producto", "")).upper(),
float(p.get("precio_unitario", 0.0)),
float(p.get("precio_total", 0.0)),
))
except Exception as e:
print(f"[warn] No se pudo leer {file}: {e}")
# Crear DataFrame
columnas = ["fecha", "cantidad", "producto", "precio_unitario", "precio_total"]
df = pd.DataFrame(datos, columns=columnas)
df.dropna(subset=["fecha"], inplace=True)
if df.empty:
print(f"[info] No hay tickets para el usuario '{args.usuario}'. Creando archivos vacios.")
empty = pd.DataFrame(columns=["producto", "diferencia_dias", "proxima_compra",
"es_estacional", "meses_temporada",
"fecha_estimada_proxima_compra"])
empty.to_csv(os.path.join(output_dir, "lista_compra_estimado.csv"), index=False)
import sys; sys.exit(0)
# Guardar detalle completo
df.to_csv(os.path.join(output_dir, "detalle_productos.csv"), index=False)
# Agrupar por producto
resumen = df.groupby("producto").agg(
veces_comprado=("fecha", "count"),
total_unidades=("cantidad", "sum"),
gasto_total=("precio_total", "sum"),
primera_vez=("fecha", "min"),
ultima_vez=("fecha", "max")
).sort_values("gasto_total", ascending=False)
resumen.to_csv(os.path.join(output_dir, "resumen_productos.csv"))
# Gasto mensual
df["mes"] = df["fecha"].dt.to_period("M")
gasto_mensual = df.groupby("mes")["precio_total"].sum()
gasto_mensual.to_csv(os.path.join(output_dir, "gasto_mensual.csv"))
# ---------------------------------------------------------------------------
# Cálculo de frecuencia con estacionalidad real por meses del año
# ---------------------------------------------------------------------------
# Algoritmo:
# 1. Detectar en qué meses del año se compra el producto.
# 2. Si se compra en ≤ 5 meses distintos → producto estacional.
# 3. Frecuencia = media de gaps entre compras consecutivas dentro de temporada (≤ 90 días).
# 4. Si hoy está en temporada → proyectar normalmente desde la última compra.
# 5. Si hoy está FUERA de temporada → proyectar al inicio del próximo período activo.
hoy = datetime.now()
def _meses_activos(meses_compra):
"""Devuelve el conjunto de meses activos con ±1 mes de tolerancia, circular."""
activos = set()
for m in meses_compra:
activos.add(m)
activos.add((m % 12) + 1) # mes siguiente
activos.add(((m - 2) % 12) + 1) # mes anterior
return activos
def _proximo_mes_activo(mes_hoy, meses_activos):
"""Devuelve cuántos meses hay que avanzar para entrar en temporada."""
for i in range(1, 13):
if ((mes_hoy - 1 + i) % 12) + 1 in meses_activos:
return i
return 0
def calcular_frecuencia_estacional(grupo):
fechas = grupo['fecha'].sort_values().dropna()
if len(fechas) < 1:
return pd.Series({'diferencia_dias': float('nan'), 'proxima_compra': pd.NaT,
'es_estacional': False, 'meses_temporada': ''})
meses_compra = set(int(m) for m in fechas.dt.month.unique())
es_estacional = len(meses_compra) <= 5
# Frecuencia: solo gaps dentro de temporada (cortos)
gaps = fechas.diff().dt.days.dropna()
gaps_validos = gaps[(gaps > 0) & (gaps <= 90)]
if len(gaps_validos) >= 1:
freq = float(gaps_validos.mean())
elif len(gaps[gaps > 0]) >= 1:
freq = float(gaps[gaps > 0].mean())
else:
return pd.Series({'diferencia_dias': float('nan'), 'proxima_compra': pd.NaT,
'es_estacional': es_estacional,
'meses_temporada': ','.join(str(m) for m in sorted(meses_compra))})
ultima = fechas.max()
proxima = ultima + timedelta(days=freq)
if es_estacional:
activos = _meses_activos(meses_compra)
if hoy.month not in activos:
# Fuera de temporada: buscar el próximo mes activo
avance = _proximo_mes_activo(hoy.month, activos)
mes_inicio = ((hoy.month - 1 + avance) % 12) + 1
anio = hoy.year if mes_inicio > hoy.month else hoy.year + 1
dia = min(int(ultima.day), 28)
try:
proxima = datetime(anio, mes_inicio, dia)
except ValueError:
proxima = datetime(anio, mes_inicio, 1)
# Si estamos en temporada, la proyección normal (ultima + freq) ya es correcta
return pd.Series({
'diferencia_dias': freq,
'proxima_compra': pd.Timestamp(proxima),
'es_estacional': es_estacional,
'meses_temporada': ','.join(str(m) for m in sorted(meses_compra)),
})
resumen_estacional = (
df.groupby("producto", group_keys=False)
.apply(calcular_frecuencia_estacional)
.reset_index()
)
# Calcular próxima compra y filtrar para esta semana (pipeline original)
proxima_semana = datetime.now() + pd.Timedelta(days=7)
df = df.merge(resumen_estacional, on="producto", how="left")
compra_estimacion = df[df["proxima_compra"] <= proxima_semana]
# Guardar lista estimada
compra_estimacion.to_csv(os.path.join(output_dir, "compra_estimacion.csv"), index=False)
# Generar HTML visual
html = compra_estimacion[["producto", "cantidad", "precio_unitario", "precio_total", "proxima_compra"]].sort_values("proxima_compra").to_html(index=False)
with open(os.path.join(output_dir, "lista_compra_estimada.html"), "w") as file:
file.write(html)
print("\n✅ Todo listo. Archivos generados:")
print("- detalle_productos.csv")
print("- resumen_productos.csv")
print("- gasto_mensual.csv")
print("- compra_estimacion.csv")
print("- lista_compra_estimada.html")
# Generar lista_compra_estimado.csv (usado por generar_lista.py)
lista_estimado = resumen_estacional.dropna(subset=["diferencia_dias"]).copy()
lista_estimado["producto"] = lista_estimado["producto"].str.title()
lista_estimado["fecha_estimada_proxima_compra"] = lista_estimado["proxima_compra"].dt.strftime("%d/%m/%Y")
lista_estimado = lista_estimado.sort_values("diferencia_dias", ascending=True)
lista_estimado.to_csv(os.path.join(output_dir, "lista_compra_estimado.csv"), index=False)
print("- lista_compra_estimado.csv")