Files
JPMC-quant/utils.py
2026-01-29 20:39:16 +01:00

971 lines
36 KiB
Python

import polars as pl
from pathlib import Path
import pandas as pd
from typing import Union
import json
import re
import hashlib
import os
from plots import JPMCPlotsMixin
from pptx import Presentation
from pptx.enum.shapes import MSO_SHAPE_TYPE
def image_alt_text_generator(fpath):
"""convert image file path to alt text
"""
if not isinstance(fpath, Path):
fpath = Path(fpath)
fparts = fpath.parts
assert fparts[0] == 'figures', "Image file path must start with 'figures'"
return Path('/'.join(fparts[2:])).as_posix()
def pptx_replace_named_image(presentation_path, target_tag, new_image_path, save_path):
"""
Finds and replaces specific images in a PowerPoint presentation while
preserving their original position, size, and aspect ratio.
This function performs a 'surgical' replacement: it records the coordinates
of the existing image, removes it from the slide's XML, and inserts a
new image into the exact same bounding box. It identifies the target
image by searching for a specific string within the Shape Name
(Selection Pane) or Alt Text.
Args:
presentation_path (str): The file path to the source .pptx file.
target_tag (str): The unique identifier to look for (e.g., 'HERO_IMAGE').
This is case-sensitive and checks both the shape name and alt text.
new_image_path (str): The file path to the new image (PNG, JPG, etc.).
save_path (str): The file path where the modified presentation will be saved.
Returns:
None: Saves the file directly to the provided save_path.
Raises:
FileNotFoundError: If the source presentation or new image is not found.
PermissionError: If the save_path is currently open or locked.
"""
prs = Presentation(presentation_path)
for i, slide in enumerate(prs.slides):
# Iterate over a list copy of shapes to safely modify the slide during iteration
print(f"Processing Slide {i + 1}...")
print(f"Total Shapes: {len(slide.shapes)} shapes")
for shape in list(slide.shapes):
print(f"Checking shape: {shape.name} of type {shape.shape_type}...")
shape_name = shape.name or ""
alt_text = ""
# More robust strategy: Check for alt text in ANY valid element property
# This allows replacing Pictures, Placeholders, GraphicFrames, etc.
try:
# Check for common property names used by python-pptx elements to store non-visual props
# nvPicPr (Picture), nvSpPr (Shape/Placeholder), nvGrpSpPr (Group),
# nvGraphicFramePr (GraphicFrame), nvCxnSpPr (Connector)
nvPr = None
for attr in ['nvPicPr', 'nvSpPr', 'nvGrpSpPr', 'nvGraphicFramePr', 'nvCxnSpPr']:
if hasattr(shape._element, attr):
nvPr = getattr(shape._element, attr)
break
if nvPr is not None and hasattr(nvPr, 'cNvPr'):
alt_text = nvPr.cNvPr.get("descr", "")
except Exception:
pass
print(f"Alt Text for shape '{shape_name}': {alt_text}")
if target_tag in shape_name or target_tag in alt_text:
print(f"Found it! Replacing {shape_name}...")
try:
# Record coordinates
left, top, width, height = shape.left, shape.top, shape.width, shape.height
# Remove old shape
old_element = shape._element
old_element.getparent().remove(old_element)
# Add new image at the same spot
slide.shapes.add_picture(str(new_image_path), left, top, width, height)
except AttributeError:
print(f"Could not replace {shape_name} - might be missing dimensions.")
else:
print(f"Skipping shape '{shape_name}' with alt text '{alt_text}'")
prs.save(save_path)
print(f"Successfully saved to {save_path}")
def _calculate_file_sha1(file_path: Union[str, Path]) -> str:
"""Calculate SHA1 hash of a file."""
sha1 = hashlib.sha1()
with open(file_path, 'rb') as f:
while True:
data = f.read(65536)
if not data:
break
sha1.update(data)
return sha1.hexdigest()
def _build_image_hash_map(root_dir: Union[str, Path]) -> dict:
"""
Recursively walk the directory and build a map of SHA1 hashes to file paths.
Only includes common image extensions.
"""
hash_map = {}
valid_extensions = {'.png', '.jpg', '.jpeg', '.tiff', '.bmp', '.gif'}
root = Path(root_dir)
print(f"Building image hash map from {root}...")
count = 0
for root_path, dirs, files in os.walk(root):
for file in files:
file_path = Path(root_path) / file
if file_path.suffix.lower() in valid_extensions:
try:
file_sha1 = _calculate_file_sha1(file_path)
# We store the absolute path for reference, but we might just need the path relative to project for alt text
hash_map[file_sha1] = file_path
count += 1
except Exception as e:
print(f"Error hashing {file_path}: {e}")
print(f"Indexed {count} images.")
return hash_map
def _iter_picture_shapes(shapes):
"""
Recursively iterate over shapes and yield those that are pictures
(have an 'image' property), diving into groups.
"""
for shape in shapes:
# Check groups recursively
if shape.shape_type == MSO_SHAPE_TYPE.GROUP:
yield from _iter_picture_shapes(shape.shapes)
continue
# Check if shape has image property (Pictures, Placeholders with images)
if hasattr(shape, 'image'):
yield shape
def update_ppt_alt_text(ppt_path: Union[str, Path], image_source_dir: Union[str, Path], output_path: Union[str, Path] = None):
"""
Updates the alt text of images in a PowerPoint presentation by matching
their content (SHA1 hash) with images in a source directory.
Args:
ppt_path (str/Path): Path to the PowerPoint file.
image_source_dir (str/Path): Directory containing source images to match against.
output_path (str/Path, optional): Path to save the updated presentation.
If None, overwrites the input file.
"""
if output_path is None:
output_path = ppt_path
# 1. Build lookup map of {sha1: file_path} from the source directory
image_hash_map = _build_image_hash_map(image_source_dir)
# 2. Open Presentation
try:
prs = Presentation(ppt_path)
except Exception as e:
print(f"Error opening presentation {ppt_path}: {e}")
return
updates_count = 0
slides = list(prs.slides)
total_slides = len(slides)
print(f"Processing {total_slides} slides...")
for i, slide in enumerate(slides):
# Use recursive iterator to find all pictures including those in groups/placeholders
picture_shapes = list(_iter_picture_shapes(slide.shapes))
for shape in picture_shapes:
try:
# shape.image.sha1 returns the SHA1 hash of the image blob
current_sha1 = shape.image.sha1
if current_sha1 in image_hash_map:
original_path = image_hash_map[current_sha1]
# Generate Alt Text
try:
# Prepare path for generator.
# Try to relativize to CWD if capable
pass_path = original_path
try:
pass_path = original_path.relative_to(Path.cwd())
except ValueError:
pass
new_alt_text = image_alt_text_generator(pass_path)
# Check existing alt text to avoid redundant updates/log them
# Accessing alt text via cNvPr
# Note: Different shape types might store non-visual props differently
# Picture: nvPicPr.cNvPr
# GraphicFrame: nvGraphicFramePr.cNvPr
# Group: nvGrpSpPr.cNvPr
# Shape/Placeholder: nvSpPr.cNvPr
nvPr = None
for attr in ['nvPicPr', 'nvSpPr', 'nvGrpSpPr', 'nvGraphicFramePr', 'nvCxnSpPr']:
if hasattr(shape._element, attr):
nvPr = getattr(shape._element, attr)
break
if nvPr and hasattr(nvPr, 'cNvPr'):
cNvPr = nvPr.cNvPr
existing_alt_text = cNvPr.get("descr", "")
if existing_alt_text != new_alt_text:
print(f"Slide {i+1}: Updating alt text for image matches '{pass_path}'")
print(f" Old: '{existing_alt_text}' -> New: '{new_alt_text}'")
cNvPr.set("descr", new_alt_text)
updates_count += 1
else:
print(f"Could not find cNvPr for shape on slide {i+1}")
except AssertionError as e:
print(f"Skipping match for {original_path} due to generator error: {e}")
except Exception as e:
print(f"Error updating alt text for {original_path}: {e}")
except AttributeError:
continue
except Exception as e:
print(f"Error processing shape on slide {i+1}: {e}")
if updates_count > 0:
prs.save(output_path)
print(f"Saved updated presentation to {output_path} with {updates_count} updates.")
else:
print("No images matched or required updates.")
def extract_voice_label(html_str: str) -> str:
"""
Extract voice label from HTML string and convert to short format.
Parameters:
html_str (str): HTML string containing voice label in format "Voice N"
Returns:
str: Voice label in format "VN" (e.g., "V14")
Example:
>>> extract_voice_label('<span style="...">Voice 14<br />...')
'V14'
"""
match = re.search(r'Voice (\d+)', html_str)
return f"V{match.group(1)}" if match else None
def extract_qid(val):
"""Extracts the 'ImportId' from a string representation of a dictionary."""
if isinstance(val, str) and val.startswith('{') and val.endswith('}'):
val = eval(val)
return val['ImportId']
def combine_exclusive_columns(df: pl.DataFrame, id_col: str = "_recordId", target_col_name: str = "combined_value") -> pl.DataFrame:
"""
Combines all columns except id_col into a single column.
Raises ValueError if more than one column is populated in a single row.
"""
merge_cols = [c for c in df.columns if c != id_col]
# Validate: count non-nulls horizontally
row_counts = df.select(
pl.sum_horizontal(pl.col(merge_cols).is_not_null())
).to_series()
if (row_counts > 1).any():
raise ValueError("Invalid Data: Multiple columns populated for a single record row.")
# Merge columns using coalesce
return df.select([
pl.col(id_col),
pl.coalesce(merge_cols).alias(target_col_name)
])
def calculate_weighted_ranking_scores(df: pl.LazyFrame) -> pl.DataFrame:
"""
Calculate weighted scores for character or voice rankings.
Points system: 1st place = 3 pts, 2nd place = 2 pts, 3rd place = 1 pt.
Parameters
----------
df : pl.DataFrame
DataFrame containing character/ voice ranking columns.
Returns
-------
pl.DataFrame
DataFrame with columns 'Character' and 'Weighted Score', sorted by score.
"""
if isinstance(df, pl.LazyFrame):
df = df.collect()
scores = []
# Identify ranking columns (assume all columns except _recordId)
ranking_cols = [c for c in df.columns if c != '_recordId']
for col in ranking_cols:
# Calculate score:
# (Count of Rank 1 * 3) + (Count of Rank 2 * 2) + (Count of Rank 3 * 1)
r1_count = df.filter(pl.col(col) == 1).height
r2_count = df.filter(pl.col(col) == 2).height
r3_count = df.filter(pl.col(col) == 3).height
weighted_score = (r1_count * 3) + (r2_count * 2) + (r3_count * 1)
# Clean name
clean_name = col.replace('Character_Ranking_', '').replace('Top_3_Voices_ranking__', '').replace('_', ' ').strip()
scores.append({
'Character': clean_name,
'Weighted Score': weighted_score
})
return pl.DataFrame(scores).sort('Weighted Score', descending=True)
def normalize_row_values(df: pl.DataFrame, target_cols: list[str]) -> pl.DataFrame:
"""
Normalizes values in the specified columns row-wise (Standardization: (x - mean) / std).
Ignores null values (NaNs). Only applied if there are at least 2 non-null values in the row.
"""
# Using list evaluation for row-wise stats
# We create a temporary list column containing values from all target columns
df_norm = df.with_columns(
pl.concat_list(target_cols)
.list.eval(
# Apply standardization: (x - mean) / std
# std(ddof=1) is the sample standard deviation
(pl.element() - pl.element().mean()) / pl.element().std(ddof=1)
)
.alias("_normalized_values")
)
# Unpack the list back to original columns
# list.get(i) retrieves the i-th element which corresponds to target_cols[i]
return df_norm.with_columns([
pl.col("_normalized_values").list.get(i).alias(target_cols[i])
for i in range(len(target_cols))
]).drop("_normalized_values")
def normalize_global_values(df: pl.DataFrame, target_cols: list[str]) -> pl.DataFrame:
"""
Normalizes values in the specified columns globally (Standardization: (x - global_mean) / global_std).
Computes a single mean and standard deviation across ALL values in the target_cols and applies it.
Ignores null values (NaNs).
"""
# Ensure eager for scalar extraction
was_lazy = isinstance(df, pl.LazyFrame)
if was_lazy:
df = df.collect()
if len(target_cols) == 0:
return df.lazy() if was_lazy else df
# Calculate global stats efficiently by stacking all columns
stats = df.select(target_cols).melt().select([
pl.col("value").mean().alias("mean"),
pl.col("value").std().alias("std")
])
global_mean = stats["mean"][0]
global_std = stats["std"][0]
if global_std is None or global_std == 0:
return df.lazy() if was_lazy else df
res = df.with_columns([
((pl.col(col) - global_mean) / global_std).alias(col)
for col in target_cols
])
return res.lazy() if was_lazy else res
class JPMCSurvey(JPMCPlotsMixin):
"""Class to handle JPMorgan Chase survey data."""
def __init__(self, data_path: Union[str, Path], qsf_path: Union[str, Path]):
if isinstance(data_path, str):
data_path = Path(data_path)
if isinstance(qsf_path, str):
qsf_path = Path(qsf_path)
self.data_filepath = data_path
self.qsf_filepath = qsf_path
self.qid_descr_map = self._extract_qid_descr_map()
self.qsf:dict = self._load_qsf()
# get export directory name for saving figures ie if data_path='data/exports/OneDrive_2026-01-21/...' should be 'figures/OneDrive_2026-01-21'
self.fig_save_dir = Path('figures') / self.data_filepath.parts[2]
if not self.fig_save_dir.exists():
self.fig_save_dir.mkdir(parents=True, exist_ok=True)
self.data_filtered = None
self.plot_height = 500
self.plot_width = 1000
# Filter values
self.filter_age:list = None
self.filter_gender:list = None
self.filter_consumer:list = None
self.filter_ethnicity:list = None
self.filter_income:list = None
def _extract_qid_descr_map(self) -> dict:
"""Extract mapping of Qualtrics ImportID to Question Description from results file."""
if '1_1-16-2026' in self.data_filepath.as_posix():
df_questions = pd.read_csv(self.data_filepath, nrows=1)
df_questions
return df_questions.iloc[0].to_dict()
else:
# First row contains Qualtrics Editor question names (ie 'B_VOICE SEL. 18-8')
# Second row which contains the question content
# Third row contains the Export Metadata (ie '{"ImportId":"startDate","timeZone":"America/Denver"}')
df_questions = pd.read_csv(self.data_filepath, nrows=2)
# transpose df_questions
df_questions = df_questions.T.reset_index()
df_questions.columns = ['QName', 'Description', 'export_metadata']
df_questions['ImportID'] = df_questions['export_metadata'].apply(extract_qid)
df_questions = df_questions[['ImportID', 'QName', 'Description']]
# return dict as {ImportID: [QName, Description]}
return df_questions.set_index('ImportID')[['QName', 'Description']].T.to_dict()
def _load_qsf(self) -> dict:
"""Load QSF file to extract question metadata if needed."""
with open(self.qsf_filepath, 'r', encoding='utf-8') as f:
qsf_data = json.load(f)
return qsf_data
def _get_qsf_question_by_QID(self, QID: str) -> dict:
"""Get question metadata from QSF using the Question ID."""
q_elem = [elem for elem in self.qsf['SurveyElements'] if elem['PrimaryAttribute'] == QID]
if len(q_elem) == 0:
raise ValueError(f"SurveyElement with 'PrimaryAttribute': '{QID}' not found in QSF.")
if len(q_elem) > 1:
raise ValueError(f"Multiple SurveyElements with 'PrimaryAttribute': '{QID}' found in QSF: \n{q_elem}")
return q_elem[0]
def load_data(self) -> pl.LazyFrame:
"""
Load CSV where column headers are in row 3 as dict strings with ImportId.
The 3rd row contains metadata like '{"ImportId":"startDate","timeZone":"America/Denver"}'.
This function extracts the ImportId from each column and uses it as the column name.
Parameters:
file_path (Path): Path to the CSV file to load.
Returns:
pl.LazyFrame: Polars LazyFrame with ImportId as column names.
"""
if '1_1-16-2026' in self.data_filepath.as_posix():
raise NotImplementedError("This method does not support the '1_1-16-2026' export format.")
# Read the 3rd row (index 2) which contains the metadata dictionaries
# Use header=None to get raw values instead of treating them as column names
df_meta = pd.read_csv(self.data_filepath, nrows=1, skiprows=2, header=None)
# Extract ImportIds from each column value in this row
new_columns = [extract_qid(val) for val in df_meta.iloc[0]]
# Now read the actual data starting from row 4 (skip first 3 rows)
df = pl.read_csv(self.data_filepath, skip_rows=3)
# Rename columns with the extracted ImportIds
df.columns = new_columns
# Store unique values for filters (ignoring nulls) to detect "all selected" state
self.options_age = sorted(df['QID1'].drop_nulls().unique().to_list()) if 'QID1' in df.columns else []
self.options_gender = sorted(df['QID2'].drop_nulls().unique().to_list()) if 'QID2' in df.columns else []
self.options_consumer = sorted(df['Consumer'].drop_nulls().unique().to_list()) if 'Consumer' in df.columns else []
self.options_ethnicity = sorted(df['QID3'].drop_nulls().unique().to_list()) if 'QID3' in df.columns else []
self.options_income = sorted(df['QID15'].drop_nulls().unique().to_list()) if 'QID15' in df.columns else []
return df.lazy()
def _get_subset(self, q: pl.LazyFrame, QIDs, rename_cols=True, include_record_id=True) -> pl.LazyFrame:
"""Extract subset of data based on specific questions."""
if include_record_id and '_recordId' not in QIDs:
QIDs = ['_recordId'] + QIDs
if not rename_cols:
return q.select(QIDs)
rename_dict = {qid: self.qid_descr_map[qid]['QName'] for qid in QIDs if qid in self.qid_descr_map and qid != '_recordId'}
return q.select(QIDs).rename(rename_dict)
def filter_data(self, q: pl.LazyFrame, age:list=None, gender:list=None, consumer:list=None, ethnicity:list=None, income:list=None) -> pl.LazyFrame:
"""Filter data based on provided parameters
Possible parameters:
- age: list of age groups to include
- gender: list
- consumer: list
- ethnicity: list
- income: list
Also saves the result to self.data_filtered.
"""
# Apply filters
self.filter_age = age
if age is not None:
q = q.filter(pl.col('QID1').is_in(age))
self.filter_gender = gender
if gender is not None:
q = q.filter(pl.col('QID2').is_in(gender))
self.filter_consumer = consumer
if consumer is not None:
q = q.filter(pl.col('Consumer').is_in(consumer))
self.filter_ethnicity = ethnicity
if ethnicity is not None:
q = q.filter(pl.col('QID3').is_in(ethnicity))
self.filter_income = income
if income is not None:
q = q.filter(pl.col('QID15').is_in(income))
self.data_filtered = q
return self.data_filtered
def get_demographics(self, q: pl.LazyFrame) -> Union[pl.LazyFrame, None]:
"""Extract columns containing the demographics.
Renames columns using qid_descr_map if provided.
"""
QIDs = ['QID1', 'QID2', 'QID3', 'QID4', 'QID13', 'QID14', 'QID15', 'QID16', 'QID17', 'Consumer']
return self._get_subset(q, QIDs), None
def get_top_8_traits(self, q: pl.LazyFrame) -> Union[pl.LazyFrame, None]:
"""Extract columns containing the top 8 characteristics are most important for this Chase virtual assistant to have.
Returns subquery that can be chained with other polars queries.
"""
QIDs = ['QID25']
return self._get_subset(q, QIDs, rename_cols=False).rename({'QID25': 'Top_8_Traits'}), None
def get_top_3_traits(self, q: pl.LazyFrame) -> Union[pl.LazyFrame, None]:
"""Extract columns containing the top 3 characteristics that the Chase virtual assistant should prioritize.
Returns subquery that can be chained with other polars queries.
"""
QIDs = ['QID26_0_GROUP']
return self._get_subset(q, QIDs, rename_cols=False).rename({'QID26_0_GROUP': 'Top_3_Traits'}), None
def get_character_ranking(self, q: pl.LazyFrame) -> Union[pl.LazyFrame, None]:
"""Extract columns containing the ranking of characteristics for the Chase virtual assistant.
Returns subquery that can be chained with other polars queries.
"""
# Requires QSF to map "Character Ranking_2" to the actual character
cfg = self._get_qsf_question_by_QID('QID27')['Payload']
QIDs_map = {f'QID27_{v}': cfg['VariableNaming'][k] for k,v in cfg['RecodeValues'].items()}
QIDs_rename = {qid: f'Character_Ranking_{QIDs_map[qid].replace(" ", "_")}' for qid in QIDs_map}
return self._get_subset(q, list(QIDs_rename.keys()), rename_cols=False).rename(QIDs_rename), None
def get_18_8_3(self, q: pl.LazyFrame) -> Union[pl.LazyFrame, None]:
"""Extract columns containing the 18-8-3 feedback for the Chase virtual assistant.
Returns subquery that can be chained with other polars queries.
"""
QIDs = ['QID29', 'QID101', 'QID36_0_GROUP']
rename_dict = {
'QID29': '18-8_Set-A',
'QID101': '18-8_Set-B',
'QID36_0_GROUP': '3_Ranked'
}
subset = self._get_subset(q, QIDs, rename_cols=False).rename(rename_dict)
# Combine 18-8 Set A and Set B into single column
subset = subset.with_columns(
pl.coalesce(['18-8_Set-A', '18-8_Set-B']).alias('8_Combined')
)
# Change order of columns
subset = subset.select(['_recordId', '18-8_Set-A', '18-8_Set-B', '8_Combined', '3_Ranked'])
return subset, None
def get_voice_scale_1_10(self, q: pl.LazyFrame) -> Union[pl.LazyFrame, None]:
"""Extract columns containing the Voice Scale 1-10 ratings for the Chase virtual assistant.
Returns subquery that can be chained with other polars queries.
"""
QIDs_map = {}
for qid, val in self.qid_descr_map.items():
if 'Scale 1-10_1' in val['QName']:
# Convert "Voice 16 Scale 1-10_1" to "Scale_1_10__Voice_16"
QIDs_map[qid] = f"Voice_Scale_1_10__V{val['QName'].split()[1]}"
return self._get_subset(q, list(QIDs_map.keys()), rename_cols=False).rename(QIDs_map), None
def get_ss_green_blue(self, q: pl.LazyFrame) -> Union[pl.LazyFrame, dict]:
"""Extract columns containing the SS Green/Blue ratings for the Chase virtual assistant.
Returns subquery that can be chained with other polars queries.
"""
cfg = self._get_qsf_question_by_QID('QID35')['Payload']
QIDs_map = {}
choices_map = {}
for qid, val in self.qid_descr_map.items():
if 'SS Green-Blue' in val['QName']:
cfg = self._get_qsf_question_by_QID(qid.split('_')[0])['Payload']
# ie: "V14 SS Green-Blue_1"
qname_parts = val['QName'].split()
voice = qname_parts[0]
trait_num = qname_parts[-1].split('_')[-1]
QIDs_map[qid] = f"SS_Green_Blue__{voice}__Choice_{trait_num}"
choices_map[f"SS_Green_Blue__{voice}__Choice_{trait_num}"] = cfg['Choices'][trait_num]['Display']
return self._get_subset(q, list(QIDs_map.keys()), rename_cols=False).rename(QIDs_map), choices_map
def get_top_3_voices(self, q: pl.LazyFrame) -> Union[pl.LazyFrame, None]:
"""Extract columns containing the top 3 voice choices for the Chase virtual assistant.
Returns subquery that can be chained with other polars queries.
"""
QIDs_map = {}
cfg36 = self._get_qsf_question_by_QID('QID36')['Payload']
choice_voice_map = {k: extract_voice_label(v['Display']) for k,v in cfg36['Choices'].items()}
for qid, val in self.qid_descr_map.items():
if 'Rank Top 3 Voices' in val['QName']:
cfg = self._get_qsf_question_by_QID(qid.split('_')[0])['Payload']
voice_num = val['QName'].split('_')[-1]
# Validate that the DynamicChoices Locator is as expected
if cfg['DynamicChoices']['Locator'] != r"q://QID36/ChoiceGroup/SelectedChoicesInGroup/1":
raise ValueError(f"Unexpected DynamicChoices Locator for QID '{qid}': {cfg['DynamicChoices']['Locator']}")
# extract the voice from the QID36 config
voice = choice_voice_map[voice_num]
# Convert "Top 3 Voices_1" to "Top_3_Voices__V14"
QIDs_map[qid] = f"Top_3_Voices_ranking__{voice}"
return self._get_subset(q, list(QIDs_map.keys()), rename_cols=False).rename(QIDs_map), None
def get_ss_orange_red(self, q: pl.LazyFrame) -> Union[pl.LazyFrame, dict]:
"""Extract columns containing the SS Orange/Red ratings for the Chase virtual assistant.
Returns subquery that can be chained with other polars queries.
"""
cfg = self._get_qsf_question_by_QID('QID40')['Payload']
QIDs_map = {}
choices_map = {}
for qid, val in self.qid_descr_map.items():
if 'SS Orange-Red' in val['QName']:
cfg = self._get_qsf_question_by_QID(qid.split('_')[0])['Payload']
# ie: "V14 SS Orange-Red_1"
qname_parts = val['QName'].split()
voice = qname_parts[0]
trait_num = qname_parts[-1].split('_')[-1]
QIDs_map[qid] = f"SS_Orange_Red__{voice}__Choice_{trait_num}"
choices_map[f"SS_Orange_Red__{voice}__Choice_{trait_num}"] = cfg['Choices'][trait_num]['Display']
return self._get_subset(q, list(QIDs_map.keys()), rename_cols=False).rename(QIDs_map), choices_map
def get_character_refine(self, q: pl.LazyFrame) -> Union[pl.LazyFrame, None]:
"""Extract columns containing the character refine feedback for the Chase virtual assistant.
Returns subquery that can be chained with other polars queries.
"""
QIDs = ['QID44', 'QID97', 'QID95', 'QID96']
return self._get_subset(q, QIDs, rename_cols=True), None
def process_speaking_style_data(
df: Union[pl.LazyFrame, pl.DataFrame],
trait_map: dict[str, str]
) -> pl.DataFrame:
"""
Process speaking style columns from wide to long format and map trait descriptions.
Parses columns with format: SS_{StyleGroup}__{Voice}__{ChoiceID}
Example: SS_Orange_Red__V14__Choice_1
Parameters
----------
df : pl.LazyFrame or pl.DataFrame
Input dataframe containing SS_* columns.
trait_map : dict
Dictionary mapping column names to trait descriptions.
Keys should be full column names like "SS_Orange_Red__V14__Choice_1".
Returns
-------
pl.DataFrame
Long-format dataframe with columns:
_recordId, Voice, Style_Group, Choice_ID, Description, Score, Left_Anchor, Right_Anchor
"""
# Normalize input to LazyFrame
lf = df.lazy() if isinstance(df, pl.DataFrame) else df
# 1. Melt SS_ columns
melted = lf.melt(
id_vars=["_recordId"],
value_vars=pl.col("^SS_.*$"),
variable_name="full_col_name",
value_name="score"
)
# 2. Extract components from column name
# Regex captures: Style_Group (e.g. SS_Orange_Red), Voice (e.g. V14), Choice_ID (e.g. Choice_1)
pattern = r"^(?P<Style_Group>SS_.+?)__(?P<Voice>.+?)__(?P<Choice_ID>Choice_\d+)$"
processed = melted.with_columns(
pl.col("full_col_name").str.extract_groups(pattern)
).unnest("full_col_name")
# 3. Create Mapping Lookup from the provided dictionary
# We map (Style_Group, Choice_ID) -> Description
mapping_data = []
seen = set()
for col_name, desc in trait_map.items():
match = re.match(pattern, col_name)
if match:
groups = match.groupdict()
key = (groups["Style_Group"], groups["Choice_ID"])
if key not in seen:
# Parse description into anchors if possible (Left : Right)
parts = desc.split(':')
left_anchor = parts[0].strip() if len(parts) > 0 else ""
right_anchor = parts[1].strip() if len(parts) > 1 else ""
mapping_data.append({
"Style_Group": groups["Style_Group"],
"Choice_ID": groups["Choice_ID"],
"Description": desc,
"Left_Anchor": left_anchor,
"Right_Anchor": right_anchor
})
seen.add(key)
if not mapping_data:
return processed.collect()
mapping_lf = pl.LazyFrame(mapping_data)
# 4. Join Data with Mapping
result = processed.join(
mapping_lf,
on=["Style_Group", "Choice_ID"],
how="left"
)
# 5. Cast score to Int
result = result.with_columns(
pl.col("score").cast(pl.Int64, strict=False)
)
return result.collect()
def process_voice_scale_data(
df: Union[pl.LazyFrame, pl.DataFrame]
) -> pl.DataFrame:
"""
Process Voice Scale columns from wide to long format.
Parses columns with format: Voice_Scale_1_10__V{Voice}
Example: Voice_Scale_1_10__V14
Returns
-------
pl.DataFrame
Long-format dataframe with columns:
_recordId, Voice, Voice_Scale_Score
"""
lf = df.lazy() if isinstance(df, pl.DataFrame) else df
# Melt
melted = lf.melt(
id_vars=["_recordId"],
value_vars=pl.col("^Voice_Scale_1_10__V.*$"),
variable_name="full_col_name",
value_name="Voice_Scale_Score"
)
# Extract Voice
processed = melted.with_columns(
pl.col("full_col_name").str.extract(r"V(\d+)", 1).alias("Voice_Num")
).with_columns(
("V" + pl.col("Voice_Num")).alias("Voice")
)
# Keep Score as Float (original data is f64)
result = processed.select([
"_recordId",
"Voice",
pl.col("Voice_Scale_Score").cast(pl.Float64, strict=False)
])
return result.collect()
def join_voice_and_style_data(
processed_style_data: pl.DataFrame,
processed_voice_data: pl.DataFrame
) -> pl.DataFrame:
"""
Joins processed Speaking Style data with Voice Scale 1-10 data.
Parameters
----------
processed_style_data : pl.DataFrame
Result of process_speaking_style_data
processed_voice_data : pl.DataFrame
Result of process_voice_scale_data
Returns
-------
pl.DataFrame
Merged dataframe with columns from both, joined on _recordId and Voice.
"""
return processed_style_data.join(
processed_voice_data,
on=["_recordId", "Voice"],
how="inner"
)
def process_voice_ranking_data(
df: Union[pl.LazyFrame, pl.DataFrame]
) -> pl.DataFrame:
"""
Process Voice Ranking columns from wide to long format and convert ranks to points.
Parses columns with format: Top_3_Voices_ranking__V{Voice}
Converts ranks to points: 1st place = 3 pts, 2nd place = 2 pts, 3rd place = 1 pt
Returns
-------
pl.DataFrame
Long-format dataframe with columns:
_recordId, Voice, Ranking_Points
"""
lf = df.lazy() if isinstance(df, pl.DataFrame) else df
# Melt
melted = lf.melt(
id_vars=["_recordId"],
value_vars=pl.col("^Top_3_Voices_ranking__V.*$"),
variable_name="full_col_name",
value_name="rank"
)
# Extract Voice
processed = melted.with_columns(
pl.col("full_col_name").str.extract(r"V(\d+)", 1).alias("Voice_Num")
).with_columns(
("V" + pl.col("Voice_Num")).alias("Voice")
)
# Convert rank to points: 1st=3, 2nd=2, 3rd=1, null=0 (not ranked)
# Rank values are 1, 2, 3 for position in top 3
result = processed.with_columns(
pl.when(pl.col("rank") == 1).then(3)
.when(pl.col("rank") == 2).then(2)
.when(pl.col("rank") == 3).then(1)
.otherwise(0)
.alias("Ranking_Points")
).select([
"_recordId",
"Voice",
"Ranking_Points"
])
return result.collect()