diff --git a/01_ingest_qualtrics_export.py b/01_ingest_qualtrics_export.py index 33a9d36..16074f7 100644 --- a/01_ingest_qualtrics_export.py +++ b/01_ingest_qualtrics_export.py @@ -10,29 +10,40 @@ def _(): import polars as pl from pathlib import Path - from utils import extract_qid_descr_map, load_csv_with_qid_headers - return extract_qid_descr_map, load_csv_with_qid_headers, mo + from utils import JPMCSurvey + from plots import plot_average_scores_with_counts, plot_top3_ranking_distribution + return ( + JPMCSurvey, + mo, + plot_average_scores_with_counts, + plot_top3_ranking_distribution, + ) @app.cell def _(): RESULTS_FILE = 'data/exports/OneDrive_2026-01-21/Soft Launch Data/JPMC_Chase Brand Personality_Quant Round 1_January 21, 2026_Soft Launch_Labels.csv' + QSF_FILE = 'data/exports/OneDrive_2026-01-21/Soft Launch Data/JPMC_Chase_Brand_Personality_Quant_Round_1.qsf' # RESULTS_FILE = 'data/exports/OneDrive_1_1-16-2026/JPMC_Chase Brand Personality_Quant Round 1_TestData_Labels.csv' - return (RESULTS_FILE,) + return QSF_FILE, RESULTS_FILE @app.cell -def _(RESULTS_FILE, extract_qid_descr_map): - qid_descr_map = extract_qid_descr_map(RESULTS_FILE) - qid_descr_map - return +def _(JPMCSurvey, QSF_FILE, RESULTS_FILE): + survey = JPMCSurvey(RESULTS_FILE, QSF_FILE) + survey.qid_descr_map + return (survey,) @app.cell -def _(RESULTS_FILE, load_csv_with_qid_headers): - df = load_csv_with_qid_headers(RESULTS_FILE) - df - return +def _(survey): + data = survey.load_data() + df = data.collect() + + + df.select([q for q in df.columns if 'QID98' in q]) + + return (data,) @app.cell @@ -77,5 +88,122 @@ def _(mo): return +@app.cell +def _(survey): + cfg = survey._get_qsf_question_by_QID('QID36')['Payload'] + cfg + return + + +@app.cell +def _(data, survey): + survey.get_demographics(data)[0].collect() + return + + +@app.cell +def _(data, survey): + survey.get_top_8_traits(data)[0].collect() + return + + +@app.cell +def _(data, survey): + survey.get_top_3_traits(data)[0].collect() + return + + +@app.cell +def _(data, survey): + survey.get_character_ranking(data)[0].collect() + return + + +@app.cell +def _(data, survey): + survey.get_18_8_3(data)[0].collect() + return + + +@app.cell +def _(mo): + mo.md(r""" + # Voice Scales 1-10 + """) + return + + +@app.cell +def _(data, survey): + vscales = survey.get_voice_scale_1_10(data)[0].collect() + vscales + return (vscales,) + + +@app.cell +def _(plot_average_scores_with_counts, vscales): + plot_average_scores_with_counts(vscales, x_label='Voice', width=1000) + return + + +@app.cell +def _(mo): + mo.md(r""" + # SS Green Blue + """) + return + + +@app.cell +def _(data, survey): + _lf, _choice_map = survey.get_ss_green_blue(data) + print(_lf.collect().head()) + return + + +@app.cell +def _(mo): + mo.md(r""" + # Top 3 Voices + """) + return + + +@app.cell +def _(data, survey): + top3_voices = survey.get_top_3_voices(data)[0].collect() + top3_voices + return (top3_voices,) + + +@app.cell +def _(top3_voices): + + print(top3_voices.head()) + return + + +@app.cell +def _(plot_top3_ranking_distribution, top3_voices): + plot_top3_ranking_distribution(top3_voices, x_label='Voice', width=1000) + return + + +@app.cell +def _(mo): + mo.md(r""" + # SS Orange / Red + """) + return + + +@app.cell +def _(data, survey): + _lf, choice_map = survey.get_ss_orange_red(data) + _d = _lf.collect() + _d + return + + if __name__ == "__main__": app.run() diff --git a/plots.py b/plots.py new file mode 100644 index 0000000..431cf55 --- /dev/null +++ b/plots.py @@ -0,0 +1,212 @@ +"""Plotting functions for Voice Branding analysis.""" + +import plotly.graph_objects as go +import polars as pl + + +def plot_average_scores_with_counts( + df: pl.DataFrame, + title: str = "General Impression (1-10)
Per Voice with Number of Participants Who Rated It", + x_label: str = "Stimuli", + y_label: str = "Average General Impression Rating (1-10)", + color: str = "#0077B6", + height: int = 500, + width: int = 1000, +) -> go.Figure: + """ + Create a bar plot showing average scores and count of non-null values for each column. + + Parameters + ---------- + df : pl.DataFrame + DataFrame containing numeric columns to analyze. + title : str, optional + Plot title. + x_label : str, optional + X-axis label. + y_label : str, optional + Y-axis label. + color : str, optional + Bar color (hex code or named color). + height : int, optional + Plot height in pixels. + width : int, optional + Plot width in pixels. + + Returns + ------- + go.Figure + Plotly figure object. + """ + # Calculate average and count of non-null values for each column + stats = [] + for col in df.columns: + avg_score = df[col].mean() + non_null_count = df[col].drop_nulls().len() + stats.append({ + 'column': col, + 'average': avg_score, + 'count': non_null_count + }) + + # Sort by average score in descending order + stats_df = pl.DataFrame(stats).sort('average', descending=True) + + # Extract voice identifiers from column names (e.g., "V14" from "Voice_Scale_1_10__V14") + labels = [col.split('__')[-1] if '__' in col else col for col in stats_df['column']] + + # Create the plot + fig = go.Figure() + + fig.add_trace(go.Bar( + x=labels, + y=stats_df['average'], + text=stats_df['count'], + textposition='inside', + textfont=dict(size=10, color='black'), + marker_color=color, + hovertemplate='%{x}
Average: %{y:.2f}
Count: %{text}' + )) + + fig.update_layout( + title=title, + xaxis_title=x_label, + yaxis_title=y_label, + height=height, + width=width, + plot_bgcolor='white', + xaxis=dict( + showgrid=True, + gridcolor='lightgray', + tickangle=-45 + ), + yaxis=dict( + range=[0, 10], + showgrid=True, + gridcolor='lightgray' + ), + font=dict(size=11) + ) + + return fig + + +def plot_top3_ranking_distribution( + df: pl.DataFrame, + title: str = "Top 3 Rankings Distribution
Count of 1st, 2nd, and 3rd Place Votes per Voice", + x_label: str = "Voices", + y_label: str = "Number of Mentions in Top 3", + height: int = 600, + width: int = 1000, +) -> go.Figure: + """ + Create a stacked bar chart showing how often each voice was ranked 1st, 2nd, or 3rd. + + The total height of the bar represents the popularity (frequency of being in Top 3), + while the segments show the quality of those rankings. + + Parameters + ---------- + df : pl.DataFrame + DataFrame containing ranking columns (values 1, 2, 3). + title : str, optional + Plot title. + x_label : str, optional + X-axis label. + y_label : str, optional + Y-axis label. + height : int, optional + Plot height in pixels. + width : int, optional + Plot width in pixels. + + Returns + ------- + go.Figure + Plotly figure object. + """ + stats = [] + for col in df.columns: + # Count occurrences of each rank (1, 2, 3) + # We ensure we're just counting the specific integer values + rank1 = df.filter(pl.col(col) == 1).height + rank2 = df.filter(pl.col(col) == 2).height + rank3 = df.filter(pl.col(col) == 3).height + total = rank1 + rank2 + rank3 + + # Only include if it received at least one vote (optional, but keeps chart clean) + if total > 0: + stats.append({ + 'column': col, + 'Rank 1': rank1, + 'Rank 2': rank2, + 'Rank 3': rank3, + 'Total': total + }) + + # Sort by Total count descending (Most popular overall) + # Tie-break with Rank 1 count + stats_df = pl.DataFrame(stats).sort(['Total', 'Rank 1'], descending=[True, True]) + + # Extract voice identifiers from column names + labels = [col.split('__')[-1] if '__' in col else col for col in stats_df['column']] + + fig = go.Figure() + + # Add traces for Rank 1, 2, and 3. + # Stack order: Rank 1 at bottom (Base) -> Rank 2 -> Rank 3 + # This makes it easy to compare the "First Choice" volume across bars. + + fig.add_trace(go.Bar( + name='Rank 1 (1st Choice)', + x=labels, + y=stats_df['Rank 1'], + marker_color='#004C6D', # Dark Blue + hovertemplate='%{x}
Rank 1: %{y}' + )) + + fig.add_trace(go.Bar( + name='Rank 2 (2nd Choice)', + x=labels, + y=stats_df['Rank 2'], + marker_color='#008493', # Teal + hovertemplate='%{x}
Rank 2: %{y}' + )) + + fig.add_trace(go.Bar( + name='Rank 3 (3rd Choice)', + x=labels, + y=stats_df['Rank 3'], + marker_color='#5AAE95', # Sea Green + hovertemplate='%{x}
Rank 3: %{y}' + )) + + fig.update_layout( + barmode='stack', + title=title, + xaxis_title=x_label, + yaxis_title=y_label, + height=height, + width=width, + plot_bgcolor='white', + xaxis=dict( + showgrid=True, + gridcolor='lightgray', + tickangle=-45 + ), + yaxis=dict( + showgrid=True, + gridcolor='lightgray' + ), + legend=dict( + orientation="h", + yanchor="bottom", + y=1.02, + xanchor="right", + x=1, + traceorder="normal" + ), + font=dict(size=11) + ) + + return fig diff --git a/utils.py b/utils.py index 54a9b54..2fac20b 100644 --- a/utils.py +++ b/utils.py @@ -2,6 +2,27 @@ import polars as pl from pathlib import Path import pandas as pd from typing import Union +import json + +import re + +def extract_voice_label(html_str: str) -> str: + """ + Extract voice label from HTML string and convert to short format. + + Parameters: + html_str (str): HTML string containing voice label in format "Voice N" + + Returns: + str: Voice label in format "VN" (e.g., "V14") + + Example: + >>> extract_voice_label('Voice 14
...') + 'V14' + """ + match = re.search(r'Voice (\d+)', html_str) + return f"V{match.group(1)}" if match else None + def extract_qid(val): """Extracts the 'ImportId' from a string representation of a dictionary.""" @@ -11,64 +32,286 @@ def extract_qid(val): return val['ImportId'] -def extract_qid_descr_map(results_file: Union[str, Path]) -> dict: - """Extract mapping of Qualtrics ImportID to Question Description from results file.""" - if isinstance(results_file, str): - results_file = Path(results_file) - if '1_1-16-2026' in results_file.as_posix(): - df_questions = pd.read_csv(results_file, nrows=1) - df_questions + + +class JPMCSurvey: + """Class to handle JPMorgan Chase survey data.""" - return df_questions.iloc[0].to_dict() + def __init__(self, data_path: Union[str, Path], qsf_path: Union[str, Path]): + if isinstance(data_path, str): + data_path = Path(data_path) + + if isinstance(qsf_path, str): + qsf_path = Path(qsf_path) + + self.data_filepath = data_path + self.qsf_filepath = qsf_path + self.qid_descr_map = self._extract_qid_descr_map() + self.qsf:dict = self._load_qsf() + + + def _extract_qid_descr_map(self) -> dict: + """Extract mapping of Qualtrics ImportID to Question Description from results file.""" + + if '1_1-16-2026' in self.data_filepath.as_posix(): + df_questions = pd.read_csv(self.data_filepath, nrows=1) + df_questions + + return df_questions.iloc[0].to_dict() + + + else: + # First row contains Qualtrics Editor question names (ie 'B_VOICE SEL. 18-8') + + # Second row which contains the question content + # Third row contains the Export Metadata (ie '{"ImportId":"startDate","timeZone":"America/Denver"}') + df_questions = pd.read_csv(self.data_filepath, nrows=2) + + + + # transpose df_questions + df_questions = df_questions.T.reset_index() + df_questions.columns = ['QName', 'Description', 'export_metadata'] + df_questions['ImportID'] = df_questions['export_metadata'].apply(extract_qid) + + df_questions = df_questions[['ImportID', 'QName', 'Description']] + + # return dict as {ImportID: [QName, Description]} + return df_questions.set_index('ImportID')[['QName', 'Description']].T.to_dict() + + def _load_qsf(self) -> dict: + """Load QSF file to extract question metadata if needed.""" + + with open(self.qsf_filepath, 'r', encoding='utf-8') as f: + qsf_data = json.load(f) + return qsf_data + + def _get_qsf_question_by_QID(self, QID: str) -> dict: + """Get question metadata from QSF using the Question ID.""" + + q_elem = [elem for elem in self.qsf['SurveyElements'] if elem['PrimaryAttribute'] == QID] + + if len(q_elem) == 0: + raise ValueError(f"SurveyElement with 'PrimaryAttribute': '{QID}' not found in QSF.") + if len(q_elem) > 1: + raise ValueError(f"Multiple SurveyElements with 'PrimaryAttribute': '{QID}' found in QSF: \n{q_elem}") + + return q_elem[0] - else: - # First row contains Qualtrics Editor question names (ie 'B_VOICE SEL. 18-8') - - # Second row which contains the question content - # Third row contains the Export Metadata (ie '{"ImportId":"startDate","timeZone":"America/Denver"}') - df_questions = pd.read_csv(results_file, nrows=1, skiprows=1) - + def load_data(self) -> pl.LazyFrame: + """ + Load CSV where column headers are in row 3 as dict strings with ImportId. + The 3rd row contains metadata like '{"ImportId":"startDate","timeZone":"America/Denver"}'. + This function extracts the ImportId from each column and uses it as the column name. + + Parameters: + file_path (Path): Path to the CSV file to load. + + Returns: + pl.LazyFrame: Polars LazyFrame with ImportId as column names. + """ + if '1_1-16-2026' in self.data_filepath.as_posix(): + raise NotImplementedError("This method does not support the '1_1-16-2026' export format.") + + # Read the 3rd row (index 2) which contains the metadata dictionaries + # Use header=None to get raw values instead of treating them as column names + df_meta = pd.read_csv(self.data_filepath, nrows=1, skiprows=2, header=None) + + # Extract ImportIds from each column value in this row + new_columns = [extract_qid(val) for val in df_meta.iloc[0]] + + # Now read the actual data starting from row 4 (skip first 3 rows) + df = pl.read_csv(self.data_filepath, skip_rows=3) + + # Rename columns with the extracted ImportIds + df.columns = new_columns + + return df.lazy() - # transpose df_questions - df_questions = df_questions.T.reset_index() - df_questions.columns = ['Description', 'export_metadata'] - df_questions['ImportID'] = df_questions['export_metadata'].apply(extract_qid) - - df_questions = df_questions[['ImportID', 'Description']] - - return dict(zip(df_questions['ImportID'], df_questions['Description'])) + def _get_subset(self, q: pl.LazyFrame, QIDs, rename_cols=True) -> pl.LazyFrame: + """Extract subset of data based on specific questions.""" + if not rename_cols: + return q.select(QIDs) + + rename_dict = {qid: self.qid_descr_map[qid]['QName'] for qid in QIDs if qid in self.qid_descr_map} + + return q.select(QIDs).rename(rename_dict) -def load_csv_with_qid_headers(file_path: Union[str, Path]) -> pl.DataFrame: - """ - Load CSV where column headers are in row 3 as dict strings with ImportId. - - The 3rd row contains metadata like '{"ImportId":"startDate","timeZone":"America/Denver"}'. - This function extracts the ImportId from each column and uses it as the column name. - - Parameters: - file_path (Path): Path to the CSV file to load. - - Returns: - pl.DataFrame: Polars DataFrame with ImportId as column names. - """ - if isinstance(file_path, str): - file_path = Path(file_path) + def get_demographics(self, q: pl.LazyFrame) -> pl.LazyFrame: + """Extract columns containing the demographics. + + Renames columns using qid_descr_map if provided. + """ + QIDs = ['QID1', 'QID2', 'QID3', 'QID4', 'QID13', 'QID14', 'QID15', 'QID16', 'QID17', 'Consumer'] + return self._get_subset(q, QIDs), None - # Read the 3rd row (index 2) which contains the metadata dictionaries - # Use header=None to get raw values instead of treating them as column names - df_meta = pd.read_csv(file_path, nrows=1, skiprows=2, header=None) + + def get_top_8_traits(self, q: pl.LazyFrame) -> pl.LazyFrame: + """Extract columns containing the top 8 characteristics are most important for this Chase virtual assistant to have. + + Returns subquery that can be chained with other polars queries. + """ + QIDs = ['QID25'] + return self._get_subset(q, QIDs, rename_cols=False).rename({'QID25': 'Top_8_Traits'}), None + + + + def get_top_3_traits(self, q: pl.LazyFrame) -> pl.LazyFrame: + """Extract columns containing the top 3 characteristics that the Chase virtual assistant should prioritize. + + Returns subquery that can be chained with other polars queries. + """ + QIDs = ['QID26_0_GROUP'] + return self._get_subset(q, QIDs, rename_cols=False).rename({'QID26_0_GROUP': 'Top_3_Traits'}), None - # Extract ImportIds from each column value in this row - new_columns = [extract_qid(val) for val in df_meta.iloc[0]] - # Now read the actual data starting from row 4 (skip first 3 rows) - df = pl.read_csv(file_path, skip_rows=3) + def get_character_ranking(self, q: pl.LazyFrame) -> pl.LazyFrame: + """Extract columns containing the ranking of characteristics for the Chase virtual assistant. + + Returns subquery that can be chained with other polars queries. + """ + + + # Requires QSF to map "Character Ranking_2" to the actual character + cfg = self._get_qsf_question_by_QID('QID27')['Payload'] + + + QIDs_map = {f'QID27_{v}': cfg['VariableNaming'][k] for k,v in cfg['RecodeValues'].items()} + QIDs_rename = {qid: f'Character_Ranking_{QIDs_map[qid].replace(" ", "_")}' for qid in QIDs_map} + + return self._get_subset(q, list(QIDs_rename.keys()), rename_cols=False).rename(QIDs_rename), None + - # Rename columns with the extracted ImportIds - df.columns = new_columns + def get_18_8_3(self, q: pl.LazyFrame) -> pl.LazyFrame: + """Extract columns containing the 18-8-3 feedback for the Chase virtual assistant. + + Returns subquery that can be chained with other polars queries. + """ + QIDs = ['QID29', 'QID101', 'QID36_0_GROUP'] + + rename_dict = { + 'QID29': '18-8_Set-A', + 'QID101': '18-8_Set-B', + 'QID36_0_GROUP': '8-3_Ranked' + } + return self._get_subset(q, QIDs, rename_cols=False).rename(rename_dict), None - return df \ No newline at end of file + + def get_voice_scale_1_10(self, q: pl.LazyFrame) -> pl.LazyFrame: + """Extract columns containing the Voice Scale 1-10 ratings for the Chase virtual assistant. + + Returns subquery that can be chained with other polars queries. + """ + + QIDs_map = {} + + for qid, val in self.qid_descr_map.items(): + if 'Scale 1-10_1' in val['QName']: + # Convert "Voice 16 Scale 1-10_1" to "Scale_1_10__Voice_16" + QIDs_map[qid] = f"Voice_Scale_1_10__V{val['QName'].split()[1]}" + + return self._get_subset(q, list(QIDs_map.keys()), rename_cols=False).rename(QIDs_map), None + + + + def get_ss_green_blue(self, q: pl.LazyFrame) -> pl.LazyFrame: + """Extract columns containing the SS Green/Blue ratings for the Chase virtual assistant. + + Returns subquery that can be chained with other polars queries. + """ + + cfg = self._get_qsf_question_by_QID('QID35')['Payload'] + + QIDs_map = {} + choices_map = {} + for qid, val in self.qid_descr_map.items(): + if 'SS Green-Blue' in val['QName']: + + cfg = self._get_qsf_question_by_QID(qid.split('_')[0])['Payload'] + + # ie: "V14 SS Green-Blue_1" + qname_parts = val['QName'].split() + voice = qname_parts[0] + trait_num = qname_parts[-1].split('_')[-1] + + QIDs_map[qid] = f"SS_Green_Blue__{voice}__Choice_{trait_num}" + + choices_map[f"SS_Green_Blue__{voice}__Choice_{trait_num}"] = cfg['Choices'][trait_num]['Display'] + + return self._get_subset(q, list(QIDs_map.keys()), rename_cols=False).rename(QIDs_map), choices_map + + + def get_top_3_voices(self, q: pl.LazyFrame) -> pl.LazyFrame: + """Extract columns containing the top 3 voice choices for the Chase virtual assistant. + + Returns subquery that can be chained with other polars queries. + """ + + QIDs_map = {} + + cfg36 = self._get_qsf_question_by_QID('QID36')['Payload'] + choice_voice_map = {k: extract_voice_label(v['Display']) for k,v in cfg36['Choices'].items()} + + + for qid, val in self.qid_descr_map.items(): + if 'Rank Top 3 Voices' in val['QName']: + + cfg = self._get_qsf_question_by_QID(qid.split('_')[0])['Payload'] + voice_num = val['QName'].split('_')[-1] + + # Validate that the DynamicChoices Locator is as expected + if cfg['DynamicChoices']['Locator'] != r"q://QID36/ChoiceGroup/SelectedChoicesInGroup/1": + raise ValueError(f"Unexpected DynamicChoices Locator for QID '{qid}': {cfg['DynamicChoices']['Locator']}") + + # extract the voice from the QID36 config + voice = choice_voice_map[voice_num] + + # Convert "Top 3 Voices_1" to "Top_3_Voices__V14" + QIDs_map[qid] = f"Top_3_Voices_ranking__{voice}" + + return self._get_subset(q, list(QIDs_map.keys()), rename_cols=False).rename(QIDs_map), None + + + def get_ss_orange_red(self, q: pl.LazyFrame) -> pl.LazyFrame: + """Extract columns containing the SS Orange/Red ratings for the Chase virtual assistant. + + Returns subquery that can be chained with other polars queries. + """ + + cfg = self._get_qsf_question_by_QID('QID40')['Payload'] + + QIDs_map = {} + choices_map = {} + for qid, val in self.qid_descr_map.items(): + if 'SS Orange-Red' in val['QName']: + + cfg = self._get_qsf_question_by_QID(qid.split('_')[0])['Payload'] + + # ie: "V14 SS Orange-Red_1" + qname_parts = val['QName'].split() + voice = qname_parts[0] + trait_num = qname_parts[-1].split('_')[-1] + + QIDs_map[qid] = f"SS_Orange_Red__{voice}__Choice_{trait_num}" + + choices_map[f"SS_Orange_Red__{voice}__Choice_{trait_num}"] = cfg['Choices'][trait_num]['Display'] + + return self._get_subset(q, list(QIDs_map.keys()), rename_cols=False).rename(QIDs_map), choices_map + + + def get_character_refine(self, q: pl.LazyFrame) -> pl.LazyFrame: + """Extract columns containing the character refine feedback for the Chase virtual assistant. + + Returns subquery that can be chained with other polars queries. + """ + QIDs = ['QID29', 'QID101', 'QID36_0_GROUP'] + + rename_dict = { + 'QID29': '18-8_Set-A', + 'QID101': '18-8_Set-B', + 'QID36_0_GROUP': '8-3_Ranked' + } \ No newline at end of file