Compare commits

...

6 Commits

6 changed files with 569 additions and 84 deletions

View File

@@ -1,7 +1,7 @@
import marimo
__generated_with = "0.19.2"
app = marimo.App(width="medium")
app = marimo.App(width="full")
@app.cell
@@ -21,6 +21,7 @@ def _():
SPEAKING_STYLES,
calculate_weighted_ranking_scores,
check_progress,
check_straight_liners,
duration_validation,
mo,
pl,
@@ -58,7 +59,7 @@ def _(JPMCSurvey, QSF_FILE, RESULTS_FILE, mo):
@app.cell
def _(Path, RESULTS_FILE, mo):
def _(Path, RESULTS_FILE, data_all, mo):
mo.md(f"""
---
@@ -66,15 +67,42 @@ def _(Path, RESULTS_FILE, mo):
**Dataset:** `{Path(RESULTS_FILE).name}`
**Responses**: `{data_all.collect().shape[0]}`
""")
return
@app.cell
def _(check_progress, data_all, duration_validation, mo):
def _():
sl_ss_max_score = 5
sl_v1_10_max_score = 10
return sl_ss_max_score, sl_v1_10_max_score
@app.cell
def _(
S,
check_progress,
check_straight_liners,
data_all,
duration_validation,
mo,
sl_ss_max_score,
sl_v1_10_max_score,
):
_ss_all = S.get_ss_green_blue(data_all)[0].join(S.get_ss_orange_red(data_all)[0], on='_recordId')
_sl_ss_c, sl_ss_df = check_straight_liners(_ss_all, max_score=sl_ss_max_score)
_sl_v1_10_c, sl_v1_10_df = check_straight_liners(
S.get_voice_scale_1_10(data_all)[0],
max_score=sl_v1_10_max_score
)
mo.md(f"""
## Data Validation
# Data Validation
{check_progress(data_all)}
@@ -83,29 +111,28 @@ def _(check_progress, data_all, duration_validation, mo):
{duration_validation(data_all)}
## Speaking Style - Straight Liners
{_sl_ss_c}
## Voice Score Scale 1-10 - Straight Liners
{_sl_v1_10_c}
""")
return
@app.cell
def _(mo):
mo.md(r"""
### ⚠️ ToDo: "straight-liner" detection and removal
""")
return
def _(data_all):
# # Drop any Voice Scale 1-10 responses with straight-lining, using sl_v1_10_df _responseId values
# records_to_drop = sl_v1_10_df.select('Record ID').to_series().to_list()
# data_validated = data_all.filter(~pl.col('_recordId').is_in(records_to_drop))
@app.cell
def _(mo):
mo.md(r"""
---
# Data Filter
Use to select a subset of the data for the following analysis
""")
return
# mo.md(f"""
# Dropped `{len(records_to_drop)}` responses with straight-lining in Voice Scale 1-10 evaluation.
# """)
data_validated = data_all
return (data_validated,)
@app.cell(hide_code=True)
@@ -140,17 +167,19 @@ def _(S, mo):
''')
return (filter_form,)
return
@app.cell
def _(S, data_all, filter_form, mo):
mo.stop(filter_form.value is None, mo.md("**Please submit filter above to proceed**"))
_d = S.filter_data(data_all, age=filter_form.value['age'], gender=filter_form.value['gender'], income=filter_form.value['income'], ethnicity=filter_form.value['ethnicity'], consumer=filter_form.value['consumer'])
def _(data_validated):
# mo.stop(filter_form.value is None, mo.md("**Please submit filter above to proceed**"))
# _d = S.filter_data(data_validated, age=filter_form.value['age'], gender=filter_form.value['gender'], income=filter_form.value['income'], ethnicity=filter_form.value['ethnicity'], consumer=filter_form.value['consumer'])
# Stop execution and prevent other cells from running if no data is selected
mo.stop(len(_d.collect()) == 0, mo.md("**No Data available for current filter combination**"))
data = _d
# # Stop execution and prevent other cells from running if no data is selected
# mo.stop(len(_d.collect()) == 0, mo.md("**No Data available for current filter combination**"))
# data = _d
data = data_validated
data.collect()
return (data,)
@@ -359,33 +388,48 @@ def _(S, data, mo):
return (vscales,)
@app.cell
def _(pl, vscales):
# Count non-null values per row
nn_vscale = vscales.with_columns(
non_null_count = pl.sum_horizontal(pl.all().exclude("_recordID").is_not_null())
)
nn_vscale.collect()['non_null_count'].describe()
return
@app.cell(hide_code=True)
def _(S, mo, vscales):
mo.md(f"""
### How does each voice score on a scale from 1-10?
{mo.ui.altair_chart(S.plot_average_scores_with_counts(vscales, x_label='Voice', width=1000))}
""")
return
@app.cell(hide_code=True)
def _():
return
@app.cell
def _(mo):
mo.md(r"""
{mo.ui.altair_chart(S.plot_average_scores_with_counts(vscales, x_label='Voice', width=1000, domain=[1,10], title="Voice General Impression (Scale 1-10)"))}
""")
return
@app.cell
def _(mo):
mo.md(r"""
def _(S, mo, utils, vscales):
_target_cols=[c for c in vscales.collect().columns if c not in ['_recordId']]
vscales_row_norm = utils.normalize_row_values(vscales.collect(), target_cols=_target_cols)
mo.md(f"""
### Voice scale 1-10 normalized per respondent?
{mo.ui.altair_chart(S.plot_average_scores_with_counts(vscales_row_norm, x_label='Voice', width=1000, domain=[1,10], title="Voice General Impression (Scale 1-10) - Normalized per Respondent"))}
""")
return
@app.cell
def _(S, mo, utils, vscales):
_target_cols=[c for c in vscales.collect().columns if c not in ['_recordId']]
vscales_global_norm = utils.normalize_global_values(vscales.collect(), target_cols=_target_cols)
mo.md(f"""
### Voice scale 1-10 normalized per respondent?
{mo.ui.altair_chart(S.plot_average_scores_with_counts(vscales_global_norm, x_label='Voice', width=1000, domain=[1,10], title="Voice General Impression (Scale 1-10) - Normalized Across All Respondents"))}
""")
return

154
03_quant_report.py Normal file
View File

@@ -0,0 +1,154 @@
import marimo
__generated_with = "0.19.2"
app = marimo.App(width="medium")
with app.setup:
import marimo as mo
import polars as pl
from pathlib import Path
from validation import check_progress, duration_validation, check_straight_liners
from utils import JPMCSurvey, combine_exclusive_columns, calculate_weighted_ranking_scores
import utils
from speaking_styles import SPEAKING_STYLES
@app.cell
def _():
file_browser = mo.ui.file_browser(
initial_path="./data/exports", multiple=False, restrict_navigation=True, filetypes=[".csv"], label="Select 'Labels' File"
)
file_browser
return (file_browser,)
@app.cell
def _(file_browser):
mo.stop(file_browser.path(index=0) is None, mo.md("**⚠️ Please select a `_Labels.csv` file above to proceed**"))
RESULTS_FILE = Path(file_browser.path(index=0))
QSF_FILE = 'data/exports/OneDrive_2026-01-21/Soft Launch Data/JPMC_Chase_Brand_Personality_Quant_Round_1.qsf'
return QSF_FILE, RESULTS_FILE
@app.cell
def _(QSF_FILE, RESULTS_FILE):
S = JPMCSurvey(RESULTS_FILE, QSF_FILE)
try:
data_all = S.load_data()
except NotImplementedError as e:
mo.stop(True, mo.md(f"**⚠️ {str(e)}**"))
return S, data_all
@app.cell(hide_code=True)
def _():
mo.md(r"""
---
# Load Data
**Dataset:** `{Path(RESULTS_FILE).name}`
**Responses**: `{data_all.collect().shape[0]}`
""")
return
@app.cell
def _(S, data_all):
sl_ss_max_score = 5
sl_v1_10_max_score = 10
_ss_all = S.get_ss_green_blue(data_all)[0].join(S.get_ss_orange_red(data_all)[0], on='_recordId')
_sl_ss_c, sl_ss_df = check_straight_liners(_ss_all, max_score=sl_ss_max_score)
_sl_v1_10_c, sl_v1_10_df = check_straight_liners(
S.get_voice_scale_1_10(data_all)[0],
max_score=sl_v1_10_max_score
)
mo.md(f"""
# Data Validation
{check_progress(data_all)}
{duration_validation(data_all)}
## Speaking Style - Straight Liners
{_sl_ss_c}
## Voice Score Scale 1-10 - Straight Liners
{_sl_v1_10_c}
""")
return
@app.cell
def _(data_all):
# # Drop any Voice Scale 1-10 responses with straight-lining, using sl_v1_10_df _responseId values
# records_to_drop = sl_v1_10_df.select('Record ID').to_series().to_list()
# data_validated = data_all.filter(~pl.col('_recordId').is_in(records_to_drop))
# mo.md(f"""
# Dropped `{len(records_to_drop)}` responses with straight-lining in Voice Scale 1-10 evaluation.
# """)
data_validated = data_all
return (data_validated,)
@app.cell(hide_code=True)
def _():
return
@app.cell
def _(data_validated):
data = data_validated
data.collect()
return
@app.cell(hide_code=True)
def _():
mo.md(r"""
---
# Introduction (Respondent Demographics)
""")
return
@app.cell(hide_code=True)
def _():
mo.md(r"""
---
# Brand Character Results
""")
return
@app.cell(hide_code=True)
def _():
mo.md(r"""
---
# Spoken Voice Results
""")
return
if __name__ == "__main__":
app.run()

View File

@@ -205,7 +205,7 @@ def _(mo):
@app.cell
def _(data, survey):
vscales = survey.get_voice_scale_1_10(data)[0].collect()
vscales
print(vscales.head())
return (vscales,)

View File

@@ -13,6 +13,12 @@ import hashlib
class JPMCPlotsMixin:
"""Mixin class for plotting functions in JPMCSurvey."""
def _process_title(self, title: str) -> str | list[str]:
"""Process title to handle <br> tags for Altair."""
if isinstance(title, str) and '<br>' in title:
return title.split('<br>')
return title
def _sanitize_filename(self, title: str) -> str:
"""Convert plot title to a safe filename."""
# Remove HTML tags
@@ -156,8 +162,8 @@ class JPMCPlotsMixin:
chart_spec = chart.to_dict()
existing_title = chart_spec.get('title', '')
# Handle different title formats (string vs dict)
if isinstance(existing_title, str):
# Handle different title formats (string vs dict vs list)
if isinstance(existing_title, (str, list)):
title_config = {
'text': existing_title,
'subtitle': lines,
@@ -260,6 +266,7 @@ class JPMCPlotsMixin:
color: str = ColorPalette.PRIMARY,
height: int | None = None,
width: int | str | None = None,
domain: list[float] | None = None,
) -> alt.Chart:
"""Create a bar plot showing average scores and count of non-null values for each column."""
df = self._ensure_dataframe(data)
@@ -278,11 +285,14 @@ class JPMCPlotsMixin:
# Convert to pandas for Altair (sort by average descending)
stats_df = pl.DataFrame(stats).sort('average', descending=True).to_pandas()
if domain is None:
domain = [stats_df['average'].min(), stats_df['average'].max()]
# Base bar chart
bars = alt.Chart(stats_df).mark_bar(color=color).encode(
x=alt.X('voice:N', title=x_label, sort='-y'),
y=alt.Y('average:Q', title=y_label, scale=alt.Scale(domain=[0, 10])),
y=alt.Y('average:Q', title=y_label, scale=alt.Scale(domain=domain)),
tooltip=[
alt.Tooltip('voice:N', title='Voice'),
alt.Tooltip('average:Q', title='Average', format='.2f'),
@@ -303,7 +313,7 @@ class JPMCPlotsMixin:
# Combine layers
chart = (bars + text).properties(
title=title,
title=self._process_title(title),
width=width or 800,
height=height or getattr(self, 'plot_height', 400)
)
@@ -360,7 +370,7 @@ class JPMCPlotsMixin:
alt.Tooltip('count:Q', title='Count')
]
).add_params(selection).properties(
title=title,
title=self._process_title(title),
width=width or 800,
height=height or getattr(self, 'plot_height', 400)
)
@@ -420,7 +430,7 @@ class JPMCPlotsMixin:
alt.Tooltip('count:Q', title='Count')
]
).add_params(selection).properties(
title=title,
title=self._process_title(title),
width=width or 800,
height=height or getattr(self, 'plot_height', 400)
)
@@ -473,7 +483,7 @@ class JPMCPlotsMixin:
alt.Tooltip('count:Q', title='1st Place Votes')
]
).properties(
title=title,
title=self._process_title(title),
width=width or 800,
height=height or getattr(self, 'plot_height', 400)
)
@@ -514,7 +524,7 @@ class JPMCPlotsMixin:
)
chart = (bars + text).properties(
title=title,
title=self._process_title(title),
width=width or 800,
height=height or getattr(self, 'plot_height', 400)
)
@@ -571,7 +581,7 @@ class JPMCPlotsMixin:
alt.Tooltip('count:Q', title='Selections')
]
).properties(
title=title,
title=self._process_title(title),
width=width or 800,
height=height or getattr(self, 'plot_height', 400)
)
@@ -627,7 +637,7 @@ class JPMCPlotsMixin:
alt.Tooltip('count:Q', title='In Top 3')
]
).properties(
title=title,
title=self._process_title(title),
width=width or 800,
height=height or getattr(self, 'plot_height', 400)
)
@@ -713,7 +723,7 @@ class JPMCPlotsMixin:
# Combine layers
chart = (bars + text).properties(
title={
"text": title,
"text": self._process_title(title),
"subtitle": [trait_description, "(Numbers on bars indicate respondent count)"]
},
width=width or 800,
@@ -776,7 +786,7 @@ class JPMCPlotsMixin:
alt.Tooltip('correlation:Q', format='.2f')
]
).properties(
title=title,
title=self._process_title(title),
width=width or 800,
height=height or 350
)
@@ -832,7 +842,7 @@ class JPMCPlotsMixin:
alt.Tooltip('correlation:Q', format='.2f')
]
).properties(
title=title,
title=self._process_title(title),
width=width or 800,
height=height or 350
)

View File

@@ -349,6 +349,87 @@ def calculate_weighted_ranking_scores(df: pl.LazyFrame) -> pl.DataFrame:
return pl.DataFrame(scores).sort('Weighted Score', descending=True)
def normalize_row_values(df: pl.DataFrame, target_cols: list[str]) -> pl.DataFrame:
"""
Normalizes values in the specified columns row-wise to 0-10 scale (Min-Max normalization).
Formula: ((x - row_min) / (row_max - row_min)) * 10
Nulls are preserved as nulls. If all non-null values in a row are equal (max == min),
those values become 5.0 (midpoint of the scale).
Parameters
----------
df : pl.DataFrame
Input dataframe.
target_cols : list[str]
List of column names to normalize.
Returns
-------
pl.DataFrame
DataFrame with target columns normalized row-wise.
"""
# Calculate row min and max across target columns (ignoring nulls)
row_min = pl.min_horizontal([pl.col(c).cast(pl.Float64) for c in target_cols])
row_max = pl.max_horizontal([pl.col(c).cast(pl.Float64) for c in target_cols])
row_range = row_max - row_min
# Build normalized column expressions
norm_exprs = []
for col in target_cols:
norm_exprs.append(
pl.when(row_range == 0)
.then(
# If range is 0 (all values equal), return 5.0 for non-null, null for null
pl.when(pl.col(col).is_null()).then(None).otherwise(5.0)
)
.otherwise(
((pl.col(col).cast(pl.Float64) - row_min) / row_range) * 10
)
.alias(col)
)
return df.with_columns(norm_exprs)
def normalize_global_values(df: pl.DataFrame, target_cols: list[str]) -> pl.DataFrame:
"""
Normalizes values in the specified columns globally to 0-10 scale.
Formula: ((x - global_min) / (global_max - global_min)) * 10
Ignores null values (NaNs).
"""
# Ensure eager for scalar extraction
was_lazy = isinstance(df, pl.LazyFrame)
if was_lazy:
df = df.collect()
if len(target_cols) == 0:
return df.lazy() if was_lazy else df
# Calculate global stats efficiently by stacking all columns
# Cast to Float64 to ensure numeric calculations
stats = df.select([pl.col(c).cast(pl.Float64) for c in target_cols]).melt().select([
pl.col("value").min().alias("min"),
pl.col("value").max().alias("max")
])
global_min = stats["min"][0]
global_max = stats["max"][0]
# Handle edge case where all values are same or none exist
if global_min is None or global_max is None or global_max == global_min:
return df.lazy() if was_lazy else df
global_range = global_max - global_min
res = df.with_columns([
(((pl.col(col).cast(pl.Float64) - global_min) / global_range) * 10).alias(col)
for col in target_cols
])
return res.lazy() if was_lazy else res
class JPMCSurvey(JPMCPlotsMixin):
"""Class to handle JPMorgan Chase survey data."""
@@ -589,10 +670,12 @@ class JPMCSurvey(JPMCPlotsMixin):
return subset, None
def get_voice_scale_1_10(self, q: pl.LazyFrame) -> Union[pl.LazyFrame, None]:
def get_voice_scale_1_10(self, q: pl.LazyFrame, drop_cols=['Voice_Scale_1_10__V46']) -> Union[pl.LazyFrame, None]:
"""Extract columns containing the Voice Scale 1-10 ratings for the Chase virtual assistant.
Returns subquery that can be chained with other polars queries.
Drops scores for V46 as it was improperly configured in the survey and thus did not show up for respondents.
"""
QIDs_map = {}
@@ -602,6 +685,12 @@ class JPMCSurvey(JPMCPlotsMixin):
# Convert "Voice 16 Scale 1-10_1" to "Scale_1_10__Voice_16"
QIDs_map[qid] = f"Voice_Scale_1_10__V{val['QName'].split()[1]}"
for col in drop_cols:
if col in QIDs_map.values():
# remove from QIDs_map
qid_to_remove = [k for k,v in QIDs_map.items() if v == col][0]
del QIDs_map[qid_to_remove]
return self._get_subset(q, list(QIDs_map.keys()), rename_cols=False).rename(QIDs_map), None

View File

@@ -1,13 +1,14 @@
import marimo as mo
import polars as pl
import altair as alt
from theme import ColorPalette
def check_progress(data):
"""Check if all responses are complete based on 'progress' column."""
if data.collect().select(pl.col('progress').unique()).shape[0] == 1:
return """### Responses Complete: \n\n✅ All responses are complete (progress = 100) """
return """## Responses Complete: \n\n✅ All responses are complete (progress = 100) """
return "### Responses Complete: \n\n⚠️ There are incomplete responses (progress < 100) ⚠️"
return "## Responses Complete: \n\n⚠️ There are incomplete responses (progress < 100) ⚠️"
def duration_validation(data):
@@ -30,9 +31,9 @@ def duration_validation(data):
outlier_data = _d.filter(pl.col('outlier_duration') == True).collect()
if outlier_data.shape[0] == 0:
return "### Duration Outliers: \n\n✅ No duration outliers detected"
return "## Duration Outliers: \n\n✅ No duration outliers detected"
return f"""### Duration Outliers:
return f"""## Duration Outliers:
**⚠️ Potential outliers detected based on response duration ⚠️**
@@ -68,13 +69,25 @@ def check_straight_liners(data, max_score=3):
schema_names = data.collect_schema().names()
# regex groupings
pattern = re.compile(r"(.*__V\d+)__Choice_\d+")
pattern_choice = re.compile(r"(.*__V\d+)__Choice_\d+")
pattern_scale = re.compile(r"Voice_Scale_1_10__V\d+")
groups = {}
for col in schema_names:
match = pattern.search(col)
if match:
group_key = match.group(1)
# Check for Choice pattern (SS_...__Vxx__Choice_y)
match_choice = pattern_choice.search(col)
if match_choice:
group_key = match_choice.group(1)
if group_key not in groups:
groups[group_key] = []
groups[group_key].append(col)
continue
# Check for Voice Scale pattern (Voice_Scale_1_10__Vxx)
# All of these form a single group "Voice_Scale_1_10"
if pattern_scale.search(col):
group_key = "Voice_Scale_1_10"
if group_key not in groups:
groups[group_key] = []
groups[group_key].append(col)
@@ -85,6 +98,13 @@ def check_straight_liners(data, max_score=3):
if not multi_attribute_groups:
return "### Straight-lining Checks: \n\n No multi-attribute question groups found."
# Cast all involved columns to Float64 (strict=False) to handle potential string columns
# and 1-10 scale floats (e.g. 5.5). Float64 covers integers as well.
all_group_cols = [col for cols in multi_attribute_groups.values() for col in cols]
data = data.with_columns([
pl.col(col).cast(pl.Float64, strict=False) for col in all_group_cols
])
# Build expressions
expressions = []
@@ -108,8 +128,9 @@ def check_straight_liners(data, max_score=3):
).alias(f"__is_straight__{key}")
value_expr = safe_val.alias(f"__val__{key}")
has_data = (list_expr.list.len() > 0).alias(f"__has_data__{key}")
expressions.extend([is_straight, value_expr])
expressions.extend([is_straight, value_expr, has_data])
# collect data with checks
# We only need _recordId and the check columns
@@ -120,33 +141,200 @@ def check_straight_liners(data, max_score=3):
# Process results into a nice table
outliers = []
for key in multi_attribute_groups.keys():
for key, group_cols in multi_attribute_groups.items():
flag_col = f"__is_straight__{key}"
val_col = f"__val__{key}"
filtered = checked_data.filter(pl.col(flag_col))
if filtered.height > 0:
rows = filtered.select(["_recordId", val_col]).rows()
for row in rows:
# Sort group_cols logic
# If Choice columns, sort by choice number.
# If Voice Scale columns (no Choice_), sort by Voice ID (Vxx)
if all("__Choice_" in c for c in group_cols):
key_func = lambda c: int(c.split('__Choice_')[-1])
else:
# Extract digits from Vxx
def key_func(c):
m = re.search(r"__V(\d+)", c)
return int(m.group(1)) if m else 0
sorted_group_cols = sorted(group_cols, key=key_func)
# Select relevant columns: Record ID, Value, and the sorted group columns
subset = filtered.select(["_recordId", val_col] + sorted_group_cols)
for row in subset.iter_rows(named=True):
# Create ordered list of values, using 'NaN' for missing data
resp_list = [row[c] if row[c] is not None else 'NaN' for c in sorted_group_cols]
outliers.append({
"Record ID": row[0],
"Record ID": row["_recordId"],
"Question Group": key,
"Value": row[1]
"Value": row[val_col],
"Responses": str(resp_list)
})
if not outliers:
return f"### Straight-lining Checks: \n\n✅ No straight-liners detected (value <= {max_score})"
return f"### Straight-lining Checks: \n\n✅ No straight-liners detected (value <= {max_score})", None
outlier_df = pl.DataFrame(outliers)
return f"""### Straight-lining Checks:
**⚠️ Potential straight-liners detected ⚠️**
Respondents selected the same value (<= {max_score}) for all attributes in the following groups:
{mo.ui.table(outlier_df)}
"""
# --- Analysis & Visualization ---
total_respondents = checked_data.height
# 1. & 3. Percentage Calculation
group_stats = []
value_dist_data = []
# Calculate Straight-Liners for ALL groups found in Data
# Condition: Respondent straight-lined ALL questions that they actually answered (ignoring empty/skipped questions)
# Logic: For every group G: if G has data (len > 0), then G must be straight.
# Also, the respondent must have answered at least one question group.
conditions = []
has_any_data_exprs = []
for key in multi_attribute_groups.keys():
flag_col = f"__is_straight__{key}"
data_col = f"__has_data__{key}"
# If has_data is True, is_straight MUST be True for it to count as valid straight-lining behavior for that user.
# Equivalent: (not has_data) OR is_straight
cond = (~pl.col(data_col)) | pl.col(flag_col)
conditions.append(cond)
has_any_data_exprs.append(pl.col(data_col))
all_straight_count = checked_data.filter(
pl.all_horizontal(conditions) & pl.any_horizontal(has_any_data_exprs)
).height
all_straight_pct = (all_straight_count / total_respondents) * 100
for key in multi_attribute_groups.keys():
flag_col = f"__is_straight__{key}"
val_col = f"__val__{key}"
# Filter for straight-liners in this specific group
sl_sub = checked_data.filter(pl.col(flag_col))
count = sl_sub.height
pct = (count / total_respondents) * 100
group_stats.append({
"Question Group": key,
"Straight-Liner %": pct,
"Count": count
})
# Get Value Distribution for this group's straight-liners
if count > 0:
# Group by the Value they straight-lined
dist = sl_sub.group_by(val_col).agg(pl.len().alias("count"))
for row in dist.iter_rows(named=True):
value_dist_data.append({
"Question Group": key,
"Value": row[val_col],
"Count": row["count"]
})
stats_df = pl.DataFrame(group_stats)
dist_df = pl.DataFrame(value_dist_data)
# Plot 1: % of Responses with Straight-Liners per Question
# Vertical bars with Count label on top
base_pct = alt.Chart(stats_df).encode(
x=alt.X("Question Group", sort=alt.EncodingSortField(field="Straight-Liner %", order="descending"))
)
bars_pct = base_pct.mark_bar(color=ColorPalette.PRIMARY).encode(
y=alt.Y("Straight-Liner %:Q", axis=alt.Axis(format=".1f", title="Share of all responses [%]")),
tooltip=["Question Group", alt.Tooltip("Straight-Liner %:Q", format=".1f"), "Count"]
)
text_pct = base_pct.mark_text(dy=-10).encode(
y=alt.Y("Straight-Liner %:Q"),
text=alt.Text("Count")
)
chart_pct = (bars_pct + text_pct).properties(
title="Share of Responses with Straight-Liners per Question",
width=800,
height=300
)
# Plot 2: Value Distribution (Horizontal Stacked Normalized Bar)
# Question Groups sorted by Total Count
# Values stacked 1 (left) -> 5 (right)
# Legend on top
# Total count at bar end
# Sort order for Y axis (Question Group) based on total Count (descending)
# Explicitly calculate sort order from stats_df to ensure consistency across layers
# High counts at the top
sorted_groups = stats_df.sort("Count", descending=True)["Question Group"].to_list()
# Base chart for Bars
# Use JPMC-aligned colors (blues) instead of default categorical rainbow
# Remove legend title as per plots.py style
bars_dist = alt.Chart(dist_df).mark_bar().encode(
y=alt.Y("Question Group", sort=sorted_groups),
x=alt.X("Count", stack="normalize", axis=alt.Axis(format="%"), title="Share of SL Responses"),
color=alt.Color("Value:O",
title=None, # explicit removal of title like in plots.py
scale=alt.Scale(scheme="blues"), # Professional blue scale
legend=alt.Legend(orient="top", direction="horizontal")
),
order=alt.Order("Value", sort="ascending"), # Ensures 1 is Left, 5 is Right
tooltip=["Question Group", "Value", "Count"]
)
# Text layer for Total Count (using stats_df which already has totals)
# using same sort for Y
text_dist = alt.Chart(stats_df).mark_text(align='left', dx=5).encode(
y=alt.Y("Question Group", sort=sorted_groups),
x=alt.datum(1.0), # Position at 100%
text=alt.Text("Count")
)
chart_dist = (bars_dist + text_dist).properties(
title="Distribution of Straight-Lined Values",
width=800,
height=500
)
analysis_md = f"""
### Straight-Lining Analysis
*"Straight-lining" is defined here as selecting the same response value for all attributes within a multi-attribute question group.*
* **Total Respondents**: {total_respondents}
* **Respondents straight-lining ALL questions presented to them**: {all_straight_pct:.2f}% ({all_straight_count} respondents)
"""
return (mo.vstack([
mo.md(f"**⚠️ Potential straight-liners detected ⚠️**\n\n"),
mo.ui.table(outlier_df),
mo.md(analysis_md),
alt.vconcat(chart_pct, chart_dist).resolve_legend(color="independent")
]), outlier_df)
if __name__ == "__main__":
from utils import JPMCSurvey
RESULTS_FILE = "data/exports/OneDrive_2026-01-28/1-28-26 Afternoon/JPMC_Chase Brand Personality_Quant Round 1_January 28, 2026_Afternoon_Labels.csv"
QSF_FILE = "data/exports/OneDrive_2026-01-21/Soft Launch Data/JPMC_Chase_Brand_Personality_Quant_Round_1.qsf"
S = JPMCSurvey(RESULTS_FILE, QSF_FILE)
data = S.load_data()
# print("Checking Green Blue:")
# print(check_straight_liners(S.get_ss_green_blue(data)[0]))
# print("Checking Orange Red:")
# print(check_straight_liners(S.get_ss_orange_red(data)[0]))
print("Checking Voice Scale 1-10:")
print(check_straight_liners(S.get_voice_scale_1_10(data)[0]))