initial plots

This commit is contained in:
2026-01-22 20:48:59 +01:00
parent b8642e9de8
commit dbcade215b
3 changed files with 642 additions and 59 deletions

View File

@@ -10,29 +10,40 @@ def _():
import polars as pl
from pathlib import Path
from utils import extract_qid_descr_map, load_csv_with_qid_headers
return extract_qid_descr_map, load_csv_with_qid_headers, mo
from utils import JPMCSurvey
from plots import plot_average_scores_with_counts, plot_top3_ranking_distribution
return (
JPMCSurvey,
mo,
plot_average_scores_with_counts,
plot_top3_ranking_distribution,
)
@app.cell
def _():
RESULTS_FILE = 'data/exports/OneDrive_2026-01-21/Soft Launch Data/JPMC_Chase Brand Personality_Quant Round 1_January 21, 2026_Soft Launch_Labels.csv'
QSF_FILE = 'data/exports/OneDrive_2026-01-21/Soft Launch Data/JPMC_Chase_Brand_Personality_Quant_Round_1.qsf'
# RESULTS_FILE = 'data/exports/OneDrive_1_1-16-2026/JPMC_Chase Brand Personality_Quant Round 1_TestData_Labels.csv'
return (RESULTS_FILE,)
return QSF_FILE, RESULTS_FILE
@app.cell
def _(RESULTS_FILE, extract_qid_descr_map):
qid_descr_map = extract_qid_descr_map(RESULTS_FILE)
qid_descr_map
return
def _(JPMCSurvey, QSF_FILE, RESULTS_FILE):
survey = JPMCSurvey(RESULTS_FILE, QSF_FILE)
survey.qid_descr_map
return (survey,)
@app.cell
def _(RESULTS_FILE, load_csv_with_qid_headers):
df = load_csv_with_qid_headers(RESULTS_FILE)
df
return
def _(survey):
data = survey.load_data()
df = data.collect()
df.select([q for q in df.columns if 'QID98' in q])
return (data,)
@app.cell
@@ -77,5 +88,122 @@ def _(mo):
return
@app.cell
def _(survey):
cfg = survey._get_qsf_question_by_QID('QID36')['Payload']
cfg
return
@app.cell
def _(data, survey):
survey.get_demographics(data)[0].collect()
return
@app.cell
def _(data, survey):
survey.get_top_8_traits(data)[0].collect()
return
@app.cell
def _(data, survey):
survey.get_top_3_traits(data)[0].collect()
return
@app.cell
def _(data, survey):
survey.get_character_ranking(data)[0].collect()
return
@app.cell
def _(data, survey):
survey.get_18_8_3(data)[0].collect()
return
@app.cell
def _(mo):
mo.md(r"""
# Voice Scales 1-10
""")
return
@app.cell
def _(data, survey):
vscales = survey.get_voice_scale_1_10(data)[0].collect()
vscales
return (vscales,)
@app.cell
def _(plot_average_scores_with_counts, vscales):
plot_average_scores_with_counts(vscales, x_label='Voice', width=1000)
return
@app.cell
def _(mo):
mo.md(r"""
# SS Green Blue
""")
return
@app.cell
def _(data, survey):
_lf, _choice_map = survey.get_ss_green_blue(data)
print(_lf.collect().head())
return
@app.cell
def _(mo):
mo.md(r"""
# Top 3 Voices
""")
return
@app.cell
def _(data, survey):
top3_voices = survey.get_top_3_voices(data)[0].collect()
top3_voices
return (top3_voices,)
@app.cell
def _(top3_voices):
print(top3_voices.head())
return
@app.cell
def _(plot_top3_ranking_distribution, top3_voices):
plot_top3_ranking_distribution(top3_voices, x_label='Voice', width=1000)
return
@app.cell
def _(mo):
mo.md(r"""
# SS Orange / Red
""")
return
@app.cell
def _(data, survey):
_lf, choice_map = survey.get_ss_orange_red(data)
_d = _lf.collect()
_d
return
if __name__ == "__main__":
app.run()

