Compare commits

...

15 Commits

15 changed files with 4819 additions and 181 deletions

5
.vscode/extensions.json vendored Normal file
View File

@@ -0,0 +1,5 @@
{
"recommendations": [
"wakatime.vscode-wakatime"
]
}

View File

@@ -26,7 +26,7 @@ def _():
@app.cell @app.cell
def _(): def _():
TAG_SOURCE = Path('/home/luigi/Documents/VoiceBranding/JPMC/Phase-3/data/reports/VOICE_Perception-Research-Report_3-2-26.pptx') TAG_SOURCE = Path('data/reports/VOICE_Perception-Research-Report_4-2-26_19-30.pptx')
# TAG_TARGET = Path('data/reports/Perception-Research-Report_2-2_tagged.pptx') # TAG_TARGET = Path('data/reports/Perception-Research-Report_2-2_tagged.pptx')
TAG_IMAGE_DIR = Path('figures/debug') TAG_IMAGE_DIR = Path('figures/debug')
return TAG_IMAGE_DIR, TAG_SOURCE return TAG_IMAGE_DIR, TAG_SOURCE
@@ -52,10 +52,10 @@ def _():
@app.cell @app.cell
def _(): def _():
REPLACE_SOURCE = Path('/home/luigi/Documents/VoiceBranding/JPMC/Phase-3/data/reports/VOICE_Perception-Research-Report_3-2-26.pptx') REPLACE_SOURCE = Path('data/reports/VOICE_Perception-Research-Report_4-2-26_19-30.pptx')
# REPLACE_TARGET = Path('data/reports/Perception-Research-Report_2-2_updated.pptx') # REPLACE_TARGET = Path('data/reports/Perception-Research-Report_2-2_updated.pptx')
NEW_IMAGES_DIR = Path('figures/debug') NEW_IMAGES_DIR = Path('figures/2-4-26')
return NEW_IMAGES_DIR, REPLACE_SOURCE return NEW_IMAGES_DIR, REPLACE_SOURCE

View File

@@ -0,0 +1,263 @@
"""Extra analyses of the traits"""
# %% Imports
import utils
import polars as pl
import argparse
import json
import re
from pathlib import Path
from validation import check_straight_liners
# %% Fixed Variables
RESULTS_FILE = 'data/exports/2-4-26/JPMC_Chase Brand Personality_Quant Round 1_February 4, 2026_Labels.csv'
QSF_FILE = 'data/exports/OneDrive_2026-01-21/Soft Launch Data/JPMC_Chase_Brand_Personality_Quant_Round_1.qsf'
# %% CLI argument parsing for batch automation
# When run as script: uv run XX_statistical_significance.script.py --age '["18
# Central filter configuration - add new filters here only
# Format: 'cli_arg_name': 'QualtricsSurvey.options_* attribute name'
FILTER_CONFIG = {
'age': 'options_age',
'gender': 'options_gender',
'ethnicity': 'options_ethnicity',
'income': 'options_income',
'consumer': 'options_consumer',
'business_owner': 'options_business_owner',
'ai_user': 'options_ai_user',
'investable_assets': 'options_investable_assets',
'industry': 'options_industry',
}
def parse_cli_args():
parser = argparse.ArgumentParser(description='Generate quant report with optional filters')
# Dynamically add filter arguments from config
for filter_name in FILTER_CONFIG:
parser.add_argument(f'--{filter_name}', type=str, default=None, help=f'JSON list of {filter_name} values')
parser.add_argument('--filter-name', type=str, default=None, help='Name for this filter combination (used for .txt description file)')
parser.add_argument('--figures-dir', type=str, default=f'figures/traits-likert-analysis/{Path(RESULTS_FILE).parts[2]}', help='Override the default figures directory')
# Only parse if running as script (not in Jupyter/interactive)
try:
# Check if running in Jupyter by looking for ipykernel
get_ipython() # noqa: F821 # type: ignore
# Return namespace with all filters set to None
no_filters = {f: None for f in FILTER_CONFIG}
# Use the same default as argparse
default_fig_dir = f'figures/traits-likert-analysis/{Path(RESULTS_FILE).parts[2]}'
return argparse.Namespace(**no_filters, filter_name=None, figures_dir=default_fig_dir)
except NameError:
args = parser.parse_args()
# Parse JSON strings to lists
for filter_name in FILTER_CONFIG:
val = getattr(args, filter_name)
setattr(args, filter_name, json.loads(val) if val else None)
return args
cli_args = parse_cli_args()
# %%
S = utils.QualtricsSurvey(RESULTS_FILE, QSF_FILE, figures_dir=cli_args.figures_dir)
data_all = S.load_data()
# %% Build filtered dataset based on CLI args
# CLI args: None means "no filter applied" - filter_data() will skip None filters
# Build filter values dict dynamically from FILTER_CONFIG
_active_filters = {filter_name: getattr(cli_args, filter_name) for filter_name in FILTER_CONFIG}
_d = S.filter_data(data_all, **_active_filters)
# Write filter description file if filter-name is provided
if cli_args.filter_name and S.fig_save_dir:
# Get the filter slug (e.g., "All_Respondents", "Cons-Starter", etc.)
_filter_slug = S._get_filter_slug()
_filter_slug_dir = S.fig_save_dir / _filter_slug
_filter_slug_dir.mkdir(parents=True, exist_ok=True)
# Build filter description
_filter_desc_lines = [
f"Filter: {cli_args.filter_name}",
"",
"Applied Filters:",
]
_short_desc_parts = []
for filter_name, options_attr in FILTER_CONFIG.items():
all_options = getattr(S, options_attr)
values = _active_filters[filter_name]
display_name = filter_name.replace('_', ' ').title()
# None means no filter applied (same as "All")
if values is not None and values != all_options:
_short_desc_parts.append(f"{display_name}: {', '.join(values)}")
_filter_desc_lines.append(f" {display_name}: {', '.join(values)}")
else:
_filter_desc_lines.append(f" {display_name}: All")
# Write detailed description INSIDE the filter-slug directory
# Sanitize filter name for filename usage (replace / and other chars)
_safe_filter_name = re.sub(r'[^\w\s-]', '_', cli_args.filter_name)
_filter_file = _filter_slug_dir / f"{_safe_filter_name}.txt"
_filter_file.write_text('\n'.join(_filter_desc_lines))
# Append to summary index file at figures/<export_date>/filter_index.txt
_summary_file = S.fig_save_dir / "filter_index.txt"
_short_desc = "; ".join(_short_desc_parts) if _short_desc_parts else "All Respondents"
_summary_line = f"{_filter_slug} | {cli_args.filter_name} | {_short_desc}\n"
# Append or create the summary file
if _summary_file.exists():
_existing = _summary_file.read_text()
# Avoid duplicate entries for same slug
if _filter_slug not in _existing:
with _summary_file.open('a') as f:
f.write(_summary_line)
else:
_header = "Filter Index\n" + "=" * 80 + "\n\n"
_header += "Directory | Filter Name | Description\n"
_header += "-" * 80 + "\n"
_summary_file.write_text(_header + _summary_line)
# Save to logical variable name for further analysis
data = _d
data.collect()
# %% Voices per trait
ss_or, choice_map_or = S.get_ss_orange_red(data)
ss_gb, choice_map_gb = S.get_ss_green_blue(data)
# Combine the data
ss_all = ss_or.join(ss_gb, on='_recordId')
_d = ss_all.collect()
choice_map = {**choice_map_or, **choice_map_gb}
# print(_d.head())
# print(choice_map)
ss_long = utils.process_speaking_style_data(ss_all, choice_map)
# %% Create plots
for i, trait in enumerate(ss_long.select("Description").unique().to_series().to_list()):
trait_d = ss_long.filter(pl.col("Description") == trait)
S.plot_speaking_style_trait_scores(trait_d, title=trait.replace(":", ""), height=550, color_gender=True)
# %% Filter out straight-liner (PER TRAIT) and re-plot to see if any changes
# Save with different filename suffix so we can compare with/without straight-liners
print("\n--- Straight-lining Checks on TRAITS ---")
sl_report_traits, sl_traits_df = check_straight_liners(ss_all, max_score=5)
sl_traits_df
# %%
if sl_traits_df is not None and not sl_traits_df.is_empty():
sl_ids = sl_traits_df.select(pl.col("Record ID").unique()).to_series().to_list()
n_sl_groups = sl_traits_df.height
print(f"\nExcluding {n_sl_groups} straight-lined question blocks from {len(sl_ids)} respondents.")
# Create key in ss_long to match sl_traits_df for anti-join
# Question Group key in sl_traits_df is like "SS_Orange_Red__V14"
# ss_long has "Style_Group" and "Voice"
ss_long_w_key = ss_long.with_columns(
(pl.col("Style_Group") + "__" + pl.col("Voice")).alias("Question Group")
)
# Prepare filter table: Record ID + Question Group
sl_filter = sl_traits_df.select([
pl.col("Record ID").alias("_recordId"),
pl.col("Question Group")
])
# Anti-join to remove specific question blocks that were straight-lined
ss_long_clean = ss_long_w_key.join(sl_filter, on=["_recordId", "Question Group"], how="anti").drop("Question Group")
# Re-plot with suffix in title
print("Re-plotting traits (Cleaned)...")
for i, trait in enumerate(ss_long_clean.select("Description").unique().to_series().to_list()):
trait_d = ss_long_clean.filter(pl.col("Description") == trait)
# Modify title to create unique filename (and display title)
title_clean = trait.replace(":", "") + " (Excl. Straight-Liners)"
S.plot_speaking_style_trait_scores(trait_d, title=title_clean, height=550, color_gender=True)
else:
print("No straight-liners found on traits.")
# %% Compare All vs Cleaned
if sl_traits_df is not None and not sl_traits_df.is_empty():
print("Generating Comparison Plots (All vs Cleaned)...")
# Always apply the per-question-group filtering here to ensure consistency
# (Matches the logic used in the re-plotting section above)
print("Applying filter to remove straight-lined question blocks...")
ss_long_w_key = ss_long.with_columns(
(pl.col("Style_Group") + "__" + pl.col("Voice")).alias("Question Group")
)
sl_filter = sl_traits_df.select([
pl.col("Record ID").alias("_recordId"),
pl.col("Question Group")
])
ss_long_clean = ss_long_w_key.join(sl_filter, on=["_recordId", "Question Group"], how="anti").drop("Question Group")
sl_ids = sl_traits_df.select(pl.col("Record ID").unique()).to_series().to_list()
# --- Verification Prints ---
print(f"\n--- Verification of Filter ---")
print(f"Original Row Count: {ss_long.height}")
print(f"Number of Straight-Liner Question Blocks: {sl_traits_df.height}")
print(f"Sample IDs affected: {sl_ids[:5]}")
print(f"Cleaned Row Count: {ss_long_clean.height}")
print(f"Rows Removed: {ss_long.height - ss_long_clean.height}")
# Verify removal
# Re-construct key to verify
ss_long_check = ss_long.with_columns(
(pl.col("Style_Group") + "__" + pl.col("Voice")).alias("Question Group")
)
sl_filter_check = sl_traits_df.select([
pl.col("Record ID").alias("_recordId"),
pl.col("Question Group")
])
should_be_removed = ss_long_check.join(sl_filter_check, on=["_recordId", "Question Group"], how="inner").height
print(f"Discrepancy Check (Should be 0): { (ss_long.height - ss_long_clean.height) - should_be_removed }")
# Show what was removed (the straight lining behavior)
print("\nSample of Straight-Liner Data (Values that caused removal):")
print(sl_traits_df.head(5))
print("-" * 30 + "\n")
# ---------------------------
for i, trait in enumerate(ss_long.select("Description").unique().to_series().to_list()):
# Get data for this trait from both datasets
trait_d_all = ss_long.filter(pl.col("Description") == trait)
trait_d_clean = ss_long_clean.filter(pl.col("Description") == trait)
# Plot comparison
title_comp = trait.replace(":", "") + " (Impact of Straight-Liners)"
S.plot_speaking_style_trait_scores_comparison(
trait_d_all,
trait_d_clean,
title=title_comp,
height=600 # Slightly taller for grouped bars
)

View File

