carritoIA/app.py

391 lines
15 KiB
Python

"""
app.py — Lista de la Compra Inteligente
Flask app con autenticacion de sesion.
Arrancar en desarrollo:
python app.py
En produccion usa gunicorn detras de nginx:
gunicorn -w 2 -b 127.0.0.1:5000 app:app
"""
import os
import io
import re
import json
import subprocess
import sys
from pathlib import Path
from functools import wraps
from datetime import datetime, timedelta
from flask import (
Flask, render_template, request, redirect,
url_for, session, jsonify, abort, send_from_directory
)
from werkzeug.security import generate_password_hash, check_password_hash
# -----------------------------------------------------------------------
# Configuracion
# -----------------------------------------------------------------------
BASE_DIR = Path(__file__).parent
USERS_FILE = BASE_DIR / "users.json"
def get_tickets_dir(usuario: str) -> Path:
"""Directorio de tickets del usuario. Se crea si no existe."""
d = BASE_DIR / "tickets" / usuario
d.mkdir(parents=True, exist_ok=True)
return d
def get_datos_json(usuario: str) -> Path:
"""Ruta al datos.json del usuario."""
return BASE_DIR / "datos" / usuario / "datos.json"
ALLOWED_IMAGE_TYPES = {'image/jpeg', 'image/jpg', 'image/png', 'image/webp'}
MAX_IMAGE_BYTES = 15 * 1024 * 1024 # 15 MB
app = Flask(__name__, template_folder="templates", static_folder="static")
app.secret_key = os.environ.get("SECRET_KEY", "cambia-esto-en-produccion-!!!!")
app.permanent_session_lifetime = timedelta(days=30)
# -----------------------------------------------------------------------
# OCR — lector lazy (se inicializa la primera vez que se usa)
# -----------------------------------------------------------------------
_ocr_reader = None
def get_ocr_reader():
global _ocr_reader
if _ocr_reader is None:
import easyocr
_ocr_reader = easyocr.Reader(['es', 'en'], gpu=False, verbose=False)
return _ocr_reader
# Palabras que indican que la línea NO es un producto
_OCR_EXCLUDE = [
"TARJETA", "IVA", "CUOTA", "TOTAL", "DEVOLUCIONES", "N.C", "AUT",
"VERIFICADO", "VISA", "IMPORTE", "MASTERCARD", "MERCADONA", "SUMA",
"EFECTIVO", "CAMBIO", "TICKET", "FACTURA",
]
def parsear_texto_ticket(texto):
"""Extrae productos de texto OCR con el mismo formato que los tickets Mercadona."""
productos = []
for line in texto.splitlines():
linea = line.strip()
if not linea:
continue
if any(kw in linea.upper() for kw in _OCR_EXCLUDE):
continue
# "2 ROLLO HOGAR DOBLE 2,35 4,70"
m = re.match(r"^(\d+)\s+(.+?)\s+(\d+[,\.]\d{2})\s+(\d+[,\.]\d{2})$", linea)
if m:
try:
productos.append({
"cantidad": int(m.group(1)),
"producto": m.group(2).strip().upper(),
"precio_unitario": round(float(m.group(3).replace(',', '.')), 2),
"precio_total": round(float(m.group(4).replace(',', '.')), 2),
})
continue
except ValueError:
pass
# "1 CROISSANT RELL CACAO 1,90"
m = re.match(r"^(\d+)\s+(.+?)\s+(\d+[,\.]\d{2})$", linea)
if m:
try:
cantidad = int(m.group(1))
precio = round(float(m.group(3).replace(',', '.')), 2)
productos.append({
"cantidad": cantidad,
"producto": m.group(2).strip().upper(),
"precio_unitario": round(precio / cantidad, 2),
"precio_total": precio,
})
except ValueError:
pass
return productos
# -----------------------------------------------------------------------
# Gestion de usuarios (users.json — NO subir al repo)
# -----------------------------------------------------------------------
def cargar_usuarios():
if not USERS_FILE.exists():
return {}
try:
with open(USERS_FILE, encoding="utf-8") as f:
contenido = f.read().strip()
return json.loads(contenido) if contenido else {}
except json.JSONDecodeError:
return {}
def guardar_usuarios(users):
with open(USERS_FILE, "w", encoding="utf-8") as f:
json.dump(users, f, indent=2, ensure_ascii=False)
def inicializar_admin():
"""Crea el usuario admin la primera vez si users.json no existe o está vacío."""
if USERS_FILE.exists() and USERS_FILE.stat().st_size > 0:
return
pwd = os.environ.get("ADMIN_PASSWORD", "cambia-esta-password")
users = {
"admin": {
"password_hash": generate_password_hash(pwd),
"nombre": "Admin"
}
}
guardar_usuarios(users)
print(f"[init] Usuario admin creado. Password: {pwd}")
print(f"[init] Cambialo en users.json o con ADMIN_PASSWORD en el entorno.")
# -----------------------------------------------------------------------
# Decorador de autenticacion
# -----------------------------------------------------------------------
def login_required(f):
@wraps(f)
def decorated(*args, **kwargs):
if not session.get("usuario"):
return redirect(url_for("login", next=request.path))
return f(*args, **kwargs)
return decorated
# -----------------------------------------------------------------------
# Rutas de autenticacion
# -----------------------------------------------------------------------
@app.route("/login", methods=["GET", "POST"])
def login():
error = None
if request.method == "POST":
usuario = request.form.get("usuario", "").strip().lower()
password = request.form.get("password", "")
users = cargar_usuarios()
user = users.get(usuario)
if user and check_password_hash(user["password_hash"], password):
session.permanent = True
session["usuario"] = usuario
session["nombre"] = user.get("nombre", usuario)
next_url = request.form.get("next") or url_for("index")
return redirect(next_url)
error = "Usuario o contrasena incorrectos"
return render_template("login.html", error=error,
next=request.args.get("next", ""))
@app.route("/logout")
def logout():
session.clear()
return redirect(url_for("login"))
@app.route("/registro", methods=["GET", "POST"])
def registro():
error = None
ok = None
if request.method == "POST":
usuario = request.form.get("usuario", "").strip().lower()
nombre = request.form.get("nombre", "").strip()
password = request.form.get("password", "")
password2 = request.form.get("password2", "")
# Validaciones
if not usuario or not password or not nombre:
error = "Todos los campos son obligatorios"
elif len(usuario) < 3 or not usuario.isalnum():
error = "El usuario debe tener al menos 3 caracteres y solo letras/numeros"
elif len(password) < 6:
error = "La contrasena debe tener al menos 6 caracteres"
elif password != password2:
error = "Las contrasenas no coinciden"
else:
users = cargar_usuarios()
if usuario in users:
error = "Ese nombre de usuario ya esta en uso"
else:
users[usuario] = {
"password_hash": generate_password_hash(password),
"nombre": nombre
}
guardar_usuarios(users)
# Crear carpeta de tickets para el nuevo usuario
(BASE_DIR / "tickets" / usuario).mkdir(parents=True, exist_ok=True)
(BASE_DIR / "datos" / usuario).mkdir(parents=True, exist_ok=True)
ok = "Cuenta creada correctamente. Ya puedes iniciar sesion."
return render_template("registro.html", error=error, ok=ok)
# -----------------------------------------------------------------------
# Pagina principal
# -----------------------------------------------------------------------
@app.route("/")
@login_required
def index():
return render_template("index.html", nombre=session.get("nombre", ""))
# -----------------------------------------------------------------------
# API: datos de predicciones
# -----------------------------------------------------------------------
@app.route("/api/datos")
@login_required
def api_datos():
datos_json = get_datos_json(session["usuario"])
if not datos_json.exists():
return jsonify({"error": "datos.json no generado", "predicciones": []}), 404
with open(datos_json, encoding="utf-8") as f:
datos = json.load(f)
return jsonify(datos)
# -----------------------------------------------------------------------
# API: forzar regeneracion del pipeline
# -----------------------------------------------------------------------
@app.route("/api/regenerar", methods=["POST"])
@login_required
def api_regenerar():
usuario = session["usuario"]
try:
r1 = subprocess.run(
[sys.executable, str(BASE_DIR / "autocompra7.py"), "--usuario", usuario],
cwd=str(BASE_DIR), capture_output=True, text=True, timeout=120
)
if r1.returncode != 0:
return jsonify({"ok": False, "mensaje": "Error en pipeline",
"detalle": r1.stderr or r1.stdout}), 500
r2 = subprocess.run(
[sys.executable, str(BASE_DIR / "generar_lista.py"), "--usuario", usuario],
cwd=str(BASE_DIR), capture_output=True, text=True, timeout=30
)
if r2.returncode != 0:
return jsonify({"ok": False, "mensaje": "Error generando lista",
"detalle": r2.stderr or r2.stdout}), 500
return jsonify({"ok": True, "mensaje": "Pipeline ejecutado correctamente"})
except subprocess.TimeoutExpired:
return jsonify({"ok": False, "mensaje": "Timeout al ejecutar el pipeline"}), 500
# -----------------------------------------------------------------------
# Archivos estaticos de tickets (solo autenticados)
# -----------------------------------------------------------------------
@app.route("/tickets/<path:filename>")
@login_required
def ticket_file(filename):
# Cada usuario solo accede a su propia carpeta de tickets
return send_from_directory(str(get_tickets_dir(session["usuario"])), filename)
# -----------------------------------------------------------------------
# Gestion de usuarios (solo admin)
# -----------------------------------------------------------------------
@app.route("/admin/usuarios")
@login_required
def admin_usuarios():
if session.get("usuario") != "admin":
abort(403)
users = cargar_usuarios()
lista = [{"usuario": k, "nombre": v.get("nombre", k)} for k, v in users.items()]
return render_template("admin_usuarios.html", usuarios=lista)
@app.route("/admin/usuarios/crear", methods=["POST"])
@login_required
def admin_crear_usuario():
if session.get("usuario") != "admin":
abort(403)
usuario = request.form.get("usuario", "").strip().lower()
nombre = request.form.get("nombre", "").strip()
password = request.form.get("password", "")
if not usuario or not password:
abort(400)
users = cargar_usuarios()
users[usuario] = {
"password_hash": generate_password_hash(password),
"nombre": nombre or usuario
}
guardar_usuarios(users)
return redirect(url_for("admin_usuarios"))
@app.route("/admin/usuarios/eliminar", methods=["POST"])
@login_required
def admin_eliminar_usuario():
if session.get("usuario") != "admin":
abort(403)
usuario = request.form.get("usuario", "").strip().lower()
if usuario == "admin":
abort(400)
users = cargar_usuarios()
users.pop(usuario, None)
guardar_usuarios(users)
return redirect(url_for("admin_usuarios"))
# -----------------------------------------------------------------------
# API: OCR de foto de ticket
# -----------------------------------------------------------------------
@app.route("/api/ocr-ticket", methods=["POST"])
@login_required
def api_ocr_ticket():
if 'imagen' not in request.files:
return jsonify({"ok": False, "mensaje": "No se recibio ninguna imagen"}), 400
file = request.files['imagen']
if not file or file.filename == '':
return jsonify({"ok": False, "mensaje": "Archivo vacio"}), 400
content_type = file.content_type or ''
if content_type not in ALLOWED_IMAGE_TYPES:
return jsonify({"ok": False, "mensaje": "Formato no soportado. Usa JPG, PNG o WEBP"}), 400
img_bytes = file.read()
if len(img_bytes) > MAX_IMAGE_BYTES:
return jsonify({"ok": False, "mensaje": "La imagen es demasiado grande (max 15 MB)"}), 400
try:
from PIL import Image, ImageEnhance, ImageFilter
# Preprocesar para mejorar OCR: escala de grises + contraste
img = Image.open(io.BytesIO(img_bytes)).convert('RGB')
img = img.convert('L') # escala de grises
img = ImageEnhance.Contrast(img).enhance(2.0) # aumentar contraste
img = img.filter(ImageFilter.SHARPEN) # nitidez
buf = io.BytesIO()
img.save(buf, format='PNG')
img_procesada = buf.getvalue()
reader = get_ocr_reader()
lineas = reader.readtext(img_procesada, detail=0, paragraph=True)
texto = '\n'.join(lineas)
productos = parsear_texto_ticket(texto)
if not productos:
return jsonify({
"ok": False,
"mensaje": "No se encontraron productos. Prueba con una foto mas nitida.",
"texto": texto,
}), 422
# Extraer fecha del ticket si aparece en el texto
date_match = re.search(r"(\d{2}/\d{2}/\d{4})", texto)
fecha_ticket = date_match.group(1) if date_match else datetime.now().strftime('%d/%m/%Y')
# Guardar como JSON en la carpeta tickets/
ts = datetime.now().strftime('%Y%m%d_%H%M%S')
tdir = get_tickets_dir(session['usuario'])
ticket_path = tdir / f"ocr_{ts}.json"
with open(ticket_path, 'w', encoding='utf-8') as f:
json.dump({
"fecha": fecha_ticket,
"fuente": "ocr",
"archivo": file.filename,
"productos": productos,
}, f, ensure_ascii=False, indent=2)
return jsonify({
"ok": True,
"mensaje": f"{len(productos)} productos guardados",
"productos": productos,
"fecha": fecha_ticket,
})
except Exception as e:
return jsonify({"ok": False, "mensaje": f"Error al procesar la imagen: {str(e)}"}), 500
# -----------------------------------------------------------------------
# Entry point
# -----------------------------------------------------------------------
if __name__ == "__main__":
inicializar_admin()
debug = os.environ.get("FLASK_DEBUG", "false").lower() == "true"
app.run(host="127.0.0.1", port=5000, debug=debug)