371 lines
14 KiB
Python
371 lines
14 KiB
Python
"""
|
|
app.py — Lista de la Compra Inteligente
|
|
Flask app con autenticacion de sesion.
|
|
|
|
Arrancar en desarrollo:
|
|
python app.py
|
|
|
|
En produccion usa gunicorn detras de nginx:
|
|
gunicorn -w 2 -b 127.0.0.1:5000 app:app
|
|
"""
|
|
|
|
import os
|
|
import io
|
|
import re
|
|
import json
|
|
import subprocess
|
|
import sys
|
|
from pathlib import Path
|
|
from functools import wraps
|
|
from datetime import datetime, timedelta
|
|
|
|
from flask import (
|
|
Flask, render_template, request, redirect,
|
|
url_for, session, jsonify, abort, send_from_directory
|
|
)
|
|
from werkzeug.security import generate_password_hash, check_password_hash
|
|
|
|
# -----------------------------------------------------------------------
|
|
# Configuracion
|
|
# -----------------------------------------------------------------------
|
|
BASE_DIR = Path(__file__).parent
|
|
DATOS_JSON = BASE_DIR / "datos.json"
|
|
USERS_FILE = BASE_DIR / "users.json"
|
|
TICKETS_DIR = BASE_DIR / "tickets"
|
|
|
|
ALLOWED_IMAGE_TYPES = {'image/jpeg', 'image/jpg', 'image/png', 'image/webp'}
|
|
MAX_IMAGE_BYTES = 15 * 1024 * 1024 # 15 MB
|
|
|
|
app = Flask(__name__, template_folder="templates", static_folder="static")
|
|
|
|
app.secret_key = os.environ.get("SECRET_KEY", "cambia-esto-en-produccion-!!!!")
|
|
app.permanent_session_lifetime = timedelta(days=30)
|
|
|
|
# -----------------------------------------------------------------------
|
|
# OCR — lector lazy (se inicializa la primera vez que se usa)
|
|
# -----------------------------------------------------------------------
|
|
_ocr_reader = None
|
|
|
|
def get_ocr_reader():
|
|
global _ocr_reader
|
|
if _ocr_reader is None:
|
|
import easyocr
|
|
_ocr_reader = easyocr.Reader(['es', 'en'], gpu=False, verbose=False)
|
|
return _ocr_reader
|
|
|
|
# Palabras que indican que la línea NO es un producto
|
|
_OCR_EXCLUDE = [
|
|
"TARJETA", "IVA", "CUOTA", "TOTAL", "DEVOLUCIONES", "N.C", "AUT",
|
|
"VERIFICADO", "VISA", "IMPORTE", "MASTERCARD", "MERCADONA", "SUMA",
|
|
"EFECTIVO", "CAMBIO", "TICKET", "FACTURA",
|
|
]
|
|
|
|
def parsear_texto_ticket(texto):
|
|
"""Extrae productos de texto OCR con el mismo formato que los tickets Mercadona."""
|
|
productos = []
|
|
for line in texto.splitlines():
|
|
linea = line.strip()
|
|
if not linea:
|
|
continue
|
|
if any(kw in linea.upper() for kw in _OCR_EXCLUDE):
|
|
continue
|
|
# "2 ROLLO HOGAR DOBLE 2,35 4,70"
|
|
m = re.match(r"^(\d+)\s+(.+?)\s+(\d+[,\.]\d{2})\s+(\d+[,\.]\d{2})$", linea)
|
|
if m:
|
|
try:
|
|
productos.append({
|
|
"cantidad": int(m.group(1)),
|
|
"producto": m.group(2).strip().upper(),
|
|
"precio_unitario": round(float(m.group(3).replace(',', '.')), 2),
|
|
"precio_total": round(float(m.group(4).replace(',', '.')), 2),
|
|
})
|
|
continue
|
|
except ValueError:
|
|
pass
|
|
# "1 CROISSANT RELL CACAO 1,90"
|
|
m = re.match(r"^(\d+)\s+(.+?)\s+(\d+[,\.]\d{2})$", linea)
|
|
if m:
|
|
try:
|
|
cantidad = int(m.group(1))
|
|
precio = round(float(m.group(3).replace(',', '.')), 2)
|
|
productos.append({
|
|
"cantidad": cantidad,
|
|
"producto": m.group(2).strip().upper(),
|
|
"precio_unitario": round(precio / cantidad, 2),
|
|
"precio_total": precio,
|
|
})
|
|
except ValueError:
|
|
pass
|
|
return productos
|
|
|
|
# -----------------------------------------------------------------------
|
|
# Gestion de usuarios (users.json — NO subir al repo)
|
|
# -----------------------------------------------------------------------
|
|
def cargar_usuarios():
|
|
if not USERS_FILE.exists():
|
|
return {}
|
|
try:
|
|
with open(USERS_FILE, encoding="utf-8") as f:
|
|
contenido = f.read().strip()
|
|
return json.loads(contenido) if contenido else {}
|
|
except json.JSONDecodeError:
|
|
return {}
|
|
|
|
def guardar_usuarios(users):
|
|
with open(USERS_FILE, "w", encoding="utf-8") as f:
|
|
json.dump(users, f, indent=2, ensure_ascii=False)
|
|
|
|
def inicializar_admin():
|
|
"""Crea el usuario admin la primera vez si users.json no existe o está vacío."""
|
|
if USERS_FILE.exists() and USERS_FILE.stat().st_size > 0:
|
|
return
|
|
pwd = os.environ.get("ADMIN_PASSWORD", "cambia-esta-password")
|
|
users = {
|
|
"admin": {
|
|
"password_hash": generate_password_hash(pwd),
|
|
"nombre": "Admin"
|
|
}
|
|
}
|
|
guardar_usuarios(users)
|
|
print(f"[init] Usuario admin creado. Password: {pwd}")
|
|
print(f"[init] Cambialo en users.json o con ADMIN_PASSWORD en el entorno.")
|
|
|
|
# -----------------------------------------------------------------------
|
|
# Decorador de autenticacion
|
|
# -----------------------------------------------------------------------
|
|
def login_required(f):
|
|
@wraps(f)
|
|
def decorated(*args, **kwargs):
|
|
if not session.get("usuario"):
|
|
return redirect(url_for("login", next=request.path))
|
|
return f(*args, **kwargs)
|
|
return decorated
|
|
|
|
# -----------------------------------------------------------------------
|
|
# Rutas de autenticacion
|
|
# -----------------------------------------------------------------------
|
|
@app.route("/login", methods=["GET", "POST"])
|
|
def login():
|
|
error = None
|
|
if request.method == "POST":
|
|
usuario = request.form.get("usuario", "").strip().lower()
|
|
password = request.form.get("password", "")
|
|
users = cargar_usuarios()
|
|
user = users.get(usuario)
|
|
if user and check_password_hash(user["password_hash"], password):
|
|
session.permanent = True
|
|
session["usuario"] = usuario
|
|
session["nombre"] = user.get("nombre", usuario)
|
|
next_url = request.form.get("next") or url_for("index")
|
|
return redirect(next_url)
|
|
error = "Usuario o contrasena incorrectos"
|
|
return render_template("login.html", error=error,
|
|
next=request.args.get("next", ""))
|
|
|
|
@app.route("/logout")
|
|
def logout():
|
|
session.clear()
|
|
return redirect(url_for("login"))
|
|
|
|
@app.route("/registro", methods=["GET", "POST"])
|
|
def registro():
|
|
error = None
|
|
ok = None
|
|
if request.method == "POST":
|
|
usuario = request.form.get("usuario", "").strip().lower()
|
|
nombre = request.form.get("nombre", "").strip()
|
|
password = request.form.get("password", "")
|
|
password2 = request.form.get("password2", "")
|
|
# Validaciones
|
|
if not usuario or not password or not nombre:
|
|
error = "Todos los campos son obligatorios"
|
|
elif len(usuario) < 3 or not usuario.isalnum():
|
|
error = "El usuario debe tener al menos 3 caracteres y solo letras/numeros"
|
|
elif len(password) < 6:
|
|
error = "La contrasena debe tener al menos 6 caracteres"
|
|
elif password != password2:
|
|
error = "Las contrasenas no coinciden"
|
|
else:
|
|
users = cargar_usuarios()
|
|
if usuario in users:
|
|
error = "Ese nombre de usuario ya esta en uso"
|
|
else:
|
|
users[usuario] = {
|
|
"password_hash": generate_password_hash(password),
|
|
"nombre": nombre
|
|
}
|
|
guardar_usuarios(users)
|
|
ok = "Cuenta creada correctamente. Ya puedes iniciar sesion."
|
|
return render_template("registro.html", error=error, ok=ok)
|
|
|
|
# -----------------------------------------------------------------------
|
|
# Pagina principal
|
|
# -----------------------------------------------------------------------
|
|
@app.route("/")
|
|
@login_required
|
|
def index():
|
|
return render_template("index.html", nombre=session.get("nombre", ""))
|
|
|
|
# -----------------------------------------------------------------------
|
|
# API: datos de predicciones
|
|
# -----------------------------------------------------------------------
|
|
@app.route("/api/datos")
|
|
@login_required
|
|
def api_datos():
|
|
if not DATOS_JSON.exists():
|
|
return jsonify({"error": "datos.json no generado", "predicciones": []}), 404
|
|
with open(DATOS_JSON, encoding="utf-8") as f:
|
|
datos = json.load(f)
|
|
return jsonify(datos)
|
|
|
|
# -----------------------------------------------------------------------
|
|
# API: forzar regeneracion del pipeline
|
|
# -----------------------------------------------------------------------
|
|
@app.route("/api/regenerar", methods=["POST"])
|
|
@login_required
|
|
def api_regenerar():
|
|
try:
|
|
subprocess.run(
|
|
[sys.executable, str(BASE_DIR / "autocompra7.py")],
|
|
cwd=str(BASE_DIR), check=True, capture_output=True, timeout=120
|
|
)
|
|
subprocess.run(
|
|
[sys.executable, str(BASE_DIR / "generar_lista.py")],
|
|
cwd=str(BASE_DIR), check=True, capture_output=True, timeout=30
|
|
)
|
|
return jsonify({"ok": True, "mensaje": "Pipeline ejecutado correctamente"})
|
|
except subprocess.CalledProcessError as e:
|
|
return jsonify({"ok": False, "mensaje": str(e)}), 500
|
|
except subprocess.TimeoutExpired:
|
|
return jsonify({"ok": False, "mensaje": "Timeout al ejecutar el pipeline"}), 500
|
|
|
|
# -----------------------------------------------------------------------
|
|
# Archivos estaticos de tickets (solo autenticados)
|
|
# -----------------------------------------------------------------------
|
|
@app.route("/tickets/<path:filename>")
|
|
@login_required
|
|
def ticket_file(filename):
|
|
return send_from_directory(str(TICKETS_DIR), filename)
|
|
|
|
# -----------------------------------------------------------------------
|
|
# Gestion de usuarios (solo admin)
|
|
# -----------------------------------------------------------------------
|
|
@app.route("/admin/usuarios")
|
|
@login_required
|
|
def admin_usuarios():
|
|
if session.get("usuario") != "admin":
|
|
abort(403)
|
|
users = cargar_usuarios()
|
|
lista = [{"usuario": k, "nombre": v.get("nombre", k)} for k, v in users.items()]
|
|
return render_template("admin_usuarios.html", usuarios=lista)
|
|
|
|
@app.route("/admin/usuarios/crear", methods=["POST"])
|
|
@login_required
|
|
def admin_crear_usuario():
|
|
if session.get("usuario") != "admin":
|
|
abort(403)
|
|
usuario = request.form.get("usuario", "").strip().lower()
|
|
nombre = request.form.get("nombre", "").strip()
|
|
password = request.form.get("password", "")
|
|
if not usuario or not password:
|
|
abort(400)
|
|
users = cargar_usuarios()
|
|
users[usuario] = {
|
|
"password_hash": generate_password_hash(password),
|
|
"nombre": nombre or usuario
|
|
}
|
|
guardar_usuarios(users)
|
|
return redirect(url_for("admin_usuarios"))
|
|
|
|
@app.route("/admin/usuarios/eliminar", methods=["POST"])
|
|
@login_required
|
|
def admin_eliminar_usuario():
|
|
if session.get("usuario") != "admin":
|
|
abort(403)
|
|
usuario = request.form.get("usuario", "").strip().lower()
|
|
if usuario == "admin":
|
|
abort(400)
|
|
users = cargar_usuarios()
|
|
users.pop(usuario, None)
|
|
guardar_usuarios(users)
|
|
return redirect(url_for("admin_usuarios"))
|
|
|
|
# -----------------------------------------------------------------------
|
|
# API: OCR de foto de ticket
|
|
# -----------------------------------------------------------------------
|
|
@app.route("/api/ocr-ticket", methods=["POST"])
|
|
@login_required
|
|
def api_ocr_ticket():
|
|
if 'imagen' not in request.files:
|
|
return jsonify({"ok": False, "mensaje": "No se recibio ninguna imagen"}), 400
|
|
|
|
file = request.files['imagen']
|
|
if not file or file.filename == '':
|
|
return jsonify({"ok": False, "mensaje": "Archivo vacio"}), 400
|
|
|
|
content_type = file.content_type or ''
|
|
if content_type not in ALLOWED_IMAGE_TYPES:
|
|
return jsonify({"ok": False, "mensaje": "Formato no soportado. Usa JPG, PNG o WEBP"}), 400
|
|
|
|
img_bytes = file.read()
|
|
if len(img_bytes) > MAX_IMAGE_BYTES:
|
|
return jsonify({"ok": False, "mensaje": "La imagen es demasiado grande (max 15 MB)"}), 400
|
|
|
|
try:
|
|
from PIL import Image, ImageEnhance, ImageFilter
|
|
|
|
# Preprocesar para mejorar OCR: escala de grises + contraste
|
|
img = Image.open(io.BytesIO(img_bytes)).convert('RGB')
|
|
img = img.convert('L') # escala de grises
|
|
img = ImageEnhance.Contrast(img).enhance(2.0) # aumentar contraste
|
|
img = img.filter(ImageFilter.SHARPEN) # nitidez
|
|
|
|
buf = io.BytesIO()
|
|
img.save(buf, format='PNG')
|
|
img_procesada = buf.getvalue()
|
|
|
|
reader = get_ocr_reader()
|
|
lineas = reader.readtext(img_procesada, detail=0, paragraph=True)
|
|
texto = '\n'.join(lineas)
|
|
productos = parsear_texto_ticket(texto)
|
|
|
|
if not productos:
|
|
return jsonify({
|
|
"ok": False,
|
|
"mensaje": "No se encontraron productos. Prueba con una foto mas nitida.",
|
|
"texto": texto,
|
|
}), 422
|
|
|
|
# Extraer fecha del ticket si aparece en el texto
|
|
date_match = re.search(r"(\d{2}/\d{2}/\d{4})", texto)
|
|
fecha_ticket = date_match.group(1) if date_match else datetime.now().strftime('%d/%m/%Y')
|
|
|
|
# Guardar como JSON en la carpeta tickets/
|
|
ts = datetime.now().strftime('%Y%m%d_%H%M%S')
|
|
ticket_path = TICKETS_DIR / f"ocr_{ts}.json"
|
|
TICKETS_DIR.mkdir(exist_ok=True)
|
|
with open(ticket_path, 'w', encoding='utf-8') as f:
|
|
json.dump({
|
|
"fecha": fecha_ticket,
|
|
"fuente": "ocr",
|
|
"archivo": file.filename,
|
|
"productos": productos,
|
|
}, f, ensure_ascii=False, indent=2)
|
|
|
|
return jsonify({
|
|
"ok": True,
|
|
"mensaje": f"{len(productos)} productos guardados",
|
|
"productos": productos,
|
|
"fecha": fecha_ticket,
|
|
})
|
|
|
|
except Exception as e:
|
|
return jsonify({"ok": False, "mensaje": f"Error al procesar la imagen: {str(e)}"}), 500
|
|
|
|
# -----------------------------------------------------------------------
|
|
# Entry point
|
|
# -----------------------------------------------------------------------
|
|
if __name__ == "__main__":
|
|
inicializar_admin()
|
|
debug = os.environ.get("FLASK_DEBUG", "false").lower() == "true"
|
|
app.run(host="127.0.0.1", port=5000, debug=debug) |