@@ -14,6 +14,13 @@ import utils
from speaking_styles import SPEAKING_STYLES from speaking_styles import SPEAKING_STYLES
# %% Fixed Variables
RESULTS_FILE = 'data/exports/2-4-26/JPMC_Chase Brand Personality_Quant Round 1_February 4, 2026_Labels.csv'
# RESULTS_FILE = 'data/exports/debug/JPMC_Chase Brand Personality_Quant Round 1_February 2, 2026_Labels.csv'
QSF_FILE = 'data/exports/OneDrive_2026-01-21/Soft Launch Data/JPMC_Chase_Brand_Personality_Quant_Round_1.qsf'
# %% # %%
# CLI argument parsing for batch automation # CLI argument parsing for batch automation
# When run as script: python 03_quant_report.script.py --age '["18 to 21 years"]' --consumer '["Starter"]' # When run as script: python 03_quant_report.script.py --age '["18 to 21 years"]' --consumer '["Starter"]'
@@ -41,13 +48,18 @@ def parse_cli_args():
parser.add_argument(f'--{filter_name}', type=str, default=None, help=f'JSON list of {filter_name} values') parser.add_argument(f'--{filter_name}', type=str, default=None, help=f'JSON list of {filter_name} values')
parser.add_argument('--filter-name', type=str, default=None, help='Name for this filter combination (used for .txt description file)') parser.add_argument('--filter-name', type=str, default=None, help='Name for this filter combination (used for .txt description file)')
parser.add_argument('--figures-dir', type=str, default=f'figures/{Path(RESULTS_FILE).parts[2]}', help='Override the default figures directory')
parser.add_argument('--best-character', type=str, default="the_coach", help='Slug of the best chosen character (default: "the_coach")')
parser.add_argument('--sl-threshold', type=int, default=None, help='Exclude respondents who straight-lined >= N question groups (e.g. 3 removes anyone with 3+ straight-lined groups)')
parser.add_argument('--voice-ranking-filter', type=str, default=None, choices=['only-missing', 'exclude-missing'], help='Filter by voice ranking completeness: "only-missing" keeps only respondents missing QID98 ranking data, "exclude-missing" removes them')
# Only parse if running as script (not in Jupyter/interactive) # Only parse if running as script (not in Jupyter/interactive)
try: try:
# Check if running in Jupyter by looking for ipykernel # Check if running in Jupyter by looking for ipykernel
get_ipython() # noqa: F821 get_ipython() # noqa: F821 # type: ignore
# Return namespace with all filters set to None # Return namespace with all filters set to None
return argparse.Namespace(**{f: None for f in FILTER_CONFIG}, filter_name=None) no_filters = {f: None for f in FILTER_CONFIG}
return argparse.Namespace(**no_filters, filter_name=None, figures_dir=f'figures/statistical_significance/{Path(RESULTS_FILE).parts[2]}', best_character="the_coach", sl_threshold=None, voice_ranking_filter=None)
except NameError: except NameError:
args = parser.parse_args() args = parser.parse_args()
# Parse JSON strings to lists # Parse JSON strings to lists
@@ -57,68 +69,26 @@ def parse_cli_args():
return args return args
cli_args = parse_cli_args() cli_args = parse_cli_args()
BEST_CHOSEN_CHARACTER = cli_args.best_character
# %% # %%
S = QualtricsSurvey(RESULTS_FILE, QSF_FILE, figures_dir=cli_args.figures_dir)
# file_browser = mo.ui.file_browser(
# initial_path="./data/exports", multiple=False, restrict_navigation=True, filetypes=[".csv"], label="Select 'Labels' File"
# )
# file_browser
# # %%
# mo.stop(file_browser.path(index=0) is None, mo.md("**⚠️ Please select a `_Labels.csv` file above to proceed**"))
# RESULTS_FILE = Path(file_browser.path(index=0))
RESULTS_FILE = 'data/exports/debug/JPMC_Chase Brand Personality_Quant Round 1_February 2, 2026_Labels.csv'
QSF_FILE = 'data/exports/OneDrive_2026-01-21/Soft Launch Data/JPMC_Chase_Brand_Personality_Quant_Round_1.qsf'
# %%
S = QualtricsSurvey(RESULTS_FILE, QSF_FILE)
try: try:
data_all = S.load_data() data_all = S.load_data()
except NotImplementedError as e: except NotImplementedError as e:
mo.stop(True, mo.md(f"**⚠️ {str(e)}**")) mo.stop(True, mo.md(f"**⚠️ {str(e)}**"))
# %%
BEST_CHOSEN_CHARACTER = "the_coach"
# # %% # %% Build filtered dataset based on CLI args
# filter_form = mo.md('''
# {age}
# {gender}
# {ethnicity}
# {income}
# {consumer}
# '''
# ).batch(
# age=mo.ui.multiselect(options=S.options_age, value=S.options_age, label="Select Age Group(s):"),
# gender=mo.ui.multiselect(options=S.options_gender, value=S.options_gender, label="Select Gender(s):"),
# ethnicity=mo.ui.multiselect(options=S.options_ethnicity, value=S.options_ethnicity, label="Select Ethnicities:"),
# income=mo.ui.multiselect(options=S.options_income, value=S.options_income, label="Select Income Group(s):"),
# consumer=mo.ui.multiselect(options=S.options_consumer, value=S.options_consumer, label="Select Consumer Groups:")
# ).form()
# mo.md(f'''
# ---
# # Data Filter
# {filter_form}
# ''')
# %%
# mo.stop(filter_form.value is None, mo.md("**Please submit filter above to proceed**"))
# CLI args: None means "no filter applied" - filter_data() will skip None filters # CLI args: None means "no filter applied" - filter_data() will skip None filters
# Build filter values dict dynamically from FILTER_CONFIG # Build filter values dict dynamically from FILTER_CONFIG
_active_filters = {filter_name: getattr(cli_args, filter_name) for filter_name in FILTER_CONFIG} _active_filters = {filter_name: getattr(cli_args, filter_name) for filter_name in FILTER_CONFIG}
# %% # %% Apply filters
_d = S.filter_data(data_all, **_active_filters) _d = S.filter_data(data_all, **_active_filters)
# Write filter description file if filter-name is provided # Write filter description file if filter-name is provided
@@ -170,14 +140,65 @@ if cli_args.filter_name and S.fig_save_dir:
_header += "-" * 80 + "\n" _header += "-" * 80 + "\n"
_summary_file.write_text(_header + _summary_line) _summary_file.write_text(_header + _summary_line)
# Stop execution and prevent other cells from running if no data is selected # %% Apply straight-liner threshold filter (if specified)
# mo.stop(len(_d.collect()) == 0, mo.md("**No Data available for current filter combination**")) # Removes respondents who straight-lined >= N question groups across
data = _d # speaking style and voice scale questions.
if cli_args.sl_threshold is not None:
_sl_n = cli_args.sl_threshold
S.sl_threshold = _sl_n # Store on Survey so filter slug/description include it
print(f"Applying straight-liner filter: excluding respondents with ≥{_sl_n} straight-lined question groups...")
_n_before = _d.select(pl.len()).collect().item()
# data = data_validated # Extract question groups with renamed columns for check_straight_liners
_sl_ss_or, _ = S.get_ss_orange_red(_d)
_sl_ss_gb, _ = S.get_ss_green_blue(_d)
_sl_vs, _ = S.get_voice_scale_1_10(_d)
_sl_all_q = _sl_ss_or.join(_sl_ss_gb, on='_recordId').join(_sl_vs, on='_recordId')
_, _sl_df = check_straight_liners(_sl_all_q, max_score=5)
if _sl_df is not None and not _sl_df.is_empty():
# Count straight-lined question groups per respondent
_sl_counts = (
_sl_df
.group_by("Record ID")
.agg(pl.len().alias("sl_count"))
.filter(pl.col("sl_count") >= _sl_n)
.select(pl.col("Record ID").alias("_recordId"))
)
# Anti-join to remove offending respondents
_d = _d.collect().join(_sl_counts, on="_recordId", how="anti").lazy()
# Update filtered data on the Survey object so sample size is correct
S.data_filtered = _d
_n_after = _d.select(pl.len()).collect().item()
print(f" Removed {_n_before - _n_after} respondents ({_n_before}{_n_after})")
else:
print(" No straight-liners detected — no respondents removed.")
# %% Apply voice-ranking completeness filter (if specified)
# Keeps only / excludes respondents who are missing the explicit voice
# ranking question (QID98) despite completing the top-3 selection (QID36).
if cli_args.voice_ranking_filter is not None:
S.voice_ranking_filter = cli_args.voice_ranking_filter # Store on Survey so filter slug/description include it
_vr_missing = S.get_top_3_voices_missing_ranking(_d)
_vr_missing_ids = _vr_missing.select('_recordId')
_n_before = _d.select(pl.len()).collect().item()
if cli_args.voice_ranking_filter == 'only-missing':
print(f"Voice ranking filter: keeping ONLY respondents missing QID98 ranking data...")
_d = _d.collect().join(_vr_missing_ids, on='_recordId', how='inner').lazy()
elif cli_args.voice_ranking_filter == 'exclude-missing':
print(f"Voice ranking filter: EXCLUDING respondents missing QID98 ranking data...")
_d = _d.collect().join(_vr_missing_ids, on='_recordId', how='anti').lazy()
S.data_filtered = _d
_n_after = _d.select(pl.len()).collect().item()
print(f" {_n_before}{_n_after} respondents ({_vr_missing_ids.height} missing ranking data)")
# Save to logical variable name for further analysis
data = _d
data.collect() data.collect()
# %%
# %% # %%
@@ -560,6 +581,39 @@ S.plot_speaking_style_color_correlation(
title="Correlation: Speaking Style Colors and Voice Ranking Points" title="Correlation: Speaking Style Colors and Voice Ranking Points"
) )
# %%
# Gender-filtered correlation plots (Male vs Female voices)
from reference import VOICE_GENDER_MAPPING
MALE_VOICES = [v for v, g in VOICE_GENDER_MAPPING.items() if g == "Male"]
FEMALE_VOICES = [v for v, g in VOICE_GENDER_MAPPING.items() if g == "Female"]
# Filter joined data by voice gender
joined_scale_male = joined_scale.filter(pl.col("Voice").is_in(MALE_VOICES))
joined_scale_female = joined_scale.filter(pl.col("Voice").is_in(FEMALE_VOICES))
joined_ranking_male = joined_ranking.filter(pl.col("Voice").is_in(MALE_VOICES))
joined_ranking_female = joined_ranking.filter(pl.col("Voice").is_in(FEMALE_VOICES))
# Colors vs Scale 1-10 (grouped by voice gender)
S.plot_speaking_style_color_correlation_by_gender(
data_male=joined_scale_male,
data_female=joined_scale_female,
speaking_styles=SPEAKING_STYLES,
target_column="Voice_Scale_Score",
title="Correlation: Speaking Style Colors and Voice Scale 1-10 (by Voice Gender)",
filename="correlation_speaking_style_and_voice_scale_1-10_by_voice_gender_color",
)
# Colors vs Ranking Points (grouped by voice gender)
S.plot_speaking_style_color_correlation_by_gender(
data_male=joined_ranking_male,
data_female=joined_ranking_female,
speaking_styles=SPEAKING_STYLES,
target_column="Ranking_Points",
title="Correlation: Speaking Style Colors and Voice Ranking Points (by Voice Gender)",
filename="correlation_speaking_style_and_voice_ranking_points_by_voice_gender_color",
)
# %% # %%
mo.md(r""" mo.md(r"""
### Individual Traits vs Scale 1-10 ### Individual Traits vs Scale 1-10
@@ -608,6 +662,48 @@ for _style, _traits in SPEAKING_STYLES.items():
""" """
mo.md(_content) mo.md(_content)
# %%
# Individual Traits vs Scale 1-10 (grouped by voice gender)
_content = """### Individual Traits vs Scale 1-10 (by Voice Gender)\n\n"""
for _style, _traits in SPEAKING_STYLES.items():
_fig = S.plot_speaking_style_scale_correlation_by_gender(
data_male=joined_scale_male,
data_female=joined_scale_female,
style_color=_style,
style_traits=_traits,
title=f"Correlation: Speaking Style {_style} and Voice Scale 1-10 (by Voice Gender)",
filename=f"correlation_speaking_style_and_voice_scale_1-10_by_voice_gender_{_style.lower()}",
)
_content += f"""
#### Speaking Style **{_style}**:
{mo.ui.altair_chart(_fig)}
"""
mo.md(_content)
# %%
# Individual Traits vs Ranking Points (grouped by voice gender)
_content = """### Individual Traits vs Ranking Points (by Voice Gender)\n\n"""
for _style, _traits in SPEAKING_STYLES.items():
_fig = S.plot_speaking_style_ranking_correlation_by_gender(
data_male=joined_ranking_male,
data_female=joined_ranking_female,
style_color=_style,
style_traits=_traits,
title=f"Correlation: Speaking Style {_style} and Voice Ranking Points (by Voice Gender)",
filename=f"correlation_speaking_style_and_voice_ranking_points_by_voice_gender_{_style.lower()}",
)
_content += f"""
#### Speaking Style **{_style}**:
{mo.ui.altair_chart(_fig)}
"""
mo.md(_content)
# %% # %%
# ## Correlations when "Best Brand Character" is chosen # ## Correlations when "Best Brand Character" is chosen
# For each of the 4 brand characters, filter the dataset to only those respondents # For each of the 4 brand characters, filter the dataset to only those respondents

View File

@@ -0,0 +1,370 @@
"""Extra statistical significance analyses for quant report."""
# %% Imports
import utils
import polars as pl
import argparse
import json
import re
from pathlib import Path
# %% Fixed Variables
RESULTS_FILE = 'data/exports/2-4-26/JPMC_Chase Brand Personality_Quant Round 1_February 4, 2026_Labels.csv'
QSF_FILE = 'data/exports/OneDrive_2026-01-21/Soft Launch Data/JPMC_Chase_Brand_Personality_Quant_Round_1.qsf'
# %% CLI argument parsing for batch automation
# When run as script: uv run XX_statistical_significance.script.py --age '["18
# Central filter configuration - add new filters here only
# Format: 'cli_arg_name': 'QualtricsSurvey.options_* attribute name'
FILTER_CONFIG = {
'age': 'options_age',
'gender': 'options_gender',
'ethnicity': 'options_ethnicity',
'income': 'options_income',
'consumer': 'options_consumer',
'business_owner': 'options_business_owner',
'ai_user': 'options_ai_user',
'investable_assets': 'options_investable_assets',
'industry': 'options_industry',
}
def parse_cli_args():
parser = argparse.ArgumentParser(description='Generate quant report with optional filters')
# Dynamically add filter arguments from config
for filter_name in FILTER_CONFIG:
parser.add_argument(f'--{filter_name}', type=str, default=None, help=f'JSON list of {filter_name} values')
parser.add_argument('--filter-name', type=str, default=None, help='Name for this filter combination (used for .txt description file)')
parser.add_argument('--figures-dir', type=str, default=f'figures/statistical_significance/{Path(RESULTS_FILE).parts[2]}', help='Override the default figures directory')
# Only parse if running as script (not in Jupyter/interactive)
try:
# Check if running in Jupyter by looking for ipykernel
get_ipython() # noqa: F821 # type: ignore
# Return namespace with all filters set to None
no_filters = {f: None for f in FILTER_CONFIG}
# Use the same default as argparse
default_fig_dir = f'figures/statistical_significance/{Path(RESULTS_FILE).parts[2]}'
return argparse.Namespace(**no_filters, filter_name=None, figures_dir=default_fig_dir)
except NameError:
args = parser.parse_args()
# Parse JSON strings to lists
for filter_name in FILTER_CONFIG:
val = getattr(args, filter_name)
setattr(args, filter_name, json.loads(val) if val else None)
return args
cli_args = parse_cli_args()
# %%
S = utils.QualtricsSurvey(RESULTS_FILE, QSF_FILE, figures_dir=cli_args.figures_dir)
data_all = S.load_data()
# %% Build filtered dataset based on CLI args
# CLI args: None means "no filter applied" - filter_data() will skip None filters
# Build filter values dict dynamically from FILTER_CONFIG
_active_filters = {filter_name: getattr(cli_args, filter_name) for filter_name in FILTER_CONFIG}
_d = S.filter_data(data_all, **_active_filters)
# Write filter description file if filter-name is provided
if cli_args.filter_name and S.fig_save_dir:
# Get the filter slug (e.g., "All_Respondents", "Cons-Starter", etc.)
_filter_slug = S._get_filter_slug()
_filter_slug_dir = S.fig_save_dir / _filter_slug
_filter_slug_dir.mkdir(parents=True, exist_ok=True)
# Build filter description
_filter_desc_lines = [
f"Filter: {cli_args.filter_name}",
"",
"Applied Filters:",
]
_short_desc_parts = []
for filter_name, options_attr in FILTER_CONFIG.items():
all_options = getattr(S, options_attr)
values = _active_filters[filter_name]
display_name = filter_name.replace('_', ' ').title()
# None means no filter applied (same as "All")
if values is not None and values != all_options:
_short_desc_parts.append(f"{display_name}: {', '.join(values)}")
_filter_desc_lines.append(f" {display_name}: {', '.join(values)}")
else:
_filter_desc_lines.append(f" {display_name}: All")
# Write detailed description INSIDE the filter-slug directory
# Sanitize filter name for filename usage (replace / and other chars)
_safe_filter_name = re.sub(r'[^\w\s-]', '_', cli_args.filter_name)
_filter_file = _filter_slug_dir / f"{_safe_filter_name}.txt"
_filter_file.write_text('\n'.join(_filter_desc_lines))
# Append to summary index file at figures/<export_date>/filter_index.txt
_summary_file = S.fig_save_dir / "filter_index.txt"
_short_desc = "; ".join(_short_desc_parts) if _short_desc_parts else "All Respondents"
_summary_line = f"{_filter_slug} | {cli_args.filter_name} | {_short_desc}\n"
# Append or create the summary file
if _summary_file.exists():
_existing = _summary_file.read_text()
# Avoid duplicate entries for same slug
if _filter_slug not in _existing:
with _summary_file.open('a') as f:
f.write(_summary_line)
else:
_header = "Filter Index\n" + "=" * 80 + "\n\n"
_header += "Directory | Filter Name | Description\n"
_header += "-" * 80 + "\n"
_summary_file.write_text(_header + _summary_line)
# Save to logical variable name for further analysis
data = _d
data.collect()
# %% Character coach significatly higher than others
char_rank = S.get_character_ranking(data)[0]
_pairwise_df, _meta = S.compute_ranking_significance(
char_rank,
alpha=0.05,
correction="none",
)
# %% [markdown]
"""
### Methodology Analysis
**Input Data (`char_rank`)**:
* Generated by `S.get_character_ranking(data)`.
* Contains the ranking values (1st, 2nd, 3rd, 4th) assigned by each respondent to the four options ("The Coach", etc.).
* Columns represent the characters; rows represent individual respondents; values are the numerical rank (1 = Top Choice).
**Processing**:
* The function `compute_ranking_significance` aggregates these rankings to find the **"Rank 1 Share"** (the percentage of respondents who picked that character as their #1 favorite).
* It builds a contingency table of how many times each character was ranked 1st vs. not 1st (or 1st v 2nd v 3rd).
**Statistical Test**:
* **Test Used**: Pairwise Z-test for two proportions (uncorrected).
* **Comparison**: It compares the **Rank 1 Share** of every pair of characters.
* *Example*: "Is the 42% of people who chose 'Coach' significantly different from the 29% who chose 'Familiar Friend'?"
* **Significance**: A result of `p < 0.05` means the difference in popularity (top-choice preference) is statistically significant and not due to random chance.
"""
# %% Plot heatmap of pairwise significance
S.plot_significance_heatmap(_pairwise_df, metadata=_meta, title="Statistical Significance: Character Top Choice Preference")
# %% Plot summary of significant differences (e.g., which characters are significantly higher than others)
# S.plot_significance_summary(_pairwise_df, metadata=_meta)
# %% [markdown]
"""
# Analysis: Significance of "The Coach"
**Parameters**: `alpha=0.05`, `correction='none'`
* **Rationale**: No correction was applied to allow for detection of all potential pairwise differences (uncorrected p < 0.05). If strict control for family-wise error rate were required (e.g., Bonferroni), the significance threshold would be lower (p < 0.0083).
**Results**:
"The Coach" is the top-ranked option (42.0% Rank 1 share) and shows strong separation from the field.
* **Vs. Bottom Two**: "The Coach" is significantly higher than both "The Bank Teller" (26.9%, p < 0.001) and "Familiar Friend" (29.4%, p < 0.001).
* **Vs. Runner-Up**: "The Coach" is widely preferred over "The Personal Assistant" (33.4%). The difference of **8.6 percentage points** is statistically significant (p = 0.017) at the standard 0.05 level.
* *Note*: While p=0.017 is significant in isolation, it would not meet the stricter Bonferroni threshold (0.0083). However, the effect size (+8.6%) is commercially meaningful.
**Conclusion**:
Yes, "The Coach" can be considered statistically more significant than the other options. It is clearly superior to the bottom two options and holds a statistically significant lead over the runner-up ("Personal Assistant") in direct comparison.
"""
# %% Mentions significance analysis
char_pairwise_df_mentions, _meta_mentions = S.compute_mentions_significance(
char_rank,
alpha=0.05,
correction="none",
)
S.plot_significance_heatmap(
char_pairwise_df_mentions,
metadata=_meta_mentions,
title="Statistical Significance: Character Total Mentions (Top 3 Visibility)"
)
# %% voices analysis
top3_voices = S.get_top_3_voices(data)[0]
_pairwise_df_voice, _metadata = S.compute_ranking_significance(
top3_voices,alpha=0.05,correction="none")
S.plot_significance_heatmap(
_pairwise_df_voice,
metadata=_metadata,
title="Statistical Significance: Voice Top Choice Preference"
)
# %% Total Mentions Significance (Rank 1+2+3 Combined)
# This tests "Quantity" (Visibility) instead of "Quality" (Preference)
_pairwise_df_mentions, _meta_mentions = S.compute_mentions_significance(
top3_voices,
alpha=0.05,
correction="none"
)
S.plot_significance_heatmap(
_pairwise_df_mentions,
metadata=_meta_mentions,
title="Statistical Significance: Voice Total Mentions (Top 3 Visibility)"
)
# %% Male Voices Only Analysis
import reference
def filter_voices_by_gender(df: pl.DataFrame, target_gender: str) -> pl.DataFrame:
"""Filter ranking columns to keep only those matching target gender."""
cols_to_keep = []
# Always keep identifier if present
if '_recordId' in df.columns:
cols_to_keep.append('_recordId')
for col in df.columns:
# Check if column is a voice column (contains Vxx)
# Format is typically "Top_3_Voices_ranking__V14"
if '__V' in col:
voice_id = col.split('__')[1]
if reference.VOICE_GENDER_MAPPING.get(voice_id) == target_gender:
cols_to_keep.append(col)
return df.select(cols_to_keep)
# Get full ranking data as DataFrame
df_voices = top3_voices.collect()
# Filter for Male voices
df_male_voices = filter_voices_by_gender(df_voices, 'Male')
# 1. Male Voices: Top Choice Preference (Rank 1)
_pairwise_male_pref, _meta_male_pref = S.compute_ranking_significance(
df_male_voices,
alpha=0.05,
correction="none"
)
S.plot_significance_heatmap(
_pairwise_male_pref,
metadata=_meta_male_pref,
title="Male Voices Only: Top Choice Preference Significance"
)
# 2. Male Voices: Total Mentions (Visibility)
_pairwise_male_vis, _meta_male_vis = S.compute_mentions_significance(
df_male_voices,
alpha=0.05,
correction="none"
)
S.plot_significance_heatmap(
_pairwise_male_vis,
metadata=_meta_male_vis,
title="Male Voices Only: Total Mentions Significance"
)
# %% Male Voices (Excluding Bottom 3: V88, V86, V81)
# Start with the male voices dataframe from the previous step
voices_to_exclude = ['V88', 'V86', 'V81']
def filter_exclude_voices(df: pl.DataFrame, exclude_list: list[str]) -> pl.DataFrame:
"""Filter ranking columns to exclude specific voices."""
cols_to_keep = []
# Always keep identifier if present
if '_recordId' in df.columns:
cols_to_keep.append('_recordId')
for col in df.columns:
# Check if column is a voice column (contains Vxx)
if '__V' in col:
voice_id = col.split('__')[1]
if voice_id not in exclude_list:
cols_to_keep.append(col)
return df.select(cols_to_keep)
df_male_top = filter_exclude_voices(df_male_voices, voices_to_exclude)
# 1. Male Top Candidates: Top Choice Preference
_pairwise_male_top_pref, _meta_male_top_pref = S.compute_ranking_significance(
df_male_top,
alpha=0.05,
correction="none"
)
S.plot_significance_heatmap(
_pairwise_male_top_pref,
metadata=_meta_male_top_pref,
title="Male Voices (Excl. Bottom 3): Top Choice Preference Significance"
)
# 2. Male Top Candidates: Total Mentions
_pairwise_male_top_vis, _meta_male_top_vis = S.compute_mentions_significance(
df_male_top,
alpha=0.05,
correction="none"
)
S.plot_significance_heatmap(
_pairwise_male_top_vis,
metadata=_meta_male_top_vis,
title="Male Voices (Excl. Bottom 3): Total Mentions Significance"
)
# %% [markdown]
"""
# Rank 1 Selection Significance (Voice Level)
Similar to the Total Mentions significance analysis above, but counting
only how many times each voice was ranked **1st** (out of all respondents).
This isolates first-choice preference rather than overall top-3 visibility.
"""
# %% Rank 1 Significance: All Voices
_pairwise_df_rank1, _meta_rank1 = S.compute_rank1_significance(
top3_voices,
alpha=0.05,
correction="none",
)
S.plot_significance_heatmap(
_pairwise_df_rank1,
metadata=_meta_rank1,
title="Statistical Significance: Voice Rank 1 Selection"
)
# %% Rank 1 Significance: Male Voices Only
_pairwise_df_rank1_male, _meta_rank1_male = S.compute_rank1_significance(
df_male_voices,
alpha=0.05,
correction="none",
)
S.plot_significance_heatmap(
_pairwise_df_rank1_male,
metadata=_meta_rank1_male,
title="Male Voices Only: Rank 1 Selection Significance"
)
# %%

267
XX_straight_liners.py Normal file
View File

@@ -0,0 +1,267 @@
"""Extra analyses of the straight-liners"""
# %% Imports
import utils
import polars as pl
import argparse
import json
import re
from pathlib import Path
from validation import check_straight_liners
# %% Fixed Variables
RESULTS_FILE = 'data/exports/2-4-26/JPMC_Chase Brand Personality_Quant Round 1_February 4, 2026_Labels.csv'
QSF_FILE = 'data/exports/OneDrive_2026-01-21/Soft Launch Data/JPMC_Chase_Brand_Personality_Quant_Round_1.qsf'
# %% CLI argument parsing for batch automation
# When run as script: uv run XX_statistical_significance.script.py --age '["18
# Central filter configuration - add new filters here only
# Format: 'cli_arg_name': 'QualtricsSurvey.options_* attribute name'
FILTER_CONFIG = {
'age': 'options_age',
'gender': 'options_gender',
'ethnicity': 'options_ethnicity',
'income': 'options_income',
'consumer': 'options_consumer',
'business_owner': 'options_business_owner',
'ai_user': 'options_ai_user',
'investable_assets': 'options_investable_assets',
'industry': 'options_industry',
}
def parse_cli_args():
parser = argparse.ArgumentParser(description='Generate quant report with optional filters')
# Dynamically add filter arguments from config
for filter_name in FILTER_CONFIG:
parser.add_argument(f'--{filter_name}', type=str, default=None, help=f'JSON list of {filter_name} values')
parser.add_argument('--filter-name', type=str, default=None, help='Name for this filter combination (used for .txt description file)')
parser.add_argument('--figures-dir', type=str, default=f'figures/straight-liner-analysis/{Path(RESULTS_FILE).parts[2]}', help='Override the default figures directory')
# Only parse if running as script (not in Jupyter/interactive)
try:
# Check if running in Jupyter by looking for ipykernel
get_ipython() # noqa: F821 # type: ignore
# Return namespace with all filters set to None
no_filters = {f: None for f in FILTER_CONFIG}
# Use the same default as argparse
default_fig_dir = f'figures/straight-liner-analysis/{Path(RESULTS_FILE).parts[2]}'
return argparse.Namespace(**no_filters, filter_name=None, figures_dir=default_fig_dir)
except NameError:
args = parser.parse_args()
# Parse JSON strings to lists
for filter_name in FILTER_CONFIG:
val = getattr(args, filter_name)
setattr(args, filter_name, json.loads(val) if val else None)
return args
cli_args = parse_cli_args()
# %%
S = utils.QualtricsSurvey(RESULTS_FILE, QSF_FILE, figures_dir=cli_args.figures_dir)
data_all = S.load_data()
# %% Build filtered dataset based on CLI args
# CLI args: None means "no filter applied" - filter_data() will skip None filters
# Build filter values dict dynamically from FILTER_CONFIG
_active_filters = {filter_name: getattr(cli_args, filter_name) for filter_name in FILTER_CONFIG}
_d = S.filter_data(data_all, **_active_filters)
# Write filter description file if filter-name is provided
if cli_args.filter_name and S.fig_save_dir:
# Get the filter slug (e.g., "All_Respondents", "Cons-Starter", etc.)
_filter_slug = S._get_filter_slug()
_filter_slug_dir = S.fig_save_dir / _filter_slug
_filter_slug_dir.mkdir(parents=True, exist_ok=True)
# Build filter description
_filter_desc_lines = [
f"Filter: {cli_args.filter_name}",
"",
"Applied Filters:",
]
_short_desc_parts = []
for filter_name, options_attr in FILTER_CONFIG.items():
all_options = getattr(S, options_attr)
values = _active_filters[filter_name]
display_name = filter_name.replace('_', ' ').title()
# None means no filter applied (same as "All")
if values is not None and values != all_options:
_short_desc_parts.append(f"{display_name}: {', '.join(values)}")
_filter_desc_lines.append(f" {display_name}: {', '.join(values)}")
else:
_filter_desc_lines.append(f" {display_name}: All")
# Write detailed description INSIDE the filter-slug directory
# Sanitize filter name for filename usage (replace / and other chars)
_safe_filter_name = re.sub(r'[^\w\s-]', '_', cli_args.filter_name)
_filter_file = _filter_slug_dir / f"{_safe_filter_name}.txt"
_filter_file.write_text('\n'.join(_filter_desc_lines))
# Append to summary index file at figures/<export_date>/filter_index.txt
_summary_file = S.fig_save_dir / "filter_index.txt"
_short_desc = "; ".join(_short_desc_parts) if _short_desc_parts else "All Respondents"
_summary_line = f"{_filter_slug} | {cli_args.filter_name} | {_short_desc}\n"
# Append or create the summary file
if _summary_file.exists():
_existing = _summary_file.read_text()
# Avoid duplicate entries for same slug
if _filter_slug not in _existing:
with _summary_file.open('a') as f:
f.write(_summary_line)
else:
_header = "Filter Index\n" + "=" * 80 + "\n\n"
_header += "Directory | Filter Name | Description\n"
_header += "-" * 80 + "\n"
_summary_file.write_text(_header + _summary_line)
# Save to logical variable name for further analysis
data = _d
data.collect()
# %% Determine straight-liner repeat offenders
# Extract question groups with renamed columns that check_straight_liners expects.
# The raw `data` has QID-based column names; the getter methods rename them to
# patterns like SS_Green_Blue__V14__Choice_1, Voice_Scale_1_10__V48, etc.
ss_or, _ = S.get_ss_orange_red(data)
ss_gb, _ = S.get_ss_green_blue(data)
vs, _ = S.get_voice_scale_1_10(data)
# Combine all question groups into one wide LazyFrame (joined on _recordId)
all_questions = ss_or.join(ss_gb, on='_recordId').join(vs, on='_recordId')
# Run straight-liner detection across all question groups
# max_score=5 catches all speaking-style straight-lining (1-5 scale)
# and voice-scale values ≤5 on the 1-10 scale
# Note: sl_threshold is NOT set on S here — this script analyses straight-liners,
# it doesn't filter them out of the dataset.
print("Running straight-liner detection across all question groups...")
sl_report, sl_df = check_straight_liners(all_questions, max_score=5)
# %% Quantify repeat offenders
# sl_df has one row per (Record ID, Question Group) that was straight-lined.
# Group by Record ID to count how many question groups each person SL'd.
if sl_df is not None and not sl_df.is_empty():
total_respondents = data.select(pl.len()).collect().item()
# Per-respondent count of straight-lined question groups
respondent_sl_counts = (
sl_df
.group_by("Record ID")
.agg(pl.len().alias("sl_count"))
.sort("sl_count", descending=True)
)
max_sl = respondent_sl_counts["sl_count"].max()
print(f"\nTotal respondents: {total_respondents}")
print(f"Respondents who straight-lined at least 1 question group: "
f"{respondent_sl_counts.height}")
print(f"Maximum question groups straight-lined by one person: {max_sl}")
print()
# Build cumulative distribution: for each threshold N, count respondents
# who straight-lined >= N question groups
cumulative_rows = []
for threshold in range(1, max_sl + 1):
count = respondent_sl_counts.filter(
pl.col("sl_count") >= threshold
).height
pct = (count / total_respondents) * 100
cumulative_rows.append({
"threshold": threshold,
"count": count,
"pct": pct,
})
print(
f"{threshold} question groups straight-lined: "
f"{count} respondents ({pct:.1f}%)"
)
cumulative_df = pl.DataFrame(cumulative_rows)
print(f"\n{cumulative_df}")
# %% Save cumulative data to CSV
_filter_slug = S._get_filter_slug()
_csv_dir = Path(S.fig_save_dir) / _filter_slug
_csv_dir.mkdir(parents=True, exist_ok=True)
_csv_path = _csv_dir / "straight_liner_repeat_offenders.csv"
cumulative_df.write_csv(_csv_path)
print(f"Saved cumulative data to {_csv_path}")
# %% Plot the cumulative distribution
S.plot_straight_liner_repeat_offenders(
cumulative_df,
total_respondents=total_respondents,
)
# %% Per-question straight-lining frequency
# Build human-readable question group names from the raw keys
def _humanise_question_group(key: str) -> str:
"""Convert internal question group key to a readable label.
Examples:
SS_Green_Blue__V14 → Green/Blue V14
SS_Orange_Red__V48 → Orange/Red V48
Voice_Scale_1_10 → Voice Scale (1-10)
"""
if key.startswith("SS_Green_Blue__"):
voice = key.split("__")[1]
return f"Green/Blue {voice}"
if key.startswith("SS_Orange_Red__"):
voice = key.split("__")[1]
return f"Orange/Red {voice}"
if key == "Voice_Scale_1_10":
return "Voice Scale (1-10)"
# Fallback: replace underscores
return key.replace("_", " ")
per_question_counts = (
sl_df
.group_by("Question Group")
.agg(pl.col("Record ID").n_unique().alias("count"))
.sort("count", descending=True)
.with_columns(
(pl.col("count") / total_respondents * 100).alias("pct")
)
)
# Add human-readable names
per_question_counts = per_question_counts.with_columns(
pl.col("Question Group").map_elements(
_humanise_question_group, return_dtype=pl.Utf8
).alias("question")
)
print("\n--- Per-Question Straight-Lining Frequency ---")
print(per_question_counts)
# Save per-question data to CSV
_csv_path_pq = _csv_dir / "straight_liner_per_question.csv"
per_question_counts.write_csv(_csv_path_pq)
print(f"Saved per-question data to {_csv_path_pq}")
# Plot
S.plot_straight_liner_per_question(
per_question_counts,
total_respondents=total_respondents,
)
# %% Show the top repeat offenders (respondents with most SL'd groups)
print("\n--- Top Repeat Offenders ---")
print(respondent_sl_counts.head(20))
else:
print("No straight-liners detected in the dataset.")

File diff suppressed because one or more lines are too long

BIN
docs/README.pdf Normal file

Binary file not shown.

View File

@@ -0,0 +1,104 @@
# Appendix: Quantitative Analysis Plots - Folder Structure Manual
This folder contains all the quantitative analysis plots, sorted by the filters applied to the dataset. Each folder corresponds to a specific demographic cut.
## Folder Overview
* `All_Respondents/`: Analysis of the full dataset (no filters).
* `filter_index.txt`: A master list of every folder code and its corresponding demographic filter.
* **Filter Folders**: All other folders represent specific demographic cuts (e.g., `Age-18to21years`, `Gen-Woman`).
## How to Navigate
Each folder contains the same set of charts generated for that specific filter.
## Directory Reference Table
Below is the complete list of folder names. These names are encodings of the filters applied to the dataset, which we use to maintain consistency across our analysis.
| Directory Code | Filter Description |
| :--- | :--- |
| All_Respondents | All Respondents |
| Age-18to21years | Age: 18 to 21 years |
| Age-22to24years | Age: 22 to 24 years |
| Age-25to34years | Age: 25 to 34 years |
| Age-35to40years | Age: 35 to 40 years |
| Age-41to50years | Age: 41 to 50 years |
| Age-51to59years | Age: 51 to 59 years |
| Age-60to70years | Age: 60 to 70 years |
| Age-70yearsormore | Age: 70 years or more |
| Gen-Man | Gender: Man |
| Gen-Prefernottosay | Gender: Prefer not to say |
| Gen-Woman | Gender: Woman |
| Eth-6_grps_c64411 | Ethnicity: All options containing 'Alaska Native or Indigenous American' |
| Eth-6_grps_8f145b | Ethnicity: All options containing 'Asian or Asian American' |
| Eth-8_grps_71ac47 | Ethnicity: All options containing 'Black or African American' |
| Eth-7_grps_c5b3ce | Ethnicity: All options containing 'Hispanic or Latinx' |
| Eth-BlackorAfricanAmerican<br>MiddleEasternorNorthAfrican<br>WhiteorCaucasian+<br>MiddleEasternorNorthAfrican | Ethnicity: Middle Eastern or North African |
| Eth-AsianorAsianAmericanBlackorAfricanAmerican<br>NativeHawaiianorOtherPacificIslander+<br>NativeHawaiianorOtherPacificIslander | Ethnicity: Native Hawaiian or Other Pacific Islander |
| Eth-10_grps_cef760 | Ethnicity: All options containing 'White or Caucasian' |
| Inc-100000to149999 | Income: $100,000 to $149,999 |
| Inc-150000to199999 | Income: $150,000 to $199,999 |
| Inc-200000ormore | Income: $200,000 or more |
| Inc-25000to34999 | Income: $25,000 to $34,999 |
| Inc-35000to54999 | Income: $35,000 to $54,999 |
| Inc-55000to79999 | Income: $55,000 to $79,999 |
| Inc-80000to99999 | Income: $80,000 to $99,999 |
| Inc-Lessthan25000 | Income: Less than $25,000 |
| Cons-Lower_Mass_A+Lower_Mass_B | Consumer: Lower_Mass_A, Lower_Mass_B |
| Cons-MassAffluent_A+MassAffluent_B | Consumer: MassAffluent_A, MassAffluent_B |
| Cons-Mass_A+Mass_B | Consumer: Mass_A, Mass_B |
| Cons-Mix_of_Affluent_Wealth__<br>High_Net_Woth_A+<br>Mix_of_Affluent_Wealth__<br>High_Net_Woth_B | Consumer: Mix_of_Affluent_Wealth_&_High_Net_Woth_A, Mix_of_Affluent_Wealth_&_High_Net_Woth_B |
| Cons-Early_Professional | Consumer: Early_Professional |
| Cons-Lower_Mass_B | Consumer: Lower_Mass_B |
| Cons-MassAffluent_B | Consumer: MassAffluent_B |
| Cons-Mass_B | Consumer: Mass_B |
| Cons-Mix_of_Affluent_Wealth__<br>High_Net_Woth_B | Consumer: Mix_of_Affluent_Wealth_&_High_Net_Woth_B |
| Cons-Starter | Consumer: Starter |
| BizOwn-No | Business Owner: No |
| BizOwn-Yes | Business Owner: Yes |
| AI-Daily | Ai User: Daily |
| AI-Lessthanonceamonth | Ai User: Less than once a month |
| AI-Morethanoncedaily | Ai User: More than once daily |
| AI-Multipletimesperweek | Ai User: Multiple times per week |
| AI-Onceamonth | Ai User: Once a month |
| AI-Onceaweek | Ai User: Once a week |
| AI-RarelyNever | Ai User: Rarely/Never |
| AI-Daily+<br>Morethanoncedaily+<br>Multipletimesperweek | Ai User: Daily, More than once daily, Multiple times per week |
| AI-4_grps_d4f57a | Ai User: Once a week, Once a month, Less than once a month, Rarely/Never |
| InvAsts-0to24999 | Investable Assets: $0 to $24,999 |
| InvAsts-150000to249999 | Investable Assets: $150,000 to $249,999 |
| InvAsts-1Mto4.9M | Investable Assets: $1M to $4.9M |
| InvAsts-25000to49999 | Investable Assets: $25,000 to $49,999 |
| InvAsts-250000to499999 | Investable Assets: $250,000 to $499,999 |
| InvAsts-50000to149999 | Investable Assets: $50,000 to $149,999 |
| InvAsts-500000to999999 | Investable Assets: $500,000 to $999,999 |
| InvAsts-5Mormore | Investable Assets: $5M or more |
| InvAsts-Prefernottoanswer | Investable Assets: Prefer not to answer |
| Ind-Agricultureforestryfishingorhunting | Industry: Agriculture, forestry, fishing, or hunting |
| Ind-Artsentertainmentorrecreation | Industry: Arts, entertainment, or recreation |
| Ind-Broadcasting | Industry: Broadcasting |
| Ind-Construction | Industry: Construction |
| Ind-EducationCollegeuniversityoradult | Industry: Education College, university, or adult |
| Ind-EducationOther | Industry: Education Other |
| Ind-EducationPrimarysecondaryK-12 | Industry: Education Primary/secondary (K-12) |
| Ind-Governmentandpublicadministration | Industry: Government and public administration |
| Ind-Hotelandfoodservices | Industry: Hotel and food services |
| Ind-InformationOther | Industry: Information Other |
| Ind-InformationServicesanddata | Industry: Information Services and data |
| Ind-Legalservices | Industry: Legal services |
| Ind-ManufacturingComputerandelectronics | Industry: Manufacturing Computer and electronics |
| Ind-ManufacturingOther | Industry: Manufacturing Other |
| Ind-Notemployed | Industry: Not employed |
| Ind-Otherindustrypleasespecify | Industry: Other industry (please specify) |
| Ind-Processing | Industry: Processing |
| Ind-Publishing | Industry: Publishing |
| Ind-Realestaterentalorleasing | Industry: Real estate, rental, or leasing |
| Ind-Retired | Industry: Retired |
| Ind-Scientificortechnicalservices | Industry: Scientific or technical services |
| Ind-Software | Industry: Software |
| Ind-Telecommunications | Industry: Telecommunications |
| Ind-Transportationandwarehousing | Industry: Transportation and warehousing |
| Ind-Utilities | Industry: Utilities |
| Ind-Wholesale | Industry: Wholesale |

831
plots.py
View File

@@ -92,6 +92,16 @@ class QualtricsPlotsMixin:
parts.append(f"{short_code}-{val_str}") parts.append(f"{short_code}-{val_str}")
# Append straight-liner threshold if set
sl_threshold = getattr(self, 'sl_threshold', None)
if sl_threshold is not None:
parts.append(f"SL-gte{sl_threshold}")
# Append voice ranking filter if set
vr_filter = getattr(self, 'voice_ranking_filter', None)
if vr_filter is not None:
parts.append(f"VR-{vr_filter}")
if not parts: if not parts:
return "All_Respondents" return "All_Respondents"
@@ -182,6 +192,20 @@ class QualtricsPlotsMixin:
sample_size = self._get_filtered_sample_size() sample_size = self._get_filtered_sample_size()
sample_prefix = f"Sample size: {sample_size}" if sample_size is not None else "" sample_prefix = f"Sample size: {sample_size}" if sample_size is not None else ""
# Append straight-liner threshold if set
sl_threshold = getattr(self, 'sl_threshold', None)
if sl_threshold is not None:
parts.append(f"STRAIGHT-LINER EXCL: ≥{sl_threshold} question groups")
# Append voice ranking filter if set
vr_filter = getattr(self, 'voice_ranking_filter', None)
if vr_filter is not None:
vr_labels = {
'only-missing': 'ONLY respondents missing voice ranking (QID98)',
'exclude-missing': 'EXCLUDING respondents missing voice ranking (QID98)',
}
parts.append(f"VOICE RANKING: {vr_labels.get(vr_filter, vr_filter)}")
if not parts: if not parts:
# No filters active - return just sample size (or empty string if no sample size) # No filters active - return just sample size (or empty string if no sample size)
return sample_prefix return sample_prefix
@@ -253,7 +277,7 @@ class QualtricsPlotsMixin:
return chart.properties(title=title_config) return chart.properties(title=title_config)
def _save_plot(self, chart: alt.Chart, title: str, filename: str | None = None) -> alt.Chart: def _save_plot(self, chart: alt.Chart, title: str, filename: str | None = None, skip_footnote: bool = False) -> alt.Chart:
"""Save chart to PNG file if fig_save_dir is set. """Save chart to PNG file if fig_save_dir is set.
Args: Args:
@@ -261,10 +285,13 @@ class QualtricsPlotsMixin:
title: Chart title (used for filename if filename not provided) title: Chart title (used for filename if filename not provided)
filename: Optional explicit filename (without extension). If provided, filename: Optional explicit filename (without extension). If provided,
this is used instead of deriving from title. this is used instead of deriving from title.
skip_footnote: If True, skip adding filter footnote (use when footnote
was already added to a sub-chart before vconcat).
Returns the (potentially modified) chart with filter footnote added. Returns the (potentially modified) chart with filter footnote added.
""" """
# Add filter footnote - returns combined chart if filters active # Add filter footnote - returns combined chart if filters active
if not skip_footnote:
chart = self._add_filter_footnote(chart) chart = self._add_filter_footnote(chart)
if hasattr(self, 'fig_save_dir') and self.fig_save_dir: if hasattr(self, 'fig_save_dir') and self.fig_save_dir:
@@ -1112,6 +1139,7 @@ class QualtricsPlotsMixin:
title: str = "Speaking Style Trait Analysis", title: str = "Speaking Style Trait Analysis",
height: int | None = None, height: int | None = None,
width: int | str | None = None, width: int | str | None = None,
color_gender: bool = False,
) -> alt.Chart: ) -> alt.Chart:
"""Plot scores for a single speaking style trait across multiple voices.""" """Plot scores for a single speaking style trait across multiple voices."""
df = self._ensure_dataframe(data) df = self._ensure_dataframe(data)
@@ -1153,6 +1181,41 @@ class QualtricsPlotsMixin:
else: else:
trait_description = "" trait_description = ""
if color_gender:
stats['gender'] = stats['Voice'].apply(self._get_voice_gender)
bars = alt.Chart(stats).mark_bar().encode(
x=alt.X('mean_score:Q', title='Average Score (1-5)', scale=alt.Scale(domain=[1, 5]), axis=alt.Axis(grid=True)),
x2=alt.datum(1), # Bars start at x=1 (left edge of domain)
y=alt.Y('Voice:N', title='Voice', sort='-x', axis=alt.Axis(grid=False)),
color=alt.Color('gender:N',
scale=alt.Scale(domain=['Male', 'Female'],
range=[ColorPalette.GENDER_MALE, ColorPalette.GENDER_FEMALE]),
legend=alt.Legend(orient='top', direction='horizontal', title='Gender')),
tooltip=[
alt.Tooltip('Voice:N'),
alt.Tooltip('mean_score:Q', title='Average', format='.2f'),
alt.Tooltip('count:Q', title='Count'),
alt.Tooltip('gender:N', title='Gender')
]
)
text = alt.Chart(stats).mark_text(
align='left',
baseline='middle',
dx=5,
fontSize=12
).encode(
x='mean_score:Q',
y=alt.Y('Voice:N', sort='-x'),
text='count:Q',
color=alt.condition(
alt.datum.gender == 'Female',
alt.value(ColorPalette.GENDER_FEMALE),
alt.value(ColorPalette.GENDER_MALE)
)
)
else:
# Horizontal bar chart - use x2 to explicitly start bars at x=1 # Horizontal bar chart - use x2 to explicitly start bars at x=1
bars = alt.Chart(stats).mark_bar(color=ColorPalette.PRIMARY).encode( bars = alt.Chart(stats).mark_bar(color=ColorPalette.PRIMARY).encode(
x=alt.X('mean_score:Q', title='Average Score (1-5)', scale=alt.Scale(domain=[1, 5]), axis=alt.Axis(grid=True)), x=alt.X('mean_score:Q', title='Average Score (1-5)', scale=alt.Scale(domain=[1, 5]), axis=alt.Axis(grid=True)),
@@ -1165,13 +1228,13 @@ class QualtricsPlotsMixin:
] ]
) )
# Count text at end of bars (right-aligned inside bar) # Count text at end of bars
text = alt.Chart(stats).mark_text( text = alt.Chart(stats).mark_text(
align='right', align='left',
baseline='middle', baseline='middle',
color='white', color='black',
fontSize=12, fontSize=12,
dx=-5 # Slight padding from bar end dx=5
).encode( ).encode(
x='mean_score:Q', x='mean_score:Q',
y=alt.Y('Voice:N', sort='-x'), y=alt.Y('Voice:N', sort='-x'),
@@ -1182,7 +1245,7 @@ class QualtricsPlotsMixin:
chart = (bars + text).properties( chart = (bars + text).properties(
title={ title={
"text": self._process_title(title), "text": self._process_title(title),
"subtitle": [trait_description, "(Numbers on bars indicate respondent count)"] "subtitle": [trait_description, "(Numbers near bars indicate respondent count)"]
}, },
width=width or 800, width=width or 800,
height=height or getattr(self, 'plot_height', 400) height=height or getattr(self, 'plot_height', 400)
@@ -1191,6 +1254,101 @@ class QualtricsPlotsMixin:
chart = self._save_plot(chart, title) chart = self._save_plot(chart, title)
return chart return chart
def plot_speaking_style_trait_scores_comparison(
self,
data_all: pl.LazyFrame | pl.DataFrame,
data_clean: pl.LazyFrame | pl.DataFrame,
trait_description: str = None,
title: str = "Speaking Style Trait Analysis (Comparison)",
height: int | None = None,
width: int | str | None = None,
) -> alt.Chart:
"""Plot scores comparing All Respondents vs Cleaned data (excl. straight-liners) in grouped bars."""
# Helper to process each dataframe
def get_stats(d, group_label):
df = self._ensure_dataframe(d)
if df.is_empty(): return None
return (
df.filter(pl.col("score").is_not_null())
.group_by("Voice")
.agg([
pl.col("score").mean().alias("mean_score"),
pl.col("score").count().alias("count")
])
.with_columns(pl.lit(group_label).alias("dataset"))
.to_pandas()
)
stats_all = get_stats(data_all, "All Respondents")
stats_clean = get_stats(data_clean, "Excl. Straight-Liners")
if stats_all is None or stats_clean is None:
return alt.Chart(pd.DataFrame({'text': ['No data']})).mark_text().encode(text='text:N')
# Combine
stats = pd.concat([stats_all, stats_clean])
# Determine sort order using "All Respondents" data (Desc)
sort_order = stats_all.sort_values('mean_score', ascending=False)['Voice'].tolist()
# Add gender and combined category for color
stats['gender'] = stats['Voice'].apply(self._get_voice_gender)
stats['color_group'] = stats.apply(
lambda x: f"{x['gender']} - {x['dataset']}", axis=1
)
# Define Color Scale
domain = [
'Male - All Respondents', 'Male - Excl. Straight-Liners',
'Female - All Respondents', 'Female - Excl. Straight-Liners'
]
range_colors = [
ColorPalette.GENDER_MALE, ColorPalette.GENDER_MALE_RANK_3,
ColorPalette.GENDER_FEMALE, ColorPalette.GENDER_FEMALE_RANK_3
]
# Base chart
base = alt.Chart(stats).encode(
y=alt.Y('Voice:N', title='Voice', sort=sort_order, axis=alt.Axis(grid=False)),
)
bars = base.mark_bar().encode(
x=alt.X('mean_score:Q', title='Average Score (1-5)', scale=alt.Scale(domain=[1, 5]), axis=alt.Axis(grid=True)),
x2=alt.datum(1),
yOffset=alt.YOffset('dataset:N', sort=['All Respondents', 'Excl. Straight-Liners']),
color=alt.Color('color_group:N',
scale=alt.Scale(domain=domain, range=range_colors),
legend=alt.Legend(title='Dataset', orient='top', columns=2)),
tooltip=[
alt.Tooltip('Voice:N'),
alt.Tooltip('dataset:N', title='Dataset'),
alt.Tooltip('mean_score:Q', title='Average', format='.2f'),
alt.Tooltip('count:Q', title='Count'),
alt.Tooltip('gender:N', title='Gender')
]
)
text = base.mark_text(align='left', baseline='middle', dx=5, fontSize=9).encode(
x=alt.X('mean_score:Q'),
yOffset=alt.YOffset('dataset:N', sort=['All Respondents', 'Excl. Straight-Liners']),
text=alt.Text('count:Q'),
color=alt.Color('color_group:N', scale=alt.Scale(domain=domain, range=range_colors), legend=None)
)
chart = (bars + text).properties(
title={
"text": self._process_title(title),
"subtitle": [trait_description if trait_description else "", "(Lighter shade = Straight-liners removed)"]
},
width=width or 800,
height=height or getattr(self, 'plot_height', 600)
)
chart = self._save_plot(chart, title)
return chart
def plot_speaking_style_scale_correlation( def plot_speaking_style_scale_correlation(
self, self,
style_color: str, style_color: str,
@@ -1256,6 +1414,243 @@ class QualtricsPlotsMixin:
chart = self._save_plot(chart, title, filename=filename) chart = self._save_plot(chart, title, filename=filename)
return chart return chart
def _create_gender_correlation_legend(self) -> alt.Chart:
"""Create a custom legend for gender correlation plots with dual-color swatches.
Horizontal layout below the chart:
[■][■] Male [■][■] Female
"""
# Horizontal layout: Male at x=0-2, Female at x=5-7 (gap for whitespace)
legend_data = pd.DataFrame([
{"x": 0, "color": ColorPalette.CORR_MALE_POSITIVE},
{"x": 1, "color": ColorPalette.CORR_MALE_NEGATIVE},
{"x": 5, "color": ColorPalette.CORR_FEMALE_POSITIVE},
{"x": 6, "color": ColorPalette.CORR_FEMALE_NEGATIVE},
])
# Color blocks
blocks = alt.Chart(legend_data).mark_rect(width=12, height=12).encode(
x=alt.X('x:Q', axis=None, scale=alt.Scale(domain=[0, 9])),
y=alt.value(6),
color=alt.Color('color:N', scale=None),
)
# Labels positioned after each pair of blocks
label_data = pd.DataFrame([
{"x": 2.3, "label": "Male"},
{"x": 7.3, "label": "Female"},
])
labels = alt.Chart(label_data).mark_text(align='left', baseline='middle', fontSize=11).encode(
x=alt.X('x:Q', axis=None, scale=alt.Scale(domain=[0, 9])),
y=alt.value(6),
text='label:N'
)
legend = (blocks + labels).properties(width=200, height=20)
return legend
def plot_speaking_style_scale_correlation_by_gender(
self,
style_color: str,
style_traits: list[str],
data_male: pl.LazyFrame | pl.DataFrame,
data_female: pl.LazyFrame | pl.DataFrame,
title: str | None = None,
filename: str | None = None,
width: int | str | None = None,
height: int | None = None,
) -> alt.Chart:
"""Plots correlation between Speaking Style Trait Scores and Voice Scale,
with grouped bars comparing male vs female voices.
Args:
style_color: The speaking style color (e.g., "Green", "Blue")
style_traits: List of traits for this style
data_male: DataFrame filtered to male voices only
data_female: DataFrame filtered to female voices only
title: Chart title
filename: Optional explicit filename for saving
width: Chart width in pixels
height: Chart height in pixels
Returns:
Altair chart with grouped bars (male/female) per trait
"""
df_male = self._ensure_dataframe(data_male)
df_female = self._ensure_dataframe(data_female)
if title is None:
title = f"Speaking style {style_color} and voice scale 1-10 correlations (by Voice Gender)"
trait_correlations = []
for i, trait in enumerate(style_traits):
trait_display = trait.replace('|', '\n')
# Male correlation
subset_m = df_male.filter(pl.col("Right_Anchor") == trait)
valid_m = subset_m.select(["score", "Voice_Scale_Score"]).drop_nulls()
if valid_m.height > 1:
corr_m = valid_m.select(pl.corr("score", "Voice_Scale_Score")).item()
corr_val = corr_m if corr_m is not None else 0.0
trait_correlations.append({
"trait_display": trait_display,
"Gender": "Male",
"correlation": corr_val,
"color_key": "Male_Pos" if corr_val >= 0 else "Male_Neg"
})
# Female correlation
subset_f = df_female.filter(pl.col("Right_Anchor") == trait)
valid_f = subset_f.select(["score", "Voice_Scale_Score"]).drop_nulls()
if valid_f.height > 1:
corr_f = valid_f.select(pl.corr("score", "Voice_Scale_Score")).item()
corr_val = corr_f if corr_f is not None else 0.0
trait_correlations.append({
"trait_display": trait_display,
"Gender": "Female",
"correlation": corr_val,
"color_key": "Female_Pos" if corr_val >= 0 else "Female_Neg"
})
if not trait_correlations:
return alt.Chart(pd.DataFrame({'text': [f"No data for {style_color} Style"]})).mark_text().encode(text='text:N')
plot_df = pl.DataFrame(trait_correlations).to_pandas()
main_chart = alt.Chart(plot_df).mark_bar().encode(
x=alt.X('trait_display:N', title=None, axis=alt.Axis(labelAngle=0, grid=False)),
xOffset='Gender:N',
y=alt.Y('correlation:Q', title='Correlation', scale=alt.Scale(domain=[-1, 1]), axis=alt.Axis(grid=True)),
color=alt.Color('color_key:N',
scale=alt.Scale(
domain=['Male_Pos', 'Female_Pos', 'Male_Neg', 'Female_Neg'],
range=[ColorPalette.CORR_MALE_POSITIVE, ColorPalette.CORR_FEMALE_POSITIVE,
ColorPalette.CORR_MALE_NEGATIVE, ColorPalette.CORR_FEMALE_NEGATIVE]
),
legend=None),
tooltip=[
alt.Tooltip('trait_display:N', title='Trait'),
alt.Tooltip('Gender:N'),
alt.Tooltip('correlation:Q', format='.3f')
]
).properties(
title=self._process_title(title),
width=width or 800,
height=height or 350
)
# Add filter footnote to main chart before combining with legend
main_chart = self._add_filter_footnote(main_chart)
# Add custom legend below the chart
legend = self._create_gender_correlation_legend()
chart = alt.vconcat(main_chart, legend, spacing=10).resolve_scale(color='independent')
chart = self._save_plot(chart, title, filename=filename, skip_footnote=True)
return chart
def plot_speaking_style_ranking_correlation_by_gender(
self,
style_color: str,
style_traits: list[str],
data_male: pl.LazyFrame | pl.DataFrame,
data_female: pl.LazyFrame | pl.DataFrame,
title: str | None = None,
filename: str | None = None,
width: int | str | None = None,
height: int | None = None,
) -> alt.Chart:
"""Plots correlation between Speaking Style Trait Scores and Voice Ranking Points,
with grouped bars comparing male vs female voices.
Args:
style_color: The speaking style color (e.g., "Green", "Blue")
style_traits: List of traits for this style
data_male: DataFrame filtered to male voices only
data_female: DataFrame filtered to female voices only
title: Chart title
filename: Optional explicit filename for saving
width: Chart width in pixels
height: Chart height in pixels
Returns:
Altair chart with grouped bars (male/female) per trait
"""
df_male = self._ensure_dataframe(data_male)
df_female = self._ensure_dataframe(data_female)
if title is None:
title = f"Speaking style {style_color} and voice ranking points correlations (by Voice Gender)"
trait_correlations = []
for i, trait in enumerate(style_traits):
trait_display = trait.replace('|', '\n')
# Male correlation
subset_m = df_male.filter(pl.col("Right_Anchor") == trait)
valid_m = subset_m.select(["score", "Ranking_Points"]).drop_nulls()
if valid_m.height > 1:
corr_m = valid_m.select(pl.corr("score", "Ranking_Points")).item()
corr_val = corr_m if corr_m is not None else 0.0
trait_correlations.append({
"trait_display": trait_display,
"Gender": "Male",
"correlation": corr_val,
"color_key": "Male_Pos" if corr_val >= 0 else "Male_Neg"
})
# Female correlation
subset_f = df_female.filter(pl.col("Right_Anchor") == trait)
valid_f = subset_f.select(["score", "Ranking_Points"]).drop_nulls()
if valid_f.height > 1:
corr_f = valid_f.select(pl.corr("score", "Ranking_Points")).item()
corr_val = corr_f if corr_f is not None else 0.0
trait_correlations.append({
"trait_display": trait_display,
"Gender": "Female",
"correlation": corr_val,
"color_key": "Female_Pos" if corr_val >= 0 else "Female_Neg"
})
if not trait_correlations:
return alt.Chart(pd.DataFrame({'text': [f"No data for {style_color} Style"]})).mark_text().encode(text='text:N')
plot_df = pl.DataFrame(trait_correlations).to_pandas()
main_chart = alt.Chart(plot_df).mark_bar().encode(
x=alt.X('trait_display:N', title='Speaking Style Trait', axis=alt.Axis(labelAngle=0, grid=False)),
xOffset='Gender:N',
y=alt.Y('correlation:Q', title='Correlation', scale=alt.Scale(domain=[-1, 1]), axis=alt.Axis(grid=True)),
color=alt.Color('color_key:N',
scale=alt.Scale(
domain=['Male_Pos', 'Female_Pos', 'Male_Neg', 'Female_Neg'],
range=[ColorPalette.CORR_MALE_POSITIVE, ColorPalette.CORR_FEMALE_POSITIVE,
ColorPalette.CORR_MALE_NEGATIVE, ColorPalette.CORR_FEMALE_NEGATIVE]
),
legend=None),
tooltip=[
alt.Tooltip('trait_display:N', title='Trait'),
alt.Tooltip('Gender:N'),
alt.Tooltip('correlation:Q', format='.3f')
]
).properties(
title=self._process_title(title),
width=width or 800,
height=height or 350
)
# Add filter footnote to main chart before combining with legend
main_chart = self._add_filter_footnote(main_chart)
# Add custom legend below the chart
legend = self._create_gender_correlation_legend()
chart = alt.vconcat(main_chart, legend, spacing=10).resolve_scale(color='independent')
chart = self._save_plot(chart, title, filename=filename, skip_footnote=True)
return chart
def plot_speaking_style_color_correlation( def plot_speaking_style_color_correlation(
self, self,
data: pl.LazyFrame | pl.DataFrame | None = None, data: pl.LazyFrame | pl.DataFrame | None = None,
@@ -1313,6 +1708,101 @@ class QualtricsPlotsMixin:
chart = self._save_plot(chart, title, filename=filename) chart = self._save_plot(chart, title, filename=filename)
return chart return chart
def plot_speaking_style_color_correlation_by_gender(
self,
data_male: pl.LazyFrame | pl.DataFrame,
data_female: pl.LazyFrame | pl.DataFrame,
speaking_styles: dict[str, list[str]],
target_column: str = "Voice_Scale_Score",
title: str = "Speaking Style Colors Correlation (by Voice Gender)",
filename: str | None = None,
width: int | str | None = None,
height: int | None = None,
) -> alt.Chart:
"""Plot correlation by speaking style color with grouped bars for male vs female voices.
Args:
data_male: DataFrame filtered to male voices only
data_female: DataFrame filtered to female voices only
speaking_styles: Dictionary mapping color names to their constituent traits
target_column: The column to correlate against ("Voice_Scale_Score" or "Ranking_Points")
title: Chart title
filename: Optional explicit filename for saving
width: Chart width in pixels
height: Chart height in pixels
Returns:
Altair chart with grouped bars (male/female) per color
"""
import utils
df_male = self._ensure_dataframe(data_male)
df_female = self._ensure_dataframe(data_female)
# Get correlations for each gender
color_corr_male, _ = utils.transform_speaking_style_color_correlation(
df_male, speaking_styles, target_column=target_column
)
color_corr_female, _ = utils.transform_speaking_style_color_correlation(
df_female, speaking_styles, target_column=target_column
)
# Add gender column and color_key based on correlation sign
color_corr_male = color_corr_male.with_columns([
pl.lit("Male").alias("Gender"),
pl.when(pl.col("correlation") >= 0)
.then(pl.lit("Male_Pos"))
.otherwise(pl.lit("Male_Neg"))
.alias("color_key")
])
color_corr_female = color_corr_female.with_columns([
pl.lit("Female").alias("Gender"),
pl.when(pl.col("correlation") >= 0)
.then(pl.lit("Female_Pos"))
.otherwise(pl.lit("Female_Neg"))
.alias("color_key")
])
combined = pl.concat([color_corr_male, color_corr_female])
main_chart = alt.Chart(combined.to_pandas()).mark_bar().encode(
x=alt.X('Color:N',
title='Speaking Style Color',
axis=alt.Axis(labelAngle=0, grid=False),
sort=["Green", "Blue", "Orange", "Red"]),
xOffset='Gender:N',
y=alt.Y('correlation:Q',
title='Average Correlation',
scale=alt.Scale(domain=[-1, 1]),
axis=alt.Axis(grid=True)),
color=alt.Color('color_key:N',
scale=alt.Scale(
domain=['Male_Pos', 'Female_Pos', 'Male_Neg', 'Female_Neg'],
range=[ColorPalette.CORR_MALE_POSITIVE, ColorPalette.CORR_FEMALE_POSITIVE,
ColorPalette.CORR_MALE_NEGATIVE, ColorPalette.CORR_FEMALE_NEGATIVE]
),
legend=None),
tooltip=[
alt.Tooltip('Color:N', title='Speaking Style'),
alt.Tooltip('Gender:N'),
alt.Tooltip('correlation:Q', format='.3f', title='Avg Correlation'),
alt.Tooltip('n_traits:Q', title='# Traits')
]
).properties(
title=self._process_title(title),
width=width or 400,
height=height or 350
)
# Add filter footnote to main chart before combining with legend
main_chart = self._add_filter_footnote(main_chart)
# Add custom legend below the chart
legend = self._create_gender_correlation_legend()
chart = alt.vconcat(main_chart, legend, spacing=10).resolve_scale(color='independent')
chart = self._save_plot(chart, title, filename=filename, skip_footnote=True)
return chart
def plot_demographic_distribution( def plot_demographic_distribution(
self, self,
column: str, column: str,
@@ -1973,9 +2463,9 @@ class QualtricsPlotsMixin:
# Base heatmap # Base heatmap
heatmap = alt.Chart(heatmap_df).mark_rect(stroke='white', strokeWidth=1).encode( heatmap = alt.Chart(heatmap_df).mark_rect(stroke='white', strokeWidth=1).encode(
x=alt.X('col:N', title=None, sort=all_groups, x=alt.X('col:N', title=None, sort=all_groups,
axis=alt.Axis(labelAngle=-45, labelLimit=150)), axis=alt.Axis(labelAngle=-45, labelLimit=150, grid=False)),
y=alt.Y('row:N', title=None, sort=all_groups, y=alt.Y('row:N', title=None, sort=all_groups,
axis=alt.Axis(labelLimit=150)), axis=alt.Axis(labelLimit=150, grid=False)),
color=alt.Color('sig_category:N', color=alt.Color('sig_category:N',
scale=alt.Scale(domain=sig_domain, range=sig_range), scale=alt.Scale(domain=sig_domain, range=sig_range),
legend=alt.Legend( legend=alt.Legend(
@@ -2162,3 +2652,328 @@ class QualtricsPlotsMixin:
chart = self._save_plot(chart, title) chart = self._save_plot(chart, title)
return chart return chart
def plot_straight_liner_repeat_offenders(
self,
cumulative_df: pl.DataFrame | pd.DataFrame,
title: str = "Straight-Liner Repeat Offenders\n(Cumulative Distribution)",
height: int | None = None,
width: int | str | None = None,
total_respondents: int | None = None,
) -> alt.Chart:
"""Plot the cumulative distribution of straight-liner repeat offenders.
Shows how many respondents straight-lined at N or more question
groups, for every observed threshold.
Parameters:
cumulative_df: DataFrame with columns ``threshold`` (int),
``count`` (int) and ``pct`` (float, 0-100). Each row
represents "≥ threshold question groups".
title: Chart title.
height: Chart height in pixels.
width: Chart width in pixels.
total_respondents: If provided, shown in the subtitle for
context.
Returns:
The Altair chart object (already saved if ``fig_save_dir``
is configured).
"""
if isinstance(cumulative_df, pl.DataFrame):
plot_df = cumulative_df.to_pandas()
else:
plot_df = cumulative_df.copy()
# Build readable x-axis labels ("≥1", "≥2", …)
plot_df["label"] = plot_df["threshold"].apply(lambda t: f"{t}")
# Explicit sort order so Altair keeps ascending threshold
sort_order = plot_df.sort_values("threshold")["label"].tolist()
# --- Bars: respondent count ---
bars = alt.Chart(plot_df).mark_bar(
color=ColorPalette.PRIMARY
).encode(
x=alt.X(
"label:N",
title="Number of Straight-Lined Question Groups",
sort=sort_order,
axis=alt.Axis(grid=False),
),
y=alt.Y(
"count:Q",
title="Number of Respondents",
axis=alt.Axis(grid=True),
),
tooltip=[
alt.Tooltip("label:N", title="Threshold"),
alt.Tooltip("count:Q", title="Respondents"),
alt.Tooltip("pct:Q", title="% of Total", format=".1f"),
],
)
# --- Text: count + percentage above each bar ---
text = alt.Chart(plot_df).mark_text(
dy=-10, color="black", fontSize=11
).encode(
x=alt.X("label:N", sort=sort_order),
y=alt.Y("count:Q"),
text=alt.Text("count_label:N"),
)
# Build a combined label column "N (xx.x%)"
plot_df["count_label"] = plot_df.apply(
lambda r: f"{int(r['count'])} ({r['pct']:.1f}%)", axis=1
)
# Rebuild text layer with the updated df
text = alt.Chart(plot_df).mark_text(
dy=-10, color="black", fontSize=11
).encode(
x=alt.X("label:N", sort=sort_order),
y=alt.Y("count:Q"),
text=alt.Text("count_label:N"),
)
# --- Subtitle ---
subtitle_parts = []
if total_respondents is not None:
subtitle_parts.append(
f"Total respondents: {total_respondents}"
)
subtitle_parts.append(
"Each bar shows how many respondents straight-lined "
"at least that many question groups"
)
subtitle = " | ".join(subtitle_parts)
title_config = {
"text": self._process_title(title),
"subtitle": subtitle,
"subtitleColor": "gray",
"subtitleFontSize": 10,
"anchor": "start",
}
chart = alt.layer(bars, text).properties(
title=title_config,
width=width or 800,
height=height or getattr(self, "plot_height", 400),
)
chart = self._save_plot(chart, title)
return chart
def plot_straight_liner_per_question(
self,
per_question_df: pl.DataFrame | pd.DataFrame,
title: str = "Straight-Lining Frequency per Question Group",
height: int | None = None,
width: int | str | None = None,
total_respondents: int | None = None,
) -> alt.Chart:
"""Plot how often each question group is straight-lined.
Parameters:
per_question_df: DataFrame with columns ``question`` (str,
human-readable name), ``count`` (int) and ``pct``
(float, 0-100). Sorted descending by count.
title: Chart title.
height: Chart height in pixels.
width: Chart width in pixels.
total_respondents: Shown in subtitle for context.
Returns:
The Altair chart (saved if ``fig_save_dir`` is set).
"""
if isinstance(per_question_df, pl.DataFrame):
plot_df = per_question_df.to_pandas()
else:
plot_df = per_question_df.copy()
# Sort order: largest count at top. Altair y-axis nominal sort places
# the first list element at the top, so descending order is correct.
sort_order = plot_df.sort_values("count", ascending=False)["question"].tolist()
# Combined label "N (xx.x%)"
plot_df["count_label"] = plot_df.apply(
lambda r: f"{int(r['count'])} ({r['pct']:.1f}%)", axis=1
)
# --- Horizontal Bars ---
bars = alt.Chart(plot_df).mark_bar(
color=ColorPalette.PRIMARY,
).encode(
y=alt.Y(
"question:N",
title=None,
sort=sort_order,
axis=alt.Axis(grid=False, labelLimit=250, labelAngle=0),
),
x=alt.X(
"count:Q",
title="Number of Straight-Liners",
axis=alt.Axis(grid=True),
),
tooltip=[
alt.Tooltip("question:N", title="Question"),
alt.Tooltip("count:Q", title="Straight-Liners"),
alt.Tooltip("pct:Q", title="% of Respondents", format=".1f"),
],
)
# --- Text labels to the right of bars ---
text = alt.Chart(plot_df).mark_text(
align="left", dx=4, color="black", fontSize=10,
).encode(
y=alt.Y("question:N", sort=sort_order),
x=alt.X("count:Q"),
text=alt.Text("count_label:N"),
)
# --- Subtitle ---
subtitle_parts = []
if total_respondents is not None:
subtitle_parts.append(f"Total respondents: {total_respondents}")
subtitle_parts.append(
"Count and share of respondents who straight-lined each question group"
)
subtitle = " | ".join(subtitle_parts)
title_config = {
"text": self._process_title(title),
"subtitle": subtitle,
"subtitleColor": "gray",
"subtitleFontSize": 10,
"anchor": "start",
}
# Scale height with number of questions for readable bar spacing
n_questions = len(plot_df)
auto_height = max(400, n_questions * 22)
chart = alt.layer(bars, text).properties(
title=title_config,
width=width or 700,
height=height or auto_height,
)
chart = self._save_plot(chart, title)
return chart
def plot_speech_attribute_correlation(
self,
corr_df: pl.DataFrame | pd.DataFrame,
title: str = "Speech Attributes vs Survey Metrics<br>Pearson Correlation",
filename: str | None = None,
height: int | None = None,
width: int | None = None,
show_values: bool = True,
color_scheme: str | None = None,
) -> alt.Chart:
"""Plot a correlation heatmap between speech attributes and survey metrics.
Expects a long-form DataFrame with columns:
- metric: row label (e.g. "Weighted Rank", "Avg Voice Score")
- attribute: column label (speech characteristic name)
- correlation: Pearson r value
Args:
corr_df: Long-form correlation DataFrame.
title: Chart title (supports <br> for line breaks).
filename: Optional explicit filename (without extension).
height: Chart height in pixels.
width: Chart width in pixels.
show_values: Whether to display correlation values as text.
color_scheme: Optional Altair diverging color scheme name.
Returns:
alt.Chart: Altair heatmap chart.
"""
if isinstance(corr_df, pl.DataFrame):
plot_df = corr_df.to_pandas()
else:
plot_df = corr_df
attributes = plot_df["attribute"].unique().tolist()
metrics = plot_df["metric"].unique().tolist()
n_attrs = len(attributes)
chart_width = width or max(600, n_attrs * 55)
chart_height = height or max(120, len(metrics) * 50 + 60)
heatmap = (
alt.Chart(plot_df)
.mark_rect(stroke="white", strokeWidth=1)
.encode(
x=alt.X(
"attribute:N",
title=None,
sort=attributes,
axis=alt.Axis(labelAngle=-45, labelLimit=180, grid=False),
),
y=alt.Y(
"metric:N",
title=None,
sort=metrics,
axis=alt.Axis(labelLimit=200, grid=False),
),
color=alt.Color(
"correlation:Q",
scale=alt.Scale(
domain=[-1, 1],
scheme=color_scheme or "redblue",
),
legend=alt.Legend(title="Pearson r"),
),
tooltip=[
alt.Tooltip("metric:N", title="Metric"),
alt.Tooltip("attribute:N", title="Attribute"),
alt.Tooltip("correlation:Q", title="r", format=".3f"),
],
)
)
if show_values:
# Split into two text layers with fixed mark colors to avoid
# conflicting color encodings that break vl_convert PNG export.
dark_rows = plot_df[plot_df["correlation"].abs() <= 0.45]
light_rows = plot_df[plot_df["correlation"].abs() > 0.45]
text_layers = []
if not dark_rows.empty:
text_layers.append(
alt.Chart(dark_rows)
.mark_text(fontSize=11, fontWeight="normal", color="black")
.encode(
x=alt.X("attribute:N", sort=attributes),
y=alt.Y("metric:N", sort=metrics),
text=alt.Text("correlation:Q", format=".2f"),
)
)
if not light_rows.empty:
text_layers.append(
alt.Chart(light_rows)
.mark_text(fontSize=11, fontWeight="normal", color="white")
.encode(
x=alt.X("attribute:N", sort=attributes),
y=alt.Y("metric:N", sort=metrics),
text=alt.Text("correlation:Q", format=".2f"),
)
)
chart = heatmap
for tl in text_layers:
chart = chart + tl
else:
chart = heatmap
chart = chart.properties(
title=self._process_title(title),
width=chart_width,
height=chart_height,
)
chart = self._save_plot(chart, title, filename=filename)
return chart

View File

@@ -0,0 +1,3 @@
- V46 not in scale 1-10. Qualtrics
- Straightliners
- V45 goed in qual maar slecht in quant

View File

@@ -179,10 +179,25 @@ def get_filter_combinations(survey: QualtricsSurvey, category: str = None) -> li
'filters': {'industry': [industry]} 'filters': {'industry': [industry]}
}) })
# Voice ranking completeness filter
# These use a special flag rather than demographic filters, so we store
# the mode in a dedicated key that run_report passes as --voice-ranking-filter.
if not category or category in ['all_filters', 'voice_ranking']:
combinations.append({
'name': 'VoiceRanking-OnlyMissing',
'filters': {},
'voice_ranking_filter': 'only-missing',
})
combinations.append({
'name': 'VoiceRanking-ExcludeMissing',
'filters': {},
'voice_ranking_filter': 'exclude-missing',
})
return combinations return combinations
def run_report(filters: dict, name: str = None, dry_run: bool = False) -> bool: def run_report(filters: dict, name: str = None, dry_run: bool = False, sl_threshold: int = None, voice_ranking_filter: str = None) -> bool:
""" """
Run the report script with given filters. Run the report script with given filters.
@@ -190,6 +205,10 @@ def run_report(filters: dict, name: str = None, dry_run: bool = False) -> bool:
filters: Dict of filter_name -> list of values filters: Dict of filter_name -> list of values
name: Name for this filter combination (used for .txt description file) name: Name for this filter combination (used for .txt description file)
dry_run: If True, just print command without running dry_run: If True, just print command without running
sl_threshold: If set, exclude respondents with >= N straight-lined question groups
voice_ranking_filter: If set, filter by voice ranking completeness.
'only-missing' keeps only respondents missing QID98 data,
'exclude-missing' removes them.
Returns: Returns:
True if successful, False otherwise True if successful, False otherwise
@@ -200,6 +219,14 @@ def run_report(filters: dict, name: str = None, dry_run: bool = False) -> bool:
if name: if name:
cmd.extend(['--filter-name', name]) cmd.extend(['--filter-name', name])
# Pass straight-liner threshold if specified
if sl_threshold is not None:
cmd.extend(['--sl-threshold', str(sl_threshold)])
# Pass voice ranking filter if specified
if voice_ranking_filter is not None:
cmd.extend(['--voice-ranking-filter', voice_ranking_filter])
for filter_name, values in filters.items(): for filter_name, values in filters.items():
if values: if values:
cmd.extend([f'--{filter_name}', json.dumps(values)]) cmd.extend([f'--{filter_name}', json.dumps(values)])
@@ -230,10 +257,11 @@ def main():
parser.add_argument('--dry-run', action='store_true', help='Preview combinations without running') parser.add_argument('--dry-run', action='store_true', help='Preview combinations without running')
parser.add_argument( parser.add_argument(
'--category', '--category',
choices=['all_filters', 'all', 'age', 'gender', 'ethnicity', 'income', 'consumer', 'business_owner', 'ai_user', 'investable_assets', 'industry'], choices=['all_filters', 'all', 'age', 'gender', 'ethnicity', 'income', 'consumer', 'business_owner', 'ai_user', 'investable_assets', 'industry', 'voice_ranking'],
default='all_filters', default='all_filters',
help='Filter category to run combinations for (default: all_filters)' help='Filter category to run combinations for (default: all_filters)'
) )
parser.add_argument('--sl-threshold', type=int, default=None, help='Exclude respondents who straight-lined >= N question groups (passed to report script)')
args = parser.parse_args() args = parser.parse_args()
# Load survey to get available filter options # Load survey to get available filter options
@@ -246,11 +274,14 @@ def main():
category_desc = f" for category '{args.category}'" if args.category != 'all' else '' category_desc = f" for category '{args.category}'" if args.category != 'all' else ''
print(f"Generated {len(combinations)} filter combinations{category_desc}") print(f"Generated {len(combinations)} filter combinations{category_desc}")
if args.sl_threshold is not None:
print(f"Straight-liner threshold: excluding respondents with ≥{args.sl_threshold} straight-lined question groups")
if args.dry_run: if args.dry_run:
print("\nDRY RUN - Commands that would be executed:") print("\nDRY RUN - Commands that would be executed:")
for combo in combinations: for combo in combinations:
print(f"\n{combo['name']}:") print(f"\n{combo['name']}:")
run_report(combo['filters'], name=combo['name'], dry_run=True) run_report(combo['filters'], name=combo['name'], dry_run=True, sl_threshold=args.sl_threshold, voice_ranking_filter=combo.get('voice_ranking_filter'))
return return
# Run each combination with progress bar # Run each combination with progress bar
@@ -259,7 +290,7 @@ def main():
for combo in tqdm(combinations, desc="Running reports", unit="filter"): for combo in tqdm(combinations, desc="Running reports", unit="filter"):
tqdm.write(f"Running: {combo['name']}") tqdm.write(f"Running: {combo['name']}")
if run_report(combo['filters'], name=combo['name']): if run_report(combo['filters'], name=combo['name'], sl_threshold=args.sl_threshold, voice_ranking_filter=combo.get('voice_ranking_filter')):
successful += 1 successful += 1
else: else:
failed.append(combo['name']) failed.append(combo['name'])

File diff suppressed because one or more lines are too long

View File

@@ -77,6 +77,13 @@ class ColorPalette:
GENDER_MALE_NEUTRAL = "#B8C9D9" # Grey-Blue GENDER_MALE_NEUTRAL = "#B8C9D9" # Grey-Blue
GENDER_FEMALE_NEUTRAL = "#D9B8C9" # Grey-Pink GENDER_FEMALE_NEUTRAL = "#D9B8C9" # Grey-Pink
# Gender colors for correlation plots (green/red indicate +/- correlation)
# Male = darker shade, Female = lighter shade
CORR_MALE_POSITIVE = "#1B5E20" # Dark Green
CORR_FEMALE_POSITIVE = "#81C784" # Light Green
CORR_MALE_NEGATIVE = "#B71C1C" # Dark Red
CORR_FEMALE_NEGATIVE = "#E57373" # Light Red
# Speaking Style Colors (named after the style quadrant colors) # Speaking Style Colors (named after the style quadrant colors)
STYLE_GREEN = "#2E7D32" # Forest Green STYLE_GREEN = "#2E7D32" # Forest Green
STYLE_BLUE = "#1565C0" # Strong Blue STYLE_BLUE = "#1565C0" # Strong Blue

448
utils.py
View File

@@ -413,10 +413,30 @@ def _iter_picture_shapes(shapes):
yield shape yield shape
def _set_shape_alt_text(shape, alt_text: str):
"""
Set alt text (descr attribute) for a PowerPoint shape.
"""
nvPr = None
# Check for common property names used by python-pptx elements
for attr in ['nvPicPr', 'nvSpPr', 'nvGrpSpPr', 'nvGraphicFramePr', 'nvCxnSpPr']:
if hasattr(shape._element, attr):
nvPr = getattr(shape._element, attr)
break
if nvPr and hasattr(nvPr, 'cNvPr'):
nvPr.cNvPr.set("descr", alt_text)
def update_ppt_alt_text(ppt_path: Union[str, Path], image_source_dir: Union[str, Path], output_path: Union[str, Path] = None, use_perceptual_hash: bool = True): def update_ppt_alt_text(ppt_path: Union[str, Path], image_source_dir: Union[str, Path], output_path: Union[str, Path] = None, use_perceptual_hash: bool = True):
""" """
Updates the alt text of images in a PowerPoint presentation by matching Updates the alt text of images in a PowerPoint presentation.
their content with images in a source directory.
1. First pass: Validates existing alt-text format (<filter>/<filename>).
- Fixes full paths by keeping only the last two parts.
- Clears invalid alt-text.
2. Second pass: If images are missing alt-text, matches them against source directory
using perceptual hash or SHA1.
Args: Args:
ppt_path (str/Path): Path to the PowerPoint file. ppt_path (str/Path): Path to the PowerPoint file.
@@ -430,10 +450,7 @@ def update_ppt_alt_text(ppt_path: Union[str, Path], image_source_dir: Union[str,
if output_path is None: if output_path is None:
output_path = ppt_path output_path = ppt_path
# 1. Build lookup map of {hash: file_path} from the source directory # Open Presentation
image_hash_map = _build_image_hash_map(image_source_dir, use_perceptual_hash=use_perceptual_hash)
# 2. Open Presentation
try: try:
prs = Presentation(ppt_path) prs = Presentation(ppt_path)
except Exception as e: except Exception as e:
@@ -441,24 +458,84 @@ def update_ppt_alt_text(ppt_path: Union[str, Path], image_source_dir: Union[str,
return return
updates_count = 0 updates_count = 0
unmatched_images = [] # Collect unmatched images to report at the end images_needing_match = []
slides = list(prs.slides) slides = list(prs.slides)
total_slides = len(slides) total_slides = len(slides)
print(f"Processing {total_slides} slides...") print(f"Scanning {total_slides} slides for existing alt-text...")
# Pass 1: Scan and clean existing alt-text
for i, slide in enumerate(slides): for i, slide in enumerate(slides):
# Use recursive iterator to find all pictures including those in groups/placeholders
picture_shapes = list(_iter_picture_shapes(slide.shapes)) picture_shapes = list(_iter_picture_shapes(slide.shapes))
for shape in picture_shapes: for shape in picture_shapes:
alt_text = _get_shape_alt_text(shape)
has_valid_alt = False
if alt_text:
# Handle potential path separators and whitespace
clean_alt = alt_text.strip().replace('\\', '/')
parts = clean_alt.split('/')
# Check if it looks like a path/file reference (at least 2 parts like dir/file)
if len(parts) >= 2:
# Enforce format: keep last 2 parts (e.g. filter/image.png)
new_alt = '/'.join(parts[-2:])
if new_alt != alt_text:
print(f"Slide {i+1}: Fixing alt-text format: '{alt_text}' -> '{new_alt}'")
_set_shape_alt_text(shape, new_alt)
updates_count += 1
has_valid_alt = True
else:
# User requested deleting other cases that do not meet format
# If it's single word or doesn't look like our path format
pass # logic below handles this
if not has_valid_alt:
if alt_text:
print(f"Slide {i+1}: Invalid/Legacy alt-text '{alt_text}'. Clearing for re-matching.")
_set_shape_alt_text(shape, "")
updates_count += 1
# Queue for hash matching
shape_id = getattr(shape, 'shape_id', getattr(shape, 'id', 'Unknown ID'))
shape_name = shape.name if shape.name else f"Unnamed Shape (ID: {shape_id})"
images_needing_match.append({
'slide_idx': i, # 0-based
'slide_num': i+1,
'shape': shape,
'shape_name': shape_name
})
if not images_needing_match:
print("\nAll images have valid alt-text format. No hash matching needed.")
if updates_count > 0:
prs.save(output_path)
print(f"✓ Saved updated presentation to {output_path} with {updates_count} updates.")
else:
print("Presentation is up to date.")
return
# Pass 2: Hash Matching
print(f"\n{len(images_needing_match)} images missing proper alt-text. Proceeding with hash matching...")
# Build lookup map of {hash: file_path} only if needed
image_hash_map = _build_image_hash_map(image_source_dir, use_perceptual_hash=use_perceptual_hash)
unmatched_images = []
for item in images_needing_match:
shape = item['shape']
slide_num = item['slide_num']
try: try:
# Get image hash based on selected method # Get image hash
if use_perceptual_hash: if use_perceptual_hash:
# Use perceptual hash of the image blob for visual content matching
current_hash = _calculate_perceptual_hash(shape.image.blob) current_hash = _calculate_perceptual_hash(shape.image.blob)
else: else:
# Use SHA1 hash from python-pptx (exact byte match)
current_hash = shape.image.sha1 current_hash = shape.image.sha1
if current_hash in image_hash_map: if current_hash in image_hash_map:
@@ -466,7 +543,6 @@ def update_ppt_alt_text(ppt_path: Union[str, Path], image_source_dir: Union[str,
# Generate Alt Text # Generate Alt Text
try: try:
# Prepare path for generator.
# Try to relativize to CWD if capable # Try to relativize to CWD if capable
pass_path = original_path pass_path = original_path
try: try:
@@ -476,75 +552,38 @@ def update_ppt_alt_text(ppt_path: Union[str, Path], image_source_dir: Union[str,
new_alt_text = image_alt_text_generator(pass_path) new_alt_text = image_alt_text_generator(pass_path)
# Check existing alt text to avoid redundant updates/log them print(f"Slide {slide_num}: Match found! Assigning alt-text '{new_alt_text}'")
# Accessing alt text via cNvPr _set_shape_alt_text(shape, new_alt_text)
# Note: Different shape types might store non-visual props differently
# Picture: nvPicPr.cNvPr
# GraphicFrame: nvGraphicFramePr.cNvPr
# Group: nvGrpSpPr.cNvPr
# Shape/Placeholder: nvSpPr.cNvPr
nvPr = None
for attr in ['nvPicPr', 'nvSpPr', 'nvGrpSpPr', 'nvGraphicFramePr', 'nvCxnSpPr']:
if hasattr(shape._element, attr):
nvPr = getattr(shape._element, attr)
break
if nvPr and hasattr(nvPr, 'cNvPr'):
cNvPr = nvPr.cNvPr
existing_alt_text = cNvPr.get("descr", "")
if existing_alt_text != new_alt_text:
print(f"Slide {i+1}: Updating alt text for image matches '{pass_path}'")
print(f" Old: '{existing_alt_text}' -> New: '{new_alt_text}'")
cNvPr.set("descr", new_alt_text)
updates_count += 1 updates_count += 1
else:
print(f"Could not find cNvPr for shape on slide {i+1}")
except AssertionError as e:
print(f"Skipping match for {original_path} due to generator error: {e}")
except Exception as e: except Exception as e:
print(f"Error updating alt text for {original_path}: {e}") print(f"Error generating alt text for {original_path}: {e}")
else: else:
# Check if image already has alt text set - if so, skip reporting as unmatched
existing_alt = _get_shape_alt_text(shape)
if existing_alt:
# Image already has alt text, no need to report as unmatched
continue
shape_id = getattr(shape, 'shape_id', getattr(shape, 'id', 'Unknown ID'))
shape_name = shape.name if shape.name else f"Unnamed Shape (ID: {shape_id})"
hash_type = "pHash" if use_perceptual_hash else "SHA1" hash_type = "pHash" if use_perceptual_hash else "SHA1"
unmatched_images.append({ unmatched_images.append({
'slide': i+1, 'slide': slide_num,
'shape_name': shape_name, 'shape_name': item['shape_name'],
'hash_type': hash_type, 'hash_type': hash_type,
'hash': current_hash 'hash': current_hash
}) })
except AttributeError:
continue
except Exception as e: except Exception as e:
print(f"Error processing shape on slide {i+1}: {e}") print(f"Error processing shape on slide {slide_num}: {e}")
# Print summary # Save and Print Summary
print("\n" + "="*80) print("\n" + "="*80)
if updates_count > 0: if updates_count > 0:
prs.save(output_path) prs.save(output_path)
print(f"✓ Saved updated presentation to {output_path} with {updates_count} updates.") print(f"✓ Saved updated presentation to {output_path} with {updates_count} updates.")
else: else:
print("No images matched or required updates.") print("No matches found for missing images.")
# List unmatched images at the end
if unmatched_images: if unmatched_images:
print(f"\n{len(unmatched_images)} image(s) not found in source directory:") print(f"\n{len(unmatched_images)} image(s) could not be matched:")
for img in unmatched_images: for img in unmatched_images:
print(f" • Slide {img['slide']}: '{img['shape_name']}' ({img['hash_type']}: {img['hash']})") print(f" • Slide {img['slide']}: '{img['shape_name']}' ({img['hash_type']}: {img['hash']})")
else: else:
print("\n✓ All images matched successfully!") print("\n✓ All images processed successfully!")
print("="*80) print("="*80)
@@ -723,7 +762,7 @@ def normalize_global_values(df: pl.DataFrame, target_cols: list[str]) -> pl.Data
class QualtricsSurvey(QualtricsPlotsMixin): class QualtricsSurvey(QualtricsPlotsMixin):
"""Class to handle Qualtrics survey data.""" """Class to handle Qualtrics survey data."""
def __init__(self, data_path: Union[str, Path], qsf_path: Union[str, Path]): def __init__(self, data_path: Union[str, Path], qsf_path: Union[str, Path], figures_dir: Union[str, Path] = None):
if isinstance(data_path, str): if isinstance(data_path, str):
data_path = Path(data_path) data_path = Path(data_path)
@@ -735,8 +774,12 @@ class QualtricsSurvey(QualtricsPlotsMixin):
self.qid_descr_map = self._extract_qid_descr_map() self.qid_descr_map = self._extract_qid_descr_map()
self.qsf:dict = self._load_qsf() self.qsf:dict = self._load_qsf()
if figures_dir:
self.fig_save_dir = Path(figures_dir)
else:
# get export directory name for saving figures ie if data_path='data/exports/OneDrive_2026-01-21/...' should be 'figures/OneDrive_2026-01-21' # get export directory name for saving figures ie if data_path='data/exports/OneDrive_2026-01-21/...' should be 'figures/OneDrive_2026-01-21'
self.fig_save_dir = Path('figures') / self.data_filepath.parts[2] self.fig_save_dir = Path('figures') / self.data_filepath.parts[2]
if not self.fig_save_dir.exists(): if not self.fig_save_dir.exists():
self.fig_save_dir.mkdir(parents=True, exist_ok=True) self.fig_save_dir.mkdir(parents=True, exist_ok=True)
@@ -1072,6 +1115,60 @@ class QualtricsSurvey(QualtricsPlotsMixin):
return self._get_subset(q, list(QIDs_map.keys()), rename_cols=False).rename(QIDs_map), None return self._get_subset(q, list(QIDs_map.keys()), rename_cols=False).rename(QIDs_map), None
def get_top_3_voices_missing_ranking(
self, q: pl.LazyFrame
) -> pl.DataFrame:
"""Identify respondents who completed the top-3 voice selection (QID36)
but are missing the explicit ranking question (QID98).
These respondents picked 3 voices in the selection step and have
selection-order data in ``QID36_G0_*_RANK``, but all 18 ``QID98_*``
ranking columns are null. This means ``get_top_3_voices()`` will
return all-null rows for them, causing plots like
``plot_most_ranked_1`` to undercount.
Parameters:
q: The (optionally filtered) LazyFrame from ``load_data()``.
Returns:
A collected ``pl.DataFrame`` with columns:
- ``_recordId`` the respondent identifier
- ``3_Ranked`` comma-separated text of the 3 voices they selected
- ``qid36_rank_cols`` dict-like column with their QID36 selection-
order values (for reference; these are *not* preference ranks)
"""
# Get the top-3 ranking data (QID98-based)
top3, _ = self.get_top_3_voices(q)
top3_df = top3.collect()
ranking_cols = [c for c in top3_df.columns if c != '_recordId']
# Respondents where every QID98 ranking column is null
all_null_expr = pl.lit(True)
for col in ranking_cols:
all_null_expr = all_null_expr & pl.col(col).is_null()
missing_ids = top3_df.filter(all_null_expr).select('_recordId')
if missing_ids.height == 0:
return pl.DataFrame(schema={
'_recordId': pl.Utf8,
'3_Ranked': pl.Utf8,
})
# Enrich with the 3_Ranked text from the 18→8→3 question
v_18_8_3, _ = self.get_18_8_3(q)
v_df = v_18_8_3.collect()
result = missing_ids.join(
v_df.select(['_recordId', '3_Ranked']),
on='_recordId',
how='left',
)
return result
def get_ss_orange_red(self, q: pl.LazyFrame) -> Union[pl.LazyFrame, dict]: def get_ss_orange_red(self, q: pl.LazyFrame) -> Union[pl.LazyFrame, dict]:
"""Extract columns containing the SS Orange/Red ratings for the Chase virtual assistant. """Extract columns containing the SS Orange/Red ratings for the Chase virtual assistant.
@@ -1545,6 +1642,235 @@ class QualtricsSurvey(QualtricsPlotsMixin):
return results_df, metadata return results_df, metadata
def compute_mentions_significance(
self,
data: pl.LazyFrame | pl.DataFrame,
alpha: float = 0.05,
correction: str = "bonferroni",
) -> tuple[pl.DataFrame, dict]:
"""Compute statistical significance for Total Mentions (Rank 1+2+3).
Tests whether the proportion of respondents who included a voice in their Top 3
is significantly different between voices.
Args:
data: Ranking data (rows=respondents, cols=voices, values=rank).
alpha: Significance level.
correction: Multiple comparison correction method.
Returns:
tuple: (pairwise_df, metadata)
"""
from scipy import stats as scipy_stats
import numpy as np
if isinstance(data, pl.LazyFrame):
df = data.collect()
else:
df = data
ranking_cols = [c for c in df.columns if c != '_recordId']
if len(ranking_cols) < 2:
raise ValueError("Need at least 2 ranking columns")
total_respondents = df.height
mentions_data = {}
# Count mentions (any rank) for each voice
for col in ranking_cols:
label = self._clean_voice_label(col)
count = df.filter(pl.col(col).is_not_null()).height
mentions_data[label] = count
labels = sorted(list(mentions_data.keys()))
results = []
n_comparisons = len(labels) * (len(labels) - 1) // 2
for i, label1 in enumerate(labels):
for label2 in labels[i+1:]:
count1 = mentions_data[label1]
count2 = mentions_data[label2]
pct1 = count1 / total_respondents
pct2 = count2 / total_respondents
# Z-test for two proportions
n1 = total_respondents
n2 = total_respondents
p_pooled = (count1 + count2) / (n1 + n2)
se = np.sqrt(p_pooled * (1 - p_pooled) * (1/n1 + 1/n2))
if se > 0:
z_stat = (pct1 - pct2) / se
p_value = 2 * (1 - scipy_stats.norm.cdf(abs(z_stat)))
else:
p_value = 1.0
results.append({
'group1': label1,
'group2': label2,
'p_value': float(p_value),
'rank1_count1': count1, # Reusing column names for compatibility with heatmap plotting
'rank1_count2': count2,
'rank1_pct1': round(pct1 * 100, 1),
'rank1_pct2': round(pct2 * 100, 1),
'total1': n1,
'total2': n2,
'effect_size': pct1 - pct2 # Difference in proportions
})
results_df = pl.DataFrame(results)
p_values = results_df['p_value'].to_numpy()
p_adjusted = np.full_like(p_values, np.nan, dtype=float)
if correction == "bonferroni":
p_adjusted = np.minimum(p_values * n_comparisons, 1.0)
elif correction == "holm":
sorted_idx = np.argsort(p_values)
sorted_p = p_values[sorted_idx]
m = len(sorted_p)
adjusted = np.zeros(m)
for j in range(m):
adjusted[j] = sorted_p[j] * (m - j)
for j in range(1, m):
adjusted[j] = max(adjusted[j], adjusted[j-1])
adjusted = np.minimum(adjusted, 1.0)
p_adjusted = adjusted[np.argsort(sorted_idx)]
elif correction == "none":
p_adjusted = p_values.astype(float) # pyright: ignore
results_df = results_df.with_columns([
pl.Series('p_adjusted', p_adjusted),
pl.Series('significant', p_adjusted < alpha),
]).sort('p_value')
metadata = {
'test_type': 'proportion_z_test_mentions',
'alpha': alpha,
'correction': correction,
'n_comparisons': n_comparisons,
}
return results_df, metadata
def compute_rank1_significance(
self,
data: pl.LazyFrame | pl.DataFrame,
alpha: float = 0.05,
correction: str = "bonferroni",
) -> tuple[pl.DataFrame, dict]:
"""Compute statistical significance for Rank 1 selections only.
Like compute_mentions_significance but counts only how many times each
voice/character was ranked **1st**, using total respondents as the
denominator. This tests whether first-choice preference differs
significantly between voices.
Args:
data: Ranking data (rows=respondents, cols=voices, values=rank).
alpha: Significance level.
correction: Multiple comparison correction method.
Returns:
tuple: (pairwise_df, metadata)
"""
from scipy import stats as scipy_stats
import numpy as np
if isinstance(data, pl.LazyFrame):
df = data.collect()
else:
df = data
ranking_cols = [c for c in df.columns if c != '_recordId']
if len(ranking_cols) < 2:
raise ValueError("Need at least 2 ranking columns")
total_respondents = df.height
rank1_data: dict[str, int] = {}
# Count rank-1 selections for each voice
for col in ranking_cols:
label = self._clean_voice_label(col)
count = df.filter(pl.col(col) == 1).height
rank1_data[label] = count
labels = sorted(list(rank1_data.keys()))
results = []
n_comparisons = len(labels) * (len(labels) - 1) // 2
for i, label1 in enumerate(labels):
for label2 in labels[i+1:]:
count1 = rank1_data[label1]
count2 = rank1_data[label2]
pct1 = count1 / total_respondents
pct2 = count2 / total_respondents
# Z-test for two proportions (same denominator for both)
n1 = total_respondents
n2 = total_respondents
p_pooled = (count1 + count2) / (n1 + n2)
se = np.sqrt(p_pooled * (1 - p_pooled) * (1/n1 + 1/n2))
if se > 0:
z_stat = (pct1 - pct2) / se
p_value = 2 * (1 - scipy_stats.norm.cdf(abs(z_stat)))
else:
p_value = 1.0
results.append({
'group1': label1,
'group2': label2,
'p_value': float(p_value),
'rank1_count1': count1,
'rank1_count2': count2,
'rank1_pct1': round(pct1 * 100, 1),
'rank1_pct2': round(pct2 * 100, 1),
'total1': n1,
'total2': n2,
'effect_size': pct1 - pct2,
})
results_df = pl.DataFrame(results)
p_values = results_df['p_value'].to_numpy()
p_adjusted = np.full_like(p_values, np.nan, dtype=float)
if correction == "bonferroni":
p_adjusted = np.minimum(p_values * n_comparisons, 1.0)
elif correction == "holm":
sorted_idx = np.argsort(p_values)
sorted_p = p_values[sorted_idx]
m = len(sorted_p)
adjusted = np.zeros(m)
for j in range(m):
adjusted[j] = sorted_p[j] * (m - j)
for j in range(1, m):
adjusted[j] = max(adjusted[j], adjusted[j-1])
adjusted = np.minimum(adjusted, 1.0)
p_adjusted = adjusted[np.argsort(sorted_idx)]
elif correction == "none":
p_adjusted = p_values.astype(float) # pyright: ignore
results_df = results_df.with_columns([
pl.Series('p_adjusted', p_adjusted),
pl.Series('significant', p_adjusted < alpha),
]).sort('p_value')
metadata = {
'test_type': 'proportion_z_test_rank1',
'alpha': alpha,
'correction': correction,
'n_comparisons': n_comparisons,
}
return results_df, metadata
def process_speaking_style_data( def process_speaking_style_data(
df: Union[pl.LazyFrame, pl.DataFrame], df: Union[pl.LazyFrame, pl.DataFrame],