import polars as pl from pathlib import Path import pandas as pd from typing import Union import json import re def extract_voice_label(html_str: str) -> str: """ Extract voice label from HTML string and convert to short format. Parameters: html_str (str): HTML string containing voice label in format "Voice N" Returns: str: Voice label in format "VN" (e.g., "V14") Example: >>> extract_voice_label('Voice 14
...') 'V14' """ match = re.search(r'Voice (\d+)', html_str) return f"V{match.group(1)}" if match else None def extract_qid(val): """Extracts the 'ImportId' from a string representation of a dictionary.""" if isinstance(val, str) and val.startswith('{') and val.endswith('}'): val = eval(val) return val['ImportId'] def combine_exclusive_columns(df: pl.DataFrame, id_col: str = "_recordId", target_col_name: str = "combined_value") -> pl.DataFrame: """ Combines all columns except id_col into a single column. Raises ValueError if more than one column is populated in a single row. """ merge_cols = [c for c in df.columns if c != id_col] # Validate: count non-nulls horizontally row_counts = df.select( pl.sum_horizontal(pl.col(merge_cols).is_not_null()) ).to_series() if (row_counts > 1).any(): raise ValueError("Invalid Data: Multiple columns populated for a single record row.") # Merge columns using coalesce return df.select([ pl.col(id_col), pl.coalesce(merge_cols).alias(target_col_name) ]) def calculate_weighted_ranking_scores(df: pl.DataFrame) -> pl.DataFrame: """ Calculate weighted scores for character or voice rankings. Points system: 1st place = 3 pts, 2nd place = 2 pts, 3rd place = 1 pt. Parameters ---------- df : pl.DataFrame DataFrame containing character/ voice ranking columns. Returns ------- pl.DataFrame DataFrame with columns 'Character' and 'Weighted Score', sorted by score. """ scores = [] # Identify ranking columns (assume all columns except _recordId) ranking_cols = [c for c in df.columns if c != '_recordId'] for col in ranking_cols: # Calculate score: # (Count of Rank 1 * 3) + (Count of Rank 2 * 2) + (Count of Rank 3 * 1) r1_count = df.filter(pl.col(col) == 1).height r2_count = df.filter(pl.col(col) == 2).height r3_count = df.filter(pl.col(col) == 3).height weighted_score = (r1_count * 3) + (r2_count * 2) + (r3_count * 1) # Clean name clean_name = col.replace('Character_Ranking_', '').replace('Top_3_Voices_ranking__', '').replace('_', ' ').strip() scores.append({ 'Character': clean_name, 'Weighted Score': weighted_score }) return pl.DataFrame(scores).sort('Weighted Score', descending=True) class JPMCSurvey: """Class to handle JPMorgan Chase survey data.""" def __init__(self, data_path: Union[str, Path], qsf_path: Union[str, Path]): if isinstance(data_path, str): data_path = Path(data_path) if isinstance(qsf_path, str): qsf_path = Path(qsf_path) self.data_filepath = data_path self.qsf_filepath = qsf_path self.qid_descr_map = self._extract_qid_descr_map() self.qsf:dict = self._load_qsf() # get export directory name for saving figures ie if data_path='data/exports/OneDrive_2026-01-21/...' should be 'figures/OneDrive_2026-01-21' self.fig_save_dir = Path('figures') / self.data_filepath.parts[2] if not self.fig_save_dir.exists(): self.fig_save_dir.mkdir(parents=True, exist_ok=True) def _extract_qid_descr_map(self) -> dict: """Extract mapping of Qualtrics ImportID to Question Description from results file.""" if '1_1-16-2026' in self.data_filepath.as_posix(): df_questions = pd.read_csv(self.data_filepath, nrows=1) df_questions return df_questions.iloc[0].to_dict() else: # First row contains Qualtrics Editor question names (ie 'B_VOICE SEL. 18-8') # Second row which contains the question content # Third row contains the Export Metadata (ie '{"ImportId":"startDate","timeZone":"America/Denver"}') df_questions = pd.read_csv(self.data_filepath, nrows=2) # transpose df_questions df_questions = df_questions.T.reset_index() df_questions.columns = ['QName', 'Description', 'export_metadata'] df_questions['ImportID'] = df_questions['export_metadata'].apply(extract_qid) df_questions = df_questions[['ImportID', 'QName', 'Description']] # return dict as {ImportID: [QName, Description]} return df_questions.set_index('ImportID')[['QName', 'Description']].T.to_dict() def _load_qsf(self) -> dict: """Load QSF file to extract question metadata if needed.""" with open(self.qsf_filepath, 'r', encoding='utf-8') as f: qsf_data = json.load(f) return qsf_data def _get_qsf_question_by_QID(self, QID: str) -> dict: """Get question metadata from QSF using the Question ID.""" q_elem = [elem for elem in self.qsf['SurveyElements'] if elem['PrimaryAttribute'] == QID] if len(q_elem) == 0: raise ValueError(f"SurveyElement with 'PrimaryAttribute': '{QID}' not found in QSF.") if len(q_elem) > 1: raise ValueError(f"Multiple SurveyElements with 'PrimaryAttribute': '{QID}' found in QSF: \n{q_elem}") return q_elem[0] def load_data(self) -> pl.LazyFrame: """ Load CSV where column headers are in row 3 as dict strings with ImportId. The 3rd row contains metadata like '{"ImportId":"startDate","timeZone":"America/Denver"}'. This function extracts the ImportId from each column and uses it as the column name. Parameters: file_path (Path): Path to the CSV file to load. Returns: pl.LazyFrame: Polars LazyFrame with ImportId as column names. """ if '1_1-16-2026' in self.data_filepath.as_posix(): raise NotImplementedError("This method does not support the '1_1-16-2026' export format.") # Read the 3rd row (index 2) which contains the metadata dictionaries # Use header=None to get raw values instead of treating them as column names df_meta = pd.read_csv(self.data_filepath, nrows=1, skiprows=2, header=None) # Extract ImportIds from each column value in this row new_columns = [extract_qid(val) for val in df_meta.iloc[0]] # Now read the actual data starting from row 4 (skip first 3 rows) df = pl.read_csv(self.data_filepath, skip_rows=3) # Rename columns with the extracted ImportIds df.columns = new_columns return df.lazy() def _get_subset(self, q: pl.LazyFrame, QIDs, rename_cols=True, include_record_id=True) -> pl.LazyFrame: """Extract subset of data based on specific questions.""" if include_record_id and '_recordId' not in QIDs: QIDs = ['_recordId'] + QIDs if not rename_cols: return q.select(QIDs) rename_dict = {qid: self.qid_descr_map[qid]['QName'] for qid in QIDs if qid in self.qid_descr_map and qid != '_recordId'} return q.select(QIDs).rename(rename_dict) def filter_data(self, q: pl.LazyFrame, age:list=None, gender:list=None, consumer:list=None, ethnicity:list=None, income:list=None) -> pl.LazyFrame: """Filter data based on provided parameters Possible parameters: - age: list of age groups to include - gender: list - consumer: list - ethnicity: list - income: list Returns filtered polars LazyFrame. """ if age is not None: q = q.filter(pl.col('QID1').is_in(age)) if gender is not None: q = q.filter(pl.col('QID2').is_in(gender)) if consumer is not None: q = q.filter(pl.col('Consumer').is_in(consumer)) if ethnicity is not None: q = q.filter(pl.col('QID3').is_in(ethnicity)) if income is not None: q = q.filter(pl.col('QID15').is_in(income)) return q def get_demographics(self, q: pl.LazyFrame) -> Union[pl.LazyFrame, None]: """Extract columns containing the demographics. Renames columns using qid_descr_map if provided. """ QIDs = ['QID1', 'QID2', 'QID3', 'QID4', 'QID13', 'QID14', 'QID15', 'QID16', 'QID17', 'Consumer'] return self._get_subset(q, QIDs), None def get_top_8_traits(self, q: pl.LazyFrame) -> Union[pl.LazyFrame, None]: """Extract columns containing the top 8 characteristics are most important for this Chase virtual assistant to have. Returns subquery that can be chained with other polars queries. """ QIDs = ['QID25'] return self._get_subset(q, QIDs, rename_cols=False).rename({'QID25': 'Top_8_Traits'}), None def get_top_3_traits(self, q: pl.LazyFrame) -> Union[pl.LazyFrame, None]: """Extract columns containing the top 3 characteristics that the Chase virtual assistant should prioritize. Returns subquery that can be chained with other polars queries. """ QIDs = ['QID26_0_GROUP'] return self._get_subset(q, QIDs, rename_cols=False).rename({'QID26_0_GROUP': 'Top_3_Traits'}), None def get_character_ranking(self, q: pl.LazyFrame) -> Union[pl.LazyFrame, None]: """Extract columns containing the ranking of characteristics for the Chase virtual assistant. Returns subquery that can be chained with other polars queries. """ # Requires QSF to map "Character Ranking_2" to the actual character cfg = self._get_qsf_question_by_QID('QID27')['Payload'] QIDs_map = {f'QID27_{v}': cfg['VariableNaming'][k] for k,v in cfg['RecodeValues'].items()} QIDs_rename = {qid: f'Character_Ranking_{QIDs_map[qid].replace(" ", "_")}' for qid in QIDs_map} return self._get_subset(q, list(QIDs_rename.keys()), rename_cols=False).rename(QIDs_rename), None def get_18_8_3(self, q: pl.LazyFrame) -> Union[pl.LazyFrame, None]: """Extract columns containing the 18-8-3 feedback for the Chase virtual assistant. Returns subquery that can be chained with other polars queries. """ QIDs = ['QID29', 'QID101', 'QID36_0_GROUP'] rename_dict = { 'QID29': '18-8_Set-A', 'QID101': '18-8_Set-B', 'QID36_0_GROUP': '3_Ranked' } subset = self._get_subset(q, QIDs, rename_cols=False).rename(rename_dict) # Combine 18-8 Set A and Set B into single column subset = subset.with_columns( pl.coalesce(['18-8_Set-A', '18-8_Set-B']).alias('8_Combined') ) # Change order of columns subset = subset.select(['_recordId', '18-8_Set-A', '18-8_Set-B', '8_Combined', '3_Ranked']) return subset, None def get_voice_scale_1_10(self, q: pl.LazyFrame) -> Union[pl.LazyFrame, None]: """Extract columns containing the Voice Scale 1-10 ratings for the Chase virtual assistant. Returns subquery that can be chained with other polars queries. """ QIDs_map = {} for qid, val in self.qid_descr_map.items(): if 'Scale 1-10_1' in val['QName']: # Convert "Voice 16 Scale 1-10_1" to "Scale_1_10__Voice_16" QIDs_map[qid] = f"Voice_Scale_1_10__V{val['QName'].split()[1]}" return self._get_subset(q, list(QIDs_map.keys()), rename_cols=False).rename(QIDs_map), None def get_ss_green_blue(self, q: pl.LazyFrame) -> Union[pl.LazyFrame, dict]: """Extract columns containing the SS Green/Blue ratings for the Chase virtual assistant. Returns subquery that can be chained with other polars queries. """ cfg = self._get_qsf_question_by_QID('QID35')['Payload'] QIDs_map = {} choices_map = {} for qid, val in self.qid_descr_map.items(): if 'SS Green-Blue' in val['QName']: cfg = self._get_qsf_question_by_QID(qid.split('_')[0])['Payload'] # ie: "V14 SS Green-Blue_1" qname_parts = val['QName'].split() voice = qname_parts[0] trait_num = qname_parts[-1].split('_')[-1] QIDs_map[qid] = f"SS_Green_Blue__{voice}__Choice_{trait_num}" choices_map[f"SS_Green_Blue__{voice}__Choice_{trait_num}"] = cfg['Choices'][trait_num]['Display'] return self._get_subset(q, list(QIDs_map.keys()), rename_cols=False).rename(QIDs_map), choices_map def get_top_3_voices(self, q: pl.LazyFrame) -> Union[pl.LazyFrame, None]: """Extract columns containing the top 3 voice choices for the Chase virtual assistant. Returns subquery that can be chained with other polars queries. """ QIDs_map = {} cfg36 = self._get_qsf_question_by_QID('QID36')['Payload'] choice_voice_map = {k: extract_voice_label(v['Display']) for k,v in cfg36['Choices'].items()} for qid, val in self.qid_descr_map.items(): if 'Rank Top 3 Voices' in val['QName']: cfg = self._get_qsf_question_by_QID(qid.split('_')[0])['Payload'] voice_num = val['QName'].split('_')[-1] # Validate that the DynamicChoices Locator is as expected if cfg['DynamicChoices']['Locator'] != r"q://QID36/ChoiceGroup/SelectedChoicesInGroup/1": raise ValueError(f"Unexpected DynamicChoices Locator for QID '{qid}': {cfg['DynamicChoices']['Locator']}") # extract the voice from the QID36 config voice = choice_voice_map[voice_num] # Convert "Top 3 Voices_1" to "Top_3_Voices__V14" QIDs_map[qid] = f"Top_3_Voices_ranking__{voice}" return self._get_subset(q, list(QIDs_map.keys()), rename_cols=False).rename(QIDs_map), None def get_ss_orange_red(self, q: pl.LazyFrame) -> Union[pl.LazyFrame, dict]: """Extract columns containing the SS Orange/Red ratings for the Chase virtual assistant. Returns subquery that can be chained with other polars queries. """ cfg = self._get_qsf_question_by_QID('QID40')['Payload'] QIDs_map = {} choices_map = {} for qid, val in self.qid_descr_map.items(): if 'SS Orange-Red' in val['QName']: cfg = self._get_qsf_question_by_QID(qid.split('_')[0])['Payload'] # ie: "V14 SS Orange-Red_1" qname_parts = val['QName'].split() voice = qname_parts[0] trait_num = qname_parts[-1].split('_')[-1] QIDs_map[qid] = f"SS_Orange_Red__{voice}__Choice_{trait_num}" choices_map[f"SS_Orange_Red__{voice}__Choice_{trait_num}"] = cfg['Choices'][trait_num]['Display'] return self._get_subset(q, list(QIDs_map.keys()), rename_cols=False).rename(QIDs_map), choices_map def get_character_refine(self, q: pl.LazyFrame) -> Union[pl.LazyFrame, None]: """Extract columns containing the character refine feedback for the Chase virtual assistant. Returns subquery that can be chained with other polars queries. """ QIDs = ['QID44', 'QID97', 'QID95', 'QID96'] return self._get_subset(q, QIDs, rename_cols=True), None def process_speaking_style_data( df: Union[pl.LazyFrame, pl.DataFrame], trait_map: dict[str, str] ) -> pl.DataFrame: """ Process speaking style columns from wide to long format and map trait descriptions. Parses columns with format: SS_{StyleGroup}__{Voice}__{ChoiceID} Example: SS_Orange_Red__V14__Choice_1 Parameters ---------- df : pl.LazyFrame or pl.DataFrame Input dataframe containing SS_* columns. trait_map : dict Dictionary mapping column names to trait descriptions. Keys should be full column names like "SS_Orange_Red__V14__Choice_1". Returns ------- pl.DataFrame Long-format dataframe with columns: _recordId, Voice, Style_Group, Choice_ID, Description, Score, Left_Anchor, Right_Anchor """ # Normalize input to LazyFrame lf = df.lazy() if isinstance(df, pl.DataFrame) else df # 1. Melt SS_ columns melted = lf.melt( id_vars=["_recordId"], value_vars=pl.col("^SS_.*$"), variable_name="full_col_name", value_name="score" ) # 2. Extract components from column name # Regex captures: Style_Group (e.g. SS_Orange_Red), Voice (e.g. V14), Choice_ID (e.g. Choice_1) pattern = r"^(?PSS_.+?)__(?P.+?)__(?PChoice_\d+)$" processed = melted.with_columns( pl.col("full_col_name").str.extract_groups(pattern) ).unnest("full_col_name") # 3. Create Mapping Lookup from the provided dictionary # We map (Style_Group, Choice_ID) -> Description mapping_data = [] seen = set() for col_name, desc in trait_map.items(): match = re.match(pattern, col_name) if match: groups = match.groupdict() key = (groups["Style_Group"], groups["Choice_ID"]) if key not in seen: # Parse description into anchors if possible (Left : Right) parts = desc.split(':') left_anchor = parts[0].strip() if len(parts) > 0 else "" right_anchor = parts[1].strip() if len(parts) > 1 else "" mapping_data.append({ "Style_Group": groups["Style_Group"], "Choice_ID": groups["Choice_ID"], "Description": desc, "Left_Anchor": left_anchor, "Right_Anchor": right_anchor }) seen.add(key) if not mapping_data: return processed.collect() mapping_lf = pl.LazyFrame(mapping_data) # 4. Join Data with Mapping result = processed.join( mapping_lf, on=["Style_Group", "Choice_ID"], how="left" ) # 5. Cast score to Int result = result.with_columns( pl.col("score").cast(pl.Int64, strict=False) ) return result.collect() def process_voice_scale_data( df: Union[pl.LazyFrame, pl.DataFrame] ) -> pl.DataFrame: """ Process Voice Scale columns from wide to long format. Parses columns with format: Voice_Scale_1_10__V{Voice} Example: Voice_Scale_1_10__V14 Returns ------- pl.DataFrame Long-format dataframe with columns: _recordId, Voice, Voice_Scale_Score """ lf = df.lazy() if isinstance(df, pl.DataFrame) else df # Melt melted = lf.melt( id_vars=["_recordId"], value_vars=pl.col("^Voice_Scale_1_10__V.*$"), variable_name="full_col_name", value_name="Voice_Scale_Score" ) # Extract Voice processed = melted.with_columns( pl.col("full_col_name").str.extract(r"V(\d+)", 1).alias("Voice_Num") ).with_columns( ("V" + pl.col("Voice_Num")).alias("Voice") ) # Keep Score as Float (original data is f64) result = processed.select([ "_recordId", "Voice", pl.col("Voice_Scale_Score").cast(pl.Float64, strict=False) ]) return result.collect() def join_voice_and_style_data( processed_style_data: pl.DataFrame, processed_voice_data: pl.DataFrame ) -> pl.DataFrame: """ Joins processed Speaking Style data with Voice Scale 1-10 data. Parameters ---------- processed_style_data : pl.DataFrame Result of process_speaking_style_data processed_voice_data : pl.DataFrame Result of process_voice_scale_data Returns ------- pl.DataFrame Merged dataframe with columns from both, joined on _recordId and Voice. """ return processed_style_data.join( processed_voice_data, on=["_recordId", "Voice"], how="inner" ) def process_voice_ranking_data( df: Union[pl.LazyFrame, pl.DataFrame] ) -> pl.DataFrame: """ Process Voice Ranking columns from wide to long format and convert ranks to points. Parses columns with format: Top_3_Voices_ranking__V{Voice} Converts ranks to points: 1st place = 3 pts, 2nd place = 2 pts, 3rd place = 1 pt Returns ------- pl.DataFrame Long-format dataframe with columns: _recordId, Voice, Ranking_Points """ lf = df.lazy() if isinstance(df, pl.DataFrame) else df # Melt melted = lf.melt( id_vars=["_recordId"], value_vars=pl.col("^Top_3_Voices_ranking__V.*$"), variable_name="full_col_name", value_name="rank" ) # Extract Voice processed = melted.with_columns( pl.col("full_col_name").str.extract(r"V(\d+)", 1).alias("Voice_Num") ).with_columns( ("V" + pl.col("Voice_Num")).alias("Voice") ) # Convert rank to points: 1st=3, 2nd=2, 3rd=1, null=0 (not ranked) # Rank values are 1, 2, 3 for position in top 3 result = processed.with_columns( pl.when(pl.col("rank") == 1).then(3) .when(pl.col("rank") == 2).then(2) .when(pl.col("rank") == 3).then(1) .otherwise(0) .alias("Ranking_Points") ).select([ "_recordId", "Voice", "Ranking_Points" ]) return result.collect()