From c2a5c127949f3fdd6e8f87dd35bb5610a74818d2 Mon Sep 17 00:00:00 2001 From: Luigi Maiorano Date: Fri, 12 Dec 2025 21:25:26 +0100 Subject: [PATCH] update import to work with CPC and SMB --- 01_Taguette-Pre-Process.py | 59 ++++++++----------------- utils/__init__.py | 2 +- utils/transcript_utils.py | 90 +++++++++++++++++++++++++++++++++++++- 3 files changed, 108 insertions(+), 43 deletions(-) diff --git a/01_Taguette-Pre-Process.py b/01_Taguette-Pre-Process.py index 7d3c2f0..ba4dcdb 100644 --- a/01_Taguette-Pre-Process.py +++ b/01_Taguette-Pre-Process.py @@ -9,7 +9,8 @@ def _(): import marimo as mo import pandas as pd from pathlib import Path - return Path, mo, pd + from utils import csv_to_markdown, cpc_smb_to_markdown + return Path, cpc_smb_to_markdown, csv_to_markdown, mo @app.cell @@ -34,49 +35,26 @@ def _(INPUT_DIR, mo): return (file_dropdown,) -@app.function(hide_code=True) -def csv_to_markdown(df): - """Convert transcript DataFrame to markdown, merging consecutive same-speaker turns.""" - lines = ["# Interview Transcript"] - - # Track previous speaker to detect when speaker changes - prev_speaker = None - # Accumulate text from consecutive turns by same speaker - merged_text = [] - - for _, row in df.iterrows(): - speaker = row["Speaker"] - text = str(row["Transcript"]).strip() - - if speaker == prev_speaker: - # Same speaker continues — append text to current block - merged_text.append(text) - else: - # New speaker detected — flush previous speaker's block - if prev_speaker is not None: - # Format: **Speaker**: text-part-1\n\ntext-part-2 - # Use \n\n to ensure distinct paragraphs for readability - lines.append(f"**{prev_speaker}**: {'\n\n'.join(merged_text)}") - - # Start new block for current speaker - prev_speaker = speaker - merged_text = [text] - - # Flush final speaker's block - if prev_speaker is not None: - lines.append(f"**{prev_speaker}**: {'\n\n'.join(merged_text)}") - - # Join all blocks with double newlines for clear separation - return "\n\n".join(lines) +@app.cell +def _(Path, cpc_smb_to_markdown, csv_to_markdown): + def jpmc_transcript_to_md(filepath): + fp = Path(filepath) + try: + return csv_to_markdown(filepath) + except Exception as e: + try: + return cpc_smb_to_markdown(filepath) + except Exception as e2: + raise ValueError(f"Failed to process file {filepath} with errors: {e}, {e2}") + return (jpmc_transcript_to_md,) @app.cell(hide_code=True) -def _(file_dropdown, mo, pd): +def _(file_dropdown, jpmc_transcript_to_md, mo): # Preview preview = mo.md("") if file_dropdown.value: - df = pd.read_csv(file_dropdown.value) - md_content = csv_to_markdown(df.head(10)) + md_content = jpmc_transcript_to_md(file_dropdown.value) preview = mo.md(md_content) preview @@ -91,13 +69,12 @@ def _(mo): @app.cell -def _(OUTPUT_DIR, Path, convert_btn, file_dropdown, mo, pd): +def _(OUTPUT_DIR, Path, convert_btn, file_dropdown, jpmc_transcript_to_md, mo): result = mo.md("") saved_md = None if convert_btn.value and file_dropdown.value: - _df = pd.read_csv(file_dropdown.value) - saved_md = csv_to_markdown(_df) + saved_md = jpmc_transcript_to_md(file_dropdown.value) _out_path = OUTPUT_DIR / (Path(file_dropdown.value).stem + ".md") _out_path.write_text(saved_md) result = mo.callout(f"✅ Saved to `{_out_path}`", kind="success") diff --git a/utils/__init__.py b/utils/__init__.py index c5846e1..cd88752 100644 --- a/utils/__init__.py +++ b/utils/__init__.py @@ -1,4 +1,4 @@ from .ollama_utils import connect_qumo_ollama from .data_utils import create_sentiment_matrix, extract_theme -from .transcript_utils import load_srt +from .transcript_utils import load_srt, csv_to_markdown, cpc_smb_to_markdown from .sentiment_analysis import dummy_sentiment_analysis, ollama_sentiment_analysis diff --git a/utils/transcript_utils.py b/utils/transcript_utils.py index 0a118f9..3ac8bf7 100644 --- a/utils/transcript_utils.py +++ b/utils/transcript_utils.py @@ -1,6 +1,7 @@ from pathlib import Path import re +import pandas as pd def load_srt(path: str | Path) -> str: """Load and parse an SRT file, returning clean transcript with speaker labels. @@ -51,4 +52,91 @@ def load_srt(path: str | Path) -> str: # Format as "SPEAKER_XX: text" transcript_lines = [f"{speaker}: {utterance}" for speaker, utterance in merged] - return '\n\n'.join(transcript_lines) \ No newline at end of file + return '\n\n'.join(transcript_lines) + + + +def csv_to_markdown(csv_path:Path): + """Convert transcript CSV to markdown, merging consecutive same-speaker turns.""" + df = pd.read_csv(str(csv_path)) + + lines = ["# Interview Transcript"] + + # Track previous speaker to detect when speaker changes + prev_speaker = None + # Accumulate text from consecutive turns by same speaker + merged_text = [] + + for _, row in df.iterrows(): + speaker = row["Speaker"] + text = str(row["Transcript"]).strip() + + if speaker == prev_speaker: + # Same speaker continues — append text to current block + merged_text.append(text) + else: + # New speaker detected — flush previous speaker's block + if prev_speaker is not None: + # Format: **Speaker**: text-part-1\n\ntext-part-2 + # Use \n\n to ensure distinct paragraphs for readability + lines.append(f"**{prev_speaker}**: {'\n\n'.join(merged_text)}") + + # Start new block for current speaker + prev_speaker = speaker + merged_text = [text] + + # Flush final speaker's block + if prev_speaker is not None: + lines.append(f"**{prev_speaker}**: {'\n\n'.join(merged_text)}") + + # Join all blocks with double newlines for clear separation + return "\n\n".join(lines) + + +def cpc_smb_to_markdown(cpc_path: Path) -> str: + """Convert CPC text transcript to markdown, merging consecutive same-speaker turns.""" + content = Path(cpc_path).read_text(encoding='utf-8') + + lines = ["# Interview Transcript"] + prev_speaker = None + merged_text = [] + + # Regex to find speaker labels: Word followed by colon and space + speaker_pattern = re.compile(r'(?:^|\s)([A-Za-z0-9]+):\s') + + for line in content.splitlines(): + line = line.strip().replace('\n', ' ') + # Remove surrounding quotes + if line.startswith('"') and line.endswith('"'): + line = line[1:-1].strip() + + if not line: + continue + + parts = speaker_pattern.split(line) + + # If no speaker found, skip line (assumed garbage like "Like", headers) + if len(parts) < 2: + continue + + # parts[0] is text before the first speaker on this line + if parts[0].strip() and prev_speaker: + merged_text.append(parts[0].strip()) + + # Iterate over speaker-text pairs + for i in range(1, len(parts), 2): + speaker = parts[i] + text = parts[i+1].strip() + + if speaker == prev_speaker: + merged_text.append(text) + else: + if prev_speaker is not None: + lines.append(f"**{prev_speaker}**: {'\n\n'.join(merged_text)}") + prev_speaker = speaker + merged_text = [text] + + if prev_speaker is not None: + lines.append(f"**{prev_speaker}**: {'\n\n'.join(merged_text)}") + + return "\n\n".join(lines) \ No newline at end of file