update import to work with CPC and SMB

This commit is contained in:
2025-12-12 21:25:26 +01:00
parent ccc5154b93
commit c2a5c12794
3 changed files with 108 additions and 43 deletions

View File

@@ -9,7 +9,8 @@ def _():
import marimo as mo import marimo as mo
import pandas as pd import pandas as pd
from pathlib import Path from pathlib import Path
return Path, mo, pd from utils import csv_to_markdown, cpc_smb_to_markdown
return Path, cpc_smb_to_markdown, csv_to_markdown, mo
@app.cell @app.cell
@@ -34,49 +35,26 @@ def _(INPUT_DIR, mo):
return (file_dropdown,) return (file_dropdown,)
@app.function(hide_code=True) @app.cell
def csv_to_markdown(df): def _(Path, cpc_smb_to_markdown, csv_to_markdown):
"""Convert transcript DataFrame to markdown, merging consecutive same-speaker turns.""" def jpmc_transcript_to_md(filepath):
lines = ["# Interview Transcript"] fp = Path(filepath)
try:
# Track previous speaker to detect when speaker changes return csv_to_markdown(filepath)
prev_speaker = None except Exception as e:
# Accumulate text from consecutive turns by same speaker try:
merged_text = [] return cpc_smb_to_markdown(filepath)
except Exception as e2:
for _, row in df.iterrows(): raise ValueError(f"Failed to process file {filepath} with errors: {e}, {e2}")
speaker = row["Speaker"] return (jpmc_transcript_to_md,)
text = str(row["Transcript"]).strip()
if speaker == prev_speaker:
# Same speaker continues — append text to current block
merged_text.append(text)
else:
# New speaker detected — flush previous speaker's block
if prev_speaker is not None:
# Format: **Speaker**: text-part-1\n\ntext-part-2
# Use \n\n to ensure distinct paragraphs for readability
lines.append(f"**{prev_speaker}**: {'\n\n'.join(merged_text)}")
# Start new block for current speaker
prev_speaker = speaker
merged_text = [text]
# Flush final speaker's block
if prev_speaker is not None:
lines.append(f"**{prev_speaker}**: {'\n\n'.join(merged_text)}")
# Join all blocks with double newlines for clear separation
return "\n\n".join(lines)
@app.cell(hide_code=True) @app.cell(hide_code=True)
def _(file_dropdown, mo, pd): def _(file_dropdown, jpmc_transcript_to_md, mo):
# Preview # Preview
preview = mo.md("") preview = mo.md("")
if file_dropdown.value: if file_dropdown.value:
df = pd.read_csv(file_dropdown.value) md_content = jpmc_transcript_to_md(file_dropdown.value)
md_content = csv_to_markdown(df.head(10))
preview = mo.md(md_content) preview = mo.md(md_content)
preview preview
@@ -91,13 +69,12 @@ def _(mo):
@app.cell @app.cell
def _(OUTPUT_DIR, Path, convert_btn, file_dropdown, mo, pd): def _(OUTPUT_DIR, Path, convert_btn, file_dropdown, jpmc_transcript_to_md, mo):
result = mo.md("") result = mo.md("")
saved_md = None saved_md = None
if convert_btn.value and file_dropdown.value: if convert_btn.value and file_dropdown.value:
_df = pd.read_csv(file_dropdown.value) saved_md = jpmc_transcript_to_md(file_dropdown.value)
saved_md = csv_to_markdown(_df)
_out_path = OUTPUT_DIR / (Path(file_dropdown.value).stem + ".md") _out_path = OUTPUT_DIR / (Path(file_dropdown.value).stem + ".md")
_out_path.write_text(saved_md) _out_path.write_text(saved_md)
result = mo.callout(f"✅ Saved to `{_out_path}`", kind="success") result = mo.callout(f"✅ Saved to `{_out_path}`", kind="success")

View File

