946 lines
37 KiB
Python
946 lines
37 KiB
Python
"""Plotting functions for Voice Branding analysis using Altair."""
|
|
|
|
import re
|
|
import math
|
|
from pathlib import Path
|
|
|
|
import altair as alt
|
|
import pandas as pd
|
|
import polars as pl
|
|
from theme import ColorPalette
|
|
|
|
import hashlib
|
|
|
|
class JPMCPlotsMixin:
|
|
"""Mixin class for plotting functions in JPMCSurvey."""
|
|
|
|
def _process_title(self, title: str) -> str | list[str]:
|
|
"""Process title to handle <br> tags for Altair."""
|
|
if isinstance(title, str) and '<br>' in title:
|
|
return title.split('<br>')
|
|
return title
|
|
|
|
def _sanitize_filename(self, title: str) -> str:
|
|
"""Convert plot title to a safe filename."""
|
|
# Remove HTML tags
|
|
clean = re.sub(r'<[^>]+>', ' ', title)
|
|
# Replace special characters with underscores
|
|
clean = re.sub(r'[^\w\s-]', '', clean)
|
|
# Replace whitespace with underscores
|
|
clean = re.sub(r'\s+', '_', clean.strip())
|
|
# Remove consecutive underscores
|
|
clean = re.sub(r'_+', '_', clean)
|
|
# Lowercase and limit length
|
|
return clean.lower()[:100]
|
|
|
|
def _get_filter_slug(self) -> str:
|
|
"""Generate a directory-friendly slug based on active filters."""
|
|
parts = []
|
|
|
|
# Mapping of attribute name to (short_code, value, options_attr)
|
|
filters = [
|
|
('age', 'Age', getattr(self, 'filter_age', None), 'options_age'),
|
|
('gender', 'Gen', getattr(self, 'filter_gender', None), 'options_gender'),
|
|
('consumer', 'Cons', getattr(self, 'filter_consumer', None), 'options_consumer'),
|
|
('ethnicity', 'Eth', getattr(self, 'filter_ethnicity', None), 'options_ethnicity'),
|
|
('income', 'Inc', getattr(self, 'filter_income', None), 'options_income'),
|
|
]
|
|
|
|
for _, short_code, value, options_attr in filters:
|
|
if value is None:
|
|
continue
|
|
|
|
# Ensure value is a list for uniform handling
|
|
if not isinstance(value, list):
|
|
value = [value]
|
|
|
|
if len(value) == 0:
|
|
continue
|
|
|
|
# Check if all options are selected (equivalent to no filter)
|
|
# We compare the set of selected values to the set of all available options
|
|
master_list = getattr(self, options_attr, None)
|
|
if master_list and set(value) == set(master_list):
|
|
continue
|
|
|
|
if len(value) > 3:
|
|
# If more than 3 options selected, create a hash of the sorted values
|
|
# This ensures uniqueness properly while keeping the slug short
|
|
sorted_vals = sorted([str(v) for v in value])
|
|
vals_str = "".join(sorted_vals)
|
|
# Create short 6-char hash
|
|
val_hash = hashlib.md5(vals_str.encode()).hexdigest()[:6]
|
|
val_str = f"{len(value)}_grps_{val_hash}"
|
|
else:
|
|
# Join values with '+'
|
|
clean_values = []
|
|
for v in value:
|
|
# Simple sanitization: keep alphanum and hyphens/dots, remove others
|
|
s = str(v)
|
|
# Remove special chars that might be problematic in dir names
|
|
s = re.sub(r'[^\w\-\.]', '', s)
|
|
clean_values.append(s)
|
|
val_str = "+".join(clean_values)
|
|
|
|
parts.append(f"{short_code}-{val_str}")
|
|
|
|
if not parts:
|
|
return "All_Respondents"
|
|
|
|
return "_".join(parts)
|
|
|
|
def _get_filter_description(self) -> str:
|
|
"""Generate a human-readable description of active filters."""
|
|
parts = []
|
|
|
|
# Mapping of attribute name to (display_name, value, options_attr)
|
|
filters = [
|
|
('Age', getattr(self, 'filter_age', None), 'options_age'),
|
|
('Gender', getattr(self, 'filter_gender', None), 'options_gender'),
|
|
('Consumer', getattr(self, 'filter_consumer', None), 'options_consumer'),
|
|
('Ethnicity', getattr(self, 'filter_ethnicity', None), 'options_ethnicity'),
|
|
('Income', getattr(self, 'filter_income', None), 'options_income'),
|
|
]
|
|
|
|
for display_name, value, options_attr in filters:
|
|
if value is None:
|
|
continue
|
|
|
|
# Ensure value is a list for uniform handling
|
|
if not isinstance(value, list):
|
|
value = [value]
|
|
|
|
if len(value) == 0:
|
|
continue
|
|
|
|
# Check if all options are selected (equivalent to no filter)
|
|
master_list = getattr(self, options_attr, None)
|
|
if master_list and set(value) == set(master_list):
|
|
continue
|
|
|
|
# Use original values for display (full list)
|
|
clean_values = [str(v) for v in value]
|
|
val_str = ", ".join(clean_values)
|
|
# Use UPPERCASE for category name to distinguish from values
|
|
parts.append(f"{display_name.upper()}: {val_str}")
|
|
|
|
if not parts:
|
|
return ""
|
|
|
|
# Join with clear separator - double space for visual break
|
|
return "Filters: " + " — ".join(parts)
|
|
|
|
def _add_filter_footnote(self, chart: alt.Chart) -> alt.Chart:
|
|
"""Add a footnote with active filters to the chart.
|
|
|
|
Uses chart subtitle for filter text to avoid layout issues with vconcat.
|
|
Returns the modified chart (or original if no filters).
|
|
"""
|
|
filter_text = self._get_filter_description()
|
|
|
|
# Skip if no filters active - return original chart
|
|
if not filter_text:
|
|
return chart
|
|
|
|
# Wrap text into multiple lines at ~100 chars, but don't break mid-word
|
|
max_line_length = 100
|
|
words = filter_text.split()
|
|
lines = []
|
|
current_line = ""
|
|
|
|
for word in words:
|
|
test_line = f"{current_line} {word}".strip() if current_line else word
|
|
if len(test_line) <= max_line_length:
|
|
current_line = test_line
|
|
else:
|
|
if current_line:
|
|
lines.append(current_line)
|
|
current_line = word
|
|
if current_line:
|
|
lines.append(current_line)
|
|
|
|
# Get existing title from chart spec
|
|
chart_spec = chart.to_dict()
|
|
existing_title = chart_spec.get('title', '')
|
|
|
|
# Handle different title formats (string vs dict vs list)
|
|
if isinstance(existing_title, (str, list)):
|
|
title_config = {
|
|
'text': existing_title,
|
|
'subtitle': lines,
|
|
'subtitleColor': 'gray',
|
|
'subtitleFontSize': 10,
|
|
'anchor': 'start',
|
|
}
|
|
elif isinstance(existing_title, dict):
|
|
title_config = existing_title.copy()
|
|
title_config['subtitle'] = lines
|
|
title_config['subtitleColor'] = 'gray'
|
|
title_config['subtitleFontSize'] = 10
|
|
title_config['anchor'] = 'start'
|
|
else:
|
|
# No existing title, just add filters as subtitle
|
|
title_config = {
|
|
'text': '',
|
|
'subtitle': lines,
|
|
'subtitleColor': 'gray',
|
|
'subtitleFontSize': 10,
|
|
'anchor': 'start',
|
|
}
|
|
|
|
return chart.properties(title=title_config)
|
|
|
|
def _save_plot(self, chart: alt.Chart, title: str) -> alt.Chart:
|
|
"""Save chart to PNG file if fig_save_dir is set.
|
|
|
|
Returns the (potentially modified) chart with filter footnote added.
|
|
"""
|
|
# Add filter footnote - returns combined chart if filters active
|
|
chart = self._add_filter_footnote(chart)
|
|
|
|
if hasattr(self, 'fig_save_dir') and self.fig_save_dir:
|
|
path = Path(self.fig_save_dir)
|
|
|
|
# Add filter slug subfolder
|
|
filter_slug = self._get_filter_slug()
|
|
path = path / filter_slug
|
|
|
|
if not path.exists():
|
|
path.mkdir(parents=True, exist_ok=True)
|
|
|
|
filename = f"{self._sanitize_filename(title)}.png"
|
|
filepath = path / filename
|
|
|
|
# Use vl_convert directly with theme config for consistent rendering
|
|
import vl_convert as vlc
|
|
from theme import jpmc_altair_theme
|
|
|
|
# Get chart spec and theme config
|
|
chart_spec = chart.to_dict()
|
|
theme_config = jpmc_altair_theme()['config']
|
|
|
|
png_data = vlc.vegalite_to_png(
|
|
vl_spec=chart_spec,
|
|
scale=2.0,
|
|
ppi=72,
|
|
config=theme_config
|
|
)
|
|
|
|
with open(filepath, 'wb') as f:
|
|
f.write(png_data)
|
|
|
|
return chart
|
|
|
|
def _ensure_dataframe(self, data: pl.LazyFrame | pl.DataFrame | None) -> pl.DataFrame:
|
|
"""Ensure data is an eager DataFrame, collecting if necessary."""
|
|
df = data if data is not None else getattr(self, 'data_filtered', None)
|
|
if df is None:
|
|
raise ValueError("No data provided and self.data_filtered is None.")
|
|
|
|
if isinstance(df, pl.LazyFrame):
|
|
return df.collect()
|
|
return df
|
|
|
|
def _clean_voice_label(self, col_name: str) -> str:
|
|
"""Extract and clean voice name from column name for display.
|
|
|
|
Handles patterns like:
|
|
- 'Voice_Scale__The_Coach' -> 'The Coach'
|
|
- 'Character_Ranking_The_Coach' -> 'The Coach'
|
|
- 'Top_3_Voices_ranking__Familiar_Friend' -> 'Familiar Friend'
|
|
"""
|
|
# First split by __ if present
|
|
label = col_name.split('__')[-1] if '__' in col_name else col_name
|
|
# Remove common prefixes
|
|
label = label.replace('Character_Ranking_', '')
|
|
label = label.replace('Top_3_Voices_ranking_', '')
|
|
# Replace underscores with spaces
|
|
label = label.replace('_', ' ').strip()
|
|
return label
|
|
|
|
def plot_average_scores_with_counts(
|
|
self,
|
|
data: pl.LazyFrame | pl.DataFrame | None = None,
|
|
title: str = "General Impression (1-10)\nPer Voice with Number of Participants Who Rated It",
|
|
x_label: str = "Stimuli",
|
|
y_label: str = "Average General Impression Rating (1-10)",
|
|
color: str = ColorPalette.PRIMARY,
|
|
height: int | None = None,
|
|
width: int | str | None = None,
|
|
domain: list[float] | None = None,
|
|
) -> alt.Chart:
|
|
"""Create a bar plot showing average scores and count of non-null values for each column."""
|
|
df = self._ensure_dataframe(data)
|
|
|
|
# Calculate stats for each column (exclude _recordId)
|
|
stats = []
|
|
for col in [c for c in df.columns if c != '_recordId']:
|
|
avg_score = df[col].mean()
|
|
non_null_count = df[col].drop_nulls().len()
|
|
label = self._clean_voice_label(col)
|
|
stats.append({
|
|
'voice': label,
|
|
'average': avg_score,
|
|
'count': non_null_count
|
|
})
|
|
|
|
# Convert to pandas for Altair (sort by average descending)
|
|
stats_df = pl.DataFrame(stats).sort('average', descending=True).to_pandas()
|
|
|
|
if domain is None:
|
|
domain = [stats_df['average'].min(), stats_df['average'].max()]
|
|
|
|
# Base bar chart
|
|
bars = alt.Chart(stats_df).mark_bar(color=color).encode(
|
|
x=alt.X('voice:N', title=x_label, sort='-y'),
|
|
y=alt.Y('average:Q', title=y_label, scale=alt.Scale(domain=domain)),
|
|
tooltip=[
|
|
alt.Tooltip('voice:N', title='Voice'),
|
|
alt.Tooltip('average:Q', title='Average', format='.2f'),
|
|
alt.Tooltip('count:Q', title='Count')
|
|
]
|
|
)
|
|
|
|
# Text overlay for counts
|
|
text = alt.Chart(stats_df).mark_text(
|
|
dy=-5,
|
|
color='black',
|
|
fontSize=10
|
|
).encode(
|
|
x=alt.X('voice:N', sort='-y'),
|
|
y=alt.Y('average:Q'),
|
|
text=alt.Text('count:Q')
|
|
)
|
|
|
|
# Combine layers
|
|
chart = (bars + text).properties(
|
|
title=self._process_title(title),
|
|
width=width or 800,
|
|
height=height or getattr(self, 'plot_height', 400)
|
|
)
|
|
|
|
chart = self._save_plot(chart, title)
|
|
return chart
|
|
|
|
def plot_top3_ranking_distribution(
|
|
self,
|
|
data: pl.LazyFrame | pl.DataFrame | None = None,
|
|
title: str = "Top 3 Rankings Distribution\nCount of 1st, 2nd, and 3rd Place Votes per Voice",
|
|
x_label: str = "Voices",
|
|
y_label: str = "Number of Mentions in Top 3",
|
|
height: int | None = None,
|
|
width: int | str | None = None,
|
|
) -> alt.Chart:
|
|
"""Create a stacked bar chart showing how often each voice was ranked 1st, 2nd, or 3rd."""
|
|
df = self._ensure_dataframe(data)
|
|
|
|
# Calculate stats per column
|
|
stats = []
|
|
for col in [c for c in df.columns if c != '_recordId']:
|
|
rank1 = df.filter(pl.col(col) == 1).height
|
|
rank2 = df.filter(pl.col(col) == 2).height
|
|
rank3 = df.filter(pl.col(col) == 3).height
|
|
total = rank1 + rank2 + rank3
|
|
|
|
if total > 0:
|
|
label = self._clean_voice_label(col)
|
|
# Add 3 rows (one per rank)
|
|
stats.append({'voice': label, 'rank': 'Rank 1 (1st Choice)', 'count': rank1, 'total': total})
|
|
stats.append({'voice': label, 'rank': 'Rank 2 (2nd Choice)', 'count': rank2, 'total': total})
|
|
stats.append({'voice': label, 'rank': 'Rank 3 (3rd Choice)', 'count': rank3, 'total': total})
|
|
|
|
# Convert to long format, sort by total
|
|
stats_df = pl.DataFrame(stats).to_pandas()
|
|
|
|
# Interactive legend selection - click to filter
|
|
selection = alt.selection_point(fields=['rank'], bind='legend')
|
|
|
|
# Create stacked bar chart with interactive legend
|
|
chart = alt.Chart(stats_df).mark_bar().encode(
|
|
x=alt.X('voice:N', title=x_label, sort=alt.EncodingSortField(field='total', op='sum', order='descending')),
|
|
y=alt.Y('count:Q', title=y_label, stack='zero'),
|
|
color=alt.Color('rank:N',
|
|
scale=alt.Scale(domain=['Rank 1 (1st Choice)', 'Rank 2 (2nd Choice)', 'Rank 3 (3rd Choice)'],
|
|
range=[ColorPalette.RANK_1, ColorPalette.RANK_2, ColorPalette.RANK_3]),
|
|
legend=alt.Legend(orient='top', direction='horizontal', title=None)),
|
|
order=alt.Order('rank:N', sort='ascending'),
|
|
opacity=alt.condition(selection, alt.value(1), alt.value(0.2)),
|
|
tooltip=[
|
|
alt.Tooltip('voice:N', title='Voice'),
|
|
alt.Tooltip('rank:N', title='Rank'),
|
|
alt.Tooltip('count:Q', title='Count')
|
|
]
|
|
).add_params(selection).properties(
|
|
title=self._process_title(title),
|
|
width=width or 800,
|
|
height=height or getattr(self, 'plot_height', 400)
|
|
)
|
|
|
|
chart = self._save_plot(chart, title)
|
|
return chart
|
|
|
|
def plot_ranking_distribution(
|
|
self,
|
|
data: pl.LazyFrame | pl.DataFrame | None = None,
|
|
title: str = "Rankings Distribution\n(1st to 4th Place)",
|
|
x_label: str = "Item",
|
|
y_label: str = "Number of Votes",
|
|
height: int | None = None,
|
|
width: int | str | None = None,
|
|
) -> alt.Chart:
|
|
"""Create a stacked bar chart showing the distribution of rankings (1st to 4th)."""
|
|
df = self._ensure_dataframe(data)
|
|
|
|
stats = []
|
|
ranking_cols = [c for c in df.columns if c != '_recordId']
|
|
|
|
for col in ranking_cols:
|
|
r1 = df.filter(pl.col(col) == 1).height
|
|
r2 = df.filter(pl.col(col) == 2).height
|
|
r3 = df.filter(pl.col(col) == 3).height
|
|
r4 = df.filter(pl.col(col) == 4).height
|
|
total = r1 + r2 + r3 + r4
|
|
|
|
if total > 0:
|
|
label = self._clean_voice_label(col)
|
|
stats.append({'item': label, 'rank': 'Rank 1 (Best)', 'count': r1, 'rank1': r1})
|
|
stats.append({'item': label, 'rank': 'Rank 2', 'count': r2, 'rank1': r1})
|
|
stats.append({'item': label, 'rank': 'Rank 3', 'count': r3, 'rank1': r1})
|
|
stats.append({'item': label, 'rank': 'Rank 4 (Worst)', 'count': r4, 'rank1': r1})
|
|
|
|
if not stats:
|
|
return alt.Chart(pd.DataFrame({'text': ['No data']})).mark_text().encode(text='text:N')
|
|
|
|
stats_df = pl.DataFrame(stats).to_pandas()
|
|
|
|
# Interactive legend selection - click to filter
|
|
selection = alt.selection_point(fields=['rank'], bind='legend')
|
|
|
|
chart = alt.Chart(stats_df).mark_bar().encode(
|
|
x=alt.X('item:N', title=x_label, sort=alt.EncodingSortField(field='rank1', order='descending')),
|
|
y=alt.Y('count:Q', title=y_label, stack='zero'),
|
|
color=alt.Color('rank:N',
|
|
scale=alt.Scale(domain=['Rank 1 (Best)', 'Rank 2', 'Rank 3', 'Rank 4 (Worst)'],
|
|
range=[ColorPalette.RANK_1, ColorPalette.RANK_2, ColorPalette.RANK_3, ColorPalette.RANK_4]),
|
|
legend=alt.Legend(orient='top', direction='horizontal', title=None)),
|
|
order=alt.Order('rank:N', sort='ascending'),
|
|
opacity=alt.condition(selection, alt.value(1), alt.value(0.2)),
|
|
tooltip=[
|
|
alt.Tooltip('item:N', title='Item'),
|
|
alt.Tooltip('rank:N', title='Rank'),
|
|
alt.Tooltip('count:Q', title='Count')
|
|
]
|
|
).add_params(selection).properties(
|
|
title=self._process_title(title),
|
|
width=width or 800,
|
|
height=height or getattr(self, 'plot_height', 400)
|
|
)
|
|
|
|
chart = self._save_plot(chart, title)
|
|
return chart
|
|
|
|
def plot_most_ranked_1(
|
|
self,
|
|
data: pl.LazyFrame | pl.DataFrame | None = None,
|
|
title: str = "Most Popular Choice\n(Number of Times Ranked 1st)",
|
|
x_label: str = "Item",
|
|
y_label: str = "Count of 1st Place Rankings",
|
|
height: int | None = None,
|
|
width: int | str | None = None,
|
|
) -> alt.Chart:
|
|
"""Create a bar chart showing which item was ranked #1 the most. Top 3 highlighted."""
|
|
df = self._ensure_dataframe(data)
|
|
|
|
stats = []
|
|
ranking_cols = [c for c in df.columns if c != '_recordId']
|
|
|
|
for col in ranking_cols:
|
|
count_rank_1 = df.filter(pl.col(col) == 1).height
|
|
label = self._clean_voice_label(col)
|
|
stats.append({'item': label, 'count': count_rank_1})
|
|
|
|
# Convert and sort
|
|
stats_df = pl.DataFrame(stats).sort('count', descending=True)
|
|
|
|
# Add rank column for coloring (1-3 vs 4+)
|
|
stats_df = stats_df.with_row_index('rank_index')
|
|
stats_df = stats_df.with_columns(
|
|
pl.when(pl.col('rank_index') < 3)
|
|
.then(pl.lit('Top 3'))
|
|
.otherwise(pl.lit('Other'))
|
|
.alias('category')
|
|
).to_pandas()
|
|
|
|
# Bar chart with conditional color
|
|
chart = alt.Chart(stats_df).mark_bar().encode(
|
|
x=alt.X('item:N', title=x_label, sort='-y'),
|
|
y=alt.Y('count:Q', title=y_label),
|
|
color=alt.Color('category:N',
|
|
scale=alt.Scale(domain=['Top 3', 'Other'],
|
|
range=[ColorPalette.PRIMARY, ColorPalette.NEUTRAL]),
|
|
legend=None),
|
|
tooltip=[
|
|
alt.Tooltip('item:N', title='Item'),
|
|
alt.Tooltip('count:Q', title='1st Place Votes')
|
|
]
|
|
).properties(
|
|
title=self._process_title(title),
|
|
width=width or 800,
|
|
height=height or getattr(self, 'plot_height', 400)
|
|
)
|
|
|
|
chart = self._save_plot(chart, title)
|
|
return chart
|
|
|
|
def plot_weighted_ranking_score(
|
|
self,
|
|
data: pl.LazyFrame | pl.DataFrame | None = None,
|
|
title: str = "Weighted Popularity Score\n(1st=3pts, 2nd=2pts, 3rd=1pt)",
|
|
x_label: str = "Character Personality",
|
|
y_label: str = "Total Weighted Score",
|
|
color: str = ColorPalette.PRIMARY,
|
|
height: int | None = None,
|
|
width: int | str | None = None,
|
|
) -> alt.Chart:
|
|
"""Create a bar chart showing the weighted ranking score for each character."""
|
|
weighted_df = self._ensure_dataframe(data).to_pandas()
|
|
|
|
# Bar chart
|
|
bars = alt.Chart(weighted_df).mark_bar(color=color).encode(
|
|
x=alt.X('Character:N', title=x_label, sort='-y'),
|
|
y=alt.Y('Weighted Score:Q', title=y_label),
|
|
tooltip=[
|
|
alt.Tooltip('Character:N'),
|
|
alt.Tooltip('Weighted Score:Q', title='Score')
|
|
]
|
|
)
|
|
|
|
# Text overlay
|
|
text = bars.mark_text(
|
|
dy=-5,
|
|
color='white',
|
|
fontSize=11
|
|
).encode(
|
|
text='Weighted Score:Q'
|
|
)
|
|
|
|
chart = (bars + text).properties(
|
|
title=self._process_title(title),
|
|
width=width or 800,
|
|
height=height or getattr(self, 'plot_height', 400)
|
|
)
|
|
|
|
chart = self._save_plot(chart, title)
|
|
return chart
|
|
|
|
def plot_voice_selection_counts(
|
|
self,
|
|
data: pl.LazyFrame | pl.DataFrame | None = None,
|
|
target_column: str = "8_Combined",
|
|
title: str = "Most Frequently Chosen Voices\n(Top 8 Highlighted)",
|
|
x_label: str = "Voice",
|
|
y_label: str = "Number of Times Chosen",
|
|
height: int | None = None,
|
|
width: int | str | None = None,
|
|
) -> alt.Chart:
|
|
"""Create a bar plot showing the frequency of voice selections."""
|
|
df = self._ensure_dataframe(data)
|
|
|
|
if target_column not in df.columns:
|
|
return alt.Chart(pd.DataFrame({'text': [f"Column '{target_column}' not found"]})).mark_text().encode(text='text:N')
|
|
|
|
# Process data: split, explode, count
|
|
stats_df = (
|
|
df.select(pl.col(target_column))
|
|
.drop_nulls()
|
|
.with_columns(pl.col(target_column).str.split(","))
|
|
.explode(target_column)
|
|
.with_columns(pl.col(target_column).str.strip_chars())
|
|
.filter(pl.col(target_column) != "")
|
|
.group_by(target_column)
|
|
.agg(pl.len().alias("count"))
|
|
.sort("count", descending=True)
|
|
.with_row_index('rank_index')
|
|
.with_columns(
|
|
pl.when(pl.col('rank_index') < 8)
|
|
.then(pl.lit('Top 8'))
|
|
.otherwise(pl.lit('Other'))
|
|
.alias('category')
|
|
)
|
|
.to_pandas()
|
|
)
|
|
|
|
chart = alt.Chart(stats_df).mark_bar().encode(
|
|
x=alt.X(f'{target_column}:N', title=x_label, sort='-y'),
|
|
y=alt.Y('count:Q', title=y_label),
|
|
color=alt.Color('category:N',
|
|
scale=alt.Scale(domain=['Top 8', 'Other'],
|
|
range=[ColorPalette.PRIMARY, ColorPalette.NEUTRAL]),
|
|
legend=None),
|
|
tooltip=[
|
|
alt.Tooltip(f'{target_column}:N', title='Voice'),
|
|
alt.Tooltip('count:Q', title='Selections')
|
|
]
|
|
).properties(
|
|
title=self._process_title(title),
|
|
width=width or 800,
|
|
height=height or getattr(self, 'plot_height', 400)
|
|
)
|
|
|
|
chart = self._save_plot(chart, title)
|
|
return chart
|
|
|
|
def plot_top3_selection_counts(
|
|
self,
|
|
data: pl.LazyFrame | pl.DataFrame | None = None,
|
|
target_column: str = "3_Ranked",
|
|
title: str = "Most Frequently Chosen Top 3 Voices\n(Top 3 Highlighted)",
|
|
x_label: str = "Voice",
|
|
y_label: str = "Count of Mentions in Top 3",
|
|
height: int | None = None,
|
|
width: int | str | None = None,
|
|
) -> alt.Chart:
|
|
"""Question: Which 3 voices are chosen the most out of 18?"""
|
|
df = self._ensure_dataframe(data)
|
|
|
|
if target_column not in df.columns:
|
|
return alt.Chart(pd.DataFrame({'text': [f"Column '{target_column}' not found"]})).mark_text().encode(text='text:N')
|
|
|
|
stats_df = (
|
|
df.select(pl.col(target_column))
|
|
.drop_nulls()
|
|
.with_columns(pl.col(target_column).str.split(","))
|
|
.explode(target_column)
|
|
.with_columns(pl.col(target_column).str.strip_chars())
|
|
.filter(pl.col(target_column) != "")
|
|
.group_by(target_column)
|
|
.agg(pl.len().alias("count"))
|
|
.sort("count", descending=True)
|
|
.with_row_index('rank_index')
|
|
.with_columns(
|
|
pl.when(pl.col('rank_index') < 3)
|
|
.then(pl.lit('Top 3'))
|
|
.otherwise(pl.lit('Other'))
|
|
.alias('category')
|
|
)
|
|
.to_pandas()
|
|
)
|
|
|
|
chart = alt.Chart(stats_df).mark_bar().encode(
|
|
x=alt.X(f'{target_column}:N', title=x_label, sort='-y'),
|
|
y=alt.Y('count:Q', title=y_label),
|
|
color=alt.Color('category:N',
|
|
scale=alt.Scale(domain=['Top 3', 'Other'],
|
|
range=[ColorPalette.PRIMARY, ColorPalette.NEUTRAL]),
|
|
legend=None),
|
|
tooltip=[
|
|
alt.Tooltip(f'{target_column}:N', title='Voice'),
|
|
alt.Tooltip('count:Q', title='In Top 3')
|
|
]
|
|
).properties(
|
|
title=self._process_title(title),
|
|
width=width or 800,
|
|
height=height or getattr(self, 'plot_height', 400)
|
|
)
|
|
|
|
chart = self._save_plot(chart, title)
|
|
return chart
|
|
|
|
def plot_speaking_style_trait_scores(
|
|
self,
|
|
data: pl.LazyFrame | pl.DataFrame | None = None,
|
|
trait_description: str = None,
|
|
left_anchor: str = None,
|
|
right_anchor: str = None,
|
|
title: str = "Speaking Style Trait Analysis",
|
|
height: int | None = None,
|
|
width: int | str | None = None,
|
|
) -> alt.Chart:
|
|
"""Plot scores for a single speaking style trait across multiple voices."""
|
|
df = self._ensure_dataframe(data)
|
|
|
|
if df.is_empty():
|
|
return alt.Chart(pd.DataFrame({'text': ['No data']})).mark_text().encode(text='text:N')
|
|
|
|
required_cols = ["Voice", "score"]
|
|
if not all(col in df.columns for col in required_cols):
|
|
return alt.Chart(pd.DataFrame({'text': ['Missing required columns']})).mark_text().encode(text='text:N')
|
|
|
|
# Calculate stats: Mean, Count
|
|
stats = (
|
|
df.filter(pl.col("score").is_not_null())
|
|
.group_by("Voice")
|
|
.agg([
|
|
pl.col("score").mean().alias("mean_score"),
|
|
pl.col("score").count().alias("count")
|
|
])
|
|
.sort("mean_score", descending=False) # Ascending for bottom-to-top display
|
|
.to_pandas()
|
|
)
|
|
|
|
# Extract anchors from data if not provided
|
|
if (left_anchor is None or right_anchor is None) and "Left_Anchor" in df.columns:
|
|
head = df.filter(pl.col("Left_Anchor").is_not_null()).head(1)
|
|
if not head.is_empty():
|
|
if left_anchor is None:
|
|
left_anchor = head["Left_Anchor"][0]
|
|
if right_anchor is None:
|
|
right_anchor = head["Right_Anchor"][0]
|
|
|
|
if trait_description is None:
|
|
if left_anchor and right_anchor:
|
|
trait_description = f"{left_anchor.split('|')[0]} vs. {right_anchor.split('|')[0]}"
|
|
elif "Description" in df.columns:
|
|
head = df.filter(pl.col("Description").is_not_null()).head(1)
|
|
trait_description = head["Description"][0] if not head.is_empty() else ""
|
|
else:
|
|
trait_description = ""
|
|
|
|
# Horizontal bar chart - use x2 to explicitly start bars at x=1
|
|
bars = alt.Chart(stats).mark_bar(color=ColorPalette.PRIMARY).encode(
|
|
x=alt.X('mean_score:Q', title='Average Score (1-5)', scale=alt.Scale(domain=[1, 5])),
|
|
x2=alt.datum(1), # Bars start at x=1 (left edge of domain)
|
|
y=alt.Y('Voice:N', title='Voice', sort='-x'),
|
|
tooltip=[
|
|
alt.Tooltip('Voice:N'),
|
|
alt.Tooltip('mean_score:Q', title='Average', format='.2f'),
|
|
alt.Tooltip('count:Q', title='Count')
|
|
]
|
|
)
|
|
|
|
# Count text at end of bars (right-aligned inside bar)
|
|
text = alt.Chart(stats).mark_text(
|
|
align='right',
|
|
baseline='middle',
|
|
color='white',
|
|
fontSize=12,
|
|
dx=-5 # Slight padding from bar end
|
|
).encode(
|
|
x='mean_score:Q',
|
|
y=alt.Y('Voice:N', sort='-x'),
|
|
text='count:Q'
|
|
)
|
|
|
|
# Combine layers
|
|
chart = (bars + text).properties(
|
|
title={
|
|
"text": self._process_title(title),
|
|
"subtitle": [trait_description, "(Numbers on bars indicate respondent count)"]
|
|
},
|
|
width=width or 800,
|
|
height=height or getattr(self, 'plot_height', 400)
|
|
)
|
|
|
|
chart = self._save_plot(chart, title)
|
|
return chart
|
|
|
|
def plot_speaking_style_correlation(
|
|
self,
|
|
style_color: str,
|
|
style_traits: list[str],
|
|
data: pl.LazyFrame | pl.DataFrame | None = None,
|
|
title: str | None = None,
|
|
width: int | str | None = None,
|
|
height: int | None = None,
|
|
) -> alt.Chart:
|
|
"""Plots correlation between Speaking Style Trait Scores (1-5) and Voice Scale (1-10)."""
|
|
df = self._ensure_dataframe(data)
|
|
|
|
if title is None:
|
|
title = f"Speaking style and voice scale 1-10 correlations"
|
|
|
|
trait_correlations = []
|
|
|
|
# Calculate correlations
|
|
for i, trait in enumerate(style_traits):
|
|
subset = df.filter(pl.col("Right_Anchor") == trait)
|
|
valid_data = subset.select(["score", "Voice_Scale_Score"]).drop_nulls()
|
|
|
|
if valid_data.height > 1:
|
|
corr_val = valid_data.select(pl.corr("score", "Voice_Scale_Score")).item()
|
|
# Wrap trait text at '|' for display
|
|
trait_display = trait.replace('|', '\n')
|
|
trait_correlations.append({
|
|
"trait_display": trait_display,
|
|
"trait_index": f"Trait {i+1}",
|
|
"correlation": corr_val if corr_val is not None else 0.0
|
|
})
|
|
|
|
if not trait_correlations:
|
|
return alt.Chart(pd.DataFrame({'text': [f"No data for {style_color} Style"]})).mark_text().encode(text='text:N')
|
|
|
|
plot_df = pl.DataFrame(trait_correlations).to_pandas()
|
|
|
|
# Conditional color based on sign
|
|
chart = alt.Chart(plot_df).mark_bar().encode(
|
|
x=alt.X('trait_display:N', title=None, axis=alt.Axis(labelAngle=0)),
|
|
y=alt.Y('correlation:Q', title='Correlation', scale=alt.Scale(domain=[-1, 1])),
|
|
color=alt.condition(
|
|
alt.datum.correlation >= 0,
|
|
alt.value('green'),
|
|
alt.value('red')
|
|
),
|
|
tooltip=[
|
|
alt.Tooltip('trait_display:N', title='Trait'),
|
|
alt.Tooltip('correlation:Q', format='.2f')
|
|
]
|
|
).properties(
|
|
title=self._process_title(title),
|
|
width=width or 800,
|
|
height=height or 350
|
|
)
|
|
|
|
chart = self._save_plot(chart, title)
|
|
return chart
|
|
|
|
def plot_demographic_distribution(
|
|
self,
|
|
column: str,
|
|
data: pl.LazyFrame | pl.DataFrame | None = None,
|
|
title: str | None = None,
|
|
height: int | None = None,
|
|
width: int | str | None = None,
|
|
show_counts: bool = True,
|
|
) -> alt.Chart:
|
|
"""Create a horizontal bar chart showing the distribution of respondents by a demographic column.
|
|
|
|
Designed to be compact so multiple charts (approx. 6) can fit on one slide.
|
|
Uses horizontal bars for better readability with many categories.
|
|
|
|
Parameters:
|
|
column: The column name to analyze (e.g., 'Age', 'Gender', 'Race/Ethnicity').
|
|
data: Optional DataFrame. If None, uses self.data_filtered.
|
|
title: Chart title. If None, auto-generates based on column name.
|
|
height: Chart height in pixels (default: auto-sized based on categories).
|
|
width: Chart width in pixels (default: 280 for compact layout).
|
|
show_counts: If True, display count labels on the bars.
|
|
|
|
Returns:
|
|
alt.Chart: An Altair horizontal bar chart showing the distribution.
|
|
"""
|
|
df = self._ensure_dataframe(data)
|
|
|
|
if column not in df.columns:
|
|
return alt.Chart(pd.DataFrame({'text': [f"Column '{column}' not found"]})).mark_text().encode(text='text:N')
|
|
|
|
# Count values in the column, including nulls
|
|
stats_df = (
|
|
df.select(pl.col(column))
|
|
.with_columns(pl.col(column).fill_null("(No Response)"))
|
|
.group_by(column)
|
|
.agg(pl.len().alias("count"))
|
|
.sort("count", descending=True)
|
|
.to_pandas()
|
|
)
|
|
|
|
if stats_df.empty:
|
|
return alt.Chart(pd.DataFrame({'text': ['No data']})).mark_text().encode(text='text:N')
|
|
|
|
# Calculate percentages
|
|
total = stats_df['count'].sum()
|
|
stats_df['percentage'] = (stats_df['count'] / total * 100).round(1)
|
|
|
|
# Generate title if not provided
|
|
if title is None:
|
|
clean_col = column.replace('_', ' ').replace('/', ' / ')
|
|
title = f"Distribution: {clean_col}"
|
|
|
|
# Calculate appropriate height based on number of categories
|
|
num_categories = len(stats_df)
|
|
bar_height = 18 # pixels per bar
|
|
calculated_height = max(120, num_categories * bar_height + 40) # min 120px, +40 for title/padding
|
|
|
|
# Horizontal bar chart - categories on Y axis, counts on X axis
|
|
bars = alt.Chart(stats_df).mark_bar(color=ColorPalette.PRIMARY).encode(
|
|
x=alt.X('count:Q', title='Count', axis=alt.Axis(grid=False)),
|
|
y=alt.Y(f'{column}:N', title=None, sort='-x', axis=alt.Axis(labelLimit=150)),
|
|
tooltip=[
|
|
alt.Tooltip(f'{column}:N', title=column.replace('_', ' ')),
|
|
alt.Tooltip('count:Q', title='Count'),
|
|
alt.Tooltip('percentage:Q', title='Percentage', format='.1f')
|
|
]
|
|
)
|
|
|
|
# Add count labels at end of bars
|
|
if show_counts:
|
|
text = alt.Chart(stats_df).mark_text(
|
|
align='left',
|
|
baseline='middle',
|
|
dx=3, # Offset from bar end
|
|
fontSize=9,
|
|
color=ColorPalette.TEXT
|
|
).encode(
|
|
x='count:Q',
|
|
y=alt.Y(f'{column}:N', sort='-x'),
|
|
text='count:Q'
|
|
)
|
|
chart = (bars + text)
|
|
else:
|
|
chart = bars
|
|
|
|
# Compact dimensions for 6-per-slide layout
|
|
chart = chart.properties(
|
|
title=self._process_title(title),
|
|
width=width or 200,
|
|
height=height or calculated_height
|
|
)
|
|
|
|
chart = self._save_plot(chart, title)
|
|
return chart
|
|
|
|
def plot_speaking_style_ranking_correlation(
|
|
self,
|
|
style_color: str,
|
|
style_traits: list[str],
|
|
data: pl.LazyFrame | pl.DataFrame | None = None,
|
|
title: str | None = None,
|
|
width: int | str | None = None,
|
|
height: int | None = None,
|
|
) -> alt.Chart:
|
|
"""Plots correlation between Speaking Style Trait Scores (1-5) and Voice Ranking Points (0-3)."""
|
|
df = self._ensure_dataframe(data)
|
|
|
|
if title is None:
|
|
title = f"Speaking style {style_color} and voice ranking points correlations"
|
|
|
|
trait_correlations = []
|
|
|
|
for i, trait in enumerate(style_traits):
|
|
subset = df.filter(pl.col("Right_Anchor") == trait)
|
|
valid_data = subset.select(["score", "Ranking_Points"]).drop_nulls()
|
|
|
|
if valid_data.height > 1:
|
|
corr_val = valid_data.select(pl.corr("score", "Ranking_Points")).item()
|
|
trait_display = trait.replace('|', '\n')
|
|
trait_correlations.append({
|
|
"trait_display": trait_display,
|
|
"trait_index": f"Trait {i+1}",
|
|
"correlation": corr_val if corr_val is not None else 0.0
|
|
})
|
|
|
|
if not trait_correlations:
|
|
return alt.Chart(pd.DataFrame({'text': [f"No data for {style_color} Style"]})).mark_text().encode(text='text:N')
|
|
|
|
plot_df = pl.DataFrame(trait_correlations).to_pandas()
|
|
|
|
chart = alt.Chart(plot_df).mark_bar().encode(
|
|
x=alt.X('trait_display:N', title=None, axis=alt.Axis(labelAngle=0)),
|
|
y=alt.Y('correlation:Q', title='Correlation', scale=alt.Scale(domain=[-1, 1])),
|
|
color=alt.condition(
|
|
alt.datum.correlation >= 0,
|
|
alt.value('green'),
|
|
alt.value('red')
|
|
),
|
|
tooltip=[
|
|
alt.Tooltip('trait_display:N', title='Trait'),
|
|
alt.Tooltip('correlation:Q', format='.2f')
|
|
]
|
|
).properties(
|
|
title=self._process_title(title),
|
|
width=width or 800,
|
|
height=height or 350
|
|
)
|
|
|
|
chart = self._save_plot(chart, title)
|
|
return chart
|