212
plots.py Normal file
View File

@@ -0,0 +1,212 @@
"""Plotting functions for Voice Branding analysis."""
import plotly.graph_objects as go
import polars as pl
def plot_average_scores_with_counts(
df: pl.DataFrame,
title: str = "General Impression (1-10)<br>Per Voice with Number of Participants Who Rated It",
x_label: str = "Stimuli",
y_label: str = "Average General Impression Rating (1-10)",
color: str = "#0077B6",
height: int = 500,
width: int = 1000,
) -> go.Figure:
"""
Create a bar plot showing average scores and count of non-null values for each column.
Parameters
----------
df : pl.DataFrame
DataFrame containing numeric columns to analyze.
title : str, optional
Plot title.
x_label : str, optional
X-axis label.
y_label : str, optional
Y-axis label.
color : str, optional
Bar color (hex code or named color).
height : int, optional
Plot height in pixels.
width : int, optional
Plot width in pixels.
Returns
-------
go.Figure
Plotly figure object.
"""
# Calculate average and count of non-null values for each column
stats = []
for col in df.columns:
avg_score = df[col].mean()
non_null_count = df[col].drop_nulls().len()
stats.append({
'column': col,
'average': avg_score,
'count': non_null_count
})
# Sort by average score in descending order
stats_df = pl.DataFrame(stats).sort('average', descending=True)
# Extract voice identifiers from column names (e.g., "V14" from "Voice_Scale_1_10__V14")
labels = [col.split('__')[-1] if '__' in col else col for col in stats_df['column']]
# Create the plot
fig = go.Figure()
fig.add_trace(go.Bar(
x=labels,
y=stats_df['average'],
text=stats_df['count'],
textposition='inside',
textfont=dict(size=10, color='black'),
marker_color=color,
hovertemplate='<b>%{x}</b><br>Average: %{y:.2f}<br>Count: %{text}<extra></extra>'
))
fig.update_layout(
title=title,
xaxis_title=x_label,
yaxis_title=y_label,
height=height,
width=width,
plot_bgcolor='white',
xaxis=dict(
showgrid=True,
gridcolor='lightgray',
tickangle=-45
),
yaxis=dict(
range=[0, 10],
showgrid=True,
gridcolor='lightgray'
),
font=dict(size=11)
)
return fig
def plot_top3_ranking_distribution(
df: pl.DataFrame,
title: str = "Top 3 Rankings Distribution<br>Count of 1st, 2nd, and 3rd Place Votes per Voice",
x_label: str = "Voices",
y_label: str = "Number of Mentions in Top 3",
height: int = 600,
width: int = 1000,
) -> go.Figure:
"""
Create a stacked bar chart showing how often each voice was ranked 1st, 2nd, or 3rd.
The total height of the bar represents the popularity (frequency of being in Top 3),
while the segments show the quality of those rankings.
Parameters
----------
df : pl.DataFrame
DataFrame containing ranking columns (values 1, 2, 3).
title : str, optional
Plot title.
x_label : str, optional
X-axis label.
y_label : str, optional
Y-axis label.
height : int, optional
Plot height in pixels.
width : int, optional
Plot width in pixels.
Returns
-------
go.Figure
Plotly figure object.
"""
stats = []
for col in df.columns:
# Count occurrences of each rank (1, 2, 3)
# We ensure we're just counting the specific integer values
rank1 = df.filter(pl.col(col) == 1).height
rank2 = df.filter(pl.col(col) == 2).height
rank3 = df.filter(pl.col(col) == 3).height
total = rank1 + rank2 + rank3
# Only include if it received at least one vote (optional, but keeps chart clean)
if total > 0:
stats.append({
'column': col,
'Rank 1': rank1,
'Rank 2': rank2,
'Rank 3': rank3,
'Total': total
})
# Sort by Total count descending (Most popular overall)
# Tie-break with Rank 1 count
stats_df = pl.DataFrame(stats).sort(['Total', 'Rank 1'], descending=[True, True])
# Extract voice identifiers from column names
labels = [col.split('__')[-1] if '__' in col else col for col in stats_df['column']]
fig = go.Figure()
# Add traces for Rank 1, 2, and 3.
# Stack order: Rank 1 at bottom (Base) -> Rank 2 -> Rank 3
# This makes it easy to compare the "First Choice" volume across bars.
fig.add_trace(go.Bar(
name='Rank 1 (1st Choice)',
x=labels,
y=stats_df['Rank 1'],
marker_color='#004C6D', # Dark Blue
hovertemplate='<b>%{x}</b><br>Rank 1: %{y}<extra></extra>'
))
fig.add_trace(go.Bar(
name='Rank 2 (2nd Choice)',
x=labels,
y=stats_df['Rank 2'],
marker_color='#008493', # Teal
hovertemplate='<b>%{x}</b><br>Rank 2: %{y}<extra></extra>'
))
fig.add_trace(go.Bar(
name='Rank 3 (3rd Choice)',
x=labels,
y=stats_df['Rank 3'],
marker_color='#5AAE95', # Sea Green
hovertemplate='<b>%{x}</b><br>Rank 3: %{y}<extra></extra>'
))
fig.update_layout(
barmode='stack',
title=title,
xaxis_title=x_label,
yaxis_title=y_label,
height=height,
width=width,
plot_bgcolor='white',
xaxis=dict(
showgrid=True,
gridcolor='lightgray',
tickangle=-45
),
yaxis=dict(
showgrid=True,
gridcolor='lightgray'
),
legend=dict(
orientation="h",
yanchor="bottom",
y=1.02,
xanchor="right",
x=1,
traceorder="normal"
),
font=dict(size=11)
)
return fig

