""" app.py — CarritoIA Flask app con autenticacion de sesion. Arrancar en desarrollo: python app.py En produccion usa gunicorn detras de nginx: gunicorn -w 2 -b 127.0.0.1:5000 app:app """ import os import io import re import json import subprocess import sys from pathlib import Path from functools import wraps from datetime import datetime, timedelta from flask import ( Flask, render_template, request, redirect, url_for, session, jsonify, abort, send_from_directory ) from werkzeug.security import generate_password_hash, check_password_hash # ----------------------------------------------------------------------- # Configuracion # ----------------------------------------------------------------------- BASE_DIR = Path(__file__).parent USERS_FILE = BASE_DIR / "users.json" def get_tickets_dir(usuario: str) -> Path: """Directorio de tickets del usuario. Se crea si no existe.""" d = BASE_DIR / "tickets" / usuario d.mkdir(parents=True, exist_ok=True) return d def get_datos_json(usuario: str) -> Path: """Ruta al datos.json del usuario.""" return BASE_DIR / "datos" / usuario / "datos.json" ALLOWED_IMAGE_TYPES = {'image/jpeg', 'image/jpg', 'image/png', 'image/webp'} MAX_IMAGE_BYTES = 15 * 1024 * 1024 # 15 MB app = Flask(__name__, template_folder="templates", static_folder="static") app.secret_key = os.environ.get("SECRET_KEY", "cambia-esto-en-produccion-!!!!") app.permanent_session_lifetime = timedelta(days=30) # ----------------------------------------------------------------------- # OCR — lector lazy (se inicializa la primera vez que se usa) # ----------------------------------------------------------------------- _ocr_reader = None def get_ocr_reader(): global _ocr_reader if _ocr_reader is None: import easyocr _ocr_reader = easyocr.Reader(['es', 'en'], gpu=False, verbose=False) return _ocr_reader # Palabras que indican que la línea NO es un producto _OCR_EXCLUDE = [ "TARJETA", "IVA", "CUOTA", "TOTAL", "DEVOLUCIONES", "N.C", "AUT", "VERIFICADO", "VISA", "IMPORTE", "MASTERCARD", "MERCADONA", "SUMA", "EFECTIVO", "CAMBIO", "TICKET", "FACTURA", ] def parsear_texto_ticket(texto): """Extrae productos de texto OCR con el mismo formato que los tickets Mercadona.""" productos = [] lines = texto.splitlines() i = 0 while i < len(lines): linea = lines[i].strip() if not linea or any(kw in linea.upper() for kw in _OCR_EXCLUDE): i += 1 continue # Producto por peso (2 líneas): "1BROCOLI" + "1,048 kg 2,60 €/kg 2,72" if i + 1 < len(lines): next_line = lines[i + 1].strip() weight_m = re.search(r"kg.*?(\d+[,.]\d{2})\s*$", next_line) if weight_m and re.match(r"^\d+[A-ZÁÉÍÓÚÑa-záéíóúñ+]", linea): name_m = re.match(r"^(\d+)(.+)$", linea) if name_m: try: cantidad = int(name_m.group(1)) precio_total = round(float(weight_m.group(1).replace(",", ".")), 2) productos.append({ "cantidad": cantidad, "producto": name_m.group(2).strip().upper(), "precio_unitario": round(precio_total / cantidad, 2), "precio_total": precio_total, }) i += 2 continue except ValueError: pass # "2ARROZ SOS 1,88 3,76" m = re.match(r"^(\d+)(.+?)\s+(\d+[,\.]\d{2})\s+(\d+[,\.]\d{2})$", linea) if m: try: productos.append({ "cantidad": int(m.group(1)), "producto": m.group(2).strip().upper(), "precio_unitario": round(float(m.group(3).replace(',', '.')), 2), "precio_total": round(float(m.group(4).replace(',', '.')), 2), }) i += 1 continue except ValueError: pass # "1CROISSANT RELL CACAO 1,90" m = re.match(r"^(\d+)(.+?)\s+(\d+[,\.]\d{2})$", linea) if m: try: cantidad = int(m.group(1)) precio = round(float(m.group(3).replace(',', '.')), 2) productos.append({ "cantidad": cantidad, "producto": m.group(2).strip().upper(), "precio_unitario": round(precio / cantidad, 2), "precio_total": precio, }) i += 1 continue except ValueError: pass i += 1 return productos # ----------------------------------------------------------------------- # Gestion de usuarios (users.json — NO subir al repo) # ----------------------------------------------------------------------- def cargar_usuarios(): if not USERS_FILE.exists() or USERS_FILE.is_dir(): return {} try: with open(USERS_FILE, encoding="utf-8") as f: contenido = f.read().strip() return json.loads(contenido) if contenido else {} except (json.JSONDecodeError, OSError): return {} def guardar_usuarios(users): # Si por error Docker creó un directorio en lugar del archivo, lo eliminamos if USERS_FILE.exists() and USERS_FILE.is_dir(): import shutil shutil.rmtree(USERS_FILE) with open(USERS_FILE, "w", encoding="utf-8") as f: json.dump(users, f, indent=2, ensure_ascii=False) def inicializar_admin(): """Crea el usuario admin la primera vez si users.json no existe o está vacío.""" if USERS_FILE.exists() and USERS_FILE.stat().st_size > 0: return pwd = os.environ.get("ADMIN_PASSWORD", "cambia-esta-password") users = { "admin": { "password_hash": generate_password_hash(pwd), "nombre": "Admin", "admin": True } } guardar_usuarios(users) print(f"[init] Usuario admin creado. Password: {pwd}") print(f"[init] Cambialo en users.json o con ADMIN_PASSWORD en el entorno.") # ----------------------------------------------------------------------- # Decoradores de autenticacion # ----------------------------------------------------------------------- def login_required(f): @wraps(f) def decorated(*args, **kwargs): if not session.get("usuario"): return redirect(url_for("login", next=request.path)) return f(*args, **kwargs) return decorated def admin_required(f): @wraps(f) def decorated(*args, **kwargs): if not session.get("usuario"): return redirect(url_for("login", next=request.path)) users = cargar_usuarios() if not users.get(session["usuario"], {}).get("admin"): abort(403) return f(*args, **kwargs) return decorated # ----------------------------------------------------------------------- # Rutas de autenticacion # ----------------------------------------------------------------------- @app.route("/login", methods=["GET", "POST"]) def login(): error = None if request.method == "POST": usuario = request.form.get("usuario", "").strip().lower() password = request.form.get("password", "") users = cargar_usuarios() user = users.get(usuario) if user and check_password_hash(user["password_hash"], password): session.permanent = True session["usuario"] = usuario session["nombre"] = user.get("nombre", usuario) next_url = request.form.get("next") or url_for("index") return redirect(next_url) error = "Usuario o contrasena incorrectos" return render_template("login.html", error=error, next=request.args.get("next", "")) @app.route("/logout") def logout(): session.clear() return redirect(url_for("login")) # ----------------------------------------------------------------------- # Perfil de usuario # ----------------------------------------------------------------------- @app.route("/perfil", methods=["GET", "POST"]) @login_required def perfil(): usuario = session["usuario"] users = cargar_usuarios() user = users[usuario] error = None ok = None if request.method == "POST": accion = request.form.get("accion", "") if accion == "nombre": nombre = request.form.get("nombre", "").strip() if not nombre: error = "El nombre no puede estar vacio" else: user["nombre"] = nombre session["nombre"] = nombre guardar_usuarios(users) ok = "Nombre actualizado" elif accion == "password": pwd_actual = request.form.get("pwd_actual", "") pwd_nueva = request.form.get("pwd_nueva", "") pwd_nueva2 = request.form.get("pwd_nueva2", "") if not check_password_hash(user["password_hash"], pwd_actual): error = "La contrasena actual no es correcta" elif len(pwd_nueva) < 6: error = "La nueva contrasena debe tener al menos 6 caracteres" elif pwd_nueva != pwd_nueva2: error = "Las contrasenas no coinciden" else: user["password_hash"] = generate_password_hash(pwd_nueva) guardar_usuarios(users) ok = "Contrasena actualizada" elif accion == "email": user["email_config"] = { "imap_host": request.form.get("imap_host", "").strip(), "imap_port": request.form.get("imap_port", "993").strip(), "correo": request.form.get("correo", "").strip(), "password": request.form.get("email_pwd", "").strip(), "remitente": request.form.get("remitente", "noreply@mercadona.es").strip(), "ssl_verify": request.form.get("ssl_verify", "false"), } guardar_usuarios(users) ok = "Configuracion de email guardada" email_cfg = user.get("email_config", {}) return render_template("perfil.html", nombre=session.get("nombre", ""), usuario=usuario, email_cfg=email_cfg, error=error, ok=ok) # ----------------------------------------------------------------------- # Panel de administracion # ----------------------------------------------------------------------- @app.route("/admin") @admin_required def admin_panel(): users = cargar_usuarios() lista = [] for uname, udata in users.items(): tickets_dir = BASE_DIR / "tickets" / uname n_tickets = sum(1 for f in tickets_dir.iterdir() if f.is_file()) if tickets_dir.exists() else 0 lista.append({ "usuario": uname, "nombre": udata.get("nombre", uname), "admin": udata.get("admin", False), "n_tickets": n_tickets, }) lista.sort(key=lambda x: x["usuario"]) return render_template("admin.html", nombre=session.get("nombre", ""), usuarios=lista) @app.route("/admin/crear", methods=["POST"]) @admin_required def admin_crear_usuario(): usuario = request.form.get("usuario", "").strip().lower() nombre = request.form.get("nombre", "").strip() password = request.form.get("password", "").strip() es_admin = request.form.get("es_admin") == "on" users = cargar_usuarios() error = None if not usuario or not nombre or not password: error = "Todos los campos son obligatorios" elif len(usuario) < 3 or not usuario.isalnum(): error = "Usuario: min. 3 caracteres, solo letras y numeros" elif len(password) < 6: error = "Contrasena: min. 6 caracteres" elif usuario in users: error = "Ese nombre de usuario ya existe" if error: # Redirigir con el error como query param (sencillo) return redirect(url_for("admin_panel") + f"?error={error}") users[usuario] = { "password_hash": generate_password_hash(password), "nombre": nombre, } if es_admin: users[usuario]["admin"] = True guardar_usuarios(users) (BASE_DIR / "tickets" / usuario).mkdir(parents=True, exist_ok=True) (BASE_DIR / "datos" / usuario).mkdir(parents=True, exist_ok=True) return redirect(url_for("admin_panel")) @app.route("/admin/eliminar", methods=["POST"]) @admin_required def admin_eliminar_usuario(): import shutil objetivo = request.form.get("usuario", "").strip().lower() if not objetivo or objetivo == session["usuario"]: return redirect(url_for("admin_panel")) # no se puede autoeliminar users = cargar_usuarios() users.pop(objetivo, None) guardar_usuarios(users) # Borrar datos del usuario for carpeta in ["tickets", "datos"]: d = BASE_DIR / carpeta / objetivo if d.exists(): shutil.rmtree(d) return redirect(url_for("admin_panel")) @app.route("/registro", methods=["GET", "POST"]) def registro(): error = None ok = None if request.method == "POST": usuario = request.form.get("usuario", "").strip().lower() nombre = request.form.get("nombre", "").strip() password = request.form.get("password", "") password2 = request.form.get("password2", "") # Validaciones if not usuario or not password or not nombre: error = "Todos los campos son obligatorios" elif len(usuario) < 3 or not usuario.isalnum(): error = "El usuario debe tener al menos 3 caracteres y solo letras/numeros" elif len(password) < 6: error = "La contrasena debe tener al menos 6 caracteres" elif password != password2: error = "Las contrasenas no coinciden" else: users = cargar_usuarios() if usuario in users: error = "Ese nombre de usuario ya esta en uso" else: users[usuario] = { "password_hash": generate_password_hash(password), "nombre": nombre } guardar_usuarios(users) # Crear carpeta de tickets para el nuevo usuario (BASE_DIR / "tickets" / usuario).mkdir(parents=True, exist_ok=True) (BASE_DIR / "datos" / usuario).mkdir(parents=True, exist_ok=True) ok = "Cuenta creada correctamente. Ya puedes iniciar sesion." return render_template("registro.html", error=error, ok=ok) # ----------------------------------------------------------------------- # Pagina principal # ----------------------------------------------------------------------- @app.route("/") @login_required def index(): return render_template("index.html", nombre=session.get("nombre", "")) # ----------------------------------------------------------------------- # API: datos de predicciones # ----------------------------------------------------------------------- @app.route("/api/datos") @login_required def api_datos(): datos_json = get_datos_json(session["usuario"]) if not datos_json.exists(): return jsonify({"error": "datos.json no generado", "predicciones": []}), 404 with open(datos_json, encoding="utf-8") as f: datos = json.load(f) return jsonify(datos) # ----------------------------------------------------------------------- # API: subir ticket PDF al servidor # ----------------------------------------------------------------------- @app.route("/api/subir-pdf", methods=["POST"]) @login_required def api_subir_pdf(): if 'pdf' not in request.files: return jsonify({"ok": False, "mensaje": "No se recibio ningun archivo"}), 400 file = request.files['pdf'] if not file or file.filename == '': return jsonify({"ok": False, "mensaje": "Archivo vacio"}), 400 if not file.filename.lower().endswith('.pdf'): return jsonify({"ok": False, "mensaje": "Solo se aceptan archivos PDF"}), 400 pdf_bytes = file.read() if len(pdf_bytes) > 20 * 1024 * 1024: return jsonify({"ok": False, "mensaje": "El PDF es demasiado grande (max 20 MB)"}), 400 tdir = get_tickets_dir(session["usuario"]) # Nombre seguro: eliminar caracteres problemáticos nombre_seguro = re.sub(r'[^\w\-. ]', '_', file.filename) dest = tdir / nombre_seguro with open(dest, 'wb') as f: f.write(pdf_bytes) return jsonify({"ok": True, "mensaje": f"PDF guardado: {nombre_seguro}"}) # ----------------------------------------------------------------------- # API: forzar regeneracion del pipeline # ----------------------------------------------------------------------- @app.route("/api/regenerar", methods=["POST"]) @login_required def api_regenerar(): usuario = session["usuario"] try: r1 = subprocess.run( [sys.executable, str(BASE_DIR / "autocompra7.py"), "--usuario", usuario], cwd=str(BASE_DIR), capture_output=True, text=True, timeout=120 ) if r1.returncode != 0: return jsonify({"ok": False, "mensaje": "Error en pipeline", "detalle": r1.stderr or r1.stdout}), 500 r2 = subprocess.run( [sys.executable, str(BASE_DIR / "generar_lista.py"), "--usuario", usuario], cwd=str(BASE_DIR), capture_output=True, text=True, timeout=30 ) if r2.returncode != 0: return jsonify({"ok": False, "mensaje": "Error generando lista", "detalle": r2.stderr or r2.stdout}), 500 return jsonify({"ok": True, "mensaje": "Pipeline ejecutado correctamente"}) except subprocess.TimeoutExpired: return jsonify({"ok": False, "mensaje": "Timeout al ejecutar el pipeline"}), 500 # ----------------------------------------------------------------------- # API: importar tickets desde correo # ----------------------------------------------------------------------- @app.route("/api/importar-email", methods=["POST"]) @login_required def api_importar_email(): usuario = session["usuario"] try: r = subprocess.run( [sys.executable, str(BASE_DIR / "importar_tickets_email.py"), "--usuario", usuario], cwd=str(BASE_DIR), capture_output=True, text=True, timeout=60 ) output = r.stdout + r.stderr if r.returncode != 0: return jsonify({"ok": False, "mensaje": "Error al importar email", "detalle": output}), 500 # Extraer cuántos tickets se descargaron del output import re as _re m = _re.search(r"Total descargados: (\d+)", output) n = int(m.group(1)) if m else 0 return jsonify({"ok": True, "nuevos": n, "mensaje": f"{n} ticket(s) nuevos importados del correo"}) except subprocess.TimeoutExpired: return jsonify({"ok": False, "mensaje": "Timeout al conectar con el correo"}), 500 # ----------------------------------------------------------------------- # Archivos estaticos de tickets (solo autenticados) # ----------------------------------------------------------------------- @app.route("/tickets/") @login_required def ticket_file(filename): # Cada usuario solo accede a su propia carpeta de tickets return send_from_directory(str(get_tickets_dir(session["usuario"])), filename) # (rutas /admin, /admin/crear, /admin/eliminar definidas arriba) # ----------------------------------------------------------------------- # API: OCR de foto de ticket # ----------------------------------------------------------------------- @app.route("/api/ocr-ticket", methods=["POST"]) @login_required def api_ocr_ticket(): if 'imagen' not in request.files: return jsonify({"ok": False, "mensaje": "No se recibio ninguna imagen"}), 400 file = request.files['imagen'] if not file or file.filename == '': return jsonify({"ok": False, "mensaje": "Archivo vacio"}), 400 content_type = file.content_type or '' if content_type not in ALLOWED_IMAGE_TYPES: return jsonify({"ok": False, "mensaje": "Formato no soportado. Usa JPG, PNG o WEBP"}), 400 img_bytes = file.read() if len(img_bytes) > MAX_IMAGE_BYTES: return jsonify({"ok": False, "mensaje": "La imagen es demasiado grande (max 15 MB)"}), 400 try: from PIL import Image, ImageEnhance, ImageFilter # Preprocesar para mejorar OCR: escala de grises + contraste img = Image.open(io.BytesIO(img_bytes)).convert('RGB') img = img.convert('L') # escala de grises img = ImageEnhance.Contrast(img).enhance(2.0) # aumentar contraste img = img.filter(ImageFilter.SHARPEN) # nitidez buf = io.BytesIO() img.save(buf, format='PNG') img_procesada = buf.getvalue() reader = get_ocr_reader() lineas = reader.readtext(img_procesada, detail=0, paragraph=True) texto = '\n'.join(lineas) productos = parsear_texto_ticket(texto) if not productos: return jsonify({ "ok": False, "mensaje": "No se encontraron productos. Prueba con una foto mas nitida.", "texto": texto, }), 422 # Extraer fecha del ticket si aparece en el texto date_match = re.search(r"(\d{2}/\d{2}/\d{4})", texto) fecha_ticket = date_match.group(1) if date_match else datetime.now().strftime('%d/%m/%Y') # Guardar como JSON en la carpeta tickets/ ts = datetime.now().strftime('%Y%m%d_%H%M%S') tdir = get_tickets_dir(session['usuario']) ticket_path = tdir / f"ocr_{ts}.json" with open(ticket_path, 'w', encoding='utf-8') as f: json.dump({ "fecha": fecha_ticket, "fuente": "ocr", "archivo": file.filename, "productos": productos, }, f, ensure_ascii=False, indent=2) return jsonify({ "ok": True, "mensaje": f"{len(productos)} productos guardados", "productos": productos, "fecha": fecha_ticket, }) except Exception as e: return jsonify({"ok": False, "mensaje": f"Error al procesar la imagen: {str(e)}"}), 500 # ----------------------------------------------------------------------- # Estadisticas de consumo # ----------------------------------------------------------------------- @app.route("/estadisticas") @login_required def estadisticas(): return render_template("estadisticas.html", nombre=session.get("nombre", "")) @app.route("/api/estadisticas") @login_required def api_estadisticas(): import csv as _csv usuario = session["usuario"] datos_dir = BASE_DIR / "datos" / usuario tickets_dir = get_tickets_dir(usuario) # Contar archivos de tickets try: total_tickets = sum(1 for f in tickets_dir.iterdir() if f.is_file()) except Exception: total_tickets = 0 # Leer gasto_mensual.csv meses = [] gasto_csv = datos_dir / "gasto_mensual.csv" if gasto_csv.exists(): with open(gasto_csv, encoding="utf-8") as fh: for row in _csv.DictReader(fh): try: meses.append({"mes": str(row.get("mes", "")), "gasto": round(float(row.get("precio_total", 0)), 2)}) except ValueError: pass meses.sort(key=lambda x: x["mes"]) _ES = ["ene","feb","mar","abr","may","jun","jul","ago","sep","oct","nov","dic"] for m in meses: try: partes = m["mes"].split("-") m["label"] = _ES[int(partes[1]) - 1] + " " + partes[0][-2:] except Exception: m["label"] = m["mes"] gastos_v = [m["gasto"] for m in meses] gasto_medio = round(sum(gastos_v) / len(gastos_v), 2) if gastos_v else 0.0 gasto_anual = round(sum(gastos_v[-12:]), 2) if gastos_v else 0.0 hoy = datetime.now() mes_actual_key = hoy.strftime("%Y-%m") mes_ant_key = (hoy.replace(day=1) - timedelta(days=1)).strftime("%Y-%m") gasto_mes_actual = next((m["gasto"] for m in meses if m["mes"] == mes_actual_key), 0.0) gasto_mes_anterior = next((m["gasto"] for m in meses if m["mes"] == mes_ant_key), 0.0) # Leer resumen_productos.csv top_gasto = [] top_frecuencia = [] productos_unicos = 0 resumen_csv = datos_dir / "resumen_productos.csv" if resumen_csv.exists(): with open(resumen_csv, encoding="utf-8") as fh: rows = list(_csv.DictReader(fh)) productos_unicos = len(rows) top_gasto = [ {"producto": r["producto"], "gasto_total": round(float(r.get("gasto_total", 0)), 2), "veces": int(r.get("veces_comprado", 0))} for r in sorted(rows, key=lambda r: float(r.get("gasto_total", 0)), reverse=True)[:10] ] top_frecuencia = [ {"producto": r["producto"], "veces_comprado": int(r.get("veces_comprado", 0)), "gasto_total": round(float(r.get("gasto_total", 0)), 2)} for r in sorted(rows, key=lambda r: int(r.get("veces_comprado", 0)), reverse=True)[:10] ] return jsonify({ "gasto_medio_mensual": gasto_medio, "gasto_mes_actual": gasto_mes_actual, "gasto_mes_anterior": gasto_mes_anterior, "gasto_anual": gasto_anual, "total_tickets": total_tickets, "productos_unicos": productos_unicos, "meses": meses[-18:], "top_productos_gasto": top_gasto, "top_productos_frecuencia": top_frecuencia, }) # ----------------------------------------------------------------------- # Entry point # ----------------------------------------------------------------------- inicializar_admin() # Necesario tanto para gunicorn como para python app.py if __name__ == "__main__": debug = os.environ.get("FLASK_DEBUG", "false").lower() == "true" app.run(host="127.0.0.1", port=5000, debug=debug)