move common ingest functions to utils

This commit is contained in:
2026-01-22 11:59:48 +01:00
parent 18ada6ca66
commit b8642e9de8
3 changed files with 136 additions and 126 deletions

View File

@@ -11,7 +11,9 @@ def _():
import pandas as pd import pandas as pd
import plotly as plt import plotly as plt
from pathlib import Path from pathlib import Path
return Path, mo, pd, pl
from utils import extract_qid_descr_map
return Path, extract_qid_descr_map, mo, pd
@app.cell @app.cell
@@ -31,35 +33,8 @@ def _(mo):
@app.cell @app.cell
def _(pd, results_file): def _(extract_qid_descr_map, results_file):
if '1_1-16-2026' in results_file.as_posix(): qid_descr_map = extract_qid_descr_map(results_file)
df_questions = pd.read_csv(results_file, nrows=1)
df_questions
qid_descr_map = df_questions.iloc[0].to_dict()
qid_descr_map
else:
# First row contains Qualtrics Editor question names (ie 'B_VOICE SEL. 18-8')
# Second row which contains the question content
# Third row contains the Export Metadata (ie '{"ImportId":"startDate","timeZone":"America/Denver"}')
df_questions = pd.read_csv(results_file, nrows=1, skiprows=1)
def extract_qid(val):
if isinstance(val, str) and val.startswith('{') and val.endswith('}'):
val = eval(val)
return val['ImportId']
# transpose df_questions
df_questions = df_questions.T.reset_index()
df_questions.columns = ['Description', 'export_metadata']
df_questions['ImportID'] = df_questions['export_metadata'].apply(extract_qid)
df_questions = df_questions[['ImportID', 'Description']]
qid_descr_map = dict(zip(df_questions['ImportID'], df_questions['Description']))
qid_descr_map qid_descr_map
return (qid_descr_map,) return (qid_descr_map,)
@@ -92,7 +67,7 @@ def _(mo):
@app.cell @app.cell
def _(Path, pd, validate_df): def _(Path, pd, validate_df):
validate_record_csv = Path('./data/exports/validation_qid_descr_map.csv') validate_record_csv = Path('./validation_qid_descr_map.csv')
if not validate_record_csv.exists(): if not validate_record_csv.exists():
validate_df.to_csv(validate_record_csv, index=False) validate_df.to_csv(validate_record_csv, index=False)
@@ -160,14 +135,14 @@ def _(pd, validate_record_csv):
@app.cell @app.cell
def _(mo): def _(mo):
mo.md(r""" mo.md(r"""
## Process (Dismiss) Errors ## Inspect & Dismiss Errors
""") """)
return return
@app.cell @app.cell
def _(pd, validate_record_csv): def _(pd, validate_record_csv):
# Known issue with the 'OneDrive_1_1-16-2026' export, where each of the descriptions is prepended with a '<Qualtrics Editor Question name> - ' string. Drop that then, recompare # Known issue with the 'OneDrive_1_1-16-2026' export, where each of the descriptions is prepended with a '<Qualtrics Editor Question name> - ' string. Drop that, then recompare
_df = pd.read_csv(validate_record_csv) _df = pd.read_csv(validate_record_csv)
@@ -176,62 +151,6 @@ def _(pd, validate_record_csv):
validation_issues_fixed = validate_mappings(_df) validation_issues_fixed = validate_mappings(_df)
validation_issues_fixed validation_issues_fixed
return
@app.cell
def _():
return
@app.cell
def _(mo):
mo.md(r"""
# Process Data
""")
return
@app.cell
def _(pl, results_file):
df = pl.read_csv(results_file, has_header=True, skip_rows_after_header=1)
df
return
@app.cell(hide_code=True)
def _(mo):
mo.md(r"""
# Answers Decoding
Pipeline to decode the ranking of voices. Currently saved as QID's, they need to be remapped back to their actual values so that the analysis can be performed. ie:
`GQIK26_G0_x8_RANK` -> Refers to question `Top 3 Traits_0_8_RANK - What are the important traits for the Chase AI virtual assistant?` and thus the #8 option
""")
return
@app.cell
def _(mo):
mo.md(r"""
## TODO:
Create a python function for each of the questions. ie `def QID63()`. Each function should return a Polars query, that can be added to an existing query.
Ideas:
- Map column name to include the Voice number (VID) (ie the questions that only have 1 voice). The VID is in this case often included in the question description
- `QID_x_GROUP` Contains the rankings of the values, stored in order. The following columns (ie `QID26_G0_x1_RANK`) are redundant and not necessary for us. The function should drop the unnecessary columns to clean up
<!-- - Translate the RANK values back to the actual VID, and create an aggregate column that contains a list of the VIDs in order. ie: [V34, V56, V81].
- Use the first line of the question description (see `qid_descr_map`) to get the `"DataExportTag"`, which is a property that can be found in the `.qsf` file to inspect the choice number and it's corresponding VID
- "`VOICE SEL. 8-3_0_5_RANK`" refers to `"DataExportTag": "VOICE SEL. 8-3"`, `Group 0` (not important for this), `Choice 5`, and the value in the cell refers to the Rank assigned to that voice
- QSF file example to retrieve the VID: `"SurveyElements" -> (Find item where "Payload"["DataExportTag"] == "VOICE SEL. 8-3") -> "Payload" -> "Choices" -> "5" -> "Display" -> (Extract 'Voice <xx>' from the HTML)` -->
""")
return
@app.cell
def _():
return return

