From b8642e9de83d8d38ccfda6d1035d27cf13a5443e Mon Sep 17 00:00:00 2001 From: Luigi Maiorano Date: Thu, 22 Jan 2026 11:59:48 +0100 Subject: [PATCH] move common ingest functions to utils --- 00_qualtrics_validation.py | 99 ++++------------------------------- 01_ingest_qualtrics_export.py | 89 ++++++++++++++++++------------- utils.py | 74 ++++++++++++++++++++++++++ 3 files changed, 136 insertions(+), 126 deletions(-) create mode 100644 utils.py diff --git a/00_qualtrics_validation.py b/00_qualtrics_validation.py index ee64cc8..877b8d4 100644 --- a/00_qualtrics_validation.py +++ b/00_qualtrics_validation.py @@ -11,7 +11,9 @@ def _(): import pandas as pd import plotly as plt from pathlib import Path - return Path, mo, pd, pl + + from utils import extract_qid_descr_map + return Path, extract_qid_descr_map, mo, pd @app.cell @@ -31,35 +33,8 @@ def _(mo): @app.cell -def _(pd, results_file): - if '1_1-16-2026' in results_file.as_posix(): - df_questions = pd.read_csv(results_file, nrows=1) - df_questions - - qid_descr_map = df_questions.iloc[0].to_dict() - qid_descr_map - - else: - # First row contains Qualtrics Editor question names (ie 'B_VOICE SEL. 18-8') - - # Second row which contains the question content - # Third row contains the Export Metadata (ie '{"ImportId":"startDate","timeZone":"America/Denver"}') - df_questions = pd.read_csv(results_file, nrows=1, skiprows=1) - - def extract_qid(val): - if isinstance(val, str) and val.startswith('{') and val.endswith('}'): - val = eval(val) - return val['ImportId'] - - # transpose df_questions - df_questions = df_questions.T.reset_index() - df_questions.columns = ['Description', 'export_metadata'] - df_questions['ImportID'] = df_questions['export_metadata'].apply(extract_qid) - - df_questions = df_questions[['ImportID', 'Description']] - - qid_descr_map = dict(zip(df_questions['ImportID'], df_questions['Description'])) - +def _(extract_qid_descr_map, results_file): + qid_descr_map = extract_qid_descr_map(results_file) qid_descr_map return (qid_descr_map,) @@ -92,7 +67,7 @@ def _(mo): @app.cell def _(Path, pd, validate_df): - validate_record_csv = Path('./data/exports/validation_qid_descr_map.csv') + validate_record_csv = Path('./validation_qid_descr_map.csv') if not validate_record_csv.exists(): validate_df.to_csv(validate_record_csv, index=False) @@ -135,7 +110,7 @@ def validate_mappings(_df): 'Descriptions': descriptions.tolist(), 'SourceFiles': group['SourceFile'].tolist() }) - + # Check for new or missing ImportIDs source_files = group['SourceFile'].unique() if len(source_files) < len(_df['SourceFile'].unique()): @@ -160,14 +135,14 @@ def _(pd, validate_record_csv): @app.cell def _(mo): mo.md(r""" - ## Process (Dismiss) Errors + ## Inspect & Dismiss Errors """) return @app.cell def _(pd, validate_record_csv): - # Known issue with the 'OneDrive_1_1-16-2026' export, where each of the descriptions is prepended with a ' - ' string. Drop that then, recompare + # Known issue with the 'OneDrive_1_1-16-2026' export, where each of the descriptions is prepended with a ' - ' string. Drop that, then recompare _df = pd.read_csv(validate_record_csv) @@ -176,62 +151,6 @@ def _(pd, validate_record_csv): validation_issues_fixed = validate_mappings(_df) validation_issues_fixed - - return - - -@app.cell -def _(): - return - - -@app.cell -def _(mo): - mo.md(r""" - # Process Data - """) - return - - -@app.cell -def _(pl, results_file): - df = pl.read_csv(results_file, has_header=True, skip_rows_after_header=1) - df - return - - -@app.cell(hide_code=True) -def _(mo): - mo.md(r""" - # Answers Decoding - - Pipeline to decode the ranking of voices. Currently saved as QID's, they need to be remapped back to their actual values so that the analysis can be performed. ie: - - `GQIK26_G0_x8_RANK` -> Refers to question `Top 3 Traits_0_8_RANK - What are the important traits for the Chase AI virtual assistant?` and thus the #8 option - """) - return - - -@app.cell -def _(mo): - mo.md(r""" - ## TODO: - - Create a python function for each of the questions. ie `def QID63()`. Each function should return a Polars query, that can be added to an existing query. - - Ideas: - - Map column name to include the Voice number (VID) (ie the questions that only have 1 voice). The VID is in this case often included in the question description - - `QID_x_GROUP` Contains the rankings of the values, stored in order. The following columns (ie `QID26_G0_x1_RANK`) are redundant and not necessary for us. The function should drop the unnecessary columns to clean up - - """) - return - - -@app.cell -def _(): return diff --git a/01_ingest_qualtrics_export.py b/01_ingest_qualtrics_export.py index 6cd7a4e..33a9d36 100644 --- a/01_ingest_qualtrics_export.py +++ b/01_ingest_qualtrics_export.py @@ -8,55 +8,72 @@ app = marimo.App(width="medium") def _(): import marimo as mo import polars as pl - import sqlite3 from pathlib import Path - return Path, pl + + from utils import extract_qid_descr_map, load_csv_with_qid_headers + return extract_qid_descr_map, load_csv_with_qid_headers, mo @app.cell def _(): - # RESULTS_FILE = 'data/exports/OneDrive_2026-01-21/Soft Launch Data/JPMC_Chase Brand Personality_Quant Round 1_January 21, 2026_Soft Launch_Labels.csv' - RESULTS_FILE = 'data/exports/OneDrive_1_1-16-2026/JPMC_Chase Brand Personality_Quant Round 1_TestData_Labels.csv' + RESULTS_FILE = 'data/exports/OneDrive_2026-01-21/Soft Launch Data/JPMC_Chase Brand Personality_Quant Round 1_January 21, 2026_Soft Launch_Labels.csv' + # RESULTS_FILE = 'data/exports/OneDrive_1_1-16-2026/JPMC_Chase Brand Personality_Quant Round 1_TestData_Labels.csv' return (RESULTS_FILE,) @app.cell -def _(Path, RESULTS_FILE, pl): - results_file = Path(RESULTS_FILE) - df = pl.read_csv(results_file, skip_rows=0) - df - return df, results_file - - -@app.cell -def _(df, pl, results_file): - colset = set(df.columns) - this_df_verify = pl.DataFrame({'column_names': [colset], 'results_file': results_file.as_posix()}) - this_df_verify - return (this_df_verify,) - - -@app.cell -def _(Path, pl, this_df_verify): - verification_record = Path('./data/exports/verification.csv') - if verification_record.exists(): - verify_df = pl.read_csv(verification_record) - - verify_df = pl.concat([verify_df, this_df_verify], how='vertical') - - # save verify_df - verify_df.write_csv(verification_record) - - else: - verify_df = this_df_verify - - # append this_df_verify to verify_df - verify_df +def _(RESULTS_FILE, extract_qid_descr_map): + qid_descr_map = extract_qid_descr_map(RESULTS_FILE) + qid_descr_map return @app.cell -def _(): +def _(RESULTS_FILE, load_csv_with_qid_headers): + df = load_csv_with_qid_headers(RESULTS_FILE) + df + return + + +@app.cell +def _(mo): + mo.md(r""" + # Data Cleanup + + - Remove incomplete responses (progress < 100) + - Flag outliers based on duration (add column) + - Flag responses that give the same rating for everything (indicates lack of engagement) + """) + return + + +@app.cell +def _(mo): + mo.md(r""" + # Answers Decoding + + Pipeline to decode the ranking of voices. Currently saved as QID's, they need to be remapped back to their actual values so that the analysis can be performed. ie: + + `GQIK26_G0_x8_RANK` -> Refers to question `Top 3 Traits_0_8_RANK - What are the important traits for the Chase AI virtual assistant?` and thus the #8 option + """) + return + + +@app.cell(hide_code=True) +def _(mo): + mo.md(r""" + ## TODO: + + Create a python function for each of the questions. ie `def QID63()`. Each function should return a Polars query, that can be added to an existing query. + + Ideas: + - Map column name to include the Voice number (VID) (ie the questions that only have 1 voice). The VID is in this case often included in the question description + - `QID_x_GROUP` Contains the rankings of the values, stored in order. The following columns (ie `QID26_G0_x1_RANK`) are redundant and not necessary for us. The function should drop the unnecessary columns to clean up + + """) return diff --git a/utils.py b/utils.py new file mode 100644 index 0000000..54a9b54 --- /dev/null +++ b/utils.py @@ -0,0 +1,74 @@ +import polars as pl +from pathlib import Path +import pandas as pd +from typing import Union + +def extract_qid(val): + """Extracts the 'ImportId' from a string representation of a dictionary.""" + + if isinstance(val, str) and val.startswith('{') and val.endswith('}'): + val = eval(val) + return val['ImportId'] + + +def extract_qid_descr_map(results_file: Union[str, Path]) -> dict: + """Extract mapping of Qualtrics ImportID to Question Description from results file.""" + if isinstance(results_file, str): + results_file = Path(results_file) + + if '1_1-16-2026' in results_file.as_posix(): + df_questions = pd.read_csv(results_file, nrows=1) + df_questions + + return df_questions.iloc[0].to_dict() + + + else: + # First row contains Qualtrics Editor question names (ie 'B_VOICE SEL. 18-8') + + # Second row which contains the question content + # Third row contains the Export Metadata (ie '{"ImportId":"startDate","timeZone":"America/Denver"}') + df_questions = pd.read_csv(results_file, nrows=1, skiprows=1) + + + + # transpose df_questions + df_questions = df_questions.T.reset_index() + df_questions.columns = ['Description', 'export_metadata'] + df_questions['ImportID'] = df_questions['export_metadata'].apply(extract_qid) + + df_questions = df_questions[['ImportID', 'Description']] + + return dict(zip(df_questions['ImportID'], df_questions['Description'])) + + +def load_csv_with_qid_headers(file_path: Union[str, Path]) -> pl.DataFrame: + """ + Load CSV where column headers are in row 3 as dict strings with ImportId. + + The 3rd row contains metadata like '{"ImportId":"startDate","timeZone":"America/Denver"}'. + This function extracts the ImportId from each column and uses it as the column name. + + Parameters: + file_path (Path): Path to the CSV file to load. + + Returns: + pl.DataFrame: Polars DataFrame with ImportId as column names. + """ + if isinstance(file_path, str): + file_path = Path(file_path) + + # Read the 3rd row (index 2) which contains the metadata dictionaries + # Use header=None to get raw values instead of treating them as column names + df_meta = pd.read_csv(file_path, nrows=1, skiprows=2, header=None) + + # Extract ImportIds from each column value in this row + new_columns = [extract_qid(val) for val in df_meta.iloc[0]] + + # Now read the actual data starting from row 4 (skip first 3 rows) + df = pl.read_csv(file_path, skip_rows=3) + + # Rename columns with the extracted ImportIds + df.columns = new_columns + + return df \ No newline at end of file