statistical tests

This commit is contained in:
2026-02-02 21:47:37 +01:00
parent 29df6a4bd9
commit f2c659c266
9 changed files with 1679 additions and 47 deletions

437
utils.py
View File

@@ -714,7 +714,7 @@ def normalize_global_values(df: pl.DataFrame, target_cols: list[str]) -> pl.Data
class QualtricsSurvey(QualtricsPlotsMixin):
"""Class to handle JPMorgan Chase survey data."""
"""Class to handle Qualtrics survey data."""
def __init__(self, data_path: Union[str, Path], qsf_path: Union[str, Path]):
if isinstance(data_path, str):
@@ -1072,6 +1072,441 @@ class QualtricsSurvey(QualtricsPlotsMixin):
return self._get_subset(q, QIDs, rename_cols=True), None
def transform_character_trait_frequency(
self,
char_df: pl.LazyFrame | pl.DataFrame,
character_column: str,
) -> tuple[pl.DataFrame, dict | None]:
"""Transform character refine data to trait frequency counts for a single character.
Original use-case: "I need a bar plot that shows the frequency of the times
each trait is chosen per brand character."
This function takes a DataFrame with comma-separated trait selections per
character, explodes traits, and counts their frequency for a single character.
Args:
char_df: Pre-fetched data
Expected columns: '_recordId', '<character_column>' (with comma-separated traits)
character_column: Name of the character column to analyze (e.g., 'Bank Teller')
Returns:
tuple: (DataFrame with columns ['trait', 'count', 'is_original'], None)
- 'trait': individual trait name
- 'count': frequency count
- 'is_original': boolean indicating if trait is in the original definition
"""
from reference import ORIGINAL_CHARACTER_TRAITS
if isinstance(char_df, pl.LazyFrame):
char_df = char_df.collect()
# Map display names to reference keys
character_key_map = {
'Bank Teller': 'the_bank_teller',
'Familiar Friend': 'the_familiar_friend',
'The Coach': 'the_coach',
'Personal Assistant': 'the_personal_assistant',
}
# Get original traits for this character
ref_key = character_key_map.get(character_column)
original_traits = set(ORIGINAL_CHARACTER_TRAITS.get(ref_key, []))
# Filter to rows where this character has a value (not null)
char_data = char_df.filter(pl.col(character_column).is_not_null())
# Split comma-separated traits and explode
exploded = (
char_data
.select(
pl.col(character_column)
.str.split(',')
.alias('traits')
)
.explode('traits')
.with_columns(
pl.col('traits').str.strip_chars().alias('trait')
)
.filter(pl.col('trait') != '')
)
# Count trait frequencies
freq_df = (
exploded
.group_by('trait')
.agg(pl.len().alias('count'))
.sort('count', descending=True)
)
# Add is_original flag
freq_df = freq_df.with_columns(
pl.col('trait').is_in(list(original_traits)).alias('is_original')
)
return freq_df, None
def compute_pairwise_significance(
self,
data: pl.LazyFrame | pl.DataFrame,
test_type: str = "auto",
alpha: float = 0.05,
correction: str = "bonferroni",
) -> tuple[pl.DataFrame, dict]:
"""Compute pairwise statistical significance tests between columns.
Original use-case: "I need to test for statistical significance and present
this in a logical manner. It should be a generalized function to work on
many dataframes."
This function performs pairwise statistical tests between all numeric columns
(excluding '_recordId') to determine which groups differ significantly.
Args:
data: Pre-fetched data with numeric columns to compare.
Expected format: rows are observations, columns are groups/categories.
Example: Voice_Scale_1_10__V14, Voice_Scale_1_10__V04, etc.
test_type: Statistical test to use:
- "auto": Automatically chooses based on data (default)
- "mannwhitney": Mann-Whitney U test (non-parametric, for continuous)
- "ttest": Independent samples t-test (parametric, for continuous)
- "chi2": Chi-square test (for count/frequency data)
alpha: Significance level (default 0.05)
correction: Multiple comparison correction method:
- "bonferroni": Bonferroni correction (conservative)
- "holm": Holm-Bonferroni (less conservative)
- "none": No correction
Returns:
tuple: (pairwise_df, metadata)
- pairwise_df: DataFrame with columns ['group1', 'group2', 'p_value',
'p_adjusted', 'significant', 'effect_size', 'mean1', 'mean2', 'n1', 'n2']
- metadata: dict with 'test_type', 'alpha', 'correction', 'n_comparisons',
'overall_test_stat', 'overall_p_value'
"""
from scipy import stats as scipy_stats
import numpy as np
if isinstance(data, pl.LazyFrame):
df = data.collect()
else:
df = data
# Get numeric columns (exclude _recordId and other non-data columns)
value_cols = [c for c in df.columns if c != '_recordId' and df[c].dtype in [pl.Float64, pl.Float32, pl.Int64, pl.Int32]]
if len(value_cols) < 2:
raise ValueError(f"Need at least 2 numeric columns for comparison, found {len(value_cols)}")
# Auto-detect test type based on data characteristics
if test_type == "auto":
# Check if data looks like counts (integers, small range) vs continuous
sample_col = df[value_cols[0]].drop_nulls()
if len(sample_col) > 0:
is_integer = sample_col.dtype in [pl.Int64, pl.Int32]
unique_ratio = sample_col.n_unique() / len(sample_col)
if is_integer and unique_ratio < 0.1:
test_type = "chi2"
else:
test_type = "mannwhitney" # Default to non-parametric
else:
test_type = "mannwhitney"
# Extract data as lists (dropping nulls for each column)
group_data = {}
for col in value_cols:
group_data[col] = df[col].drop_nulls().to_numpy()
# Compute overall test (Kruskal-Wallis for non-parametric, ANOVA for parametric)
all_groups = [group_data[col] for col in value_cols if len(group_data[col]) > 0]
if test_type in ["mannwhitney", "auto"]:
overall_stat, overall_p = scipy_stats.kruskal(*all_groups)
overall_test_name = "Kruskal-Wallis"
elif test_type == "ttest":
overall_stat, overall_p = scipy_stats.f_oneway(*all_groups)
overall_test_name = "One-way ANOVA"
else:
overall_stat, overall_p = None, None
overall_test_name = "N/A (Chi-square)"
# Compute pairwise tests
results = []
n_comparisons = len(value_cols) * (len(value_cols) - 1) // 2
for i, col1 in enumerate(value_cols):
for col2 in value_cols[i+1:]:
data1 = group_data[col1]
data2 = group_data[col2]
n1, n2 = len(data1), len(data2)
mean1 = float(np.mean(data1)) if n1 > 0 else None
mean2 = float(np.mean(data2)) if n2 > 0 else None
# Skip if either group has no data
if n1 == 0 or n2 == 0:
results.append({
'group1': self._clean_voice_label(col1),
'group2': self._clean_voice_label(col2),
'p_value': None,
'effect_size': None,
'mean1': mean1,
'mean2': mean2,
'n1': n1,
'n2': n2,
})
continue
# Perform the appropriate test
if test_type == "mannwhitney":
stat, p_value = scipy_stats.mannwhitneyu(data1, data2, alternative='two-sided')
# Effect size: rank-biserial correlation
effect_size = 1 - (2 * stat) / (n1 * n2)
elif test_type == "ttest":
stat, p_value = scipy_stats.ttest_ind(data1, data2)
# Effect size: Cohen's d
pooled_std = np.sqrt(((n1-1)*np.std(data1)**2 + (n2-1)*np.std(data2)**2) / (n1+n2-2))
effect_size = (mean1 - mean2) / pooled_std if pooled_std > 0 else 0
elif test_type == "chi2":
# Create contingency table from the two distributions
# Bin the data for chi-square
all_data = np.concatenate([data1, data2])
bins = np.histogram_bin_edges(all_data, bins='auto')
counts1, _ = np.histogram(data1, bins=bins)
counts2, _ = np.histogram(data2, bins=bins)
contingency = np.array([counts1, counts2])
# Remove zero columns
contingency = contingency[:, contingency.sum(axis=0) > 0]
if contingency.shape[1] > 1:
stat, p_value, _, _ = scipy_stats.chi2_contingency(contingency)
effect_size = np.sqrt(stat / (contingency.sum() * (min(contingency.shape) - 1)))
else:
p_value, effect_size = 1.0, 0.0
else:
raise ValueError(f"Unknown test_type: {test_type}")
results.append({
'group1': self._clean_voice_label(col1),
'group2': self._clean_voice_label(col2),
'p_value': float(p_value),
'effect_size': float(effect_size),
'mean1': mean1,
'mean2': mean2,
'n1': n1,
'n2': n2,
})
# Create DataFrame and apply multiple comparison correction
results_df = pl.DataFrame(results)
# Apply correction
p_values = results_df['p_value'].to_numpy()
valid_mask = ~np.isnan(p_values.astype(float))
p_adjusted = np.full_like(p_values, np.nan, dtype=float)
if correction == "bonferroni":
p_adjusted[valid_mask] = np.minimum(p_values[valid_mask] * n_comparisons, 1.0)
elif correction == "holm":
# Holm-Bonferroni step-down procedure
valid_p = p_values[valid_mask]
sorted_idx = np.argsort(valid_p)
sorted_p = valid_p[sorted_idx]
m = len(sorted_p)
adjusted = np.zeros(m)
for j in range(m):
adjusted[j] = sorted_p[j] * (m - j)
# Ensure monotonicity
for j in range(1, m):
adjusted[j] = max(adjusted[j], adjusted[j-1])
adjusted = np.minimum(adjusted, 1.0)
# Restore original order
p_adjusted[valid_mask] = adjusted[np.argsort(sorted_idx)]
elif correction == "none":
p_adjusted = p_values.astype(float)
results_df = results_df.with_columns([
pl.Series('p_adjusted', p_adjusted),
pl.Series('significant', p_adjusted < alpha),
])
metadata = {
'test_type': test_type,
'alpha': alpha,
'correction': correction,
'n_comparisons': n_comparisons,
'overall_test': overall_test_name,
'overall_stat': overall_stat,
'overall_p_value': overall_p,
}
return results_df, metadata
def compute_ranking_significance(
self,
data: pl.LazyFrame | pl.DataFrame,
alpha: float = 0.05,
correction: str = "bonferroni",
) -> tuple[pl.DataFrame, dict]:
"""Compute statistical significance for ranking data (e.g., Top 3 Voices).
Original use-case: "Test whether voices are ranked significantly differently
based on the distribution of 1st, 2nd, 3rd place votes."
This function takes raw ranking data (rows = respondents, columns = voices,
values = rank 1/2/3 or null) and performs:
1. Overall chi-square test on the full contingency table
2. Pairwise proportion tests comparing Rank 1 vote shares
Args:
data: Pre-fetched ranking data from get_top_3_voices() or get_character_ranking().
Expected format: rows are respondents, columns are voices/characters,
values are 1, 2, 3 (rank) or null (not ranked).
alpha: Significance level (default 0.05)
correction: Multiple comparison correction method:
- "bonferroni": Bonferroni correction (conservative)
- "holm": Holm-Bonferroni (less conservative)
- "none": No correction
Returns:
tuple: (pairwise_df, metadata)
- pairwise_df: DataFrame with columns ['group1', 'group2', 'p_value',
'p_adjusted', 'significant', 'rank1_count1', 'rank1_count2',
'rank1_pct1', 'rank1_pct2', 'total1', 'total2']
- metadata: dict with 'alpha', 'correction', 'n_comparisons',
'chi2_stat', 'chi2_p_value', 'contingency_table'
Example:
>>> ranking_data, _ = S.get_top_3_voices(data)
>>> pairwise_df, meta = S.compute_ranking_significance(ranking_data)
>>> # See which voices have significantly different Rank 1 proportions
>>> print(pairwise_df.filter(pl.col('significant') == True))
"""
from scipy import stats as scipy_stats
import numpy as np
if isinstance(data, pl.LazyFrame):
df = data.collect()
else:
df = data
# Get ranking columns (exclude _recordId)
ranking_cols = [c for c in df.columns if c != '_recordId']
if len(ranking_cols) < 2:
raise ValueError(f"Need at least 2 ranking columns, found {len(ranking_cols)}")
# Build contingency table: rows = ranks (1, 2, 3), columns = voices
# Count how many times each voice received each rank
contingency_data = {}
for col in ranking_cols:
label = self._clean_voice_label(col)
r1 = df.filter(pl.col(col) == 1).height
r2 = df.filter(pl.col(col) == 2).height
r3 = df.filter(pl.col(col) == 3).height
contingency_data[label] = [r1, r2, r3]
# Create contingency table as numpy array
labels = list(contingency_data.keys())
contingency_table = np.array([contingency_data[l] for l in labels]).T # 3 x n_voices
# Overall chi-square test on contingency table
# Tests whether rank distribution is independent of voice
chi2_stat, chi2_p, chi2_dof, _ = scipy_stats.chi2_contingency(contingency_table)
# Pairwise proportion tests for Rank 1 votes
# We use a two-proportion z-test to compare rank 1 proportions
results = []
n_comparisons = len(labels) * (len(labels) - 1) // 2
# Total respondents who ranked any voice in top 3
total_respondents = df.height
for i, label1 in enumerate(labels):
for label2 in labels[i+1:]:
r1_count1 = contingency_data[label1][0] # Rank 1 votes for voice 1
r1_count2 = contingency_data[label2][0] # Rank 1 votes for voice 2
# Total times each voice was ranked (1st + 2nd + 3rd)
total1 = sum(contingency_data[label1])
total2 = sum(contingency_data[label2])
# Calculate proportions of Rank 1 out of all rankings for each voice
pct1 = r1_count1 / total1 if total1 > 0 else 0
pct2 = r1_count2 / total2 if total2 > 0 else 0
# Two-proportion z-test
# H0: p1 = p2 (both voices have same proportion of Rank 1)
if total1 > 0 and total2 > 0 and (r1_count1 + r1_count2) > 0:
# Pooled proportion
p_pooled = (r1_count1 + r1_count2) / (total1 + total2)
# Standard error
se = np.sqrt(p_pooled * (1 - p_pooled) * (1/total1 + 1/total2))
if se > 0:
z_stat = (pct1 - pct2) / se
p_value = 2 * (1 - scipy_stats.norm.cdf(abs(z_stat))) # Two-tailed
else:
p_value = 1.0
else:
p_value = 1.0
results.append({
'group1': label1,
'group2': label2,
'p_value': float(p_value),
'rank1_count1': r1_count1,
'rank1_count2': r1_count2,
'rank1_pct1': round(pct1 * 100, 1),
'rank1_pct2': round(pct2 * 100, 1),
'total1': total1,
'total2': total2,
})
# Create DataFrame and apply correction
results_df = pl.DataFrame(results)
p_values = results_df['p_value'].to_numpy()
p_adjusted = np.full_like(p_values, np.nan, dtype=float)
if correction == "bonferroni":
p_adjusted = np.minimum(p_values * n_comparisons, 1.0)
elif correction == "holm":
sorted_idx = np.argsort(p_values)
sorted_p = p_values[sorted_idx]
m = len(sorted_p)
adjusted = np.zeros(m)
for j in range(m):
adjusted[j] = sorted_p[j] * (m - j)
for j in range(1, m):
adjusted[j] = max(adjusted[j], adjusted[j-1])
adjusted = np.minimum(adjusted, 1.0)
p_adjusted = adjusted[np.argsort(sorted_idx)]
elif correction == "none":
p_adjusted = p_values.astype(float)
results_df = results_df.with_columns([
pl.Series('p_adjusted', p_adjusted),
pl.Series('significant', p_adjusted < alpha),
])
# Sort by p_value for easier inspection
results_df = results_df.sort('p_value')
metadata = {
'test_type': 'proportion_z_test',
'alpha': alpha,
'correction': correction,
'n_comparisons': n_comparisons,
'chi2_stat': chi2_stat,
'chi2_p_value': chi2_p,
'chi2_dof': chi2_dof,
'overall_test': 'Chi-square',
'overall_stat': chi2_stat,
'overall_p_value': chi2_p,
'contingency_table': {label: contingency_data[label] for label in labels},
}
return results_df, metadata
def process_speaking_style_data(