View File

@@ -8,55 +8,72 @@ app = marimo.App(width="medium")
def _(): def _():
import marimo as mo import marimo as mo
import polars as pl import polars as pl
import sqlite3
from pathlib import Path from pathlib import Path
return Path, pl
from utils import extract_qid_descr_map, load_csv_with_qid_headers
return extract_qid_descr_map, load_csv_with_qid_headers, mo
@app.cell @app.cell
def _(): def _():
# RESULTS_FILE = 'data/exports/OneDrive_2026-01-21/Soft Launch Data/JPMC_Chase Brand Personality_Quant Round 1_January 21, 2026_Soft Launch_Labels.csv' RESULTS_FILE = 'data/exports/OneDrive_2026-01-21/Soft Launch Data/JPMC_Chase Brand Personality_Quant Round 1_January 21, 2026_Soft Launch_Labels.csv'
RESULTS_FILE = 'data/exports/OneDrive_1_1-16-2026/JPMC_Chase Brand Personality_Quant Round 1_TestData_Labels.csv' # RESULTS_FILE = 'data/exports/OneDrive_1_1-16-2026/JPMC_Chase Brand Personality_Quant Round 1_TestData_Labels.csv'
return (RESULTS_FILE,) return (RESULTS_FILE,)
@app.cell @app.cell
def _(Path, RESULTS_FILE, pl): def _(RESULTS_FILE, extract_qid_descr_map):
results_file = Path(RESULTS_FILE) qid_descr_map = extract_qid_descr_map(RESULTS_FILE)
df = pl.read_csv(results_file, skip_rows=0) qid_descr_map
df
return df, results_file
@app.cell
def _(df, pl, results_file):
colset = set(df.columns)
this_df_verify = pl.DataFrame({'column_names': [colset], 'results_file': results_file.as_posix()})
this_df_verify
return (this_df_verify,)
@app.cell
def _(Path, pl, this_df_verify):
verification_record = Path('./data/exports/verification.csv')
if verification_record.exists():
verify_df = pl.read_csv(verification_record)
verify_df = pl.concat([verify_df, this_df_verify], how='vertical')
# save verify_df
verify_df.write_csv(verification_record)
else:
verify_df = this_df_verify
# append this_df_verify to verify_df
verify_df
return return
@app.cell @app.cell
def _(): def _(RESULTS_FILE, load_csv_with_qid_headers):
df = load_csv_with_qid_headers(RESULTS_FILE)
df
return
@app.cell
def _(mo):
mo.md(r"""
# Data Cleanup
- Remove incomplete responses (progress < 100)
- Flag outliers based on duration (add column)
- Flag responses that give the same rating for everything (indicates lack of engagement)
""")
return
@app.cell
def _(mo):
mo.md(r"""
# Answers Decoding
Pipeline to decode the ranking of voices. Currently saved as QID's, they need to be remapped back to their actual values so that the analysis can be performed. ie:
`GQIK26_G0_x8_RANK` -> Refers to question `Top 3 Traits_0_8_RANK - What are the important traits for the Chase AI virtual assistant?` and thus the #8 option
""")
return
@app.cell(hide_code=True)
def _(mo):
mo.md(r"""
## TODO:
Create a python function for each of the questions. ie `def QID63()`. Each function should return a Polars query, that can be added to an existing query.
Ideas:
- Map column name to include the Voice number (VID) (ie the questions that only have 1 voice). The VID is in this case often included in the question description
- `QID_x_GROUP` Contains the rankings of the values, stored in order. The following columns (ie `QID26_G0_x1_RANK`) are redundant and not necessary for us. The function should drop the unnecessary columns to clean up
<!-- - Translate the RANK values back to the actual VID, and create an aggregate column that contains a list of the VIDs in order. ie: [V34, V56, V81].
- Use the first line of the question description (see `qid_descr_map`) to get the `"DataExportTag"`, which is a property that can be found in the `.qsf` file to inspect the choice number and it's corresponding VID
- "`VOICE SEL. 8-3_0_5_RANK`" refers to `"DataExportTag": "VOICE SEL. 8-3"`, `Group 0` (not important for this), `Choice 5`, and the value in the cell refers to the Rank assigned to that voice
- QSF file example to retrieve the VID: `"SurveyElements" -> (Find item where "Payload"["DataExportTag"] == "VOICE SEL. 8-3") -> "Payload" -> "Choices" -> "5" -> "Display" -> (Extract 'Voice <xx>' from the HTML)` -->
""")
return return

