664 lines
26 KiB
Python
664 lines
26 KiB
Python
"""
|
|
app.py — CarritoIA
|
|
Flask app con autenticacion de sesion.
|
|
|
|
Arrancar en desarrollo:
|
|
python app.py
|
|
|
|
En produccion usa gunicorn detras de nginx:
|
|
gunicorn -w 2 -b 127.0.0.1:5000 app:app
|
|
"""
|
|
|
|
import os
|
|
import io
|
|
import re
|
|
import json
|
|
import subprocess
|
|
import sys
|
|
from pathlib import Path
|
|
from functools import wraps
|
|
from datetime import datetime, timedelta
|
|
|
|
from flask import (
|
|
Flask, render_template, request, redirect,
|
|
url_for, session, jsonify, abort, send_from_directory
|
|
)
|
|
from werkzeug.security import generate_password_hash, check_password_hash
|
|
|
|
# -----------------------------------------------------------------------
|
|
# Configuracion
|
|
# -----------------------------------------------------------------------
|
|
BASE_DIR = Path(__file__).parent
|
|
USERS_FILE = BASE_DIR / "users.json"
|
|
|
|
def get_tickets_dir(usuario: str) -> Path:
|
|
"""Directorio de tickets del usuario. Se crea si no existe."""
|
|
d = BASE_DIR / "tickets" / usuario
|
|
d.mkdir(parents=True, exist_ok=True)
|
|
return d
|
|
|
|
def get_datos_json(usuario: str) -> Path:
|
|
"""Ruta al datos.json del usuario."""
|
|
return BASE_DIR / "datos" / usuario / "datos.json"
|
|
|
|
ALLOWED_IMAGE_TYPES = {'image/jpeg', 'image/jpg', 'image/png', 'image/webp'}
|
|
MAX_IMAGE_BYTES = 15 * 1024 * 1024 # 15 MB
|
|
|
|
app = Flask(__name__, template_folder="templates", static_folder="static")
|
|
|
|
app.secret_key = os.environ.get("SECRET_KEY", "cambia-esto-en-produccion-!!!!")
|
|
app.permanent_session_lifetime = timedelta(days=30)
|
|
|
|
# -----------------------------------------------------------------------
|
|
# OCR — lector lazy (se inicializa la primera vez que se usa)
|
|
# -----------------------------------------------------------------------
|
|
_ocr_reader = None
|
|
|
|
def get_ocr_reader():
|
|
global _ocr_reader
|
|
if _ocr_reader is None:
|
|
import easyocr
|
|
_ocr_reader = easyocr.Reader(['es', 'en'], gpu=False, verbose=False)
|
|
return _ocr_reader
|
|
|
|
# Palabras que indican que la línea NO es un producto
|
|
_OCR_EXCLUDE = [
|
|
"TARJETA", "IVA", "CUOTA", "TOTAL", "DEVOLUCIONES", "N.C", "AUT",
|
|
"VERIFICADO", "VISA", "IMPORTE", "MASTERCARD", "MERCADONA", "SUMA",
|
|
"EFECTIVO", "CAMBIO", "TICKET", "FACTURA",
|
|
]
|
|
|
|
def parsear_texto_ticket(texto):
|
|
"""Extrae productos de texto OCR con el mismo formato que los tickets Mercadona."""
|
|
productos = []
|
|
lines = texto.splitlines()
|
|
i = 0
|
|
while i < len(lines):
|
|
linea = lines[i].strip()
|
|
if not linea or any(kw in linea.upper() for kw in _OCR_EXCLUDE):
|
|
i += 1
|
|
continue
|
|
|
|
# Producto por peso (2 líneas): "1BROCOLI" + "1,048 kg 2,60 €/kg 2,72"
|
|
if i + 1 < len(lines):
|
|
next_line = lines[i + 1].strip()
|
|
weight_m = re.search(r"kg.*?(\d+[,.]\d{2})\s*$", next_line)
|
|
if weight_m and re.match(r"^\d+[A-ZÁÉÍÓÚÑa-záéíóúñ+]", linea):
|
|
name_m = re.match(r"^(\d+)(.+)$", linea)
|
|
if name_m:
|
|
try:
|
|
cantidad = int(name_m.group(1))
|
|
precio_total = round(float(weight_m.group(1).replace(",", ".")), 2)
|
|
productos.append({
|
|
"cantidad": cantidad,
|
|
"producto": name_m.group(2).strip().upper(),
|
|
"precio_unitario": round(precio_total / cantidad, 2),
|
|
"precio_total": precio_total,
|
|
})
|
|
i += 2
|
|
continue
|
|
except ValueError:
|
|
pass
|
|
|
|
# "2ARROZ SOS 1,88 3,76"
|
|
m = re.match(r"^(\d+)(.+?)\s+(\d+[,\.]\d{2})\s+(\d+[,\.]\d{2})$", linea)
|
|
if m:
|
|
try:
|
|
productos.append({
|
|
"cantidad": int(m.group(1)),
|
|
"producto": m.group(2).strip().upper(),
|
|
"precio_unitario": round(float(m.group(3).replace(',', '.')), 2),
|
|
"precio_total": round(float(m.group(4).replace(',', '.')), 2),
|
|
})
|
|
i += 1
|
|
continue
|
|
except ValueError:
|
|
pass
|
|
|
|
# "1CROISSANT RELL CACAO 1,90"
|
|
m = re.match(r"^(\d+)(.+?)\s+(\d+[,\.]\d{2})$", linea)
|
|
if m:
|
|
try:
|
|
cantidad = int(m.group(1))
|
|
precio = round(float(m.group(3).replace(',', '.')), 2)
|
|
productos.append({
|
|
"cantidad": cantidad,
|
|
"producto": m.group(2).strip().upper(),
|
|
"precio_unitario": round(precio / cantidad, 2),
|
|
"precio_total": precio,
|
|
})
|
|
i += 1
|
|
continue
|
|
except ValueError:
|
|
pass
|
|
|
|
i += 1
|
|
return productos
|
|
|
|
# -----------------------------------------------------------------------
|
|
# Gestion de usuarios (users.json — NO subir al repo)
|
|
# -----------------------------------------------------------------------
|
|
def cargar_usuarios():
|
|
if not USERS_FILE.exists() or USERS_FILE.is_dir():
|
|
return {}
|
|
try:
|
|
with open(USERS_FILE, encoding="utf-8") as f:
|
|
contenido = f.read().strip()
|
|
return json.loads(contenido) if contenido else {}
|
|
except (json.JSONDecodeError, OSError):
|
|
return {}
|
|
|
|
def guardar_usuarios(users):
|
|
# Si por error Docker creó un directorio en lugar del archivo, lo eliminamos
|
|
if USERS_FILE.exists() and USERS_FILE.is_dir():
|
|
import shutil
|
|
shutil.rmtree(USERS_FILE)
|
|
with open(USERS_FILE, "w", encoding="utf-8") as f:
|
|
json.dump(users, f, indent=2, ensure_ascii=False)
|
|
|
|
def inicializar_admin():
|
|
"""Crea el usuario admin la primera vez si users.json no existe o está vacío."""
|
|
if USERS_FILE.exists() and USERS_FILE.stat().st_size > 0:
|
|
return
|
|
pwd = os.environ.get("ADMIN_PASSWORD", "cambia-esta-password")
|
|
users = {
|
|
"admin": {
|
|
"password_hash": generate_password_hash(pwd),
|
|
"nombre": "Admin",
|
|
"admin": True
|
|
}
|
|
}
|
|
guardar_usuarios(users)
|
|
print(f"[init] Usuario admin creado. Password: {pwd}")
|
|
print(f"[init] Cambialo en users.json o con ADMIN_PASSWORD en el entorno.")
|
|
|
|
# -----------------------------------------------------------------------
|
|
# Decoradores de autenticacion
|
|
# -----------------------------------------------------------------------
|
|
def login_required(f):
|
|
@wraps(f)
|
|
def decorated(*args, **kwargs):
|
|
if not session.get("usuario"):
|
|
return redirect(url_for("login", next=request.path))
|
|
return f(*args, **kwargs)
|
|
return decorated
|
|
|
|
def admin_required(f):
|
|
@wraps(f)
|
|
def decorated(*args, **kwargs):
|
|
if not session.get("usuario"):
|
|
return redirect(url_for("login", next=request.path))
|
|
users = cargar_usuarios()
|
|
if not users.get(session["usuario"], {}).get("admin"):
|
|
abort(403)
|
|
return f(*args, **kwargs)
|
|
return decorated
|
|
|
|
# -----------------------------------------------------------------------
|
|
# Rutas de autenticacion
|
|
# -----------------------------------------------------------------------
|
|
@app.route("/login", methods=["GET", "POST"])
|
|
def login():
|
|
error = None
|
|
if request.method == "POST":
|
|
usuario = request.form.get("usuario", "").strip().lower()
|
|
password = request.form.get("password", "")
|
|
users = cargar_usuarios()
|
|
user = users.get(usuario)
|
|
if user and check_password_hash(user["password_hash"], password):
|
|
session.permanent = True
|
|
session["usuario"] = usuario
|
|
session["nombre"] = user.get("nombre", usuario)
|
|
next_url = request.form.get("next") or url_for("index")
|
|
return redirect(next_url)
|
|
error = "Usuario o contrasena incorrectos"
|
|
return render_template("login.html", error=error,
|
|
next=request.args.get("next", ""))
|
|
|
|
@app.route("/logout")
|
|
def logout():
|
|
session.clear()
|
|
return redirect(url_for("login"))
|
|
|
|
# -----------------------------------------------------------------------
|
|
# Perfil de usuario
|
|
# -----------------------------------------------------------------------
|
|
@app.route("/perfil", methods=["GET", "POST"])
|
|
@login_required
|
|
def perfil():
|
|
usuario = session["usuario"]
|
|
users = cargar_usuarios()
|
|
user = users[usuario]
|
|
error = None
|
|
ok = None
|
|
|
|
if request.method == "POST":
|
|
accion = request.form.get("accion", "")
|
|
|
|
if accion == "nombre":
|
|
nombre = request.form.get("nombre", "").strip()
|
|
if not nombre:
|
|
error = "El nombre no puede estar vacio"
|
|
else:
|
|
user["nombre"] = nombre
|
|
session["nombre"] = nombre
|
|
guardar_usuarios(users)
|
|
ok = "Nombre actualizado"
|
|
|
|
elif accion == "password":
|
|
pwd_actual = request.form.get("pwd_actual", "")
|
|
pwd_nueva = request.form.get("pwd_nueva", "")
|
|
pwd_nueva2 = request.form.get("pwd_nueva2", "")
|
|
if not check_password_hash(user["password_hash"], pwd_actual):
|
|
error = "La contrasena actual no es correcta"
|
|
elif len(pwd_nueva) < 6:
|
|
error = "La nueva contrasena debe tener al menos 6 caracteres"
|
|
elif pwd_nueva != pwd_nueva2:
|
|
error = "Las contrasenas no coinciden"
|
|
else:
|
|
user["password_hash"] = generate_password_hash(pwd_nueva)
|
|
guardar_usuarios(users)
|
|
ok = "Contrasena actualizada"
|
|
|
|
elif accion == "email":
|
|
user["email_config"] = {
|
|
"imap_host": request.form.get("imap_host", "").strip(),
|
|
"imap_port": request.form.get("imap_port", "993").strip(),
|
|
"correo": request.form.get("correo", "").strip(),
|
|
"password": request.form.get("email_pwd", "").strip(),
|
|
"remitente": request.form.get("remitente", "noreply@mercadona.es").strip(),
|
|
"ssl_verify": request.form.get("ssl_verify", "false"),
|
|
}
|
|
guardar_usuarios(users)
|
|
ok = "Configuracion de email guardada"
|
|
|
|
email_cfg = user.get("email_config", {})
|
|
return render_template("perfil.html",
|
|
nombre=session.get("nombre", ""),
|
|
usuario=usuario,
|
|
email_cfg=email_cfg,
|
|
error=error, ok=ok)
|
|
|
|
# -----------------------------------------------------------------------
|
|
# Panel de administracion
|
|
# -----------------------------------------------------------------------
|
|
@app.route("/admin")
|
|
@admin_required
|
|
def admin_panel():
|
|
users = cargar_usuarios()
|
|
lista = []
|
|
for uname, udata in users.items():
|
|
tickets_dir = BASE_DIR / "tickets" / uname
|
|
n_tickets = sum(1 for f in tickets_dir.iterdir() if f.is_file()) if tickets_dir.exists() else 0
|
|
lista.append({
|
|
"usuario": uname,
|
|
"nombre": udata.get("nombre", uname),
|
|
"admin": udata.get("admin", False),
|
|
"n_tickets": n_tickets,
|
|
})
|
|
lista.sort(key=lambda x: x["usuario"])
|
|
return render_template("admin.html", nombre=session.get("nombre", ""), usuarios=lista)
|
|
|
|
@app.route("/admin/crear", methods=["POST"])
|
|
@admin_required
|
|
def admin_crear_usuario():
|
|
usuario = request.form.get("usuario", "").strip().lower()
|
|
nombre = request.form.get("nombre", "").strip()
|
|
password = request.form.get("password", "").strip()
|
|
es_admin = request.form.get("es_admin") == "on"
|
|
users = cargar_usuarios()
|
|
error = None
|
|
|
|
if not usuario or not nombre or not password:
|
|
error = "Todos los campos son obligatorios"
|
|
elif len(usuario) < 3 or not usuario.isalnum():
|
|
error = "Usuario: min. 3 caracteres, solo letras y numeros"
|
|
elif len(password) < 6:
|
|
error = "Contrasena: min. 6 caracteres"
|
|
elif usuario in users:
|
|
error = "Ese nombre de usuario ya existe"
|
|
|
|
if error:
|
|
# Redirigir con el error como query param (sencillo)
|
|
return redirect(url_for("admin_panel") + f"?error={error}")
|
|
|
|
users[usuario] = {
|
|
"password_hash": generate_password_hash(password),
|
|
"nombre": nombre,
|
|
}
|
|
if es_admin:
|
|
users[usuario]["admin"] = True
|
|
guardar_usuarios(users)
|
|
(BASE_DIR / "tickets" / usuario).mkdir(parents=True, exist_ok=True)
|
|
(BASE_DIR / "datos" / usuario).mkdir(parents=True, exist_ok=True)
|
|
return redirect(url_for("admin_panel"))
|
|
|
|
@app.route("/admin/eliminar", methods=["POST"])
|
|
@admin_required
|
|
def admin_eliminar_usuario():
|
|
import shutil
|
|
objetivo = request.form.get("usuario", "").strip().lower()
|
|
if not objetivo or objetivo == session["usuario"]:
|
|
return redirect(url_for("admin_panel")) # no se puede autoeliminar
|
|
users = cargar_usuarios()
|
|
users.pop(objetivo, None)
|
|
guardar_usuarios(users)
|
|
# Borrar datos del usuario
|
|
for carpeta in ["tickets", "datos"]:
|
|
d = BASE_DIR / carpeta / objetivo
|
|
if d.exists():
|
|
shutil.rmtree(d)
|
|
return redirect(url_for("admin_panel"))
|
|
|
|
@app.route("/registro", methods=["GET", "POST"])
|
|
def registro():
|
|
error = None
|
|
ok = None
|
|
if request.method == "POST":
|
|
usuario = request.form.get("usuario", "").strip().lower()
|
|
nombre = request.form.get("nombre", "").strip()
|
|
password = request.form.get("password", "")
|
|
password2 = request.form.get("password2", "")
|
|
# Validaciones
|
|
if not usuario or not password or not nombre:
|
|
error = "Todos los campos son obligatorios"
|
|
elif len(usuario) < 3 or not usuario.isalnum():
|
|
error = "El usuario debe tener al menos 3 caracteres y solo letras/numeros"
|
|
elif len(password) < 6:
|
|
error = "La contrasena debe tener al menos 6 caracteres"
|
|
elif password != password2:
|
|
error = "Las contrasenas no coinciden"
|
|
else:
|
|
users = cargar_usuarios()
|
|
if usuario in users:
|
|
error = "Ese nombre de usuario ya esta en uso"
|
|
else:
|
|
users[usuario] = {
|
|
"password_hash": generate_password_hash(password),
|
|
"nombre": nombre
|
|
}
|
|
guardar_usuarios(users)
|
|
# Crear carpeta de tickets para el nuevo usuario
|
|
(BASE_DIR / "tickets" / usuario).mkdir(parents=True, exist_ok=True)
|
|
(BASE_DIR / "datos" / usuario).mkdir(parents=True, exist_ok=True)
|
|
ok = "Cuenta creada correctamente. Ya puedes iniciar sesion."
|
|
return render_template("registro.html", error=error, ok=ok)
|
|
|
|
# -----------------------------------------------------------------------
|
|
# Pagina principal
|
|
# -----------------------------------------------------------------------
|
|
@app.route("/")
|
|
@login_required
|
|
def index():
|
|
return render_template("index.html", nombre=session.get("nombre", ""))
|
|
|
|
# -----------------------------------------------------------------------
|
|
# API: datos de predicciones
|
|
# -----------------------------------------------------------------------
|
|
@app.route("/api/datos")
|
|
@login_required
|
|
def api_datos():
|
|
datos_json = get_datos_json(session["usuario"])
|
|
if not datos_json.exists():
|
|
return jsonify({"error": "datos.json no generado", "predicciones": []}), 404
|
|
with open(datos_json, encoding="utf-8") as f:
|
|
datos = json.load(f)
|
|
return jsonify(datos)
|
|
|
|
# -----------------------------------------------------------------------
|
|
# API: subir ticket PDF al servidor
|
|
# -----------------------------------------------------------------------
|
|
@app.route("/api/subir-pdf", methods=["POST"])
|
|
@login_required
|
|
def api_subir_pdf():
|
|
if 'pdf' not in request.files:
|
|
return jsonify({"ok": False, "mensaje": "No se recibio ningun archivo"}), 400
|
|
file = request.files['pdf']
|
|
if not file or file.filename == '':
|
|
return jsonify({"ok": False, "mensaje": "Archivo vacio"}), 400
|
|
if not file.filename.lower().endswith('.pdf'):
|
|
return jsonify({"ok": False, "mensaje": "Solo se aceptan archivos PDF"}), 400
|
|
|
|
pdf_bytes = file.read()
|
|
if len(pdf_bytes) > 20 * 1024 * 1024:
|
|
return jsonify({"ok": False, "mensaje": "El PDF es demasiado grande (max 20 MB)"}), 400
|
|
|
|
tdir = get_tickets_dir(session["usuario"])
|
|
# Nombre seguro: eliminar caracteres problemáticos
|
|
nombre_seguro = re.sub(r'[^\w\-. ]', '_', file.filename)
|
|
dest = tdir / nombre_seguro
|
|
with open(dest, 'wb') as f:
|
|
f.write(pdf_bytes)
|
|
|
|
return jsonify({"ok": True, "mensaje": f"PDF guardado: {nombre_seguro}"})
|
|
|
|
# -----------------------------------------------------------------------
|
|
# API: forzar regeneracion del pipeline
|
|
# -----------------------------------------------------------------------
|
|
@app.route("/api/regenerar", methods=["POST"])
|
|
@login_required
|
|
def api_regenerar():
|
|
usuario = session["usuario"]
|
|
try:
|
|
r1 = subprocess.run(
|
|
[sys.executable, str(BASE_DIR / "autocompra7.py"), "--usuario", usuario],
|
|
cwd=str(BASE_DIR), capture_output=True, text=True, timeout=120
|
|
)
|
|
if r1.returncode != 0:
|
|
return jsonify({"ok": False, "mensaje": "Error en pipeline",
|
|
"detalle": r1.stderr or r1.stdout}), 500
|
|
|
|
r2 = subprocess.run(
|
|
[sys.executable, str(BASE_DIR / "generar_lista.py"), "--usuario", usuario],
|
|
cwd=str(BASE_DIR), capture_output=True, text=True, timeout=30
|
|
)
|
|
if r2.returncode != 0:
|
|
return jsonify({"ok": False, "mensaje": "Error generando lista",
|
|
"detalle": r2.stderr or r2.stdout}), 500
|
|
|
|
return jsonify({"ok": True, "mensaje": "Pipeline ejecutado correctamente"})
|
|
except subprocess.TimeoutExpired:
|
|
return jsonify({"ok": False, "mensaje": "Timeout al ejecutar el pipeline"}), 500
|
|
|
|
# -----------------------------------------------------------------------
|
|
# API: importar tickets desde correo
|
|
# -----------------------------------------------------------------------
|
|
@app.route("/api/importar-email", methods=["POST"])
|
|
@login_required
|
|
def api_importar_email():
|
|
usuario = session["usuario"]
|
|
try:
|
|
r = subprocess.run(
|
|
[sys.executable, str(BASE_DIR / "importar_tickets_email.py"), "--usuario", usuario],
|
|
cwd=str(BASE_DIR), capture_output=True, text=True, timeout=60
|
|
)
|
|
output = r.stdout + r.stderr
|
|
if r.returncode != 0:
|
|
return jsonify({"ok": False, "mensaje": "Error al importar email",
|
|
"detalle": output}), 500
|
|
# Extraer cuántos tickets se descargaron del output
|
|
import re as _re
|
|
m = _re.search(r"Total descargados: (\d+)", output)
|
|
n = int(m.group(1)) if m else 0
|
|
return jsonify({"ok": True, "nuevos": n,
|
|
"mensaje": f"{n} ticket(s) nuevos importados del correo"})
|
|
except subprocess.TimeoutExpired:
|
|
return jsonify({"ok": False, "mensaje": "Timeout al conectar con el correo"}), 500
|
|
|
|
# -----------------------------------------------------------------------
|
|
# Archivos estaticos de tickets (solo autenticados)
|
|
# -----------------------------------------------------------------------
|
|
@app.route("/tickets/<path:filename>")
|
|
@login_required
|
|
def ticket_file(filename):
|
|
# Cada usuario solo accede a su propia carpeta de tickets
|
|
return send_from_directory(str(get_tickets_dir(session["usuario"])), filename)
|
|
|
|
# (rutas /admin, /admin/crear, /admin/eliminar definidas arriba)
|
|
|
|
# -----------------------------------------------------------------------
|
|
# API: OCR de foto de ticket
|
|
# -----------------------------------------------------------------------
|
|
@app.route("/api/ocr-ticket", methods=["POST"])
|
|
@login_required
|
|
def api_ocr_ticket():
|
|
if 'imagen' not in request.files:
|
|
return jsonify({"ok": False, "mensaje": "No se recibio ninguna imagen"}), 400
|
|
|
|
file = request.files['imagen']
|
|
if not file or file.filename == '':
|
|
return jsonify({"ok": False, "mensaje": "Archivo vacio"}), 400
|
|
|
|
content_type = file.content_type or ''
|
|
if content_type not in ALLOWED_IMAGE_TYPES:
|
|
return jsonify({"ok": False, "mensaje": "Formato no soportado. Usa JPG, PNG o WEBP"}), 400
|
|
|
|
img_bytes = file.read()
|
|
if len(img_bytes) > MAX_IMAGE_BYTES:
|
|
return jsonify({"ok": False, "mensaje": "La imagen es demasiado grande (max 15 MB)"}), 400
|
|
|
|
try:
|
|
from PIL import Image, ImageEnhance, ImageFilter
|
|
|
|
# Preprocesar para mejorar OCR: escala de grises + contraste
|
|
img = Image.open(io.BytesIO(img_bytes)).convert('RGB')
|
|
img = img.convert('L') # escala de grises
|
|
img = ImageEnhance.Contrast(img).enhance(2.0) # aumentar contraste
|
|
img = img.filter(ImageFilter.SHARPEN) # nitidez
|
|
|
|
buf = io.BytesIO()
|
|
img.save(buf, format='PNG')
|
|
img_procesada = buf.getvalue()
|
|
|
|
reader = get_ocr_reader()
|
|
lineas = reader.readtext(img_procesada, detail=0, paragraph=True)
|
|
texto = '\n'.join(lineas)
|
|
productos = parsear_texto_ticket(texto)
|
|
|
|
if not productos:
|
|
return jsonify({
|
|
"ok": False,
|
|
"mensaje": "No se encontraron productos. Prueba con una foto mas nitida.",
|
|
"texto": texto,
|
|
}), 422
|
|
|
|
# Extraer fecha del ticket si aparece en el texto
|
|
date_match = re.search(r"(\d{2}/\d{2}/\d{4})", texto)
|
|
fecha_ticket = date_match.group(1) if date_match else datetime.now().strftime('%d/%m/%Y')
|
|
|
|
# Guardar como JSON en la carpeta tickets/
|
|
ts = datetime.now().strftime('%Y%m%d_%H%M%S')
|
|
tdir = get_tickets_dir(session['usuario'])
|
|
ticket_path = tdir / f"ocr_{ts}.json"
|
|
with open(ticket_path, 'w', encoding='utf-8') as f:
|
|
json.dump({
|
|
"fecha": fecha_ticket,
|
|
"fuente": "ocr",
|
|
"archivo": file.filename,
|
|
"productos": productos,
|
|
}, f, ensure_ascii=False, indent=2)
|
|
|
|
return jsonify({
|
|
"ok": True,
|
|
"mensaje": f"{len(productos)} productos guardados",
|
|
"productos": productos,
|
|
"fecha": fecha_ticket,
|
|
})
|
|
|
|
except Exception as e:
|
|
return jsonify({"ok": False, "mensaje": f"Error al procesar la imagen: {str(e)}"}), 500
|
|
|
|
# -----------------------------------------------------------------------
|
|
# Estadisticas de consumo
|
|
# -----------------------------------------------------------------------
|
|
@app.route("/estadisticas")
|
|
@login_required
|
|
def estadisticas():
|
|
return render_template("estadisticas.html", nombre=session.get("nombre", ""))
|
|
|
|
@app.route("/api/estadisticas")
|
|
@login_required
|
|
def api_estadisticas():
|
|
import csv as _csv
|
|
usuario = session["usuario"]
|
|
datos_dir = BASE_DIR / "datos" / usuario
|
|
tickets_dir = get_tickets_dir(usuario)
|
|
|
|
# Contar archivos de tickets
|
|
try:
|
|
total_tickets = sum(1 for f in tickets_dir.iterdir() if f.is_file())
|
|
except Exception:
|
|
total_tickets = 0
|
|
|
|
# Leer gasto_mensual.csv
|
|
meses = []
|
|
gasto_csv = datos_dir / "gasto_mensual.csv"
|
|
if gasto_csv.exists():
|
|
with open(gasto_csv, encoding="utf-8") as fh:
|
|
for row in _csv.DictReader(fh):
|
|
try:
|
|
meses.append({"mes": str(row.get("mes", "")),
|
|
"gasto": round(float(row.get("precio_total", 0)), 2)})
|
|
except ValueError:
|
|
pass
|
|
meses.sort(key=lambda x: x["mes"])
|
|
_ES = ["ene","feb","mar","abr","may","jun","jul","ago","sep","oct","nov","dic"]
|
|
for m in meses:
|
|
try:
|
|
partes = m["mes"].split("-")
|
|
m["label"] = _ES[int(partes[1]) - 1] + " " + partes[0][-2:]
|
|
except Exception:
|
|
m["label"] = m["mes"]
|
|
|
|
gastos_v = [m["gasto"] for m in meses]
|
|
gasto_medio = round(sum(gastos_v) / len(gastos_v), 2) if gastos_v else 0.0
|
|
gasto_anual = round(sum(gastos_v[-12:]), 2) if gastos_v else 0.0
|
|
|
|
hoy = datetime.now()
|
|
mes_actual_key = hoy.strftime("%Y-%m")
|
|
mes_ant_key = (hoy.replace(day=1) - timedelta(days=1)).strftime("%Y-%m")
|
|
gasto_mes_actual = next((m["gasto"] for m in meses if m["mes"] == mes_actual_key), 0.0)
|
|
gasto_mes_anterior = next((m["gasto"] for m in meses if m["mes"] == mes_ant_key), 0.0)
|
|
|
|
# Leer resumen_productos.csv
|
|
top_gasto = []
|
|
top_frecuencia = []
|
|
productos_unicos = 0
|
|
resumen_csv = datos_dir / "resumen_productos.csv"
|
|
if resumen_csv.exists():
|
|
with open(resumen_csv, encoding="utf-8") as fh:
|
|
rows = list(_csv.DictReader(fh))
|
|
productos_unicos = len(rows)
|
|
top_gasto = [
|
|
{"producto": r["producto"],
|
|
"gasto_total": round(float(r.get("gasto_total", 0)), 2),
|
|
"veces": int(r.get("veces_comprado", 0))}
|
|
for r in sorted(rows, key=lambda r: float(r.get("gasto_total", 0)), reverse=True)[:10]
|
|
]
|
|
top_frecuencia = [
|
|
{"producto": r["producto"],
|
|
"veces_comprado": int(r.get("veces_comprado", 0)),
|
|
"gasto_total": round(float(r.get("gasto_total", 0)), 2)}
|
|
for r in sorted(rows, key=lambda r: int(r.get("veces_comprado", 0)), reverse=True)[:10]
|
|
]
|
|
|
|
return jsonify({
|
|
"gasto_medio_mensual": gasto_medio,
|
|
"gasto_mes_actual": gasto_mes_actual,
|
|
"gasto_mes_anterior": gasto_mes_anterior,
|
|
"gasto_anual": gasto_anual,
|
|
"total_tickets": total_tickets,
|
|
"productos_unicos": productos_unicos,
|
|
"meses": meses[-18:],
|
|
"top_productos_gasto": top_gasto,
|
|
"top_productos_frecuencia": top_frecuencia,
|
|
})
|
|
|
|
# -----------------------------------------------------------------------
|
|
# Entry point
|
|
# -----------------------------------------------------------------------
|
|
inicializar_admin() # Necesario tanto para gunicorn como para python app.py
|
|
|
|
if __name__ == "__main__":
|
|
debug = os.environ.get("FLASK_DEBUG", "false").lower() == "true"
|
|
app.run(host="127.0.0.1", port=5000, debug=debug) |