339
utils.py
View File

@@ -2,6 +2,27 @@ import polars as pl
from pathlib import Path
import pandas as pd
from typing import Union
import json
import re
def extract_voice_label(html_str: str) -> str:
"""
Extract voice label from HTML string and convert to short format.
Parameters:
html_str (str): HTML string containing voice label in format "Voice N"
Returns:
str: Voice label in format "VN" (e.g., "V14")
Example:
>>> extract_voice_label('<span style="...">Voice 14<br />...')
'V14'
"""
match = re.search(r'Voice (\d+)', html_str)
return f"V{match.group(1)}" if match else None
def extract_qid(val):
"""Extracts the 'ImportId' from a string representation of a dictionary."""
@@ -11,64 +32,286 @@ def extract_qid(val):
return val['ImportId']
def extract_qid_descr_map(results_file: Union[str, Path]) -> dict:
"""Extract mapping of Qualtrics ImportID to Question Description from results file."""
if isinstance(results_file, str):
results_file = Path(results_file)
if '1_1-16-2026' in results_file.as_posix():
df_questions = pd.read_csv(results_file, nrows=1)
df_questions
class JPMCSurvey:
"""Class to handle JPMorgan Chase survey data."""
return df_questions.iloc[0].to_dict()
def __init__(self, data_path: Union[str, Path], qsf_path: Union[str, Path]):
if isinstance(data_path, str):
data_path = Path(data_path)
if isinstance(qsf_path, str):
qsf_path = Path(qsf_path)
self.data_filepath = data_path
self.qsf_filepath = qsf_path
self.qid_descr_map = self._extract_qid_descr_map()
self.qsf:dict = self._load_qsf()
def _extract_qid_descr_map(self) -> dict:
"""Extract mapping of Qualtrics ImportID to Question Description from results file."""
if '1_1-16-2026' in self.data_filepath.as_posix():
df_questions = pd.read_csv(self.data_filepath, nrows=1)
df_questions
return df_questions.iloc[0].to_dict()
else:
# First row contains Qualtrics Editor question names (ie 'B_VOICE SEL. 18-8')
# Second row which contains the question content
# Third row contains the Export Metadata (ie '{"ImportId":"startDate","timeZone":"America/Denver"}')
df_questions = pd.read_csv(self.data_filepath, nrows=2)
# transpose df_questions
df_questions = df_questions.T.reset_index()
df_questions.columns = ['QName', 'Description', 'export_metadata']
df_questions['ImportID'] = df_questions['export_metadata'].apply(extract_qid)
df_questions = df_questions[['ImportID', 'QName', 'Description']]
# return dict as {ImportID: [QName, Description]}
return df_questions.set_index('ImportID')[['QName', 'Description']].T.to_dict()
def _load_qsf(self) -> dict:
"""Load QSF file to extract question metadata if needed."""
with open(self.qsf_filepath, 'r', encoding='utf-8') as f:
qsf_data = json.load(f)
return qsf_data
def _get_qsf_question_by_QID(self, QID: str) -> dict:
"""Get question metadata from QSF using the Question ID."""
q_elem = [elem for elem in self.qsf['SurveyElements'] if elem['PrimaryAttribute'] == QID]
if len(q_elem) == 0:
raise ValueError(f"SurveyElement with 'PrimaryAttribute': '{QID}' not found in QSF.")
if len(q_elem) > 1:
raise ValueError(f"Multiple SurveyElements with 'PrimaryAttribute': '{QID}' found in QSF: \n{q_elem}")
return q_elem[0]
else:
# First row contains Qualtrics Editor question names (ie 'B_VOICE SEL. 18-8')
# Second row which contains the question content
# Third row contains the Export Metadata (ie '{"ImportId":"startDate","timeZone":"America/Denver"}')
df_questions = pd.read_csv(results_file, nrows=1, skiprows=1)
def load_data(self) -> pl.LazyFrame:
"""
Load CSV where column headers are in row 3 as dict strings with ImportId.
The 3rd row contains metadata like '{"ImportId":"startDate","timeZone":"America/Denver"}'.
This function extracts the ImportId from each column and uses it as the column name.
Parameters:
file_path (Path): Path to the CSV file to load.
Returns:
pl.LazyFrame: Polars LazyFrame with ImportId as column names.
"""
if '1_1-16-2026' in self.data_filepath.as_posix():
raise NotImplementedError("This method does not support the '1_1-16-2026' export format.")
# Read the 3rd row (index 2) which contains the metadata dictionaries
# Use header=None to get raw values instead of treating them as column names
df_meta = pd.read_csv(self.data_filepath, nrows=1, skiprows=2, header=None)
# Extract ImportIds from each column value in this row
new_columns = [extract_qid(val) for val in df_meta.iloc[0]]
# Now read the actual data starting from row 4 (skip first 3 rows)
df = pl.read_csv(self.data_filepath, skip_rows=3)
# Rename columns with the extracted ImportIds
df.columns = new_columns
return df.lazy()
# transpose df_questions
df_questions = df_questions.T.reset_index()
df_questions.columns = ['Description', 'export_metadata']
df_questions['ImportID'] = df_questions['export_metadata'].apply(extract_qid)
df_questions = df_questions[['ImportID', 'Description']]
return dict(zip(df_questions['ImportID'], df_questions['Description']))
def _get_subset(self, q: pl.LazyFrame, QIDs, rename_cols=True) -> pl.LazyFrame:
"""Extract subset of data based on specific questions."""
if not rename_cols:
return q.select(QIDs)
rename_dict = {qid: self.qid_descr_map[qid]['QName'] for qid in QIDs if qid in self.qid_descr_map}
return q.select(QIDs).rename(rename_dict)
def load_csv_with_qid_headers(file_path: Union[str, Path]) -> pl.DataFrame:
"""
Load CSV where column headers are in row 3 as dict strings with ImportId.
The 3rd row contains metadata like '{"ImportId":"startDate","timeZone":"America/Denver"}'.
This function extracts the ImportId from each column and uses it as the column name.
Parameters:
file_path (Path): Path to the CSV file to load.
Returns:
pl.DataFrame: Polars DataFrame with ImportId as column names.
"""
if isinstance(file_path, str):
file_path = Path(file_path)
def get_demographics(self, q: pl.LazyFrame) -> pl.LazyFrame:
"""Extract columns containing the demographics.
Renames columns using qid_descr_map if provided.
"""
QIDs = ['QID1', 'QID2', 'QID3', 'QID4', 'QID13', 'QID14', 'QID15', 'QID16', 'QID17', 'Consumer']
return self._get_subset(q, QIDs), None
# Read the 3rd row (index 2) which contains the metadata dictionaries
# Use header=None to get raw values instead of treating them as column names
df_meta = pd.read_csv(file_path, nrows=1, skiprows=2, header=None)
def get_top_8_traits(self, q: pl.LazyFrame) -> pl.LazyFrame:
"""Extract columns containing the top 8 characteristics are most important for this Chase virtual assistant to have.
Returns subquery that can be chained with other polars queries.
"""
QIDs = ['QID25']
return self._get_subset(q, QIDs, rename_cols=False).rename({'QID25': 'Top_8_Traits'}), None
def get_top_3_traits(self, q: pl.LazyFrame) -> pl.LazyFrame:
"""Extract columns containing the top 3 characteristics that the Chase virtual assistant should prioritize.
Returns subquery that can be chained with other polars queries.
"""
QIDs = ['QID26_0_GROUP']
return self._get_subset(q, QIDs, rename_cols=False).rename({'QID26_0_GROUP': 'Top_3_Traits'}), None
# Extract ImportIds from each column value in this row
new_columns = [extract_qid(val) for val in df_meta.iloc[0]]
# Now read the actual data starting from row 4 (skip first 3 rows)
df = pl.read_csv(file_path, skip_rows=3)
def get_character_ranking(self, q: pl.LazyFrame) -> pl.LazyFrame:
"""Extract columns containing the ranking of characteristics for the Chase virtual assistant.
Returns subquery that can be chained with other polars queries.
"""
# Requires QSF to map "Character Ranking_2" to the actual character
cfg = self._get_qsf_question_by_QID('QID27')['Payload']
QIDs_map = {f'QID27_{v}': cfg['VariableNaming'][k] for k,v in cfg['RecodeValues'].items()}
QIDs_rename = {qid: f'Character_Ranking_{QIDs_map[qid].replace(" ", "_")}' for qid in QIDs_map}
return self._get_subset(q, list(QIDs_rename.keys()), rename_cols=False).rename(QIDs_rename), None
# Rename columns with the extracted ImportIds
df.columns = new_columns
def get_18_8_3(self, q: pl.LazyFrame) -> pl.LazyFrame:
"""Extract columns containing the 18-8-3 feedback for the Chase virtual assistant.
Returns subquery that can be chained with other polars queries.
"""
QIDs = ['QID29', 'QID101', 'QID36_0_GROUP']
rename_dict = {
'QID29': '18-8_Set-A',
'QID101': '18-8_Set-B',
'QID36_0_GROUP': '8-3_Ranked'
}
return self._get_subset(q, QIDs, rename_cols=False).rename(rename_dict), None
return df
def get_voice_scale_1_10(self, q: pl.LazyFrame) -> pl.LazyFrame:
"""Extract columns containing the Voice Scale 1-10 ratings for the Chase virtual assistant.
Returns subquery that can be chained with other polars queries.
"""
QIDs_map = {}
for qid, val in self.qid_descr_map.items():
if 'Scale 1-10_1' in val['QName']:
# Convert "Voice 16 Scale 1-10_1" to "Scale_1_10__Voice_16"
QIDs_map[qid] = f"Voice_Scale_1_10__V{val['QName'].split()[1]}"
return self._get_subset(q, list(QIDs_map.keys()), rename_cols=False).rename(QIDs_map), None
def get_ss_green_blue(self, q: pl.LazyFrame) -> pl.LazyFrame:
"""Extract columns containing the SS Green/Blue ratings for the Chase virtual assistant.
Returns subquery that can be chained with other polars queries.
"""
cfg = self._get_qsf_question_by_QID('QID35')['Payload']
QIDs_map = {}
choices_map = {}
for qid, val in self.qid_descr_map.items():
if 'SS Green-Blue' in val['QName']:
cfg = self._get_qsf_question_by_QID(qid.split('_')[0])['Payload']
# ie: "V14 SS Green-Blue_1"
qname_parts = val['QName'].split()
voice = qname_parts[0]
trait_num = qname_parts[-1].split('_')[-1]
QIDs_map[qid] = f"SS_Green_Blue__{voice}__Choice_{trait_num}"
choices_map[f"SS_Green_Blue__{voice}__Choice_{trait_num}"] = cfg['Choices'][trait_num]['Display']
return self._get_subset(q, list(QIDs_map.keys()), rename_cols=False).rename(QIDs_map), choices_map
def get_top_3_voices(self, q: pl.LazyFrame) -> pl.LazyFrame:
"""Extract columns containing the top 3 voice choices for the Chase virtual assistant.
Returns subquery that can be chained with other polars queries.
"""
QIDs_map = {}
cfg36 = self._get_qsf_question_by_QID('QID36')['Payload']
choice_voice_map = {k: extract_voice_label(v['Display']) for k,v in cfg36['Choices'].items()}
for qid, val in self.qid_descr_map.items():
if 'Rank Top 3 Voices' in val['QName']:
cfg = self._get_qsf_question_by_QID(qid.split('_')[0])['Payload']
voice_num = val['QName'].split('_')[-1]
# Validate that the DynamicChoices Locator is as expected
if cfg['DynamicChoices']['Locator'] != r"q://QID36/ChoiceGroup/SelectedChoicesInGroup/1":
raise ValueError(f"Unexpected DynamicChoices Locator for QID '{qid}': {cfg['DynamicChoices']['Locator']}")
# extract the voice from the QID36 config
voice = choice_voice_map[voice_num]
# Convert "Top 3 Voices_1" to "Top_3_Voices__V14"
QIDs_map[qid] = f"Top_3_Voices_ranking__{voice}"
return self._get_subset(q, list(QIDs_map.keys()), rename_cols=False).rename(QIDs_map), None
def get_ss_orange_red(self, q: pl.LazyFrame) -> pl.LazyFrame:
"""Extract columns containing the SS Orange/Red ratings for the Chase virtual assistant.
Returns subquery that can be chained with other polars queries.
"""
cfg = self._get_qsf_question_by_QID('QID40')['Payload']
QIDs_map = {}
choices_map = {}
for qid, val in self.qid_descr_map.items():
if 'SS Orange-Red' in val['QName']:
cfg = self._get_qsf_question_by_QID(qid.split('_')[0])['Payload']
# ie: "V14 SS Orange-Red_1"
qname_parts = val['QName'].split()
voice = qname_parts[0]
trait_num = qname_parts[-1].split('_')[-1]
QIDs_map[qid] = f"SS_Orange_Red__{voice}__Choice_{trait_num}"
choices_map[f"SS_Orange_Red__{voice}__Choice_{trait_num}"] = cfg['Choices'][trait_num]['Display']
return self._get_subset(q, list(QIDs_map.keys()), rename_cols=False).rename(QIDs_map), choices_map
def get_character_refine(self, q: pl.LazyFrame) -> pl.LazyFrame:
"""Extract columns containing the character refine feedback for the Chase virtual assistant.
Returns subquery that can be chained with other polars queries.
"""
QIDs = ['QID29', 'QID101', 'QID36_0_GROUP']
rename_dict = {
'QID29': '18-8_Set-A',
'QID101': '18-8_Set-B',
'QID36_0_GROUP': '8-3_Ranked'
}