carritoIA/app.py

664 lines
26 KiB
Python

"""
app.py — CarritoIA
Flask app con autenticacion de sesion.
Arrancar en desarrollo:
python app.py
En produccion usa gunicorn detras de nginx:
gunicorn -w 2 -b 127.0.0.1:5000 app:app
"""
import os
import io
import re
import json
import subprocess
import sys
from pathlib import Path
from functools import wraps
from datetime import datetime, timedelta
from flask import (
Flask, render_template, request, redirect,
url_for, session, jsonify, abort, send_from_directory
)
from werkzeug.security import generate_password_hash, check_password_hash
# -----------------------------------------------------------------------
# Configuracion
# -----------------------------------------------------------------------
BASE_DIR = Path(__file__).parent
USERS_FILE = BASE_DIR / "users.json"
def get_tickets_dir(usuario: str) -> Path:
"""Directorio de tickets del usuario. Se crea si no existe."""
d = BASE_DIR / "tickets" / usuario
d.mkdir(parents=True, exist_ok=True)
return d
def get_datos_json(usuario: str) -> Path:
"""Ruta al datos.json del usuario."""
return BASE_DIR / "datos" / usuario / "datos.json"
ALLOWED_IMAGE_TYPES = {'image/jpeg', 'image/jpg', 'image/png', 'image/webp'}
MAX_IMAGE_BYTES = 15 * 1024 * 1024 # 15 MB
app = Flask(__name__, template_folder="templates", static_folder="static")
app.secret_key = os.environ.get("SECRET_KEY", "cambia-esto-en-produccion-!!!!")
app.permanent_session_lifetime = timedelta(days=30)
# -----------------------------------------------------------------------
# OCR — lector lazy (se inicializa la primera vez que se usa)
# -----------------------------------------------------------------------
_ocr_reader = None
def get_ocr_reader():
global _ocr_reader
if _ocr_reader is None:
import easyocr
_ocr_reader = easyocr.Reader(['es', 'en'], gpu=False, verbose=False)
return _ocr_reader
# Palabras que indican que la línea NO es un producto
_OCR_EXCLUDE = [
"TARJETA", "IVA", "CUOTA", "TOTAL", "DEVOLUCIONES", "N.C", "AUT",
"VERIFICADO", "VISA", "IMPORTE", "MASTERCARD", "MERCADONA", "SUMA",
"EFECTIVO", "CAMBIO", "TICKET", "FACTURA",
]
def parsear_texto_ticket(texto):
"""Extrae productos de texto OCR con el mismo formato que los tickets Mercadona."""
productos = []
lines = texto.splitlines()
i = 0
while i < len(lines):
linea = lines[i].strip()
if not linea or any(kw in linea.upper() for kw in _OCR_EXCLUDE):
i += 1
continue
# Producto por peso (2 líneas): "1BROCOLI" + "1,048 kg 2,60 €/kg 2,72"
if i + 1 < len(lines):
next_line = lines[i + 1].strip()
weight_m = re.search(r"kg.*?(\d+[,.]\d{2})\s*$", next_line)
if weight_m and re.match(r"^\d+[A-ZÁÉÍÓÚÑa-záéíóúñ+]", linea):
name_m = re.match(r"^(\d+)(.+)$", linea)
if name_m:
try:
cantidad = int(name_m.group(1))
precio_total = round(float(weight_m.group(1).replace(",", ".")), 2)
productos.append({
"cantidad": cantidad,
"producto": name_m.group(2).strip().upper(),
"precio_unitario": round(precio_total / cantidad, 2),
"precio_total": precio_total,
})
i += 2
continue
except ValueError:
pass
# "2ARROZ SOS 1,88 3,76"
m = re.match(r"^(\d+)(.+?)\s+(\d+[,\.]\d{2})\s+(\d+[,\.]\d{2})$", linea)
if m:
try:
productos.append({
"cantidad": int(m.group(1)),
"producto": m.group(2).strip().upper(),
"precio_unitario": round(float(m.group(3).replace(',', '.')), 2),
"precio_total": round(float(m.group(4).replace(',', '.')), 2),
})
i += 1
continue
except ValueError:
pass
# "1CROISSANT RELL CACAO 1,90"
m = re.match(r"^(\d+)(.+?)\s+(\d+[,\.]\d{2})$", linea)
if m:
try:
cantidad = int(m.group(1))
precio = round(float(m.group(3).replace(',', '.')), 2)
productos.append({
"cantidad": cantidad,
"producto": m.group(2).strip().upper(),
"precio_unitario": round(precio / cantidad, 2),
"precio_total": precio,
})
i += 1
continue
except ValueError:
pass
i += 1
return productos
# -----------------------------------------------------------------------
# Gestion de usuarios (users.json — NO subir al repo)
# -----------------------------------------------------------------------
def cargar_usuarios():
if not USERS_FILE.exists() or USERS_FILE.is_dir():
return {}
try:
with open(USERS_FILE, encoding="utf-8") as f:
contenido = f.read().strip()
return json.loads(contenido) if contenido else {}
except (json.JSONDecodeError, OSError):
return {}
def guardar_usuarios(users):
# Si por error Docker creó un directorio en lugar del archivo, lo eliminamos
if USERS_FILE.exists() and USERS_FILE.is_dir():
import shutil
shutil.rmtree(USERS_FILE)
with open(USERS_FILE, "w", encoding="utf-8") as f:
json.dump(users, f, indent=2, ensure_ascii=False)
def inicializar_admin():
"""Crea el usuario admin la primera vez si users.json no existe o está vacío."""
if USERS_FILE.exists() and USERS_FILE.stat().st_size > 0:
return
pwd = os.environ.get("ADMIN_PASSWORD", "cambia-esta-password")
users = {
"admin": {
"password_hash": generate_password_hash(pwd),
"nombre": "Admin",
"admin": True
}
}
guardar_usuarios(users)
print(f"[init] Usuario admin creado. Password: {pwd}")
print(f"[init] Cambialo en users.json o con ADMIN_PASSWORD en el entorno.")
# -----------------------------------------------------------------------
# Decoradores de autenticacion
# -----------------------------------------------------------------------
def login_required(f):
@wraps(f)
def decorated(*args, **kwargs):
if not session.get("usuario"):
return redirect(url_for("login", next=request.path))
return f(*args, **kwargs)
return decorated
def admin_required(f):
@wraps(f)
def decorated(*args, **kwargs):
if not session.get("usuario"):
return redirect(url_for("login", next=request.path))
users = cargar_usuarios()
if not users.get(session["usuario"], {}).get("admin"):
abort(403)
return f(*args, **kwargs)
return decorated
# -----------------------------------------------------------------------
# Rutas de autenticacion
# -----------------------------------------------------------------------
@app.route("/login", methods=["GET", "POST"])
def login():
error = None
if request.method == "POST":
usuario = request.form.get("usuario", "").strip().lower()
password = request.form.get("password", "")
users = cargar_usuarios()
user = users.get(usuario)
if user and check_password_hash(user["password_hash"], password):
session.permanent = True
session["usuario"] = usuario
session["nombre"] = user.get("nombre", usuario)
next_url = request.form.get("next") or url_for("index")
return redirect(next_url)
error = "Usuario o contrasena incorrectos"
return render_template("login.html", error=error,
next=request.args.get("next", ""))
@app.route("/logout")
def logout():
session.clear()
return redirect(url_for("login"))
# -----------------------------------------------------------------------
# Perfil de usuario
# -----------------------------------------------------------------------
@app.route("/perfil", methods=["GET", "POST"])
@login_required
def perfil():
usuario = session["usuario"]
users = cargar_usuarios()
user = users[usuario]
error = None
ok = None
if request.method == "POST":
accion = request.form.get("accion", "")
if accion == "nombre":
nombre = request.form.get("nombre", "").strip()
if not nombre:
error = "El nombre no puede estar vacio"
else:
user["nombre"] = nombre
session["nombre"] = nombre
guardar_usuarios(users)
ok = "Nombre actualizado"
elif accion == "password":
pwd_actual = request.form.get("pwd_actual", "")
pwd_nueva = request.form.get("pwd_nueva", "")
pwd_nueva2 = request.form.get("pwd_nueva2", "")
if not check_password_hash(user["password_hash"], pwd_actual):
error = "La contrasena actual no es correcta"
elif len(pwd_nueva) < 6:
error = "La nueva contrasena debe tener al menos 6 caracteres"
elif pwd_nueva != pwd_nueva2:
error = "Las contrasenas no coinciden"
else:
user["password_hash"] = generate_password_hash(pwd_nueva)
guardar_usuarios(users)
ok = "Contrasena actualizada"
elif accion == "email":
user["email_config"] = {
"imap_host": request.form.get("imap_host", "").strip(),
"imap_port": request.form.get("imap_port", "993").strip(),
"correo": request.form.get("correo", "").strip(),
"password": request.form.get("email_pwd", "").strip(),
"remitente": request.form.get("remitente", "noreply@mercadona.es").strip(),
"ssl_verify": request.form.get("ssl_verify", "false"),
}
guardar_usuarios(users)
ok = "Configuracion de email guardada"
email_cfg = user.get("email_config", {})
return render_template("perfil.html",
nombre=session.get("nombre", ""),
usuario=usuario,
email_cfg=email_cfg,
error=error, ok=ok)
# -----------------------------------------------------------------------
# Panel de administracion
# -----------------------------------------------------------------------
@app.route("/admin")
@admin_required
def admin_panel():
users = cargar_usuarios()
lista = []
for uname, udata in users.items():
tickets_dir = BASE_DIR / "tickets" / uname
n_tickets = sum(1 for f in tickets_dir.iterdir() if f.is_file()) if tickets_dir.exists() else 0
lista.append({
"usuario": uname,
"nombre": udata.get("nombre", uname),
"admin": udata.get("admin", False),
"n_tickets": n_tickets,
})
lista.sort(key=lambda x: x["usuario"])
return render_template("admin.html", nombre=session.get("nombre", ""), usuarios=lista)
@app.route("/admin/crear", methods=["POST"])
@admin_required
def admin_crear_usuario():
usuario = request.form.get("usuario", "").strip().lower()
nombre = request.form.get("nombre", "").strip()
password = request.form.get("password", "").strip()
es_admin = request.form.get("es_admin") == "on"
users = cargar_usuarios()
error = None
if not usuario or not nombre or not password:
error = "Todos los campos son obligatorios"
elif len(usuario) < 3 or not usuario.isalnum():
error = "Usuario: min. 3 caracteres, solo letras y numeros"
elif len(password) < 6:
error = "Contrasena: min. 6 caracteres"
elif usuario in users:
error = "Ese nombre de usuario ya existe"
if error:
# Redirigir con el error como query param (sencillo)
return redirect(url_for("admin_panel") + f"?error={error}")
users[usuario] = {
"password_hash": generate_password_hash(password),
"nombre": nombre,
}
if es_admin:
users[usuario]["admin"] = True
guardar_usuarios(users)
(BASE_DIR / "tickets" / usuario).mkdir(parents=True, exist_ok=True)
(BASE_DIR / "datos" / usuario).mkdir(parents=True, exist_ok=True)
return redirect(url_for("admin_panel"))
@app.route("/admin/eliminar", methods=["POST"])
@admin_required
def admin_eliminar_usuario():
import shutil
objetivo = request.form.get("usuario", "").strip().lower()
if not objetivo or objetivo == session["usuario"]:
return redirect(url_for("admin_panel")) # no se puede autoeliminar
users = cargar_usuarios()
users.pop(objetivo, None)
guardar_usuarios(users)
# Borrar datos del usuario
for carpeta in ["tickets", "datos"]:
d = BASE_DIR / carpeta / objetivo
if d.exists():
shutil.rmtree(d)
return redirect(url_for("admin_panel"))
@app.route("/registro", methods=["GET", "POST"])
def registro():
error = None
ok = None
if request.method == "POST":
usuario = request.form.get("usuario", "").strip().lower()
nombre = request.form.get("nombre", "").strip()
password = request.form.get("password", "")
password2 = request.form.get("password2", "")
# Validaciones
if not usuario or not password or not nombre:
error = "Todos los campos son obligatorios"
elif len(usuario) < 3 or not usuario.isalnum():
error = "El usuario debe tener al menos 3 caracteres y solo letras/numeros"
elif len(password) < 6:
error = "La contrasena debe tener al menos 6 caracteres"
elif password != password2:
error = "Las contrasenas no coinciden"
else:
users = cargar_usuarios()
if usuario in users:
error = "Ese nombre de usuario ya esta en uso"
else:
users[usuario] = {
"password_hash": generate_password_hash(password),
"nombre": nombre
}
guardar_usuarios(users)
# Crear carpeta de tickets para el nuevo usuario
(BASE_DIR / "tickets" / usuario).mkdir(parents=True, exist_ok=True)
(BASE_DIR / "datos" / usuario).mkdir(parents=True, exist_ok=True)
ok = "Cuenta creada correctamente. Ya puedes iniciar sesion."
return render_template("registro.html", error=error, ok=ok)
# -----------------------------------------------------------------------
# Pagina principal
# -----------------------------------------------------------------------
@app.route("/")
@login_required
def index():
return render_template("index.html", nombre=session.get("nombre", ""))
# -----------------------------------------------------------------------
# API: datos de predicciones
# -----------------------------------------------------------------------
@app.route("/api/datos")
@login_required
def api_datos():
datos_json = get_datos_json(session["usuario"])
if not datos_json.exists():
return jsonify({"error": "datos.json no generado", "predicciones": []}), 404
with open(datos_json, encoding="utf-8") as f:
datos = json.load(f)
return jsonify(datos)
# -----------------------------------------------------------------------
# API: subir ticket PDF al servidor
# -----------------------------------------------------------------------
@app.route("/api/subir-pdf", methods=["POST"])
@login_required
def api_subir_pdf():
if 'pdf' not in request.files:
return jsonify({"ok": False, "mensaje": "No se recibio ningun archivo"}), 400
file = request.files['pdf']
if not file or file.filename == '':
return jsonify({"ok": False, "mensaje": "Archivo vacio"}), 400
if not file.filename.lower().endswith('.pdf'):
return jsonify({"ok": False, "mensaje": "Solo se aceptan archivos PDF"}), 400
pdf_bytes = file.read()
if len(pdf_bytes) > 20 * 1024 * 1024:
return jsonify({"ok": False, "mensaje": "El PDF es demasiado grande (max 20 MB)"}), 400
tdir = get_tickets_dir(session["usuario"])
# Nombre seguro: eliminar caracteres problemáticos
nombre_seguro = re.sub(r'[^\w\-. ]', '_', file.filename)
dest = tdir / nombre_seguro
with open(dest, 'wb') as f:
f.write(pdf_bytes)
return jsonify({"ok": True, "mensaje": f"PDF guardado: {nombre_seguro}"})
# -----------------------------------------------------------------------
# API: forzar regeneracion del pipeline
# -----------------------------------------------------------------------
@app.route("/api/regenerar", methods=["POST"])
@login_required
def api_regenerar():
usuario = session["usuario"]
try:
r1 = subprocess.run(
[sys.executable, str(BASE_DIR / "autocompra7.py"), "--usuario", usuario],
cwd=str(BASE_DIR), capture_output=True, text=True, timeout=120
)
if r1.returncode != 0:
return jsonify({"ok": False, "mensaje": "Error en pipeline",
"detalle": r1.stderr or r1.stdout}), 500
r2 = subprocess.run(
[sys.executable, str(BASE_DIR / "generar_lista.py"), "--usuario", usuario],
cwd=str(BASE_DIR), capture_output=True, text=True, timeout=30
)
if r2.returncode != 0:
return jsonify({"ok": False, "mensaje": "Error generando lista",
"detalle": r2.stderr or r2.stdout}), 500
return jsonify({"ok": True, "mensaje": "Pipeline ejecutado correctamente"})
except subprocess.TimeoutExpired:
return jsonify({"ok": False, "mensaje": "Timeout al ejecutar el pipeline"}), 500
# -----------------------------------------------------------------------
# API: importar tickets desde correo
# -----------------------------------------------------------------------
@app.route("/api/importar-email", methods=["POST"])
@login_required
def api_importar_email():
usuario = session["usuario"]
try:
r = subprocess.run(
[sys.executable, str(BASE_DIR / "importar_tickets_email.py"), "--usuario", usuario],
cwd=str(BASE_DIR), capture_output=True, text=True, timeout=60
)
output = r.stdout + r.stderr
if r.returncode != 0:
return jsonify({"ok": False, "mensaje": "Error al importar email",
"detalle": output}), 500
# Extraer cuántos tickets se descargaron del output
import re as _re
m = _re.search(r"Total descargados: (\d+)", output)
n = int(m.group(1)) if m else 0
return jsonify({"ok": True, "nuevos": n,
"mensaje": f"{n} ticket(s) nuevos importados del correo"})
except subprocess.TimeoutExpired:
return jsonify({"ok": False, "mensaje": "Timeout al conectar con el correo"}), 500
# -----------------------------------------------------------------------
# Archivos estaticos de tickets (solo autenticados)
# -----------------------------------------------------------------------
@app.route("/tickets/<path:filename>")
@login_required
def ticket_file(filename):
# Cada usuario solo accede a su propia carpeta de tickets
return send_from_directory(str(get_tickets_dir(session["usuario"])), filename)
# (rutas /admin, /admin/crear, /admin/eliminar definidas arriba)
# -----------------------------------------------------------------------
# API: OCR de foto de ticket
# -----------------------------------------------------------------------
@app.route("/api/ocr-ticket", methods=["POST"])
@login_required
def api_ocr_ticket():
if 'imagen' not in request.files:
return jsonify({"ok": False, "mensaje": "No se recibio ninguna imagen"}), 400
file = request.files['imagen']
if not file or file.filename == '':
return jsonify({"ok": False, "mensaje": "Archivo vacio"}), 400
content_type = file.content_type or ''
if content_type not in ALLOWED_IMAGE_TYPES:
return jsonify({"ok": False, "mensaje": "Formato no soportado. Usa JPG, PNG o WEBP"}), 400
img_bytes = file.read()
if len(img_bytes) > MAX_IMAGE_BYTES:
return jsonify({"ok": False, "mensaje": "La imagen es demasiado grande (max 15 MB)"}), 400
try:
from PIL import Image, ImageEnhance, ImageFilter
# Preprocesar para mejorar OCR: escala de grises + contraste
img = Image.open(io.BytesIO(img_bytes)).convert('RGB')
img = img.convert('L') # escala de grises
img = ImageEnhance.Contrast(img).enhance(2.0) # aumentar contraste
img = img.filter(ImageFilter.SHARPEN) # nitidez
buf = io.BytesIO()
img.save(buf, format='PNG')
img_procesada = buf.getvalue()
reader = get_ocr_reader()
lineas = reader.readtext(img_procesada, detail=0, paragraph=True)
texto = '\n'.join(lineas)
productos = parsear_texto_ticket(texto)
if not productos:
return jsonify({
"ok": False,
"mensaje": "No se encontraron productos. Prueba con una foto mas nitida.",
"texto": texto,
}), 422
# Extraer fecha del ticket si aparece en el texto
date_match = re.search(r"(\d{2}/\d{2}/\d{4})", texto)
fecha_ticket = date_match.group(1) if date_match else datetime.now().strftime('%d/%m/%Y')
# Guardar como JSON en la carpeta tickets/
ts = datetime.now().strftime('%Y%m%d_%H%M%S')
tdir = get_tickets_dir(session['usuario'])
ticket_path = tdir / f"ocr_{ts}.json"
with open(ticket_path, 'w', encoding='utf-8') as f:
json.dump({
"fecha": fecha_ticket,
"fuente": "ocr",
"archivo": file.filename,
"productos": productos,
}, f, ensure_ascii=False, indent=2)
return jsonify({
"ok": True,
"mensaje": f"{len(productos)} productos guardados",
"productos": productos,
"fecha": fecha_ticket,
})
except Exception as e:
return jsonify({"ok": False, "mensaje": f"Error al procesar la imagen: {str(e)}"}), 500
# -----------------------------------------------------------------------
# Estadisticas de consumo
# -----------------------------------------------------------------------
@app.route("/estadisticas")
@login_required
def estadisticas():
return render_template("estadisticas.html", nombre=session.get("nombre", ""))
@app.route("/api/estadisticas")
@login_required
def api_estadisticas():
import csv as _csv
usuario = session["usuario"]
datos_dir = BASE_DIR / "datos" / usuario
tickets_dir = get_tickets_dir(usuario)
# Contar archivos de tickets
try:
total_tickets = sum(1 for f in tickets_dir.iterdir() if f.is_file())
except Exception:
total_tickets = 0
# Leer gasto_mensual.csv
meses = []
gasto_csv = datos_dir / "gasto_mensual.csv"
if gasto_csv.exists():
with open(gasto_csv, encoding="utf-8") as fh:
for row in _csv.DictReader(fh):
try:
meses.append({"mes": str(row.get("mes", "")),
"gasto": round(float(row.get("precio_total", 0)), 2)})
except ValueError:
pass
meses.sort(key=lambda x: x["mes"])
_ES = ["ene","feb","mar","abr","may","jun","jul","ago","sep","oct","nov","dic"]
for m in meses:
try:
partes = m["mes"].split("-")
m["label"] = _ES[int(partes[1]) - 1] + " " + partes[0][-2:]
except Exception:
m["label"] = m["mes"]
gastos_v = [m["gasto"] for m in meses]
gasto_medio = round(sum(gastos_v) / len(gastos_v), 2) if gastos_v else 0.0
gasto_anual = round(sum(gastos_v[-12:]), 2) if gastos_v else 0.0
hoy = datetime.now()
mes_actual_key = hoy.strftime("%Y-%m")
mes_ant_key = (hoy.replace(day=1) - timedelta(days=1)).strftime("%Y-%m")
gasto_mes_actual = next((m["gasto"] for m in meses if m["mes"] == mes_actual_key), 0.0)
gasto_mes_anterior = next((m["gasto"] for m in meses if m["mes"] == mes_ant_key), 0.0)
# Leer resumen_productos.csv
top_gasto = []
top_frecuencia = []
productos_unicos = 0
resumen_csv = datos_dir / "resumen_productos.csv"
if resumen_csv.exists():
with open(resumen_csv, encoding="utf-8") as fh:
rows = list(_csv.DictReader(fh))
productos_unicos = len(rows)
top_gasto = [
{"producto": r["producto"],
"gasto_total": round(float(r.get("gasto_total", 0)), 2),
"veces": int(r.get("veces_comprado", 0))}
for r in sorted(rows, key=lambda r: float(r.get("gasto_total", 0)), reverse=True)[:10]
]
top_frecuencia = [
{"producto": r["producto"],
"veces_comprado": int(r.get("veces_comprado", 0)),
"gasto_total": round(float(r.get("gasto_total", 0)), 2)}
for r in sorted(rows, key=lambda r: int(r.get("veces_comprado", 0)), reverse=True)[:10]
]
return jsonify({
"gasto_medio_mensual": gasto_medio,
"gasto_mes_actual": gasto_mes_actual,
"gasto_mes_anterior": gasto_mes_anterior,
"gasto_anual": gasto_anual,
"total_tickets": total_tickets,
"productos_unicos": productos_unicos,
"meses": meses[-18:],
"top_productos_gasto": top_gasto,
"top_productos_frecuencia": top_frecuencia,
})
# -----------------------------------------------------------------------
# Entry point
# -----------------------------------------------------------------------
inicializar_admin() # Necesario tanto para gunicorn como para python app.py
if __name__ == "__main__":
debug = os.environ.get("FLASK_DEBUG", "false").lower() == "true"
app.run(host="127.0.0.1", port=5000, debug=debug)