import polars as pl from pathlib import Path import pandas as pd from typing import Union import json import re import hashlib import os from plots import JPMCPlotsMixin from pptx import Presentation from pptx.enum.shapes import MSO_SHAPE_TYPE def image_alt_text_generator(fpath): """convert image file path to alt text """ if not isinstance(fpath, Path): fpath = Path(fpath) fparts = fpath.parts assert fparts[0] == 'figures', "Image file path must start with 'figures'" return Path('/'.join(fparts[2:])).as_posix() def pptx_replace_named_image(presentation_path, target_tag, new_image_path, save_path): """ Finds and replaces specific images in a PowerPoint presentation while preserving their original position, size, and aspect ratio. This function performs a 'surgical' replacement: it records the coordinates of the existing image, removes it from the slide's XML, and inserts a new image into the exact same bounding box. It identifies the target image by searching for a specific string within the Shape Name (Selection Pane) or Alt Text. Args: presentation_path (str): The file path to the source .pptx file. target_tag (str): The unique identifier to look for (e.g., 'HERO_IMAGE'). This is case-sensitive and checks both the shape name and alt text. new_image_path (str): The file path to the new image (PNG, JPG, etc.). save_path (str): The file path where the modified presentation will be saved. Returns: None: Saves the file directly to the provided save_path. Raises: FileNotFoundError: If the source presentation or new image is not found. PermissionError: If the save_path is currently open or locked. """ prs = Presentation(presentation_path) for i, slide in enumerate(prs.slides): # Iterate over a list copy of shapes to safely modify the slide during iteration print(f"Processing Slide {i + 1}...") print(f"Total Shapes: {len(slide.shapes)} shapes") for shape in list(slide.shapes): print(f"Checking shape: {shape.name} of type {shape.shape_type}...") shape_name = shape.name or "" alt_text = "" # More robust strategy: Check for alt text in ANY valid element property # This allows replacing Pictures, Placeholders, GraphicFrames, etc. try: # Check for common property names used by python-pptx elements to store non-visual props # nvPicPr (Picture), nvSpPr (Shape/Placeholder), nvGrpSpPr (Group), # nvGraphicFramePr (GraphicFrame), nvCxnSpPr (Connector) nvPr = None for attr in ['nvPicPr', 'nvSpPr', 'nvGrpSpPr', 'nvGraphicFramePr', 'nvCxnSpPr']: if hasattr(shape._element, attr): nvPr = getattr(shape._element, attr) break if nvPr is not None and hasattr(nvPr, 'cNvPr'): alt_text = nvPr.cNvPr.get("descr", "") except Exception: pass print(f"Alt Text for shape '{shape_name}': {alt_text}") if target_tag in shape_name or target_tag in alt_text: print(f"Found it! Replacing {shape_name}...") try: # Record coordinates left, top, width, height = shape.left, shape.top, shape.width, shape.height # Remove old shape old_element = shape._element old_element.getparent().remove(old_element) # Add new image at the same spot slide.shapes.add_picture(str(new_image_path), left, top, width, height) except AttributeError: print(f"Could not replace {shape_name} - might be missing dimensions.") else: print(f"Skipping shape '{shape_name}' with alt text '{alt_text}'") prs.save(save_path) print(f"Successfully saved to {save_path}") def _calculate_file_sha1(file_path: Union[str, Path]) -> str: """Calculate SHA1 hash of a file.""" sha1 = hashlib.sha1() with open(file_path, 'rb') as f: while True: data = f.read(65536) if not data: break sha1.update(data) return sha1.hexdigest() def _build_image_hash_map(root_dir: Union[str, Path]) -> dict: """ Recursively walk the directory and build a map of SHA1 hashes to file paths. Only includes common image extensions. """ hash_map = {} valid_extensions = {'.png', '.jpg', '.jpeg', '.tiff', '.bmp', '.gif'} root = Path(root_dir) print(f"Building image hash map from {root}...") count = 0 for root_path, dirs, files in os.walk(root): for file in files: file_path = Path(root_path) / file if file_path.suffix.lower() in valid_extensions: try: file_sha1 = _calculate_file_sha1(file_path) # We store the absolute path for reference, but we might just need the path relative to project for alt text hash_map[file_sha1] = file_path count += 1 except Exception as e: print(f"Error hashing {file_path}: {e}") print(f"Indexed {count} images.") return hash_map def _iter_picture_shapes(shapes): """ Recursively iterate over shapes and yield those that are pictures (have an 'image' property), diving into groups. """ for shape in shapes: # Check groups recursively if shape.shape_type == MSO_SHAPE_TYPE.GROUP: yield from _iter_picture_shapes(shape.shapes) continue # Check if shape has image property (Pictures, Placeholders with images) if hasattr(shape, 'image'): yield shape def update_ppt_alt_text(ppt_path: Union[str, Path], image_source_dir: Union[str, Path], output_path: Union[str, Path] = None): """ Updates the alt text of images in a PowerPoint presentation by matching their content (SHA1 hash) with images in a source directory. Args: ppt_path (str/Path): Path to the PowerPoint file. image_source_dir (str/Path): Directory containing source images to match against. output_path (str/Path, optional): Path to save the updated presentation. If None, overwrites the input file. """ if output_path is None: output_path = ppt_path # 1. Build lookup map of {sha1: file_path} from the source directory image_hash_map = _build_image_hash_map(image_source_dir) # 2. Open Presentation try: prs = Presentation(ppt_path) except Exception as e: print(f"Error opening presentation {ppt_path}: {e}") return updates_count = 0 slides = list(prs.slides) total_slides = len(slides) print(f"Processing {total_slides} slides...") for i, slide in enumerate(slides): # Use recursive iterator to find all pictures including those in groups/placeholders picture_shapes = list(_iter_picture_shapes(slide.shapes)) for shape in picture_shapes: try: # shape.image.sha1 returns the SHA1 hash of the image blob current_sha1 = shape.image.sha1 if current_sha1 in image_hash_map: original_path = image_hash_map[current_sha1] # Generate Alt Text try: # Prepare path for generator. # Try to relativize to CWD if capable pass_path = original_path try: pass_path = original_path.relative_to(Path.cwd()) except ValueError: pass new_alt_text = image_alt_text_generator(pass_path) # Check existing alt text to avoid redundant updates/log them # Accessing alt text via cNvPr # Note: Different shape types might store non-visual props differently # Picture: nvPicPr.cNvPr # GraphicFrame: nvGraphicFramePr.cNvPr # Group: nvGrpSpPr.cNvPr # Shape/Placeholder: nvSpPr.cNvPr nvPr = None for attr in ['nvPicPr', 'nvSpPr', 'nvGrpSpPr', 'nvGraphicFramePr', 'nvCxnSpPr']: if hasattr(shape._element, attr): nvPr = getattr(shape._element, attr) break if nvPr and hasattr(nvPr, 'cNvPr'): cNvPr = nvPr.cNvPr existing_alt_text = cNvPr.get("descr", "") if existing_alt_text != new_alt_text: print(f"Slide {i+1}: Updating alt text for image matches '{pass_path}'") print(f" Old: '{existing_alt_text}' -> New: '{new_alt_text}'") cNvPr.set("descr", new_alt_text) updates_count += 1 else: print(f"Could not find cNvPr for shape on slide {i+1}") except AssertionError as e: print(f"Skipping match for {original_path} due to generator error: {e}") except Exception as e: print(f"Error updating alt text for {original_path}: {e}") except AttributeError: continue except Exception as e: print(f"Error processing shape on slide {i+1}: {e}") if updates_count > 0: prs.save(output_path) print(f"Saved updated presentation to {output_path} with {updates_count} updates.") else: print("No images matched or required updates.") def extract_voice_label(html_str: str) -> str: """ Extract voice label from HTML string and convert to short format. Parameters: html_str (str): HTML string containing voice label in format "Voice N" Returns: str: Voice label in format "VN" (e.g., "V14") Example: >>> extract_voice_label('Voice 14
...') 'V14' """ match = re.search(r'Voice (\d+)', html_str) return f"V{match.group(1)}" if match else None def extract_qid(val): """Extracts the 'ImportId' from a string representation of a dictionary.""" if isinstance(val, str) and val.startswith('{') and val.endswith('}'): val = eval(val) return val['ImportId'] def combine_exclusive_columns(df: pl.DataFrame, id_col: str = "_recordId", target_col_name: str = "combined_value") -> pl.DataFrame: """ Combines all columns except id_col into a single column. Raises ValueError if more than one column is populated in a single row. """ merge_cols = [c for c in df.columns if c != id_col] # Validate: count non-nulls horizontally row_counts = df.select( pl.sum_horizontal(pl.col(merge_cols).is_not_null()) ).to_series() if (row_counts > 1).any(): raise ValueError("Invalid Data: Multiple columns populated for a single record row.") # Merge columns using coalesce return df.select([ pl.col(id_col), pl.coalesce(merge_cols).alias(target_col_name) ]) def calculate_weighted_ranking_scores(df: pl.LazyFrame) -> pl.DataFrame: """ Calculate weighted scores for character or voice rankings. Points system: 1st place = 3 pts, 2nd place = 2 pts, 3rd place = 1 pt. Parameters ---------- df : pl.DataFrame DataFrame containing character/ voice ranking columns. Returns ------- pl.DataFrame DataFrame with columns 'Character' and 'Weighted Score', sorted by score. """ if isinstance(df, pl.LazyFrame): df = df.collect() scores = [] # Identify ranking columns (assume all columns except _recordId) ranking_cols = [c for c in df.columns if c != '_recordId'] for col in ranking_cols: # Calculate score: # (Count of Rank 1 * 3) + (Count of Rank 2 * 2) + (Count of Rank 3 * 1) r1_count = df.filter(pl.col(col) == 1).height r2_count = df.filter(pl.col(col) == 2).height r3_count = df.filter(pl.col(col) == 3).height weighted_score = (r1_count * 3) + (r2_count * 2) + (r3_count * 1) # Clean name clean_name = col.replace('Character_Ranking_', '').replace('Top_3_Voices_ranking__', '').replace('_', ' ').strip() scores.append({ 'Character': clean_name, 'Weighted Score': weighted_score }) return pl.DataFrame(scores).sort('Weighted Score', descending=True) class JPMCSurvey(JPMCPlotsMixin): """Class to handle JPMorgan Chase survey data.""" def __init__(self, data_path: Union[str, Path], qsf_path: Union[str, Path]): if isinstance(data_path, str): data_path = Path(data_path) if isinstance(qsf_path, str): qsf_path = Path(qsf_path) self.data_filepath = data_path self.qsf_filepath = qsf_path self.qid_descr_map = self._extract_qid_descr_map() self.qsf:dict = self._load_qsf() # get export directory name for saving figures ie if data_path='data/exports/OneDrive_2026-01-21/...' should be 'figures/OneDrive_2026-01-21' self.fig_save_dir = Path('figures') / self.data_filepath.parts[2] if not self.fig_save_dir.exists(): self.fig_save_dir.mkdir(parents=True, exist_ok=True) self.data_filtered = None self.plot_height = 500 self.plot_width = 1000 # Filter values self.filter_age:list = None self.filter_gender:list = None self.filter_consumer:list = None self.filter_ethnicity:list = None self.filter_income:list = None def _extract_qid_descr_map(self) -> dict: """Extract mapping of Qualtrics ImportID to Question Description from results file.""" if '1_1-16-2026' in self.data_filepath.as_posix(): df_questions = pd.read_csv(self.data_filepath, nrows=1) df_questions return df_questions.iloc[0].to_dict() else: # First row contains Qualtrics Editor question names (ie 'B_VOICE SEL. 18-8') # Second row which contains the question content # Third row contains the Export Metadata (ie '{"ImportId":"startDate","timeZone":"America/Denver"}') df_questions = pd.read_csv(self.data_filepath, nrows=2) # transpose df_questions df_questions = df_questions.T.reset_index() df_questions.columns = ['QName', 'Description', 'export_metadata'] df_questions['ImportID'] = df_questions['export_metadata'].apply(extract_qid) df_questions = df_questions[['ImportID', 'QName', 'Description']] # return dict as {ImportID: [QName, Description]} return df_questions.set_index('ImportID')[['QName', 'Description']].T.to_dict() def _load_qsf(self) -> dict: """Load QSF file to extract question metadata if needed.""" with open(self.qsf_filepath, 'r', encoding='utf-8') as f: qsf_data = json.load(f) return qsf_data def _get_qsf_question_by_QID(self, QID: str) -> dict: """Get question metadata from QSF using the Question ID.""" q_elem = [elem for elem in self.qsf['SurveyElements'] if elem['PrimaryAttribute'] == QID] if len(q_elem) == 0: raise ValueError(f"SurveyElement with 'PrimaryAttribute': '{QID}' not found in QSF.") if len(q_elem) > 1: raise ValueError(f"Multiple SurveyElements with 'PrimaryAttribute': '{QID}' found in QSF: \n{q_elem}") return q_elem[0] def load_data(self) -> pl.LazyFrame: """ Load CSV where column headers are in row 3 as dict strings with ImportId. The 3rd row contains metadata like '{"ImportId":"startDate","timeZone":"America/Denver"}'. This function extracts the ImportId from each column and uses it as the column name. Parameters: file_path (Path): Path to the CSV file to load. Returns: pl.LazyFrame: Polars LazyFrame with ImportId as column names. """ if '1_1-16-2026' in self.data_filepath.as_posix(): raise NotImplementedError("This method does not support the '1_1-16-2026' export format.") # Read the 3rd row (index 2) which contains the metadata dictionaries # Use header=None to get raw values instead of treating them as column names df_meta = pd.read_csv(self.data_filepath, nrows=1, skiprows=2, header=None) # Extract ImportIds from each column value in this row new_columns = [extract_qid(val) for val in df_meta.iloc[0]] # Now read the actual data starting from row 4 (skip first 3 rows) df = pl.read_csv(self.data_filepath, skip_rows=3) # Rename columns with the extracted ImportIds df.columns = new_columns # Store unique values for filters (ignoring nulls) to detect "all selected" state self.options_age = sorted(df['QID1'].drop_nulls().unique().to_list()) if 'QID1' in df.columns else [] self.options_gender = sorted(df['QID2'].drop_nulls().unique().to_list()) if 'QID2' in df.columns else [] self.options_consumer = sorted(df['Consumer'].drop_nulls().unique().to_list()) if 'Consumer' in df.columns else [] self.options_ethnicity = sorted(df['QID3'].drop_nulls().unique().to_list()) if 'QID3' in df.columns else [] self.options_income = sorted(df['QID15'].drop_nulls().unique().to_list()) if 'QID15' in df.columns else [] return df.lazy() def _get_subset(self, q: pl.LazyFrame, QIDs, rename_cols=True, include_record_id=True) -> pl.LazyFrame: """Extract subset of data based on specific questions.""" if include_record_id and '_recordId' not in QIDs: QIDs = ['_recordId'] + QIDs if not rename_cols: return q.select(QIDs) rename_dict = {qid: self.qid_descr_map[qid]['QName'] for qid in QIDs if qid in self.qid_descr_map and qid != '_recordId'} return q.select(QIDs).rename(rename_dict) def filter_data(self, q: pl.LazyFrame, age:list=None, gender:list=None, consumer:list=None, ethnicity:list=None, income:list=None) -> pl.LazyFrame: """Filter data based on provided parameters Possible parameters: - age: list of age groups to include - gender: list - consumer: list - ethnicity: list - income: list Also saves the result to self.data_filtered. """ # Apply filters self.filter_age = age if age is not None: q = q.filter(pl.col('QID1').is_in(age)) self.filter_gender = gender if gender is not None: q = q.filter(pl.col('QID2').is_in(gender)) self.filter_consumer = consumer if consumer is not None: q = q.filter(pl.col('Consumer').is_in(consumer)) self.filter_ethnicity = ethnicity if ethnicity is not None: q = q.filter(pl.col('QID3').is_in(ethnicity)) self.filter_income = income if income is not None: q = q.filter(pl.col('QID15').is_in(income)) self.data_filtered = q return self.data_filtered def get_demographics(self, q: pl.LazyFrame) -> Union[pl.LazyFrame, None]: """Extract columns containing the demographics. Renames columns using qid_descr_map if provided. """ QIDs = ['QID1', 'QID2', 'QID3', 'QID4', 'QID13', 'QID14', 'QID15', 'QID16', 'QID17', 'Consumer'] return self._get_subset(q, QIDs), None def get_top_8_traits(self, q: pl.LazyFrame) -> Union[pl.LazyFrame, None]: """Extract columns containing the top 8 characteristics are most important for this Chase virtual assistant to have. Returns subquery that can be chained with other polars queries. """ QIDs = ['QID25'] return self._get_subset(q, QIDs, rename_cols=False).rename({'QID25': 'Top_8_Traits'}), None def get_top_3_traits(self, q: pl.LazyFrame) -> Union[pl.LazyFrame, None]: """Extract columns containing the top 3 characteristics that the Chase virtual assistant should prioritize. Returns subquery that can be chained with other polars queries. """ QIDs = ['QID26_0_GROUP'] return self._get_subset(q, QIDs, rename_cols=False).rename({'QID26_0_GROUP': 'Top_3_Traits'}), None def get_character_ranking(self, q: pl.LazyFrame) -> Union[pl.LazyFrame, None]: """Extract columns containing the ranking of characteristics for the Chase virtual assistant. Returns subquery that can be chained with other polars queries. """ # Requires QSF to map "Character Ranking_2" to the actual character cfg = self._get_qsf_question_by_QID('QID27')['Payload'] QIDs_map = {f'QID27_{v}': cfg['VariableNaming'][k] for k,v in cfg['RecodeValues'].items()} QIDs_rename = {qid: f'Character_Ranking_{QIDs_map[qid].replace(" ", "_")}' for qid in QIDs_map} return self._get_subset(q, list(QIDs_rename.keys()), rename_cols=False).rename(QIDs_rename), None def get_18_8_3(self, q: pl.LazyFrame) -> Union[pl.LazyFrame, None]: """Extract columns containing the 18-8-3 feedback for the Chase virtual assistant. Returns subquery that can be chained with other polars queries. """ QIDs = ['QID29', 'QID101', 'QID36_0_GROUP'] rename_dict = { 'QID29': '18-8_Set-A', 'QID101': '18-8_Set-B', 'QID36_0_GROUP': '3_Ranked' } subset = self._get_subset(q, QIDs, rename_cols=False).rename(rename_dict) # Combine 18-8 Set A and Set B into single column subset = subset.with_columns( pl.coalesce(['18-8_Set-A', '18-8_Set-B']).alias('8_Combined') ) # Change order of columns subset = subset.select(['_recordId', '18-8_Set-A', '18-8_Set-B', '8_Combined', '3_Ranked']) return subset, None def get_voice_scale_1_10(self, q: pl.LazyFrame) -> Union[pl.LazyFrame, None]: """Extract columns containing the Voice Scale 1-10 ratings for the Chase virtual assistant. Returns subquery that can be chained with other polars queries. """ QIDs_map = {} for qid, val in self.qid_descr_map.items(): if 'Scale 1-10_1' in val['QName']: # Convert "Voice 16 Scale 1-10_1" to "Scale_1_10__Voice_16" QIDs_map[qid] = f"Voice_Scale_1_10__V{val['QName'].split()[1]}" return self._get_subset(q, list(QIDs_map.keys()), rename_cols=False).rename(QIDs_map), None def get_ss_green_blue(self, q: pl.LazyFrame) -> Union[pl.LazyFrame, dict]: """Extract columns containing the SS Green/Blue ratings for the Chase virtual assistant. Returns subquery that can be chained with other polars queries. """ cfg = self._get_qsf_question_by_QID('QID35')['Payload'] QIDs_map = {} choices_map = {} for qid, val in self.qid_descr_map.items(): if 'SS Green-Blue' in val['QName']: cfg = self._get_qsf_question_by_QID(qid.split('_')[0])['Payload'] # ie: "V14 SS Green-Blue_1" qname_parts = val['QName'].split() voice = qname_parts[0] trait_num = qname_parts[-1].split('_')[-1] QIDs_map[qid] = f"SS_Green_Blue__{voice}__Choice_{trait_num}" choices_map[f"SS_Green_Blue__{voice}__Choice_{trait_num}"] = cfg['Choices'][trait_num]['Display'] return self._get_subset(q, list(QIDs_map.keys()), rename_cols=False).rename(QIDs_map), choices_map def get_top_3_voices(self, q: pl.LazyFrame) -> Union[pl.LazyFrame, None]: """Extract columns containing the top 3 voice choices for the Chase virtual assistant. Returns subquery that can be chained with other polars queries. """ QIDs_map = {} cfg36 = self._get_qsf_question_by_QID('QID36')['Payload'] choice_voice_map = {k: extract_voice_label(v['Display']) for k,v in cfg36['Choices'].items()} for qid, val in self.qid_descr_map.items(): if 'Rank Top 3 Voices' in val['QName']: cfg = self._get_qsf_question_by_QID(qid.split('_')[0])['Payload'] voice_num = val['QName'].split('_')[-1] # Validate that the DynamicChoices Locator is as expected if cfg['DynamicChoices']['Locator'] != r"q://QID36/ChoiceGroup/SelectedChoicesInGroup/1": raise ValueError(f"Unexpected DynamicChoices Locator for QID '{qid}': {cfg['DynamicChoices']['Locator']}") # extract the voice from the QID36 config voice = choice_voice_map[voice_num] # Convert "Top 3 Voices_1" to "Top_3_Voices__V14" QIDs_map[qid] = f"Top_3_Voices_ranking__{voice}" return self._get_subset(q, list(QIDs_map.keys()), rename_cols=False).rename(QIDs_map), None def get_ss_orange_red(self, q: pl.LazyFrame) -> Union[pl.LazyFrame, dict]: """Extract columns containing the SS Orange/Red ratings for the Chase virtual assistant. Returns subquery that can be chained with other polars queries. """ cfg = self._get_qsf_question_by_QID('QID40')['Payload'] QIDs_map = {} choices_map = {} for qid, val in self.qid_descr_map.items(): if 'SS Orange-Red' in val['QName']: cfg = self._get_qsf_question_by_QID(qid.split('_')[0])['Payload'] # ie: "V14 SS Orange-Red_1" qname_parts = val['QName'].split() voice = qname_parts[0] trait_num = qname_parts[-1].split('_')[-1] QIDs_map[qid] = f"SS_Orange_Red__{voice}__Choice_{trait_num}" choices_map[f"SS_Orange_Red__{voice}__Choice_{trait_num}"] = cfg['Choices'][trait_num]['Display'] return self._get_subset(q, list(QIDs_map.keys()), rename_cols=False).rename(QIDs_map), choices_map def get_character_refine(self, q: pl.LazyFrame) -> Union[pl.LazyFrame, None]: """Extract columns containing the character refine feedback for the Chase virtual assistant. Returns subquery that can be chained with other polars queries. """ QIDs = ['QID44', 'QID97', 'QID95', 'QID96'] return self._get_subset(q, QIDs, rename_cols=True), None def process_speaking_style_data( df: Union[pl.LazyFrame, pl.DataFrame], trait_map: dict[str, str] ) -> pl.DataFrame: """ Process speaking style columns from wide to long format and map trait descriptions. Parses columns with format: SS_{StyleGroup}__{Voice}__{ChoiceID} Example: SS_Orange_Red__V14__Choice_1 Parameters ---------- df : pl.LazyFrame or pl.DataFrame Input dataframe containing SS_* columns. trait_map : dict Dictionary mapping column names to trait descriptions. Keys should be full column names like "SS_Orange_Red__V14__Choice_1". Returns ------- pl.DataFrame Long-format dataframe with columns: _recordId, Voice, Style_Group, Choice_ID, Description, Score, Left_Anchor, Right_Anchor """ # Normalize input to LazyFrame lf = df.lazy() if isinstance(df, pl.DataFrame) else df # 1. Melt SS_ columns melted = lf.melt( id_vars=["_recordId"], value_vars=pl.col("^SS_.*$"), variable_name="full_col_name", value_name="score" ) # 2. Extract components from column name # Regex captures: Style_Group (e.g. SS_Orange_Red), Voice (e.g. V14), Choice_ID (e.g. Choice_1) pattern = r"^(?PSS_.+?)__(?P.+?)__(?PChoice_\d+)$" processed = melted.with_columns( pl.col("full_col_name").str.extract_groups(pattern) ).unnest("full_col_name") # 3. Create Mapping Lookup from the provided dictionary # We map (Style_Group, Choice_ID) -> Description mapping_data = [] seen = set() for col_name, desc in trait_map.items(): match = re.match(pattern, col_name) if match: groups = match.groupdict() key = (groups["Style_Group"], groups["Choice_ID"]) if key not in seen: # Parse description into anchors if possible (Left : Right) parts = desc.split(':') left_anchor = parts[0].strip() if len(parts) > 0 else "" right_anchor = parts[1].strip() if len(parts) > 1 else "" mapping_data.append({ "Style_Group": groups["Style_Group"], "Choice_ID": groups["Choice_ID"], "Description": desc, "Left_Anchor": left_anchor, "Right_Anchor": right_anchor }) seen.add(key) if not mapping_data: return processed.collect() mapping_lf = pl.LazyFrame(mapping_data) # 4. Join Data with Mapping result = processed.join( mapping_lf, on=["Style_Group", "Choice_ID"], how="left" ) # 5. Cast score to Int result = result.with_columns( pl.col("score").cast(pl.Int64, strict=False) ) return result.collect() def process_voice_scale_data( df: Union[pl.LazyFrame, pl.DataFrame] ) -> pl.DataFrame: """ Process Voice Scale columns from wide to long format. Parses columns with format: Voice_Scale_1_10__V{Voice} Example: Voice_Scale_1_10__V14 Returns ------- pl.DataFrame Long-format dataframe with columns: _recordId, Voice, Voice_Scale_Score """ lf = df.lazy() if isinstance(df, pl.DataFrame) else df # Melt melted = lf.melt( id_vars=["_recordId"], value_vars=pl.col("^Voice_Scale_1_10__V.*$"), variable_name="full_col_name", value_name="Voice_Scale_Score" ) # Extract Voice processed = melted.with_columns( pl.col("full_col_name").str.extract(r"V(\d+)", 1).alias("Voice_Num") ).with_columns( ("V" + pl.col("Voice_Num")).alias("Voice") ) # Keep Score as Float (original data is f64) result = processed.select([ "_recordId", "Voice", pl.col("Voice_Scale_Score").cast(pl.Float64, strict=False) ]) return result.collect() def join_voice_and_style_data( processed_style_data: pl.DataFrame, processed_voice_data: pl.DataFrame ) -> pl.DataFrame: """ Joins processed Speaking Style data with Voice Scale 1-10 data. Parameters ---------- processed_style_data : pl.DataFrame Result of process_speaking_style_data processed_voice_data : pl.DataFrame Result of process_voice_scale_data Returns ------- pl.DataFrame Merged dataframe with columns from both, joined on _recordId and Voice. """ return processed_style_data.join( processed_voice_data, on=["_recordId", "Voice"], how="inner" ) def process_voice_ranking_data( df: Union[pl.LazyFrame, pl.DataFrame] ) -> pl.DataFrame: """ Process Voice Ranking columns from wide to long format and convert ranks to points. Parses columns with format: Top_3_Voices_ranking__V{Voice} Converts ranks to points: 1st place = 3 pts, 2nd place = 2 pts, 3rd place = 1 pt Returns ------- pl.DataFrame Long-format dataframe with columns: _recordId, Voice, Ranking_Points """ lf = df.lazy() if isinstance(df, pl.DataFrame) else df # Melt melted = lf.melt( id_vars=["_recordId"], value_vars=pl.col("^Top_3_Voices_ranking__V.*$"), variable_name="full_col_name", value_name="rank" ) # Extract Voice processed = melted.with_columns( pl.col("full_col_name").str.extract(r"V(\d+)", 1).alias("Voice_Num") ).with_columns( ("V" + pl.col("Voice_Num")).alias("Voice") ) # Convert rank to points: 1st=3, 2nd=2, 3rd=1, null=0 (not ranked) # Rank values are 1, 2, 3 for position in top 3 result = processed.with_columns( pl.when(pl.col("rank") == 1).then(3) .when(pl.col("rank") == 2).then(2) .when(pl.col("rank") == 3).then(1) .otherwise(0) .alias("Ranking_Points") ).select([ "_recordId", "Voice", "Ranking_Points" ]) return result.collect()