import collections
import logging
import statistics
from typing import Iterable, Union
import numpy as np
import pandas as pd
from sklearn.neighbors import NearestNeighbors
from ..constants import RANDOM_SEED
from .types import AggregatedMetricResult, MetricResult
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def _safelog(a: np.ndarray) -> np.ndarray:
"""Compute safe log that handles zeros by returning 0.
Args:
a: Input array
Returns:
Array with log values, with 0s where input was 0
"""
a = a.astype("float")
return np.log(a, out=np.zeros_like(a), where=(a != 0))
[docs]
def nearest_neighbors_hnsw(
data: np.ndarray,
expansion_factor: int = 200,
max_links: int = 48,
n_neighbors: int = 100,
random_seed: int = RANDOM_SEED,
) -> tuple[np.ndarray, np.ndarray]:
"""Find nearest neighbors using HNSW algorithm.
Args:
data: Input data matrix of shape (n_samples, n_features)
expansion_factor: Size of dynamic candidate list for search
max_links: Number of bi-directional links created for every new element
n_neighbors: Number of nearest neighbors to find
Returns:
Tuple containing:
- Indices array of shape (n_samples, n_neighbors)
- Distances array of shape (n_samples, n_neighbors)
"""
import hnswlib
if n_neighbors > data.shape[0]:
raise ValueError(
f"n_neighbors ({n_neighbors}) must be less than or equal to the number of samples: {data.shape[0]}"
)
sample_indices = np.arange(data.shape[0])
index = hnswlib.Index(space="l2", dim=data.shape[1])
index.init_index(
max_elements=data.shape[0],
ef_construction=expansion_factor,
M=max_links,
random_seed=random_seed,
)
index.add_items(data, sample_indices)
index.set_ef(expansion_factor)
neighbor_indices, distances = index.knn_query(data, k=n_neighbors)
return neighbor_indices, distances
[docs]
def compute_entropy_per_cell(
X: np.ndarray,
labels: Union[pd.Categorical, pd.Series, np.ndarray],
n_neighbors: int = 200,
random_seed: int = RANDOM_SEED,
) -> np.ndarray:
"""Compute entropy of batch labels in local neighborhoods.
For each cell, finds nearest neighbors and computes entropy of
batch label distribution in that neighborhood.
Args:
X: Cell Embedding matrix of shape (n_cells, n_features)
labels: Series containing batch labels for each cell
n_neighbors: Number of nearest neighbors to consider
random_seed: Random seed for reproducibility
Returns:
Array of entropy values for each cell, normalized by log of number of batches
"""
if n_neighbors > X.shape[0]:
n_neighbors = X.shape[0]
logger.warning(
f"n_neighbors ({n_neighbors}) is greater than the number of samples ({X.shape[0]}). Setting n_neighbors to {n_neighbors}."
)
indices, _ = nearest_neighbors_hnsw(
X, n_neighbors=n_neighbors, random_seed=random_seed
)
labels = np.array(list(labels))
unique_batch_labels = np.unique(labels)
indices_batch = labels[indices]
label_counts_per_cell = np.vstack(
[(indices_batch == label).sum(1) for label in unique_batch_labels]
).T
label_counts_per_cell_normed = (
label_counts_per_cell / label_counts_per_cell.sum(1)[:, None]
)
return (
(-label_counts_per_cell_normed * _safelog(label_counts_per_cell_normed)).sum(1)
/ _safelog(np.array([len(unique_batch_labels)]))
).mean()
[docs]
def jaccard_score(y_true: set[str], y_pred: set[str]):
"""Compute Jaccard similarity between true and predicted values.
Args:
y_true: True values
y_pred: Predicted values
"""
return len(y_true.intersection(y_pred)) / len(y_true.union(y_pred))
[docs]
def mean_fold_metric(results_df, metric="accuracy", classifier=None):
"""Compute mean of a metric across folds.
Args:
results_df: DataFrame containing cross-validation results. Must have columns:
- "classifier": Name of the classifier (e.g., "lr", "knn")
And one of the following metric columns:
- "accuracy": For accuracy scores
- "f1": For F1 scores
- "precision": For precision scores
- "recall": For recall scores
metric: Name of metric column to average ("accuracy", "f1", etc.)
classifier: Optional classifier name to filter results
Returns:
Mean value of the metric across folds
Raises:
KeyError: If the specified metric column is not present in results_df
"""
if classifier:
df = results_df[results_df["classifier"] == classifier]
else:
df = results_df
return df[metric].mean()
[docs]
def single_metric(results_df, metric: str, **kwargs):
"""Get a single metric value from filtered results.
Args:
results_df: DataFrame containing classification results
metric: Name of metric column to extract ("accuracy", "f1", etc.)
**kwargs: Filter parameters (e.g., classifier, train_species, test_species)
Returns:
Single metric value from the filtered results
Raises:
ValueError: If filtering results in 0 or >1 rows
KeyError: If the specified metric column is not present in results_df
"""
df = results_df.copy()
for param, value in kwargs.items():
if param in df.columns:
df = df[df[param] == value]
if len(df) == 0:
raise ValueError(f"No results found after filtering with {kwargs!r}")
elif len(df) > 1:
raise ValueError(
f"Multiple results found after filtering with {kwargs!r}. Expected exactly 1 row."
)
return df[metric].iloc[0]
[docs]
def aggregate_results(results: Iterable[MetricResult]) -> list[AggregatedMetricResult]:
"""aggregate a collection of MetricResults by their type and parameters"""
grouped_results = collections.defaultdict(list)
for result in results:
grouped_results[result.aggregation_key].append(result)
aggregated = []
for results_to_agg in grouped_results.values():
values_raw = [result.value for result in results_to_agg]
value_mean = statistics.mean(values_raw)
try:
value_std_dev = statistics.stdev(values_raw, xbar=value_mean)
except statistics.StatisticsError:
# we only had one result so we can't compute it
value_std_dev = None
aggregated.append(
AggregatedMetricResult(
metric_type=results_to_agg[0].metric_type,
params=results_to_agg[0].params,
value=value_mean,
value_std_dev=value_std_dev,
values_raw=values_raw,
n_values=len(values_raw),
)
)
return aggregated
def _normalize_sequential_labels(labels: np.ndarray) -> np.ndarray:
"""
Validate that labels are numeric or can be converted to numeric.
Raises error for string/character labels that can't be ordered.
"""
labels = np.asarray(labels)
# Check if labels are strings/characters
if labels.dtype.kind in ["U", "S", "O"]: # Unicode, byte string, or object
# Try to convert to numeric
try:
labels = labels.astype(float)
except (ValueError, TypeError):
raise ValueError(
"Labels must be numeric or convertible to numeric. "
"String/character labels are not supported as they don't have inherent ordering. "
f"Got labels with dtype: {labels.dtype}"
)
# Ensure numeric type
if not np.issubdtype(labels.dtype, np.number):
try:
labels = labels.astype(float)
except (ValueError, TypeError):
raise ValueError(
f"Cannot convert labels to numeric type. Got dtype: {labels.dtype}"
)
return labels
[docs]
def sequential_alignment(
X: np.ndarray,
labels: np.ndarray,
k: int = 10,
normalize: bool = True,
adaptive_k: bool = False,
) -> float:
"""
Measure how sequentially close neighbors are in embedding space.
Works with UNSORTED data - does not assume X and labels are pre-sorted.
Parameters:
-----------
X : np.ndarray
Embedding matrix of shape (n_samples, n_features) (can be unsorted)
labels : np.ndarray
Sequential labels of shape (n_samples,) (can be unsorted)
Must be numeric or convertible to numeric. String labels will raise error.
k : int
Number of neighbors to consider
normalize : bool
Whether to normalize score to [0,1] range
adaptive_k : bool
Use adaptive k based on local density
Returns:
--------
float: Sequential alignment score (higher = better sequential consistency)
"""
X = np.asarray(X)
labels = _normalize_sequential_labels(labels)
if len(X) != len(labels):
raise ValueError("X and labels must have same length")
if len(X) < k + 1:
raise ValueError(f"Need at least {k + 1} samples for k={k}")
# Handle edge case: all labels the same
if len(np.unique(labels)) == 1:
return 1.0 if normalize else 0.0
if adaptive_k:
k_values = _compute_adaptive_k(X, k)
else:
k_values = np.array([k] * len(X))
# Find neighbors for each point
max_k = max(k_values)
nn = NearestNeighbors(n_neighbors=max_k + 1).fit(X)
distances, indices = nn.kneighbors(X)
# Calculate sequential distances for each point's neighborhood
sequential_distances = []
for i in range(len(X)):
k_i = k_values[i]
# Skip self (index 0)
neighbor_indices = indices[i, 1 : k_i + 1]
neighbor_labels = labels[neighbor_indices]
# Mean absolute sequential distance to k nearest neighbors
sequential_dist = np.mean(np.abs(labels[i] - neighbor_labels))
sequential_distances.append(sequential_dist)
mean_sequential_distance = np.mean(sequential_distances)
if not normalize:
return mean_sequential_distance
# Compare against expected random sequential distance
baseline = _compute_random_baseline(labels, k)
# Normalize: 1 = perfect sequential consistency, 0 = random
if baseline > 0:
normalized_score = 1 - (mean_sequential_distance / baseline)
normalized_score = float(np.clip(normalized_score, 0, 1))
else:
normalized_score = 1.0
return normalized_score
def _compute_adaptive_k(X: np.ndarray, base_k: int) -> np.ndarray:
"""Compute adaptive k values based on local density."""
# Choose a neighborhood size for density estimation
# We want a neighborhood larger than base_k (so density reflects a wider local area),
# avoid n_neighbors==1 (which returns self-distance==0) and ensure we don't exceed
# the available samples.
suggested = max(2, base_k * 3, len(X) // 4)
max_allowed = max(1, len(X) - 1)
density_k = int(min(30, suggested, max_allowed))
# Fall back to at least 2 neighbors if dataset is very small
density_k = max(2, density_k)
nn_density = NearestNeighbors(n_neighbors=density_k).fit(X)
distances, _ = nn_density.kneighbors(X)
mean_distances = distances[:, -1]
densities = 1 / (mean_distances + 1e-10)
min_density, max_density = np.percentile(densities, [10, 90])
normalized_densities = np.clip(
(densities - min_density) / (max_density - min_density + 1e-10), 0, 1
)
k_scale = 0.5 + 1.5 * (1 - normalized_densities)
k_values = np.round(base_k * k_scale).astype(int)
upper_bound = min(50, len(X) // 2)
lower_bound = 3
if upper_bound < lower_bound:
k_values = np.full_like(k_values, lower_bound)
else:
k_values = np.clip(k_values, lower_bound, upper_bound)
return k_values
def _compute_random_baseline(labels: np.ndarray, k: int) -> float:
"""Compute expected sequential distance for random neighbors."""
unique_labels = np.unique(labels)
if len(unique_labels) == 1:
return 0.0
n = len(labels)
# ensure k does not exceed available neighbors and is at least 1
k = max(1, min(k, n - 1))
n_samples = min(10000, n * 10)
random_diffs = []
for _ in range(n_samples):
# pick a random reference index
i = np.random.randint(0, n)
# sample k distinct neighbor indices excluding i
if k == n - 1:
neighbors = np.delete(np.arange(n), i)
else:
# sample from range [0, n-2] then map to [0, n-1] skipping i
choices = np.random.choice(n - 1, size=k, replace=False)
neighbors = choices + (choices >= i).astype(int)
# mean absolute difference between label[i] and its k random neighbors
random_diffs.append(np.mean(np.abs(labels[i] - labels[neighbors])))
return float(np.mean(random_diffs))