Spaces:
Runtime error
Runtime error
| import pandas as pd | |
| import io | |
| import re | |
| import yaml | |
| from typing import List, Optional | |
| from fastapi import FastAPI, File, UploadFile, HTTPException, Query | |
| from fastapi.responses import JSONResponse | |
| import uvicorn | |
| from fastapi.middleware.cors import CORSMiddleware | |
| app = FastAPI() | |
| # Carregar configuração | |
| with open("column_config.yaml") as f: | |
| COLUMN_CONFIG = yaml.safe_load(f) | |
| # Função para detectar tipos de colunas | |
| def detect_column_type(dtype): | |
| if pd.api.types.is_datetime64_any_dtype(dtype): | |
| return "datetime" | |
| elif pd.api.types.is_numeric_dtype(dtype): | |
| return "number" | |
| return "text" | |
| # Normalização de colunas | |
| def normalize_column_names(column_names: List[str]) -> List[str]: | |
| normalized = [] | |
| for raw_col in column_names: | |
| sanitized = re.sub(r'[\W]+', '_', raw_col.strip()).lower().strip('_') | |
| for config_col, config in COLUMN_CONFIG['columns'].items(): | |
| synonyms = [ | |
| re.sub(r'[\W]+', '_', s.strip()).lower().strip('_') | |
| for s in [config_col] + config.get('synonyms', []) | |
| ] | |
| if sanitized in synonyms: | |
| normalized.append(config_col) | |
| break | |
| else: | |
| normalized.append(sanitized) | |
| return normalized | |
| # Limpeza de dados aprimorada | |
| def clean_data(df: pd.DataFrame) -> pd.DataFrame: | |
| df.columns = normalize_column_names(df.columns) | |
| # Tratamento de valores ausentes | |
| for col in df.columns: | |
| if col in COLUMN_CONFIG['columns']: | |
| col_type = COLUMN_CONFIG['columns'][col].get('type', 'text') | |
| if col_type == 'datetime': | |
| df[col] = pd.to_datetime(df[col], errors='coerce') | |
| elif col_type == 'numeric': | |
| df[col] = pd.to_numeric(df[col], errors='coerce') | |
| elif col_type == 'categorical': | |
| allowed = COLUMN_CONFIG['columns'][col].get('allowed', []) | |
| df[col] = df[col].where(df[col].isin(allowed), None) | |
| # Tratamento de formatos inconsistentes | |
| for col in df.columns: | |
| if col in COLUMN_CONFIG['columns']: | |
| col_type = COLUMN_CONFIG['columns'][col].get('type', 'text') | |
| if col_type == 'datetime': | |
| fmt = COLUMN_CONFIG['columns'][col].get('format') | |
| df[col] = pd.to_datetime(df[col], errors='coerce', format=fmt) | |
| df[col] = df[col].dt.strftime('%Y-%m-%dT%H:%M:%SZ') | |
| elif col_type == 'numeric': | |
| df[col] = pd.to_numeric(df[col], errors='coerce').astype(float) | |
| elif col_type == 'categorical': | |
| allowed = COLUMN_CONFIG['columns'][col].get('allowed', []) | |
| df[col] = df[col].where(df[col].isin(allowed)) | |
| # Tratamento de outliers | |
| for col in df.columns: | |
| if col in COLUMN_CONFIG['columns']: | |
| col_type = COLUMN_CONFIG['columns'][col].get('type', 'text') | |
| if col_type == 'numeric': | |
| q1 = df[col].quantile(0.25) | |
| q3 = df[col].quantile(0.75) | |
| iqr = q3 - q1 | |
| lower_bound = q1 - 1.5 * iqr | |
| upper_bound = q3 + 1.5 * iqr | |
| df[col] = df[col].clip(lower=lower_bound, upper=upper_bound) | |
| # Tratamento de registros duplicados | |
| df.drop_duplicates(inplace=True) | |
| # Tratamento de tipos de dados mistos | |
| for col in df.columns: | |
| if col in COLUMN_CONFIG['columns']: | |
| col_type = COLUMN_CONFIG['columns'][col].get('type', 'text') | |
| if col_type == 'numeric': | |
| df[col] = pd.to_numeric(df[col], errors='coerce') | |
| elif col_type == 'datetime': | |
| df[col] = pd.to_datetime(df[col], errors='coerce') | |
| # Tratamento de dados ruídos | |
| for col in df.columns: | |
| if col in COLUMN_CONFIG['columns']: | |
| col_type = COLUMN_CONFIG['columns'][col].get('type', 'text') | |
| if col_type == 'text': | |
| df[col] = df[col].str.strip().str.lower() | |
| return df.replace({pd.NA: None}) | |
| # Função para processar o arquivo e retornar dados limpos | |
| def process_file(file: UploadFile, sheet_name: Optional[str] = None) -> pd.DataFrame: | |
| try: | |
| content = file.file.read() | |
| extension = file.filename.split('.')[-1] | |
| if extension == 'csv': | |
| df = pd.read_csv(io.BytesIO(content)) | |
| elif extension == 'xlsx': | |
| if sheet_name is None: | |
| sheet_name = 0 # Default to the first sheet | |
| df = pd.read_excel(io.BytesIO(content), sheet_name=sheet_name) | |
| else: | |
| raise HTTPException(400, "Formato de arquivo não suportado") | |
| return df, clean_data(df) | |
| except Exception as e: | |
| raise HTTPException(500, f"Erro ao processar o arquivo: {str(e)}") | |
| # Endpoint para upload e processamento de arquivos | |
| async def process_file_endpoint(file: UploadFile = File(...), sheet_name: Optional[str] = Query(None)): | |
| try: | |
| raw_df, df = process_file(file, sheet_name) | |
| columns = [{ | |
| "name": col, | |
| "type": detect_column_type(df[col].dtype) | |
| } for col in df.columns] | |
| rows = [] | |
| for idx, row in df.iterrows(): | |
| cells = {} | |
| for col, val in row.items(): | |
| cells[col] = { | |
| "value": val, | |
| "displayValue": str(val), | |
| "columnId": col | |
| } | |
| rows.append({"id": str(idx), "cells": cells}) | |
| return JSONResponse( | |
| content={ | |
| "data": { | |
| "columns": columns, | |
| "rows": rows | |
| }, | |
| "metadata": { | |
| "totalRows": len(df), | |
| "processedAt": pd.Timestamp.now().isoformat() | |
| } | |
| }) | |
| except Exception as e: | |
| raise HTTPException(500, f"Erro: {str(e)}") | |
| # Configuração de CORS | |
| app.add_middleware( | |
| CORSMiddleware, | |
| allow_origins=["*"], | |
| allow_credentials=True, | |
| allow_methods=["*"], | |
| allow_headers=["*"], | |
| ) | |
| if __name__ == "__main__": | |
| uvicorn.run("app:app", host="0.0.0.0", port=7860, reload=True) |