Source code for fivedreg.data.dataloader
import pickle
import numpy as np
from typing import Dict, Any
[docs]
class DataLoader:
[docs]
def __init__(self, data_path: str):
"""
Initialize DataLoader with path to pickle file.
Args:
data_path: Path to the pickle file containing the dataset dictionary
"""
self.data_path = data_path
[docs]
def load_data(self) -> Dict[str, Any]:
"""
Load data from pickle file.
Expected format:
{
'X': numpy array of shape (n_samples, n_features),
'y': numpy array of shape (n_samples,),
'metadata': dict with dataset information
}
Returns:
Dictionary containing 'X', 'y', and 'metadata' keys
"""
with open(self.data_path, 'rb') as f:
return pickle.load(f)
[docs]
def get_data_summary(self, data: Dict[str, Any]) -> Dict[str, Any]:
"""
Get a structured summary of the data dictionary.
Args:
data: Dictionary with 'X', 'y', and 'metadata' keys
Returns:
Dictionary containing structured summary data
"""
if not isinstance(data, dict):
raise ValueError("Data must be a dictionary with 'X', 'y', and 'metadata' keys")
# Check required keys
required_keys = ['X', 'y', 'metadata']
missing_keys = [key for key in required_keys if key not in data]
if missing_keys:
raise ValueError(f"Missing required keys: {missing_keys}")
X = data['X']
y = data['y']
metadata = data['metadata']
summary = {
"dataset_structure": {
"type": type(data).__name__,
"keys": list(data.keys())
},
"feature_matrix": {
"shape": list(X.shape),
"type": type(X).__name__,
"dtype": str(X.dtype),
"first_5_rows": X[:5].tolist() if len(X) >= 5 else X.tolist()
},
"target_vector": {
"shape": list(y.shape),
"type": type(y).__name__,
"dtype": str(y.dtype),
"first_10_values": y[:10].tolist() if len(y) >= 10 else y.tolist()
},
"metadata": metadata,
"validation": {}
}
# Add optional statistics for X
if hasattr(X, 'min') and hasattr(X, 'max'):
summary["feature_matrix"]["min"] = float(X.min())
summary["feature_matrix"]["max"] = float(X.max())
if hasattr(X, 'mean'):
summary["feature_matrix"]["mean"] = float(X.mean())
if hasattr(X, 'std'):
summary["feature_matrix"]["std"] = float(X.std())
# Add optional statistics for y
if hasattr(y, 'min') and hasattr(y, 'max'):
summary["target_vector"]["min"] = float(y.min())
summary["target_vector"]["max"] = float(y.max())
if hasattr(y, 'mean'):
summary["target_vector"]["mean"] = float(y.mean())
if hasattr(y, 'std'):
summary["target_vector"]["std"] = float(y.std())
# Data validation
if isinstance(X, np.ndarray) and isinstance(y, np.ndarray):
validation = {}
if X.shape[0] != y.shape[0]:
validation["sample_size_mismatch"] = True
validation["message"] = f"Mismatch in sample sizes - X has {X.shape[0]} samples, y has {y.shape[0]} samples"
else:
validation["sample_size_mismatch"] = False
validation["sample_count"] = int(X.shape[0])
# Check for NaN values
x_nan_count = int(np.isnan(X).sum()) if X.dtype in [np.float32, np.float64] else 0
y_nan_count = int(np.isnan(y).sum()) if y.dtype in [np.float32, np.float64] else 0
validation["nan_count_x"] = x_nan_count
validation["nan_count_y"] = y_nan_count
summary["validation"] = validation
return summary