74
utils.py Normal file
View File

@@ -0,0 +1,74 @@
import polars as pl
from pathlib import Path
import pandas as pd
from typing import Union
def extract_qid(val):
"""Extracts the 'ImportId' from a string representation of a dictionary."""
if isinstance(val, str) and val.startswith('{') and val.endswith('}'):
val = eval(val)
return val['ImportId']
def extract_qid_descr_map(results_file: Union[str, Path]) -> dict:
"""Extract mapping of Qualtrics ImportID to Question Description from results file."""
if isinstance(results_file, str):
results_file = Path(results_file)
if '1_1-16-2026' in results_file.as_posix():
df_questions = pd.read_csv(results_file, nrows=1)
df_questions
return df_questions.iloc[0].to_dict()
else:
# First row contains Qualtrics Editor question names (ie 'B_VOICE SEL. 18-8')
# Second row which contains the question content
# Third row contains the Export Metadata (ie '{"ImportId":"startDate","timeZone":"America/Denver"}')
df_questions = pd.read_csv(results_file, nrows=1, skiprows=1)
# transpose df_questions
df_questions = df_questions.T.reset_index()
df_questions.columns = ['Description', 'export_metadata']
df_questions['ImportID'] = df_questions['export_metadata'].apply(extract_qid)
df_questions = df_questions[['ImportID', 'Description']]
return dict(zip(df_questions['ImportID'], df_questions['Description']))
def load_csv_with_qid_headers(file_path: Union[str, Path]) -> pl.DataFrame:
"""
Load CSV where column headers are in row 3 as dict strings with ImportId.
The 3rd row contains metadata like '{"ImportId":"startDate","timeZone":"America/Denver"}'.
This function extracts the ImportId from each column and uses it as the column name.
Parameters:
file_path (Path): Path to the CSV file to load.
Returns:
pl.DataFrame: Polars DataFrame with ImportId as column names.
"""
if isinstance(file_path, str):
file_path = Path(file_path)
# Read the 3rd row (index 2) which contains the metadata dictionaries
# Use header=None to get raw values instead of treating them as column names
df_meta = pd.read_csv(file_path, nrows=1, skiprows=2, header=None)
# Extract ImportIds from each column value in this row
new_columns = [extract_qid(val) for val in df_meta.iloc[0]]
# Now read the actual data starting from row 4 (skip first 3 rows)
df = pl.read_csv(file_path, skip_rows=3)
# Rename columns with the extracted ImportIds
df.columns = new_columns
return df