148 lines
4.9 KiB
Python
148 lines
4.9 KiB
Python
|
|
from pathlib import Path
|
|
import re
|
|
import pandas as pd
|
|
|
|
def load_srt(path: str | Path) -> str:
|
|
"""Load and parse an SRT file, returning clean transcript with speaker labels.
|
|
|
|
Args:
|
|
path: Path to the SRT file
|
|
|
|
Returns:
|
|
Clean transcript string with format "SPEAKER_XX: text" per line,
|
|
timestamps stripped, consecutive lines from same speaker merged.
|
|
"""
|
|
path = Path(path)
|
|
content = path.read_text(encoding='utf-8')
|
|
|
|
# Parse SRT blocks: sequence number, timestamp, speaker|text
|
|
# Pattern matches: number, timestamp line, content line(s)
|
|
blocks = re.split(r'\n\n+', content.strip())
|
|
|
|
turns = []
|
|
for block in blocks:
|
|
lines = block.strip().split('\n')
|
|
if len(lines) < 3:
|
|
continue
|
|
|
|
# Skip sequence number (line 0) and timestamp (line 1)
|
|
# Content is line 2 onwards
|
|
text_lines = lines[2:]
|
|
text = ' '.join(text_lines)
|
|
|
|
# Parse speaker|text format
|
|
if '|' in text:
|
|
speaker, utterance = text.split('|', 1)
|
|
speaker = speaker.strip()
|
|
utterance = utterance.strip()
|
|
else:
|
|
speaker = "UNKNOWN"
|
|
utterance = text.strip()
|
|
|
|
turns.append((speaker, utterance))
|
|
|
|
# Merge consecutive turns from same speaker
|
|
merged = []
|
|
for speaker, utterance in turns:
|
|
if merged and merged[-1][0] == speaker:
|
|
merged[-1] = (speaker, merged[-1][1] + ' ' + utterance)
|
|
else:
|
|
merged.append((speaker, utterance))
|
|
|
|
# Format as "SPEAKER_XX: text"
|
|
transcript_lines = [f"{speaker}: {utterance}" for speaker, utterance in merged]
|
|
return '\n\n'.join(transcript_lines)
|
|
|
|
|
|
|
|
def csv_to_markdown(csv_path:Path):
|
|
"""Convert transcript CSV to markdown, merging consecutive same-speaker turns."""
|
|
df = pd.read_csv(str(csv_path))
|
|
|
|
lines = ["# Interview Transcript"]
|
|
|
|
# Track previous speaker to detect when speaker changes
|
|
prev_speaker = None
|
|
# Accumulate text from consecutive turns by same speaker
|
|
merged_text = []
|
|
|
|
for _, row in df.iterrows():
|
|
speaker = row["Speaker"]
|
|
text = str(row["Transcript"]).strip()
|
|
|
|
if speaker == prev_speaker:
|
|
# Same speaker continues — append text to current block
|
|
merged_text.append(text)
|
|
else:
|
|
# New speaker detected — flush previous speaker's block
|
|
if prev_speaker is not None:
|
|
# Format: **Speaker**: text-part-1\n\ntext-part-2
|
|
# Use \n\n to ensure distinct paragraphs for readability
|
|
lines.append(f"**{prev_speaker}**: {'\n\n'.join(merged_text)}")
|
|
|
|
# Start new block for current speaker
|
|
prev_speaker = speaker
|
|
merged_text = [text]
|
|
|
|
# Flush final speaker's block
|
|
if prev_speaker is not None:
|
|
lines.append(f"**{prev_speaker}**: {'\n\n'.join(merged_text)}")
|
|
|
|
# Join all blocks with double newlines for clear separation
|
|
return "\n\n".join(lines)
|
|
|
|
|
|
def cpc_smb_to_markdown(cpc_path: Path) -> str:
|
|
"""Convert CPC text transcript to markdown, merging consecutive same-speaker turns."""
|
|
content = Path(cpc_path).read_text(encoding='utf-8')
|
|
|
|
lines = ["# Interview Transcript"]
|
|
prev_speaker = None
|
|
merged_text = []
|
|
|
|
# Regex to find speaker labels: Word followed by colon and space
|
|
speaker_pattern = re.compile(r'(?:^|\s)([A-Za-z0-9]+):\s')
|
|
|
|
for line in content.splitlines():
|
|
line = line.strip().replace('\n', ' ')
|
|
|
|
# Handle edge case: "CPC1, (She/ Her,) LOCATION: Hello." -> "CPC1: Hello."
|
|
match = re.match(r'^"?([A-Za-z0-9]+),\s*\(.*?\)\s*LOCATION:\s*(.*?)"?$', line)
|
|
if match:
|
|
line = f"{match.group(1)}: {match.group(2)}"
|
|
|
|
# Remove surrounding quotes
|
|
if line.startswith('"') and line.endswith('"'):
|
|
line = line[1:-1].strip()
|
|
|
|
if not line:
|
|
continue
|
|
|
|
parts = speaker_pattern.split(line)
|
|
|
|
# If no speaker found, skip line (assumed garbage like "Like", headers)
|
|
if len(parts) < 2:
|
|
continue
|
|
|
|
# parts[0] is text before the first speaker on this line
|
|
if parts[0].strip() and prev_speaker:
|
|
merged_text.append(parts[0].strip())
|
|
|
|
# Iterate over speaker-text pairs
|
|
for i in range(1, len(parts), 2):
|
|
speaker = parts[i]
|
|
text = parts[i+1].strip()
|
|
|
|
if speaker == prev_speaker:
|
|
merged_text.append(text)
|
|
else:
|
|
if prev_speaker is not None:
|
|
lines.append(f"**{prev_speaker}**: {'\n\n'.join(merged_text)}")
|
|
prev_speaker = speaker
|
|
merged_text = [text]
|
|
|
|
if prev_speaker is not None:
|
|
lines.append(f"**{prev_speaker}**: {'\n\n'.join(merged_text)}")
|
|
|
|
return "\n\n".join(lines) |