"""Extra analyses of the straight-liners""" # %% Imports import utils import polars as pl import argparse import json import re from pathlib import Path from validation import check_straight_liners # %% Fixed Variables RESULTS_FILE = 'data/exports/2-4-26/JPMC_Chase Brand Personality_Quant Round 1_February 4, 2026_Labels.csv' QSF_FILE = 'data/exports/OneDrive_2026-01-21/Soft Launch Data/JPMC_Chase_Brand_Personality_Quant_Round_1.qsf' # %% CLI argument parsing for batch automation # When run as script: uv run XX_statistical_significance.script.py --age '["18 # Central filter configuration - add new filters here only # Format: 'cli_arg_name': 'QualtricsSurvey.options_* attribute name' FILTER_CONFIG = { 'age': 'options_age', 'gender': 'options_gender', 'ethnicity': 'options_ethnicity', 'income': 'options_income', 'consumer': 'options_consumer', 'business_owner': 'options_business_owner', 'ai_user': 'options_ai_user', 'investable_assets': 'options_investable_assets', 'industry': 'options_industry', } def parse_cli_args(): parser = argparse.ArgumentParser(description='Generate quant report with optional filters') # Dynamically add filter arguments from config for filter_name in FILTER_CONFIG: parser.add_argument(f'--{filter_name}', type=str, default=None, help=f'JSON list of {filter_name} values') parser.add_argument('--filter-name', type=str, default=None, help='Name for this filter combination (used for .txt description file)') parser.add_argument('--figures-dir', type=str, default=f'figures/straight-liner-analysis/{Path(RESULTS_FILE).parts[2]}', help='Override the default figures directory') # Only parse if running as script (not in Jupyter/interactive) try: # Check if running in Jupyter by looking for ipykernel get_ipython() # noqa: F821 # type: ignore # Return namespace with all filters set to None no_filters = {f: None for f in FILTER_CONFIG} # Use the same default as argparse default_fig_dir = f'figures/straight-liner-analysis/{Path(RESULTS_FILE).parts[2]}' return argparse.Namespace(**no_filters, filter_name=None, figures_dir=default_fig_dir) except NameError: args = parser.parse_args() # Parse JSON strings to lists for filter_name in FILTER_CONFIG: val = getattr(args, filter_name) setattr(args, filter_name, json.loads(val) if val else None) return args cli_args = parse_cli_args() # %% S = utils.QualtricsSurvey(RESULTS_FILE, QSF_FILE, figures_dir=cli_args.figures_dir) data_all = S.load_data() # %% Build filtered dataset based on CLI args # CLI args: None means "no filter applied" - filter_data() will skip None filters # Build filter values dict dynamically from FILTER_CONFIG _active_filters = {filter_name: getattr(cli_args, filter_name) for filter_name in FILTER_CONFIG} _d = S.filter_data(data_all, **_active_filters) # Write filter description file if filter-name is provided if cli_args.filter_name and S.fig_save_dir: # Get the filter slug (e.g., "All_Respondents", "Cons-Starter", etc.) _filter_slug = S._get_filter_slug() _filter_slug_dir = S.fig_save_dir / _filter_slug _filter_slug_dir.mkdir(parents=True, exist_ok=True) # Build filter description _filter_desc_lines = [ f"Filter: {cli_args.filter_name}", "", "Applied Filters:", ] _short_desc_parts = [] for filter_name, options_attr in FILTER_CONFIG.items(): all_options = getattr(S, options_attr) values = _active_filters[filter_name] display_name = filter_name.replace('_', ' ').title() # None means no filter applied (same as "All") if values is not None and values != all_options: _short_desc_parts.append(f"{display_name}: {', '.join(values)}") _filter_desc_lines.append(f" {display_name}: {', '.join(values)}") else: _filter_desc_lines.append(f" {display_name}: All") # Write detailed description INSIDE the filter-slug directory # Sanitize filter name for filename usage (replace / and other chars) _safe_filter_name = re.sub(r'[^\w\s-]', '_', cli_args.filter_name) _filter_file = _filter_slug_dir / f"{_safe_filter_name}.txt" _filter_file.write_text('\n'.join(_filter_desc_lines)) # Append to summary index file at figures//filter_index.txt _summary_file = S.fig_save_dir / "filter_index.txt" _short_desc = "; ".join(_short_desc_parts) if _short_desc_parts else "All Respondents" _summary_line = f"{_filter_slug} | {cli_args.filter_name} | {_short_desc}\n" # Append or create the summary file if _summary_file.exists(): _existing = _summary_file.read_text() # Avoid duplicate entries for same slug if _filter_slug not in _existing: with _summary_file.open('a') as f: f.write(_summary_line) else: _header = "Filter Index\n" + "=" * 80 + "\n\n" _header += "Directory | Filter Name | Description\n" _header += "-" * 80 + "\n" _summary_file.write_text(_header + _summary_line) # Save to logical variable name for further analysis data = _d data.collect() # %% Determine straight-liner repeat offenders # Extract question groups with renamed columns that check_straight_liners expects. # The raw `data` has QID-based column names; the getter methods rename them to # patterns like SS_Green_Blue__V14__Choice_1, Voice_Scale_1_10__V48, etc. ss_or, _ = S.get_ss_orange_red(data) ss_gb, _ = S.get_ss_green_blue(data) vs, _ = S.get_voice_scale_1_10(data) # Combine all question groups into one wide LazyFrame (joined on _recordId) all_questions = ss_or.join(ss_gb, on='_recordId').join(vs, on='_recordId') # Run straight-liner detection across all question groups # max_score=5 catches all speaking-style straight-lining (1-5 scale) # and voice-scale values ≤5 on the 1-10 scale # Note: sl_threshold is NOT set on S here — this script analyses straight-liners, # it doesn't filter them out of the dataset. print("Running straight-liner detection across all question groups...") sl_report, sl_df = check_straight_liners(all_questions, max_score=5) # %% Quantify repeat offenders # sl_df has one row per (Record ID, Question Group) that was straight-lined. # Group by Record ID to count how many question groups each person SL'd. if sl_df is not None and not sl_df.is_empty(): total_respondents = data.select(pl.len()).collect().item() # Per-respondent count of straight-lined question groups respondent_sl_counts = ( sl_df .group_by("Record ID") .agg(pl.len().alias("sl_count")) .sort("sl_count", descending=True) ) max_sl = respondent_sl_counts["sl_count"].max() print(f"\nTotal respondents: {total_respondents}") print(f"Respondents who straight-lined at least 1 question group: " f"{respondent_sl_counts.height}") print(f"Maximum question groups straight-lined by one person: {max_sl}") print() # Build cumulative distribution: for each threshold N, count respondents # who straight-lined >= N question groups cumulative_rows = [] for threshold in range(1, max_sl + 1): count = respondent_sl_counts.filter( pl.col("sl_count") >= threshold ).height pct = (count / total_respondents) * 100 cumulative_rows.append({ "threshold": threshold, "count": count, "pct": pct, }) print( f" ≥{threshold} question groups straight-lined: " f"{count} respondents ({pct:.1f}%)" ) cumulative_df = pl.DataFrame(cumulative_rows) print(f"\n{cumulative_df}") # %% Save cumulative data to CSV _filter_slug = S._get_filter_slug() _csv_dir = Path(S.fig_save_dir) / _filter_slug _csv_dir.mkdir(parents=True, exist_ok=True) _csv_path = _csv_dir / "straight_liner_repeat_offenders.csv" cumulative_df.write_csv(_csv_path) print(f"Saved cumulative data to {_csv_path}") # %% Plot the cumulative distribution S.plot_straight_liner_repeat_offenders( cumulative_df, total_respondents=total_respondents, ) # %% Per-question straight-lining frequency # Build human-readable question group names from the raw keys def _humanise_question_group(key: str) -> str: """Convert internal question group key to a readable label. Examples: SS_Green_Blue__V14 → Green/Blue – V14 SS_Orange_Red__V48 → Orange/Red – V48 Voice_Scale_1_10 → Voice Scale (1-10) """ if key.startswith("SS_Green_Blue__"): voice = key.split("__")[1] return f"Green/Blue – {voice}" if key.startswith("SS_Orange_Red__"): voice = key.split("__")[1] return f"Orange/Red – {voice}" if key == "Voice_Scale_1_10": return "Voice Scale (1-10)" # Fallback: replace underscores return key.replace("_", " ") per_question_counts = ( sl_df .group_by("Question Group") .agg(pl.col("Record ID").n_unique().alias("count")) .sort("count", descending=True) .with_columns( (pl.col("count") / total_respondents * 100).alias("pct") ) ) # Add human-readable names per_question_counts = per_question_counts.with_columns( pl.col("Question Group").map_elements( _humanise_question_group, return_dtype=pl.Utf8 ).alias("question") ) print("\n--- Per-Question Straight-Lining Frequency ---") print(per_question_counts) # Save per-question data to CSV _csv_path_pq = _csv_dir / "straight_liner_per_question.csv" per_question_counts.write_csv(_csv_path_pq) print(f"Saved per-question data to {_csv_path_pq}") # Plot S.plot_straight_liner_per_question( per_question_counts, total_respondents=total_respondents, ) # %% Show the top repeat offenders (respondents with most SL'd groups) print("\n--- Top Repeat Offenders ---") print(respondent_sl_counts.head(20)) else: print("No straight-liners detected in the dataset.")