1007 lines
37 KiB
Python
1007 lines
37 KiB
Python
import polars as pl
|
|
from pathlib import Path
|
|
import pandas as pd
|
|
from typing import Union
|
|
import json
|
|
import re
|
|
import hashlib
|
|
import os
|
|
from plots import JPMCPlotsMixin
|
|
|
|
|
|
from pptx import Presentation
|
|
from pptx.enum.shapes import MSO_SHAPE_TYPE
|
|
|
|
|
|
def image_alt_text_generator(fpath, include_dataset_dirname=False) -> str:
|
|
"""convert image file path to alt text
|
|
|
|
Args:
|
|
fpath (str or Path): path to image file, must start with 'figures/'
|
|
include_dataset_dirname (bool): whether to include the dataset directory name in the alt text. Recommended to keep False, so that the images do not get tied to a specific dataset export. (Defeats the purpose of assigning alt text to be able to update images when new datasets are exported.)
|
|
"""
|
|
|
|
if not isinstance(fpath, Path):
|
|
fpath = Path(fpath)
|
|
|
|
fparts = fpath.parts
|
|
assert fparts[0] == 'figures', "Image file path must start with 'figures'"
|
|
|
|
if include_dataset_dirname:
|
|
return Path('/'.join(fparts[1:])).as_posix()
|
|
else:
|
|
return Path('/'.join(fparts[2:])).as_posix()
|
|
|
|
def pptx_replace_named_image(presentation_path, target_tag, new_image_path, save_path):
|
|
"""
|
|
Finds and replaces specific images in a PowerPoint presentation while
|
|
preserving their original position, size, and aspect ratio.
|
|
|
|
This function performs a 'surgical' replacement: it records the coordinates
|
|
of the existing image, removes it from the slide's XML, and inserts a
|
|
new image into the exact same bounding box. It identifies the target
|
|
image by searching for a specific string within the Shape Name
|
|
(Selection Pane) or Alt Text.
|
|
|
|
Args:
|
|
presentation_path (str): The file path to the source .pptx file.
|
|
target_tag (str): The unique identifier to look for (e.g., 'HERO_IMAGE').
|
|
This is case-sensitive and checks both the shape name and alt text.
|
|
new_image_path (str): The file path to the new image (PNG, JPG, etc.).
|
|
save_path (str): The file path where the modified presentation will be saved.
|
|
|
|
Returns:
|
|
None: Saves the file directly to the provided save_path.
|
|
|
|
Raises:
|
|
FileNotFoundError: If the source presentation or new image is not found.
|
|
PermissionError: If the save_path is currently open or locked.
|
|
"""
|
|
prs = Presentation(presentation_path)
|
|
|
|
for i, slide in enumerate(prs.slides):
|
|
# Iterate over a list copy of shapes to safely modify the slide during iteration
|
|
print(f"Processing Slide {i + 1}...")
|
|
print(f"Total Shapes: {len(slide.shapes)} shapes")
|
|
|
|
for shape in list(slide.shapes):
|
|
print(f"Checking shape: {shape.name} of type {shape.shape_type}...")
|
|
|
|
shape_name = shape.name or ""
|
|
alt_text = ""
|
|
|
|
# More robust strategy: Check for alt text in ANY valid element property
|
|
# This allows replacing Pictures, Placeholders, GraphicFrames, etc.
|
|
try:
|
|
# Check for common property names used by python-pptx elements to store non-visual props
|
|
# nvPicPr (Picture), nvSpPr (Shape/Placeholder), nvGrpSpPr (Group),
|
|
# nvGraphicFramePr (GraphicFrame), nvCxnSpPr (Connector)
|
|
nvPr = None
|
|
for attr in ['nvPicPr', 'nvSpPr', 'nvGrpSpPr', 'nvGraphicFramePr', 'nvCxnSpPr']:
|
|
if hasattr(shape._element, attr):
|
|
nvPr = getattr(shape._element, attr)
|
|
break
|
|
|
|
if nvPr is not None and hasattr(nvPr, 'cNvPr'):
|
|
alt_text = nvPr.cNvPr.get("descr", "")
|
|
except Exception:
|
|
pass
|
|
|
|
print(f"Alt Text for shape '{shape_name}': {alt_text}")
|
|
|
|
if target_tag in shape_name or target_tag in alt_text:
|
|
print(f"Found it! Replacing {shape_name}...")
|
|
|
|
try:
|
|
# Record coordinates
|
|
left, top, width, height = shape.left, shape.top, shape.width, shape.height
|
|
|
|
# Remove old shape
|
|
old_element = shape._element
|
|
old_element.getparent().remove(old_element)
|
|
|
|
# Add new image at the same spot
|
|
slide.shapes.add_picture(str(new_image_path), left, top, width, height)
|
|
except AttributeError:
|
|
print(f"Could not replace {shape_name} - might be missing dimensions.")
|
|
|
|
else:
|
|
print(f"Skipping shape '{shape_name}' with alt text '{alt_text}'")
|
|
|
|
prs.save(save_path)
|
|
print(f"Successfully saved to {save_path}")
|
|
|
|
|
|
def _calculate_file_sha1(file_path: Union[str, Path]) -> str:
|
|
"""Calculate SHA1 hash of a file."""
|
|
sha1 = hashlib.sha1()
|
|
with open(file_path, 'rb') as f:
|
|
while True:
|
|
data = f.read(65536)
|
|
if not data:
|
|
break
|
|
sha1.update(data)
|
|
return sha1.hexdigest()
|
|
|
|
|
|
def _build_image_hash_map(root_dir: Union[str, Path]) -> dict:
|
|
"""
|
|
Recursively walk the directory and build a map of SHA1 hashes to file paths.
|
|
Only includes common image extensions.
|
|
"""
|
|
hash_map = {}
|
|
valid_extensions = {'.png', '.jpg', '.jpeg', '.tiff', '.bmp', '.gif'}
|
|
|
|
root = Path(root_dir)
|
|
print(f"Building image hash map from {root}...")
|
|
|
|
count = 0
|
|
for root_path, dirs, files in os.walk(root):
|
|
for file in files:
|
|
file_path = Path(root_path) / file
|
|
if file_path.suffix.lower() in valid_extensions:
|
|
try:
|
|
file_sha1 = _calculate_file_sha1(file_path)
|
|
# We store the absolute path for reference, but we might just need the path relative to project for alt text
|
|
hash_map[file_sha1] = file_path
|
|
count += 1
|
|
except Exception as e:
|
|
print(f"Error hashing {file_path}: {e}")
|
|
|
|
print(f"Indexed {count} images.")
|
|
return hash_map
|
|
|
|
|
|
def _iter_picture_shapes(shapes):
|
|
"""
|
|
Recursively iterate over shapes and yield those that are pictures
|
|
(have an 'image' property), diving into groups.
|
|
"""
|
|
for shape in shapes:
|
|
# Check groups recursively
|
|
if shape.shape_type == MSO_SHAPE_TYPE.GROUP:
|
|
yield from _iter_picture_shapes(shape.shapes)
|
|
continue
|
|
|
|
# Check if shape has image property (Pictures, Placeholders with images)
|
|
if hasattr(shape, 'image'):
|
|
yield shape
|
|
|
|
|
|
def update_ppt_alt_text(ppt_path: Union[str, Path], image_source_dir: Union[str, Path], output_path: Union[str, Path] = None):
|
|
"""
|
|
Updates the alt text of images in a PowerPoint presentation by matching
|
|
their content (SHA1 hash) with images in a source directory.
|
|
|
|
Args:
|
|
ppt_path (str/Path): Path to the PowerPoint file.
|
|
image_source_dir (str/Path): Directory containing source images to match against.
|
|
output_path (str/Path, optional): Path to save the updated presentation.
|
|
If None, overwrites the input file.
|
|
"""
|
|
if output_path is None:
|
|
output_path = ppt_path
|
|
|
|
# 1. Build lookup map of {sha1: file_path} from the source directory
|
|
image_hash_map = _build_image_hash_map(image_source_dir)
|
|
|
|
# 2. Open Presentation
|
|
try:
|
|
prs = Presentation(ppt_path)
|
|
except Exception as e:
|
|
print(f"Error opening presentation {ppt_path}: {e}")
|
|
return
|
|
|
|
updates_count = 0
|
|
slides = list(prs.slides)
|
|
total_slides = len(slides)
|
|
|
|
print(f"Processing {total_slides} slides...")
|
|
|
|
for i, slide in enumerate(slides):
|
|
# Use recursive iterator to find all pictures including those in groups/placeholders
|
|
picture_shapes = list(_iter_picture_shapes(slide.shapes))
|
|
|
|
for shape in picture_shapes:
|
|
try:
|
|
# shape.image.sha1 returns the SHA1 hash of the image blob
|
|
current_sha1 = shape.image.sha1
|
|
|
|
if current_sha1 in image_hash_map:
|
|
original_path = image_hash_map[current_sha1]
|
|
|
|
# Generate Alt Text
|
|
try:
|
|
# Prepare path for generator.
|
|
# Try to relativize to CWD if capable
|
|
pass_path = original_path
|
|
try:
|
|
pass_path = original_path.relative_to(Path.cwd())
|
|
except ValueError:
|
|
pass
|
|
|
|
new_alt_text = image_alt_text_generator(pass_path)
|
|
|
|
# Check existing alt text to avoid redundant updates/log them
|
|
# Accessing alt text via cNvPr
|
|
# Note: Different shape types might store non-visual props differently
|
|
# Picture: nvPicPr.cNvPr
|
|
# GraphicFrame: nvGraphicFramePr.cNvPr
|
|
# Group: nvGrpSpPr.cNvPr
|
|
# Shape/Placeholder: nvSpPr.cNvPr
|
|
|
|
nvPr = None
|
|
for attr in ['nvPicPr', 'nvSpPr', 'nvGrpSpPr', 'nvGraphicFramePr', 'nvCxnSpPr']:
|
|
if hasattr(shape._element, attr):
|
|
nvPr = getattr(shape._element, attr)
|
|
break
|
|
|
|
if nvPr and hasattr(nvPr, 'cNvPr'):
|
|
cNvPr = nvPr.cNvPr
|
|
existing_alt_text = cNvPr.get("descr", "")
|
|
|
|
if existing_alt_text != new_alt_text:
|
|
print(f"Slide {i+1}: Updating alt text for image matches '{pass_path}'")
|
|
print(f" Old: '{existing_alt_text}' -> New: '{new_alt_text}'")
|
|
cNvPr.set("descr", new_alt_text)
|
|
updates_count += 1
|
|
else:
|
|
print(f"Could not find cNvPr for shape on slide {i+1}")
|
|
|
|
except AssertionError as e:
|
|
print(f"Skipping match for {original_path} due to generator error: {e}")
|
|
except Exception as e:
|
|
print(f"Error updating alt text for {original_path}: {e}")
|
|
|
|
except AttributeError:
|
|
continue
|
|
except Exception as e:
|
|
print(f"Error processing shape on slide {i+1}: {e}")
|
|
|
|
if updates_count > 0:
|
|
prs.save(output_path)
|
|
print(f"Saved updated presentation to {output_path} with {updates_count} updates.")
|
|
else:
|
|
print("No images matched or required updates.")
|
|
|
|
|
|
def extract_voice_label(html_str: str) -> str:
|
|
"""
|
|
Extract voice label from HTML string and convert to short format.
|
|
|
|
Parameters:
|
|
html_str (str): HTML string containing voice label in format "Voice N"
|
|
|
|
Returns:
|
|
str: Voice label in format "VN" (e.g., "V14")
|
|
|
|
Example:
|
|
>>> extract_voice_label('<span style="...">Voice 14<br />...')
|
|
'V14'
|
|
"""
|
|
match = re.search(r'Voice (\d+)', html_str)
|
|
return f"V{match.group(1)}" if match else None
|
|
|
|
|
|
def extract_qid(val):
|
|
"""Extracts the 'ImportId' from a string representation of a dictionary."""
|
|
|
|
if isinstance(val, str) and val.startswith('{') and val.endswith('}'):
|
|
val = eval(val)
|
|
return val['ImportId']
|
|
|
|
|
|
def combine_exclusive_columns(df: pl.DataFrame, id_col: str = "_recordId", target_col_name: str = "combined_value") -> pl.DataFrame:
|
|
"""
|
|
Combines all columns except id_col into a single column.
|
|
Raises ValueError if more than one column is populated in a single row.
|
|
"""
|
|
merge_cols = [c for c in df.columns if c != id_col]
|
|
|
|
# Validate: count non-nulls horizontally
|
|
row_counts = df.select(
|
|
pl.sum_horizontal(pl.col(merge_cols).is_not_null())
|
|
).to_series()
|
|
|
|
if (row_counts > 1).any():
|
|
raise ValueError("Invalid Data: Multiple columns populated for a single record row.")
|
|
|
|
# Merge columns using coalesce
|
|
return df.select([
|
|
pl.col(id_col),
|
|
pl.coalesce(merge_cols).alias(target_col_name)
|
|
])
|
|
|
|
|
|
|
|
def calculate_weighted_ranking_scores(df: pl.LazyFrame) -> pl.DataFrame:
|
|
"""
|
|
Calculate weighted scores for character or voice rankings.
|
|
Points system: 1st place = 3 pts, 2nd place = 2 pts, 3rd place = 1 pt.
|
|
|
|
Parameters
|
|
----------
|
|
df : pl.DataFrame
|
|
DataFrame containing character/ voice ranking columns.
|
|
|
|
Returns
|
|
-------
|
|
pl.DataFrame
|
|
DataFrame with columns 'Character' and 'Weighted Score', sorted by score.
|
|
"""
|
|
if isinstance(df, pl.LazyFrame):
|
|
df = df.collect()
|
|
|
|
scores = []
|
|
# Identify ranking columns (assume all columns except _recordId)
|
|
ranking_cols = [c for c in df.columns if c != '_recordId']
|
|
|
|
for col in ranking_cols:
|
|
# Calculate score:
|
|
# (Count of Rank 1 * 3) + (Count of Rank 2 * 2) + (Count of Rank 3 * 1)
|
|
r1_count = df.filter(pl.col(col) == 1).height
|
|
r2_count = df.filter(pl.col(col) == 2).height
|
|
r3_count = df.filter(pl.col(col) == 3).height
|
|
|
|
weighted_score = (r1_count * 3) + (r2_count * 2) + (r3_count * 1)
|
|
|
|
# Clean name
|
|
clean_name = col.replace('Character_Ranking_', '').replace('Top_3_Voices_ranking__', '').replace('_', ' ').strip()
|
|
|
|
scores.append({
|
|
'Character': clean_name,
|
|
'Weighted Score': weighted_score
|
|
})
|
|
|
|
return pl.DataFrame(scores).sort('Weighted Score', descending=True)
|
|
|
|
|
|
def normalize_row_values(df: pl.DataFrame, target_cols: list[str]) -> pl.DataFrame:
|
|
"""
|
|
Normalizes values in the specified columns row-wise to 0-10 scale (Min-Max normalization).
|
|
Formula: ((x - row_min) / (row_max - row_min)) * 10
|
|
|
|
Nulls are preserved as nulls. If all non-null values in a row are equal (max == min),
|
|
those values become 5.0 (midpoint of the scale).
|
|
|
|
Parameters
|
|
----------
|
|
df : pl.DataFrame
|
|
Input dataframe.
|
|
target_cols : list[str]
|
|
List of column names to normalize.
|
|
|
|
Returns
|
|
-------
|
|
pl.DataFrame
|
|
DataFrame with target columns normalized row-wise.
|
|
"""
|
|
# Calculate row min and max across target columns (ignoring nulls)
|
|
row_min = pl.min_horizontal([pl.col(c).cast(pl.Float64) for c in target_cols])
|
|
row_max = pl.max_horizontal([pl.col(c).cast(pl.Float64) for c in target_cols])
|
|
row_range = row_max - row_min
|
|
|
|
# Build normalized column expressions
|
|
norm_exprs = []
|
|
for col in target_cols:
|
|
norm_exprs.append(
|
|
pl.when(row_range == 0)
|
|
.then(
|
|
# If range is 0 (all values equal), return 5.0 for non-null, null for null
|
|
pl.when(pl.col(col).is_null()).then(None).otherwise(5.0)
|
|
)
|
|
.otherwise(
|
|
((pl.col(col).cast(pl.Float64) - row_min) / row_range) * 10
|
|
)
|
|
.alias(col)
|
|
)
|
|
|
|
return df.with_columns(norm_exprs)
|
|
|
|
|
|
def normalize_global_values(df: pl.DataFrame, target_cols: list[str]) -> pl.DataFrame:
|
|
"""
|
|
Normalizes values in the specified columns globally to 0-10 scale.
|
|
Formula: ((x - global_min) / (global_max - global_min)) * 10
|
|
Ignores null values (NaNs).
|
|
"""
|
|
# Ensure eager for scalar extraction
|
|
was_lazy = isinstance(df, pl.LazyFrame)
|
|
if was_lazy:
|
|
df = df.collect()
|
|
|
|
if len(target_cols) == 0:
|
|
return df.lazy() if was_lazy else df
|
|
|
|
# Calculate global stats efficiently by stacking all columns
|
|
# Cast to Float64 to ensure numeric calculations
|
|
stats = df.select([pl.col(c).cast(pl.Float64) for c in target_cols]).melt().select([
|
|
pl.col("value").min().alias("min"),
|
|
pl.col("value").max().alias("max")
|
|
])
|
|
|
|
global_min = stats["min"][0]
|
|
global_max = stats["max"][0]
|
|
|
|
# Handle edge case where all values are same or none exist
|
|
if global_min is None or global_max is None or global_max == global_min:
|
|
return df.lazy() if was_lazy else df
|
|
|
|
global_range = global_max - global_min
|
|
|
|
res = df.with_columns([
|
|
(((pl.col(col).cast(pl.Float64) - global_min) / global_range) * 10).alias(col)
|
|
for col in target_cols
|
|
])
|
|
|
|
return res.lazy() if was_lazy else res
|
|
|
|
|
|
class JPMCSurvey(JPMCPlotsMixin):
|
|
"""Class to handle JPMorgan Chase survey data."""
|
|
|
|
def __init__(self, data_path: Union[str, Path], qsf_path: Union[str, Path]):
|
|
if isinstance(data_path, str):
|
|
data_path = Path(data_path)
|
|
|
|
if isinstance(qsf_path, str):
|
|
qsf_path = Path(qsf_path)
|
|
|
|
self.data_filepath = data_path
|
|
self.qsf_filepath = qsf_path
|
|
self.qid_descr_map = self._extract_qid_descr_map()
|
|
self.qsf:dict = self._load_qsf()
|
|
|
|
# get export directory name for saving figures ie if data_path='data/exports/OneDrive_2026-01-21/...' should be 'figures/OneDrive_2026-01-21'
|
|
self.fig_save_dir = Path('figures') / self.data_filepath.parts[2]
|
|
if not self.fig_save_dir.exists():
|
|
self.fig_save_dir.mkdir(parents=True, exist_ok=True)
|
|
|
|
self.data_filtered = None
|
|
self.plot_height = 500
|
|
self.plot_width = 1000
|
|
|
|
# Filter values
|
|
self.filter_age:list = None
|
|
self.filter_gender:list = None
|
|
self.filter_consumer:list = None
|
|
self.filter_ethnicity:list = None
|
|
self.filter_income:list = None
|
|
|
|
|
|
|
|
def _extract_qid_descr_map(self) -> dict:
|
|
"""Extract mapping of Qualtrics ImportID to Question Description from results file."""
|
|
|
|
if '1_1-16-2026' in self.data_filepath.as_posix():
|
|
df_questions = pd.read_csv(self.data_filepath, nrows=1)
|
|
df_questions
|
|
|
|
return df_questions.iloc[0].to_dict()
|
|
|
|
|
|
else:
|
|
# First row contains Qualtrics Editor question names (ie 'B_VOICE SEL. 18-8')
|
|
|
|
# Second row which contains the question content
|
|
# Third row contains the Export Metadata (ie '{"ImportId":"startDate","timeZone":"America/Denver"}')
|
|
df_questions = pd.read_csv(self.data_filepath, nrows=2)
|
|
|
|
|
|
|
|
# transpose df_questions
|
|
df_questions = df_questions.T.reset_index()
|
|
df_questions.columns = ['QName', 'Description', 'export_metadata']
|
|
df_questions['ImportID'] = df_questions['export_metadata'].apply(extract_qid)
|
|
|
|
df_questions = df_questions[['ImportID', 'QName', 'Description']]
|
|
|
|
# return dict as {ImportID: [QName, Description]}
|
|
return df_questions.set_index('ImportID')[['QName', 'Description']].T.to_dict()
|
|
|
|
def _load_qsf(self) -> dict:
|
|
"""Load QSF file to extract question metadata if needed."""
|
|
|
|
with open(self.qsf_filepath, 'r', encoding='utf-8') as f:
|
|
qsf_data = json.load(f)
|
|
return qsf_data
|
|
|
|
def _get_qsf_question_by_QID(self, QID: str) -> dict:
|
|
"""Get question metadata from QSF using the Question ID."""
|
|
|
|
q_elem = [elem for elem in self.qsf['SurveyElements'] if elem['PrimaryAttribute'] == QID]
|
|
|
|
if len(q_elem) == 0:
|
|
raise ValueError(f"SurveyElement with 'PrimaryAttribute': '{QID}' not found in QSF.")
|
|
if len(q_elem) > 1:
|
|
raise ValueError(f"Multiple SurveyElements with 'PrimaryAttribute': '{QID}' found in QSF: \n{q_elem}")
|
|
|
|
return q_elem[0]
|
|
|
|
|
|
def load_data(self) -> pl.LazyFrame:
|
|
"""
|
|
Load CSV where column headers are in row 3 as dict strings with ImportId.
|
|
|
|
The 3rd row contains metadata like '{"ImportId":"startDate","timeZone":"America/Denver"}'.
|
|
This function extracts the ImportId from each column and uses it as the column name.
|
|
|
|
Parameters:
|
|
file_path (Path): Path to the CSV file to load.
|
|
|
|
Returns:
|
|
pl.LazyFrame: Polars LazyFrame with ImportId as column names.
|
|
"""
|
|
if '1_1-16-2026' in self.data_filepath.as_posix():
|
|
raise NotImplementedError("This method does not support the '1_1-16-2026' export format.")
|
|
|
|
# Read the 3rd row (index 2) which contains the metadata dictionaries
|
|
# Use header=None to get raw values instead of treating them as column names
|
|
df_meta = pd.read_csv(self.data_filepath, nrows=1, skiprows=2, header=None)
|
|
|
|
# Extract ImportIds from each column value in this row
|
|
new_columns = [extract_qid(val) for val in df_meta.iloc[0]]
|
|
|
|
# Now read the actual data starting from row 4 (skip first 3 rows)
|
|
df = pl.read_csv(self.data_filepath, skip_rows=3)
|
|
|
|
# Rename columns with the extracted ImportIds
|
|
df.columns = new_columns
|
|
|
|
# Store unique values for filters (ignoring nulls) to detect "all selected" state
|
|
self.options_age = sorted(df['QID1'].drop_nulls().unique().to_list()) if 'QID1' in df.columns else []
|
|
self.options_gender = sorted(df['QID2'].drop_nulls().unique().to_list()) if 'QID2' in df.columns else []
|
|
self.options_consumer = sorted(df['Consumer'].drop_nulls().unique().to_list()) if 'Consumer' in df.columns else []
|
|
self.options_ethnicity = sorted(df['QID3'].drop_nulls().unique().to_list()) if 'QID3' in df.columns else []
|
|
self.options_income = sorted(df['QID15'].drop_nulls().unique().to_list()) if 'QID15' in df.columns else []
|
|
|
|
return df.lazy()
|
|
|
|
def _get_subset(self, q: pl.LazyFrame, QIDs, rename_cols=True, include_record_id=True) -> pl.LazyFrame:
|
|
"""Extract subset of data based on specific questions."""
|
|
|
|
if include_record_id and '_recordId' not in QIDs:
|
|
QIDs = ['_recordId'] + QIDs
|
|
|
|
if not rename_cols:
|
|
return q.select(QIDs)
|
|
|
|
rename_dict = {qid: self.qid_descr_map[qid]['QName'] for qid in QIDs if qid in self.qid_descr_map and qid != '_recordId'}
|
|
|
|
return q.select(QIDs).rename(rename_dict)
|
|
|
|
def filter_data(self, q: pl.LazyFrame, age:list=None, gender:list=None, consumer:list=None, ethnicity:list=None, income:list=None) -> pl.LazyFrame:
|
|
"""Filter data based on provided parameters
|
|
|
|
Possible parameters:
|
|
- age: list of age groups to include
|
|
- gender: list
|
|
- consumer: list
|
|
- ethnicity: list
|
|
- income: list
|
|
|
|
Also saves the result to self.data_filtered.
|
|
"""
|
|
|
|
# Apply filters
|
|
self.filter_age = age
|
|
if age is not None:
|
|
q = q.filter(pl.col('QID1').is_in(age))
|
|
|
|
self.filter_gender = gender
|
|
if gender is not None:
|
|
q = q.filter(pl.col('QID2').is_in(gender))
|
|
|
|
self.filter_consumer = consumer
|
|
if consumer is not None:
|
|
q = q.filter(pl.col('Consumer').is_in(consumer))
|
|
|
|
self.filter_ethnicity = ethnicity
|
|
if ethnicity is not None:
|
|
q = q.filter(pl.col('QID3').is_in(ethnicity))
|
|
|
|
self.filter_income = income
|
|
if income is not None:
|
|
q = q.filter(pl.col('QID15').is_in(income))
|
|
|
|
self.data_filtered = q
|
|
return self.data_filtered
|
|
|
|
def get_demographics(self, q: pl.LazyFrame) -> Union[pl.LazyFrame, None]:
|
|
"""Extract columns containing the demographics.
|
|
|
|
Renames columns using qid_descr_map if provided.
|
|
"""
|
|
QIDs = ['QID1', 'QID2', 'QID3', 'QID4', 'QID7', 'QID13', 'QID14', 'QID15', 'QID16', 'QID17', 'Consumer']
|
|
return self._get_subset(q, QIDs), None
|
|
|
|
|
|
def get_top_8_traits(self, q: pl.LazyFrame) -> Union[pl.LazyFrame, None]:
|
|
"""Extract columns containing the top 8 characteristics are most important for this Chase virtual assistant to have.
|
|
|
|
Returns subquery that can be chained with other polars queries.
|
|
"""
|
|
QIDs = ['QID25']
|
|
return self._get_subset(q, QIDs, rename_cols=False).rename({'QID25': 'Top_8_Traits'}), None
|
|
|
|
|
|
|
|
def get_top_3_traits(self, q: pl.LazyFrame) -> Union[pl.LazyFrame, None]:
|
|
"""Extract columns containing the top 3 characteristics that the Chase virtual assistant should prioritize.
|
|
|
|
Returns subquery that can be chained with other polars queries.
|
|
"""
|
|
QIDs = ['QID26_0_GROUP']
|
|
return self._get_subset(q, QIDs, rename_cols=False).rename({'QID26_0_GROUP': 'Top_3_Traits'}), None
|
|
|
|
|
|
def get_character_ranking(self, q: pl.LazyFrame) -> Union[pl.LazyFrame, None]:
|
|
"""Extract columns containing the ranking of characteristics for the Chase virtual assistant.
|
|
|
|
Returns subquery that can be chained with other polars queries.
|
|
"""
|
|
|
|
|
|
# Requires QSF to map "Character Ranking_2" to the actual character
|
|
cfg = self._get_qsf_question_by_QID('QID27')['Payload']
|
|
|
|
|
|
QIDs_map = {f'QID27_{v}': cfg['VariableNaming'][k] for k,v in cfg['RecodeValues'].items()}
|
|
QIDs_rename = {qid: f'Character_Ranking_{QIDs_map[qid].replace(" ", "_")}' for qid in QIDs_map}
|
|
|
|
return self._get_subset(q, list(QIDs_rename.keys()), rename_cols=False).rename(QIDs_rename), None
|
|
|
|
|
|
def get_18_8_3(self, q: pl.LazyFrame) -> Union[pl.LazyFrame, None]:
|
|
"""Extract columns containing the 18-8-3 feedback for the Chase virtual assistant.
|
|
|
|
Returns subquery that can be chained with other polars queries.
|
|
"""
|
|
QIDs = ['QID29', 'QID101', 'QID36_0_GROUP']
|
|
|
|
rename_dict = {
|
|
'QID29': '18-8_Set-A',
|
|
'QID101': '18-8_Set-B',
|
|
'QID36_0_GROUP': '3_Ranked'
|
|
}
|
|
|
|
subset = self._get_subset(q, QIDs, rename_cols=False).rename(rename_dict)
|
|
|
|
# Combine 18-8 Set A and Set B into single column
|
|
subset = subset.with_columns(
|
|
pl.coalesce(['18-8_Set-A', '18-8_Set-B']).alias('8_Combined')
|
|
)
|
|
# Change order of columns
|
|
subset = subset.select(['_recordId', '18-8_Set-A', '18-8_Set-B', '8_Combined', '3_Ranked'])
|
|
|
|
return subset, None
|
|
|
|
|
|
def get_voice_scale_1_10(self, q: pl.LazyFrame, drop_cols=['Voice_Scale_1_10__V46']) -> Union[pl.LazyFrame, None]:
|
|
"""Extract columns containing the Voice Scale 1-10 ratings for the Chase virtual assistant.
|
|
|
|
Returns subquery that can be chained with other polars queries.
|
|
|
|
Drops scores for V46 as it was improperly configured in the survey and thus did not show up for respondents.
|
|
"""
|
|
|
|
QIDs_map = {}
|
|
|
|
for qid, val in self.qid_descr_map.items():
|
|
if 'Scale 1-10_1' in val['QName']:
|
|
# Convert "Voice 16 Scale 1-10_1" to "Scale_1_10__Voice_16"
|
|
QIDs_map[qid] = f"Voice_Scale_1_10__V{val['QName'].split()[1]}"
|
|
|
|
for col in drop_cols:
|
|
if col in QIDs_map.values():
|
|
# remove from QIDs_map
|
|
qid_to_remove = [k for k,v in QIDs_map.items() if v == col][0]
|
|
del QIDs_map[qid_to_remove]
|
|
|
|
return self._get_subset(q, list(QIDs_map.keys()), rename_cols=False).rename(QIDs_map), None
|
|
|
|
|
|
|
|
def get_ss_green_blue(self, q: pl.LazyFrame) -> Union[pl.LazyFrame, dict]:
|
|
"""Extract columns containing the SS Green/Blue ratings for the Chase virtual assistant.
|
|
|
|
Returns subquery that can be chained with other polars queries.
|
|
"""
|
|
|
|
cfg = self._get_qsf_question_by_QID('QID35')['Payload']
|
|
|
|
QIDs_map = {}
|
|
choices_map = {}
|
|
for qid, val in self.qid_descr_map.items():
|
|
if 'SS Green-Blue' in val['QName']:
|
|
|
|
cfg = self._get_qsf_question_by_QID(qid.split('_')[0])['Payload']
|
|
|
|
# ie: "V14 SS Green-Blue_1"
|
|
qname_parts = val['QName'].split()
|
|
voice = qname_parts[0]
|
|
trait_num = qname_parts[-1].split('_')[-1]
|
|
|
|
QIDs_map[qid] = f"SS_Green_Blue__{voice}__Choice_{trait_num}"
|
|
|
|
choices_map[f"SS_Green_Blue__{voice}__Choice_{trait_num}"] = cfg['Choices'][trait_num]['Display']
|
|
|
|
return self._get_subset(q, list(QIDs_map.keys()), rename_cols=False).rename(QIDs_map), choices_map
|
|
|
|
|
|
def get_top_3_voices(self, q: pl.LazyFrame) -> Union[pl.LazyFrame, None]:
|
|
"""Extract columns containing the top 3 voice choices for the Chase virtual assistant.
|
|
|
|
Returns subquery that can be chained with other polars queries.
|
|
"""
|
|
|
|
QIDs_map = {}
|
|
|
|
cfg36 = self._get_qsf_question_by_QID('QID36')['Payload']
|
|
choice_voice_map = {k: extract_voice_label(v['Display']) for k,v in cfg36['Choices'].items()}
|
|
|
|
|
|
for qid, val in self.qid_descr_map.items():
|
|
if 'Rank Top 3 Voices' in val['QName']:
|
|
|
|
cfg = self._get_qsf_question_by_QID(qid.split('_')[0])['Payload']
|
|
voice_num = val['QName'].split('_')[-1]
|
|
|
|
# Validate that the DynamicChoices Locator is as expected
|
|
if cfg['DynamicChoices']['Locator'] != r"q://QID36/ChoiceGroup/SelectedChoicesInGroup/1":
|
|
raise ValueError(f"Unexpected DynamicChoices Locator for QID '{qid}': {cfg['DynamicChoices']['Locator']}")
|
|
|
|
# extract the voice from the QID36 config
|
|
voice = choice_voice_map[voice_num]
|
|
|
|
# Convert "Top 3 Voices_1" to "Top_3_Voices__V14"
|
|
QIDs_map[qid] = f"Top_3_Voices_ranking__{voice}"
|
|
|
|
return self._get_subset(q, list(QIDs_map.keys()), rename_cols=False).rename(QIDs_map), None
|
|
|
|
|
|
def get_ss_orange_red(self, q: pl.LazyFrame) -> Union[pl.LazyFrame, dict]:
|
|
"""Extract columns containing the SS Orange/Red ratings for the Chase virtual assistant.
|
|
|
|
Returns subquery that can be chained with other polars queries.
|
|
"""
|
|
|
|
cfg = self._get_qsf_question_by_QID('QID40')['Payload']
|
|
|
|
QIDs_map = {}
|
|
choices_map = {}
|
|
for qid, val in self.qid_descr_map.items():
|
|
if 'SS Orange-Red' in val['QName']:
|
|
|
|
cfg = self._get_qsf_question_by_QID(qid.split('_')[0])['Payload']
|
|
|
|
# ie: "V14 SS Orange-Red_1"
|
|
qname_parts = val['QName'].split()
|
|
voice = qname_parts[0]
|
|
trait_num = qname_parts[-1].split('_')[-1]
|
|
|
|
QIDs_map[qid] = f"SS_Orange_Red__{voice}__Choice_{trait_num}"
|
|
|
|
choices_map[f"SS_Orange_Red__{voice}__Choice_{trait_num}"] = cfg['Choices'][trait_num]['Display']
|
|
|
|
return self._get_subset(q, list(QIDs_map.keys()), rename_cols=False).rename(QIDs_map), choices_map
|
|
|
|
|
|
def get_character_refine(self, q: pl.LazyFrame) -> Union[pl.LazyFrame, None]:
|
|
"""Extract columns containing the character refine feedback for the Chase virtual assistant.
|
|
|
|
Returns subquery that can be chained with other polars queries.
|
|
"""
|
|
QIDs = ['QID44', 'QID97', 'QID95', 'QID96']
|
|
|
|
return self._get_subset(q, QIDs, rename_cols=True), None
|
|
|
|
|
|
def process_speaking_style_data(
|
|
df: Union[pl.LazyFrame, pl.DataFrame],
|
|
trait_map: dict[str, str]
|
|
) -> pl.DataFrame:
|
|
"""
|
|
Process speaking style columns from wide to long format and map trait descriptions.
|
|
|
|
Parses columns with format: SS_{StyleGroup}__{Voice}__{ChoiceID}
|
|
Example: SS_Orange_Red__V14__Choice_1
|
|
|
|
Parameters
|
|
----------
|
|
df : pl.LazyFrame or pl.DataFrame
|
|
Input dataframe containing SS_* columns.
|
|
trait_map : dict
|
|
Dictionary mapping column names to trait descriptions.
|
|
Keys should be full column names like "SS_Orange_Red__V14__Choice_1".
|
|
|
|
Returns
|
|
-------
|
|
pl.DataFrame
|
|
Long-format dataframe with columns:
|
|
_recordId, Voice, Style_Group, Choice_ID, Description, Score, Left_Anchor, Right_Anchor
|
|
"""
|
|
# Normalize input to LazyFrame
|
|
lf = df.lazy() if isinstance(df, pl.DataFrame) else df
|
|
|
|
# 1. Melt SS_ columns
|
|
melted = lf.melt(
|
|
id_vars=["_recordId"],
|
|
value_vars=pl.col("^SS_.*$"),
|
|
variable_name="full_col_name",
|
|
value_name="score"
|
|
)
|
|
|
|
# 2. Extract components from column name
|
|
# Regex captures: Style_Group (e.g. SS_Orange_Red), Voice (e.g. V14), Choice_ID (e.g. Choice_1)
|
|
pattern = r"^(?P<Style_Group>SS_.+?)__(?P<Voice>.+?)__(?P<Choice_ID>Choice_\d+)$"
|
|
|
|
processed = melted.with_columns(
|
|
pl.col("full_col_name").str.extract_groups(pattern)
|
|
).unnest("full_col_name")
|
|
|
|
# 3. Create Mapping Lookup from the provided dictionary
|
|
# We map (Style_Group, Choice_ID) -> Description
|
|
mapping_data = []
|
|
seen = set()
|
|
|
|
for col_name, desc in trait_map.items():
|
|
match = re.match(pattern, col_name)
|
|
if match:
|
|
groups = match.groupdict()
|
|
key = (groups["Style_Group"], groups["Choice_ID"])
|
|
|
|
if key not in seen:
|
|
# Parse description into anchors if possible (Left : Right)
|
|
parts = desc.split(':')
|
|
left_anchor = parts[0].strip() if len(parts) > 0 else ""
|
|
right_anchor = parts[1].strip() if len(parts) > 1 else ""
|
|
|
|
mapping_data.append({
|
|
"Style_Group": groups["Style_Group"],
|
|
"Choice_ID": groups["Choice_ID"],
|
|
"Description": desc,
|
|
"Left_Anchor": left_anchor,
|
|
"Right_Anchor": right_anchor
|
|
})
|
|
seen.add(key)
|
|
|
|
if not mapping_data:
|
|
return processed.collect()
|
|
|
|
mapping_lf = pl.LazyFrame(mapping_data)
|
|
|
|
# 4. Join Data with Mapping
|
|
result = processed.join(
|
|
mapping_lf,
|
|
on=["Style_Group", "Choice_ID"],
|
|
how="left"
|
|
)
|
|
|
|
# 5. Cast score to Int
|
|
result = result.with_columns(
|
|
pl.col("score").cast(pl.Int64, strict=False)
|
|
)
|
|
|
|
return result.collect()
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def process_voice_scale_data(
|
|
df: Union[pl.LazyFrame, pl.DataFrame]
|
|
) -> pl.DataFrame:
|
|
"""
|
|
Process Voice Scale columns from wide to long format.
|
|
|
|
Parses columns with format: Voice_Scale_1_10__V{Voice}
|
|
Example: Voice_Scale_1_10__V14
|
|
|
|
Returns
|
|
-------
|
|
pl.DataFrame
|
|
Long-format dataframe with columns:
|
|
_recordId, Voice, Voice_Scale_Score
|
|
"""
|
|
lf = df.lazy() if isinstance(df, pl.DataFrame) else df
|
|
|
|
# Melt
|
|
melted = lf.melt(
|
|
id_vars=["_recordId"],
|
|
value_vars=pl.col("^Voice_Scale_1_10__V.*$"),
|
|
variable_name="full_col_name",
|
|
value_name="Voice_Scale_Score"
|
|
)
|
|
|
|
# Extract Voice
|
|
processed = melted.with_columns(
|
|
pl.col("full_col_name").str.extract(r"V(\d+)", 1).alias("Voice_Num")
|
|
).with_columns(
|
|
("V" + pl.col("Voice_Num")).alias("Voice")
|
|
)
|
|
|
|
# Keep Score as Float (original data is f64)
|
|
result = processed.select([
|
|
"_recordId",
|
|
"Voice",
|
|
pl.col("Voice_Scale_Score").cast(pl.Float64, strict=False)
|
|
])
|
|
|
|
return result.collect()
|
|
|
|
def join_voice_and_style_data(
|
|
processed_style_data: pl.DataFrame,
|
|
processed_voice_data: pl.DataFrame
|
|
) -> pl.DataFrame:
|
|
"""
|
|
Joins processed Speaking Style data with Voice Scale 1-10 data.
|
|
|
|
Parameters
|
|
----------
|
|
processed_style_data : pl.DataFrame
|
|
Result of process_speaking_style_data
|
|
processed_voice_data : pl.DataFrame
|
|
Result of process_voice_scale_data
|
|
|
|
Returns
|
|
-------
|
|
pl.DataFrame
|
|
Merged dataframe with columns from both, joined on _recordId and Voice.
|
|
"""
|
|
|
|
return processed_style_data.join(
|
|
processed_voice_data,
|
|
on=["_recordId", "Voice"],
|
|
how="inner"
|
|
)
|
|
|
|
def process_voice_ranking_data(
|
|
df: Union[pl.LazyFrame, pl.DataFrame]
|
|
) -> pl.DataFrame:
|
|
"""
|
|
Process Voice Ranking columns from wide to long format and convert ranks to points.
|
|
|
|
Parses columns with format: Top_3_Voices_ranking__V{Voice}
|
|
Converts ranks to points: 1st place = 3 pts, 2nd place = 2 pts, 3rd place = 1 pt
|
|
|
|
Returns
|
|
-------
|
|
pl.DataFrame
|
|
Long-format dataframe with columns:
|
|
_recordId, Voice, Ranking_Points
|
|
"""
|
|
lf = df.lazy() if isinstance(df, pl.DataFrame) else df
|
|
|
|
# Melt
|
|
melted = lf.melt(
|
|
id_vars=["_recordId"],
|
|
value_vars=pl.col("^Top_3_Voices_ranking__V.*$"),
|
|
variable_name="full_col_name",
|
|
value_name="rank"
|
|
)
|
|
|
|
# Extract Voice
|
|
processed = melted.with_columns(
|
|
pl.col("full_col_name").str.extract(r"V(\d+)", 1).alias("Voice_Num")
|
|
).with_columns(
|
|
("V" + pl.col("Voice_Num")).alias("Voice")
|
|
)
|
|
|
|
# Convert rank to points: 1st=3, 2nd=2, 3rd=1, null=0 (not ranked)
|
|
# Rank values are 1, 2, 3 for position in top 3
|
|
result = processed.with_columns(
|
|
pl.when(pl.col("rank") == 1).then(3)
|
|
.when(pl.col("rank") == 2).then(2)
|
|
.when(pl.col("rank") == 3).then(1)
|
|
.otherwise(0)
|
|
.alias("Ranking_Points")
|
|
).select([
|
|
"_recordId",
|
|
"Voice",
|
|
"Ranking_Points"
|
|
])
|
|
|
|
return result.collect()
|