@@ -1,4 +1,4 @@
from .ollama_utils import connect_qumo_ollama from .ollama_utils import connect_qumo_ollama
from .data_utils import create_sentiment_matrix, extract_theme from .data_utils import create_sentiment_matrix, extract_theme
from .transcript_utils import load_srt from .transcript_utils import load_srt, csv_to_markdown, cpc_smb_to_markdown
from .sentiment_analysis import dummy_sentiment_analysis, ollama_sentiment_analysis from .sentiment_analysis import dummy_sentiment_analysis, ollama_sentiment_analysis

View File

@@ -1,6 +1,7 @@
from pathlib import Path from pathlib import Path
import re import re
import pandas as pd
def load_srt(path: str | Path) -> str: def load_srt(path: str | Path) -> str:
"""Load and parse an SRT file, returning clean transcript with speaker labels. """Load and parse an SRT file, returning clean transcript with speaker labels.
@@ -52,3 +53,90 @@ def load_srt(path: str | Path) -> str:
# Format as "SPEAKER_XX: text" # Format as "SPEAKER_XX: text"
transcript_lines = [f"{speaker}: {utterance}" for speaker, utterance in merged] transcript_lines = [f"{speaker}: {utterance}" for speaker, utterance in merged]
return '\n\n'.join(transcript_lines) return '\n\n'.join(transcript_lines)
def csv_to_markdown(csv_path:Path):
"""Convert transcript CSV to markdown, merging consecutive same-speaker turns."""
df = pd.read_csv(str(csv_path))
lines = ["# Interview Transcript"]
# Track previous speaker to detect when speaker changes
prev_speaker = None
# Accumulate text from consecutive turns by same speaker
merged_text = []
for _, row in df.iterrows():
speaker = row["Speaker"]
text = str(row["Transcript"]).strip()
if speaker == prev_speaker:
# Same speaker continues — append text to current block
merged_text.append(text)
else:
# New speaker detected — flush previous speaker's block
if prev_speaker is not None:
# Format: **Speaker**: text-part-1\n\ntext-part-2
# Use \n\n to ensure distinct paragraphs for readability
lines.append(f"**{prev_speaker}**: {'\n\n'.join(merged_text)}")
# Start new block for current speaker
prev_speaker = speaker
merged_text = [text]
# Flush final speaker's block
if prev_speaker is not None:
lines.append(f"**{prev_speaker}**: {'\n\n'.join(merged_text)}")
# Join all blocks with double newlines for clear separation
return "\n\n".join(lines)
def cpc_smb_to_markdown(cpc_path: Path) -> str:
"""Convert CPC text transcript to markdown, merging consecutive same-speaker turns."""
content = Path(cpc_path).read_text(encoding='utf-8')
lines = ["# Interview Transcript"]
prev_speaker = None
merged_text = []
# Regex to find speaker labels: Word followed by colon and space
speaker_pattern = re.compile(r'(?:^|\s)([A-Za-z0-9]+):\s')
for line in content.splitlines():
line = line.strip().replace('\n', ' ')
# Remove surrounding quotes
if line.startswith('"') and line.endswith('"'):
line = line[1:-1].strip()
if not line:
continue
parts = speaker_pattern.split(line)
# If no speaker found, skip line (assumed garbage like "Like", headers)
if len(parts) < 2:
continue
# parts[0] is text before the first speaker on this line
if parts[0].strip() and prev_speaker:
merged_text.append(parts[0].strip())
# Iterate over speaker-text pairs
for i in range(1, len(parts), 2):
speaker = parts[i]
text = parts[i+1].strip()
if speaker == prev_speaker:
merged_text.append(text)
else:
if prev_speaker is not None:
lines.append(f"**{prev_speaker}**: {'\n\n'.join(merged_text)}")
prev_speaker = speaker
merged_text = [text]
if prev_speaker is not None:
lines.append(f"**{prev_speaker}**: {'\n\n'.join(merged_text)}")
return "\n\n".join(lines)