Files
JPMC-quant/XX_straight_liners.py

265 lines
10 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""Extra analyses of the straight-liners"""
# %% Imports
import utils
import polars as pl
import argparse
import json
import re
from pathlib import Path
from validation import check_straight_liners
# %% Fixed Variables
RESULTS_FILE = 'data/exports/2-4-26/JPMC_Chase Brand Personality_Quant Round 1_February 4, 2026_Labels.csv'
QSF_FILE = 'data/exports/OneDrive_2026-01-21/Soft Launch Data/JPMC_Chase_Brand_Personality_Quant_Round_1.qsf'
# %% CLI argument parsing for batch automation
# When run as script: uv run XX_statistical_significance.script.py --age '["18
# Central filter configuration - add new filters here only
# Format: 'cli_arg_name': 'QualtricsSurvey.options_* attribute name'
FILTER_CONFIG = {
'age': 'options_age',
'gender': 'options_gender',
'ethnicity': 'options_ethnicity',
'income': 'options_income',
'consumer': 'options_consumer',
'business_owner': 'options_business_owner',
'ai_user': 'options_ai_user',
'investable_assets': 'options_investable_assets',
'industry': 'options_industry',
}
def parse_cli_args():
parser = argparse.ArgumentParser(description='Generate quant report with optional filters')
# Dynamically add filter arguments from config
for filter_name in FILTER_CONFIG:
parser.add_argument(f'--{filter_name}', type=str, default=None, help=f'JSON list of {filter_name} values')
parser.add_argument('--filter-name', type=str, default=None, help='Name for this filter combination (used for .txt description file)')
parser.add_argument('--figures-dir', type=str, default=f'figures/straight-liner-analysis/{Path(RESULTS_FILE).parts[2]}', help='Override the default figures directory')
# Only parse if running as script (not in Jupyter/interactive)
try:
# Check if running in Jupyter by looking for ipykernel
get_ipython() # noqa: F821 # type: ignore
# Return namespace with all filters set to None
no_filters = {f: None for f in FILTER_CONFIG}
# Use the same default as argparse
default_fig_dir = f'figures/straight-liner-analysis/{Path(RESULTS_FILE).parts[2]}'
return argparse.Namespace(**no_filters, filter_name=None, figures_dir=default_fig_dir)
except NameError:
args = parser.parse_args()
# Parse JSON strings to lists
for filter_name in FILTER_CONFIG:
val = getattr(args, filter_name)
setattr(args, filter_name, json.loads(val) if val else None)
return args
cli_args = parse_cli_args()
# %%
S = utils.QualtricsSurvey(RESULTS_FILE, QSF_FILE, figures_dir=cli_args.figures_dir)
data_all = S.load_data()
# %% Build filtered dataset based on CLI args
# CLI args: None means "no filter applied" - filter_data() will skip None filters
# Build filter values dict dynamically from FILTER_CONFIG
_active_filters = {filter_name: getattr(cli_args, filter_name) for filter_name in FILTER_CONFIG}
_d = S.filter_data(data_all, **_active_filters)
# Write filter description file if filter-name is provided
if cli_args.filter_name and S.fig_save_dir:
# Get the filter slug (e.g., "All_Respondents", "Cons-Starter", etc.)
_filter_slug = S._get_filter_slug()
_filter_slug_dir = S.fig_save_dir / _filter_slug
_filter_slug_dir.mkdir(parents=True, exist_ok=True)
# Build filter description
_filter_desc_lines = [
f"Filter: {cli_args.filter_name}",
"",
"Applied Filters:",
]
_short_desc_parts = []
for filter_name, options_attr in FILTER_CONFIG.items():
all_options = getattr(S, options_attr)
values = _active_filters[filter_name]
display_name = filter_name.replace('_', ' ').title()
# None means no filter applied (same as "All")
if values is not None and values != all_options:
_short_desc_parts.append(f"{display_name}: {', '.join(values)}")
_filter_desc_lines.append(f" {display_name}: {', '.join(values)}")
else:
_filter_desc_lines.append(f" {display_name}: All")
# Write detailed description INSIDE the filter-slug directory
# Sanitize filter name for filename usage (replace / and other chars)
_safe_filter_name = re.sub(r'[^\w\s-]', '_', cli_args.filter_name)
_filter_file = _filter_slug_dir / f"{_safe_filter_name}.txt"
_filter_file.write_text('\n'.join(_filter_desc_lines))
# Append to summary index file at figures/<export_date>/filter_index.txt
_summary_file = S.fig_save_dir / "filter_index.txt"
_short_desc = "; ".join(_short_desc_parts) if _short_desc_parts else "All Respondents"
_summary_line = f"{_filter_slug} | {cli_args.filter_name} | {_short_desc}\n"
# Append or create the summary file
if _summary_file.exists():
_existing = _summary_file.read_text()
# Avoid duplicate entries for same slug
if _filter_slug not in _existing:
with _summary_file.open('a') as f:
f.write(_summary_line)
else:
_header = "Filter Index\n" + "=" * 80 + "\n\n"
_header += "Directory | Filter Name | Description\n"
_header += "-" * 80 + "\n"
_summary_file.write_text(_header + _summary_line)
# Save to logical variable name for further analysis
data = _d
data.collect()
# %% Determine straight-liner repeat offenders
# Extract question groups with renamed columns that check_straight_liners expects.
# The raw `data` has QID-based column names; the getter methods rename them to
# patterns like SS_Green_Blue__V14__Choice_1, Voice_Scale_1_10__V48, etc.
ss_or, _ = S.get_ss_orange_red(data)
ss_gb, _ = S.get_ss_green_blue(data)
vs, _ = S.get_voice_scale_1_10(data)
# Combine all question groups into one wide LazyFrame (joined on _recordId)
all_questions = ss_or.join(ss_gb, on='_recordId').join(vs, on='_recordId')
# Run straight-liner detection across all question groups
# max_score=5 catches all speaking-style straight-lining (1-5 scale)
# and voice-scale values ≤5 on the 1-10 scale
print("Running straight-liner detection across all question groups...")
sl_report, sl_df = check_straight_liners(all_questions, max_score=5)
# %% Quantify repeat offenders
# sl_df has one row per (Record ID, Question Group) that was straight-lined.
# Group by Record ID to count how many question groups each person SL'd.
if sl_df is not None and not sl_df.is_empty():
total_respondents = data.select(pl.len()).collect().item()
# Per-respondent count of straight-lined question groups
respondent_sl_counts = (
sl_df
.group_by("Record ID")
.agg(pl.len().alias("sl_count"))
.sort("sl_count", descending=True)
)
max_sl = respondent_sl_counts["sl_count"].max()
print(f"\nTotal respondents: {total_respondents}")
print(f"Respondents who straight-lined at least 1 question group: "
f"{respondent_sl_counts.height}")
print(f"Maximum question groups straight-lined by one person: {max_sl}")
print()
# Build cumulative distribution: for each threshold N, count respondents
# who straight-lined >= N question groups
cumulative_rows = []
for threshold in range(1, max_sl + 1):
count = respondent_sl_counts.filter(
pl.col("sl_count") >= threshold
).height
pct = (count / total_respondents) * 100
cumulative_rows.append({
"threshold": threshold,
"count": count,
"pct": pct,
})
print(
f"{threshold} question groups straight-lined: "
f"{count} respondents ({pct:.1f}%)"
)
cumulative_df = pl.DataFrame(cumulative_rows)
print(f"\n{cumulative_df}")
# %% Save cumulative data to CSV
_filter_slug = S._get_filter_slug()
_csv_dir = Path(S.fig_save_dir) / _filter_slug
_csv_dir.mkdir(parents=True, exist_ok=True)
_csv_path = _csv_dir / "straight_liner_repeat_offenders.csv"
cumulative_df.write_csv(_csv_path)
print(f"Saved cumulative data to {_csv_path}")
# %% Plot the cumulative distribution
S.plot_straight_liner_repeat_offenders(
cumulative_df,
total_respondents=total_respondents,
)
# %% Per-question straight-lining frequency
# Build human-readable question group names from the raw keys
def _humanise_question_group(key: str) -> str:
"""Convert internal question group key to a readable label.
Examples:
SS_Green_Blue__V14 → Green/Blue V14
SS_Orange_Red__V48 → Orange/Red V48
Voice_Scale_1_10 → Voice Scale (1-10)
"""
if key.startswith("SS_Green_Blue__"):
voice = key.split("__")[1]
return f"Green/Blue {voice}"
if key.startswith("SS_Orange_Red__"):
voice = key.split("__")[1]
return f"Orange/Red {voice}"
if key == "Voice_Scale_1_10":
return "Voice Scale (1-10)"
# Fallback: replace underscores
return key.replace("_", " ")
per_question_counts = (
sl_df
.group_by("Question Group")
.agg(pl.col("Record ID").n_unique().alias("count"))
.sort("count", descending=True)
.with_columns(
(pl.col("count") / total_respondents * 100).alias("pct")
)
)
# Add human-readable names
per_question_counts = per_question_counts.with_columns(
pl.col("Question Group").map_elements(
_humanise_question_group, return_dtype=pl.Utf8
).alias("question")
)
print("\n--- Per-Question Straight-Lining Frequency ---")
print(per_question_counts)
# Save per-question data to CSV
_csv_path_pq = _csv_dir / "straight_liner_per_question.csv"
per_question_counts.write_csv(_csv_path_pq)
print(f"Saved per-question data to {_csv_path_pq}")
# Plot
S.plot_straight_liner_per_question(
per_question_counts,
total_respondents=total_respondents,
)
# %% Show the top repeat offenders (respondents with most SL'd groups)
print("\n--- Top Repeat Offenders ---")
print(respondent_sl_counts.head(20))
else:
print("No straight-liners detected in the dataset.")