Files
JPMC-quant/utils.py

661 lines
24 KiB
Python

import polars as pl
from pathlib import Path
import pandas as pd
from typing import Union
import json
import re
from plots import JPMCPlotsMixin
import marimo as mo
def extract_voice_label(html_str: str) -> str:
"""
Extract voice label from HTML string and convert to short format.
Parameters:
html_str (str): HTML string containing voice label in format "Voice N"
Returns:
str: Voice label in format "VN" (e.g., "V14")
Example:
>>> extract_voice_label('<span style="...">Voice 14<br />...')
'V14'
"""
match = re.search(r'Voice (\d+)', html_str)
return f"V{match.group(1)}" if match else None
def extract_qid(val):
"""Extracts the 'ImportId' from a string representation of a dictionary."""
if isinstance(val, str) and val.startswith('{') and val.endswith('}'):
val = eval(val)
return val['ImportId']
def combine_exclusive_columns(df: pl.DataFrame, id_col: str = "_recordId", target_col_name: str = "combined_value") -> pl.DataFrame:
"""
Combines all columns except id_col into a single column.
Raises ValueError if more than one column is populated in a single row.
"""
merge_cols = [c for c in df.columns if c != id_col]
# Validate: count non-nulls horizontally
row_counts = df.select(
pl.sum_horizontal(pl.col(merge_cols).is_not_null())
).to_series()
if (row_counts > 1).any():
raise ValueError("Invalid Data: Multiple columns populated for a single record row.")
# Merge columns using coalesce
return df.select([
pl.col(id_col),
pl.coalesce(merge_cols).alias(target_col_name)
])
def calculate_weighted_ranking_scores(df: pl.LazyFrame) -> pl.DataFrame:
"""
Calculate weighted scores for character or voice rankings.
Points system: 1st place = 3 pts, 2nd place = 2 pts, 3rd place = 1 pt.
Parameters
----------
df : pl.DataFrame
DataFrame containing character/ voice ranking columns.
Returns
-------
pl.DataFrame
DataFrame with columns 'Character' and 'Weighted Score', sorted by score.
"""
if isinstance(df, pl.LazyFrame):
df = df.collect()
scores = []
# Identify ranking columns (assume all columns except _recordId)
ranking_cols = [c for c in df.columns if c != '_recordId']
for col in ranking_cols:
# Calculate score:
# (Count of Rank 1 * 3) + (Count of Rank 2 * 2) + (Count of Rank 3 * 1)
r1_count = df.filter(pl.col(col) == 1).height
r2_count = df.filter(pl.col(col) == 2).height
r3_count = df.filter(pl.col(col) == 3).height
weighted_score = (r1_count * 3) + (r2_count * 2) + (r3_count * 1)
# Clean name
clean_name = col.replace('Character_Ranking_', '').replace('Top_3_Voices_ranking__', '').replace('_', ' ').strip()
scores.append({
'Character': clean_name,
'Weighted Score': weighted_score
})
return pl.DataFrame(scores).sort('Weighted Score', descending=True)
class JPMCSurvey(JPMCPlotsMixin):
"""Class to handle JPMorgan Chase survey data."""
def __init__(self, data_path: Union[str, Path], qsf_path: Union[str, Path]):
if isinstance(data_path, str):
data_path = Path(data_path)
if isinstance(qsf_path, str):
qsf_path = Path(qsf_path)
self.data_filepath = data_path
self.qsf_filepath = qsf_path
self.qid_descr_map = self._extract_qid_descr_map()
self.qsf:dict = self._load_qsf()
# get export directory name for saving figures ie if data_path='data/exports/OneDrive_2026-01-21/...' should be 'figures/OneDrive_2026-01-21'
self.fig_save_dir = Path('figures') / self.data_filepath.parts[2]
if not self.fig_save_dir.exists():
self.fig_save_dir.mkdir(parents=True, exist_ok=True)
self.data_filtered = None
self.plot_height = 500
self.plot_width = 1000
# Filter values
self.filter_age:list = None
self.filter_gender:list = None
self.filter_consumer:list = None
self.filter_ethnicity:list = None
self.filter_income:list = None
def _extract_qid_descr_map(self) -> dict:
"""Extract mapping of Qualtrics ImportID to Question Description from results file."""
if '1_1-16-2026' in self.data_filepath.as_posix():
df_questions = pd.read_csv(self.data_filepath, nrows=1)
df_questions
return df_questions.iloc[0].to_dict()
else:
# First row contains Qualtrics Editor question names (ie 'B_VOICE SEL. 18-8')
# Second row which contains the question content
# Third row contains the Export Metadata (ie '{"ImportId":"startDate","timeZone":"America/Denver"}')
df_questions = pd.read_csv(self.data_filepath, nrows=2)
# transpose df_questions
df_questions = df_questions.T.reset_index()
df_questions.columns = ['QName', 'Description', 'export_metadata']
df_questions['ImportID'] = df_questions['export_metadata'].apply(extract_qid)
df_questions = df_questions[['ImportID', 'QName', 'Description']]
# return dict as {ImportID: [QName, Description]}
return df_questions.set_index('ImportID')[['QName', 'Description']].T.to_dict()
def _load_qsf(self) -> dict:
"""Load QSF file to extract question metadata if needed."""
with open(self.qsf_filepath, 'r', encoding='utf-8') as f:
qsf_data = json.load(f)
return qsf_data
def _get_qsf_question_by_QID(self, QID: str) -> dict:
"""Get question metadata from QSF using the Question ID."""
q_elem = [elem for elem in self.qsf['SurveyElements'] if elem['PrimaryAttribute'] == QID]
if len(q_elem) == 0:
raise ValueError(f"SurveyElement with 'PrimaryAttribute': '{QID}' not found in QSF.")
if len(q_elem) > 1:
raise ValueError(f"Multiple SurveyElements with 'PrimaryAttribute': '{QID}' found in QSF: \n{q_elem}")
return q_elem[0]
def load_data(self) -> pl.LazyFrame:
"""
Load CSV where column headers are in row 3 as dict strings with ImportId.
The 3rd row contains metadata like '{"ImportId":"startDate","timeZone":"America/Denver"}'.
This function extracts the ImportId from each column and uses it as the column name.
Parameters:
file_path (Path): Path to the CSV file to load.
Returns:
pl.LazyFrame: Polars LazyFrame with ImportId as column names.
"""
if '1_1-16-2026' in self.data_filepath.as_posix():
raise NotImplementedError("This method does not support the '1_1-16-2026' export format.")
# Read the 3rd row (index 2) which contains the metadata dictionaries
# Use header=None to get raw values instead of treating them as column names
df_meta = pd.read_csv(self.data_filepath, nrows=1, skiprows=2, header=None)
# Extract ImportIds from each column value in this row
new_columns = [extract_qid(val) for val in df_meta.iloc[0]]
# Now read the actual data starting from row 4 (skip first 3 rows)
df = pl.read_csv(self.data_filepath, skip_rows=3)
# Rename columns with the extracted ImportIds
df.columns = new_columns
# Store unique values for filters (ignoring nulls) to detect "all selected" state
self.options_age = sorted(df['QID1'].drop_nulls().unique().to_list()) if 'QID1' in df.columns else []
self.options_gender = sorted(df['QID2'].drop_nulls().unique().to_list()) if 'QID2' in df.columns else []
self.options_consumer = sorted(df['Consumer'].drop_nulls().unique().to_list()) if 'Consumer' in df.columns else []
self.options_ethnicity = sorted(df['QID3'].drop_nulls().unique().to_list()) if 'QID3' in df.columns else []
self.options_income = sorted(df['QID15'].drop_nulls().unique().to_list()) if 'QID15' in df.columns else []
return df.lazy()
def _get_subset(self, q: pl.LazyFrame, QIDs, rename_cols=True, include_record_id=True) -> pl.LazyFrame:
"""Extract subset of data based on specific questions."""
if include_record_id and '_recordId' not in QIDs:
QIDs = ['_recordId'] + QIDs
if not rename_cols:
return q.select(QIDs)
rename_dict = {qid: self.qid_descr_map[qid]['QName'] for qid in QIDs if qid in self.qid_descr_map and qid != '_recordId'}
return q.select(QIDs).rename(rename_dict)
def filter_data(self, q: pl.LazyFrame, age:list=None, gender:list=None, consumer:list=None, ethnicity:list=None, income:list=None) -> pl.LazyFrame:
"""Filter data based on provided parameters
Possible parameters:
- age: list of age groups to include
- gender: list
- consumer: list
- ethnicity: list
- income: list
Also saves the result to self.data_filtered.
"""
# Apply filters
self.filter_age = age
if age is not None:
q = q.filter(pl.col('QID1').is_in(age))
self.filter_gender = gender
if gender is not None:
q = q.filter(pl.col('QID2').is_in(gender))
self.filter_consumer = consumer
if consumer is not None:
q = q.filter(pl.col('Consumer').is_in(consumer))
self.filter_ethnicity = ethnicity
if ethnicity is not None:
q = q.filter(pl.col('QID3').is_in(ethnicity))
self.filter_income = income
if income is not None:
q = q.filter(pl.col('QID15').is_in(income))
self.data_filtered = q
return self.data_filtered
def get_demographics(self, q: pl.LazyFrame) -> Union[pl.LazyFrame, None]:
"""Extract columns containing the demographics.
Renames columns using qid_descr_map if provided.
"""
QIDs = ['QID1', 'QID2', 'QID3', 'QID4', 'QID13', 'QID14', 'QID15', 'QID16', 'QID17', 'Consumer']
return self._get_subset(q, QIDs), None
def get_top_8_traits(self, q: pl.LazyFrame) -> Union[pl.LazyFrame, None]:
"""Extract columns containing the top 8 characteristics are most important for this Chase virtual assistant to have.
Returns subquery that can be chained with other polars queries.
"""
QIDs = ['QID25']
return self._get_subset(q, QIDs, rename_cols=False).rename({'QID25': 'Top_8_Traits'}), None
def get_top_3_traits(self, q: pl.LazyFrame) -> Union[pl.LazyFrame, None]:
"""Extract columns containing the top 3 characteristics that the Chase virtual assistant should prioritize.
Returns subquery that can be chained with other polars queries.
"""
QIDs = ['QID26_0_GROUP']
return self._get_subset(q, QIDs, rename_cols=False).rename({'QID26_0_GROUP': 'Top_3_Traits'}), None
def get_character_ranking(self, q: pl.LazyFrame) -> Union[pl.LazyFrame, None]:
"""Extract columns containing the ranking of characteristics for the Chase virtual assistant.
Returns subquery that can be chained with other polars queries.
"""
# Requires QSF to map "Character Ranking_2" to the actual character
cfg = self._get_qsf_question_by_QID('QID27')['Payload']
QIDs_map = {f'QID27_{v}': cfg['VariableNaming'][k] for k,v in cfg['RecodeValues'].items()}
QIDs_rename = {qid: f'Character_Ranking_{QIDs_map[qid].replace(" ", "_")}' for qid in QIDs_map}
return self._get_subset(q, list(QIDs_rename.keys()), rename_cols=False).rename(QIDs_rename), None
def get_18_8_3(self, q: pl.LazyFrame) -> Union[pl.LazyFrame, None]:
"""Extract columns containing the 18-8-3 feedback for the Chase virtual assistant.
Returns subquery that can be chained with other polars queries.
"""
QIDs = ['QID29', 'QID101', 'QID36_0_GROUP']
rename_dict = {
'QID29': '18-8_Set-A',
'QID101': '18-8_Set-B',
'QID36_0_GROUP': '3_Ranked'
}
subset = self._get_subset(q, QIDs, rename_cols=False).rename(rename_dict)
# Combine 18-8 Set A and Set B into single column
subset = subset.with_columns(
pl.coalesce(['18-8_Set-A', '18-8_Set-B']).alias('8_Combined')
)
# Change order of columns
subset = subset.select(['_recordId', '18-8_Set-A', '18-8_Set-B', '8_Combined', '3_Ranked'])
return subset, None
def get_voice_scale_1_10(self, q: pl.LazyFrame) -> Union[pl.LazyFrame, None]:
"""Extract columns containing the Voice Scale 1-10 ratings for the Chase virtual assistant.
Returns subquery that can be chained with other polars queries.
"""
QIDs_map = {}
for qid, val in self.qid_descr_map.items():
if 'Scale 1-10_1' in val['QName']:
# Convert "Voice 16 Scale 1-10_1" to "Scale_1_10__Voice_16"
QIDs_map[qid] = f"Voice_Scale_1_10__V{val['QName'].split()[1]}"
return self._get_subset(q, list(QIDs_map.keys()), rename_cols=False).rename(QIDs_map), None
def get_ss_green_blue(self, q: pl.LazyFrame) -> Union[pl.LazyFrame, dict]:
"""Extract columns containing the SS Green/Blue ratings for the Chase virtual assistant.
Returns subquery that can be chained with other polars queries.
"""
cfg = self._get_qsf_question_by_QID('QID35')['Payload']
QIDs_map = {}
choices_map = {}
for qid, val in self.qid_descr_map.items():
if 'SS Green-Blue' in val['QName']:
cfg = self._get_qsf_question_by_QID(qid.split('_')[0])['Payload']
# ie: "V14 SS Green-Blue_1"
qname_parts = val['QName'].split()
voice = qname_parts[0]
trait_num = qname_parts[-1].split('_')[-1]
QIDs_map[qid] = f"SS_Green_Blue__{voice}__Choice_{trait_num}"
choices_map[f"SS_Green_Blue__{voice}__Choice_{trait_num}"] = cfg['Choices'][trait_num]['Display']
return self._get_subset(q, list(QIDs_map.keys()), rename_cols=False).rename(QIDs_map), choices_map
def get_top_3_voices(self, q: pl.LazyFrame) -> Union[pl.LazyFrame, None]:
"""Extract columns containing the top 3 voice choices for the Chase virtual assistant.
Returns subquery that can be chained with other polars queries.
"""
QIDs_map = {}
cfg36 = self._get_qsf_question_by_QID('QID36')['Payload']
choice_voice_map = {k: extract_voice_label(v['Display']) for k,v in cfg36['Choices'].items()}
for qid, val in self.qid_descr_map.items():
if 'Rank Top 3 Voices' in val['QName']:
cfg = self._get_qsf_question_by_QID(qid.split('_')[0])['Payload']
voice_num = val['QName'].split('_')[-1]
# Validate that the DynamicChoices Locator is as expected
if cfg['DynamicChoices']['Locator'] != r"q://QID36/ChoiceGroup/SelectedChoicesInGroup/1":
raise ValueError(f"Unexpected DynamicChoices Locator for QID '{qid}': {cfg['DynamicChoices']['Locator']}")
# extract the voice from the QID36 config
voice = choice_voice_map[voice_num]
# Convert "Top 3 Voices_1" to "Top_3_Voices__V14"
QIDs_map[qid] = f"Top_3_Voices_ranking__{voice}"
return self._get_subset(q, list(QIDs_map.keys()), rename_cols=False).rename(QIDs_map), None
def get_ss_orange_red(self, q: pl.LazyFrame) -> Union[pl.LazyFrame, dict]:
"""Extract columns containing the SS Orange/Red ratings for the Chase virtual assistant.
Returns subquery that can be chained with other polars queries.
"""
cfg = self._get_qsf_question_by_QID('QID40')['Payload']
QIDs_map = {}
choices_map = {}
for qid, val in self.qid_descr_map.items():
if 'SS Orange-Red' in val['QName']:
cfg = self._get_qsf_question_by_QID(qid.split('_')[0])['Payload']
# ie: "V14 SS Orange-Red_1"
qname_parts = val['QName'].split()
voice = qname_parts[0]
trait_num = qname_parts[-1].split('_')[-1]
QIDs_map[qid] = f"SS_Orange_Red__{voice}__Choice_{trait_num}"
choices_map[f"SS_Orange_Red__{voice}__Choice_{trait_num}"] = cfg['Choices'][trait_num]['Display']
return self._get_subset(q, list(QIDs_map.keys()), rename_cols=False).rename(QIDs_map), choices_map
def get_character_refine(self, q: pl.LazyFrame) -> Union[pl.LazyFrame, None]:
"""Extract columns containing the character refine feedback for the Chase virtual assistant.
Returns subquery that can be chained with other polars queries.
"""
QIDs = ['QID44', 'QID97', 'QID95', 'QID96']
return self._get_subset(q, QIDs, rename_cols=True), None
def process_speaking_style_data(
df: Union[pl.LazyFrame, pl.DataFrame],
trait_map: dict[str, str]
) -> pl.DataFrame:
"""
Process speaking style columns from wide to long format and map trait descriptions.
Parses columns with format: SS_{StyleGroup}__{Voice}__{ChoiceID}
Example: SS_Orange_Red__V14__Choice_1
Parameters
----------
df : pl.LazyFrame or pl.DataFrame
Input dataframe containing SS_* columns.
trait_map : dict
Dictionary mapping column names to trait descriptions.
Keys should be full column names like "SS_Orange_Red__V14__Choice_1".
Returns
-------
pl.DataFrame
Long-format dataframe with columns:
_recordId, Voice, Style_Group, Choice_ID, Description, Score, Left_Anchor, Right_Anchor
"""
# Normalize input to LazyFrame
lf = df.lazy() if isinstance(df, pl.DataFrame) else df
# 1. Melt SS_ columns
melted = lf.melt(
id_vars=["_recordId"],
value_vars=pl.col("^SS_.*$"),
variable_name="full_col_name",
value_name="score"
)
# 2. Extract components from column name
# Regex captures: Style_Group (e.g. SS_Orange_Red), Voice (e.g. V14), Choice_ID (e.g. Choice_1)
pattern = r"^(?P<Style_Group>SS_.+?)__(?P<Voice>.+?)__(?P<Choice_ID>Choice_\d+)$"
processed = melted.with_columns(
pl.col("full_col_name").str.extract_groups(pattern)
).unnest("full_col_name")
# 3. Create Mapping Lookup from the provided dictionary
# We map (Style_Group, Choice_ID) -> Description
mapping_data = []
seen = set()
for col_name, desc in trait_map.items():
match = re.match(pattern, col_name)
if match:
groups = match.groupdict()
key = (groups["Style_Group"], groups["Choice_ID"])
if key not in seen:
# Parse description into anchors if possible (Left : Right)
parts = desc.split(':')
left_anchor = parts[0].strip() if len(parts) > 0 else ""
right_anchor = parts[1].strip() if len(parts) > 1 else ""
mapping_data.append({
"Style_Group": groups["Style_Group"],
"Choice_ID": groups["Choice_ID"],
"Description": desc,
"Left_Anchor": left_anchor,
"Right_Anchor": right_anchor
})
seen.add(key)
if not mapping_data:
return processed.collect()
mapping_lf = pl.LazyFrame(mapping_data)
# 4. Join Data with Mapping
result = processed.join(
mapping_lf,
on=["Style_Group", "Choice_ID"],
how="left"
)
# 5. Cast score to Int
result = result.with_columns(
pl.col("score").cast(pl.Int64, strict=False)
)
return result.collect()
def process_voice_scale_data(
df: Union[pl.LazyFrame, pl.DataFrame]
) -> pl.DataFrame:
"""
Process Voice Scale columns from wide to long format.
Parses columns with format: Voice_Scale_1_10__V{Voice}
Example: Voice_Scale_1_10__V14
Returns
-------
pl.DataFrame
Long-format dataframe with columns:
_recordId, Voice, Voice_Scale_Score
"""
lf = df.lazy() if isinstance(df, pl.DataFrame) else df
# Melt
melted = lf.melt(
id_vars=["_recordId"],
value_vars=pl.col("^Voice_Scale_1_10__V.*$"),
variable_name="full_col_name",
value_name="Voice_Scale_Score"
)
# Extract Voice
processed = melted.with_columns(
pl.col("full_col_name").str.extract(r"V(\d+)", 1).alias("Voice_Num")
).with_columns(
("V" + pl.col("Voice_Num")).alias("Voice")
)
# Keep Score as Float (original data is f64)
result = processed.select([
"_recordId",
"Voice",
pl.col("Voice_Scale_Score").cast(pl.Float64, strict=False)
])
return result.collect()
def join_voice_and_style_data(
processed_style_data: pl.DataFrame,
processed_voice_data: pl.DataFrame
) -> pl.DataFrame:
"""
Joins processed Speaking Style data with Voice Scale 1-10 data.
Parameters
----------
processed_style_data : pl.DataFrame
Result of process_speaking_style_data
processed_voice_data : pl.DataFrame
Result of process_voice_scale_data
Returns
-------
pl.DataFrame
Merged dataframe with columns from both, joined on _recordId and Voice.
"""
return processed_style_data.join(
processed_voice_data,
on=["_recordId", "Voice"],
how="inner"
)
def process_voice_ranking_data(
df: Union[pl.LazyFrame, pl.DataFrame]
) -> pl.DataFrame:
"""
Process Voice Ranking columns from wide to long format and convert ranks to points.
Parses columns with format: Top_3_Voices_ranking__V{Voice}
Converts ranks to points: 1st place = 3 pts, 2nd place = 2 pts, 3rd place = 1 pt
Returns
-------
pl.DataFrame
Long-format dataframe with columns:
_recordId, Voice, Ranking_Points
"""
lf = df.lazy() if isinstance(df, pl.DataFrame) else df
# Melt
melted = lf.melt(
id_vars=["_recordId"],
value_vars=pl.col("^Top_3_Voices_ranking__V.*$"),
variable_name="full_col_name",
value_name="rank"
)
# Extract Voice
processed = melted.with_columns(
pl.col("full_col_name").str.extract(r"V(\d+)", 1).alias("Voice_Num")
).with_columns(
("V" + pl.col("Voice_Num")).alias("Voice")
)
# Convert rank to points: 1st=3, 2nd=2, 3rd=1, null=0 (not ranked)
# Rank values are 1, 2, 3 for position in top 3
result = processed.with_columns(
pl.when(pl.col("rank") == 1).then(3)
.when(pl.col("rank") == 2).then(2)
.when(pl.col("rank") == 3).then(1)
.otherwise(0)
.alias("Ranking_Points")
).select([
"_recordId",
"Voice",
"Ranking_Points"
])
return result.collect()