Files
Interview-Analysis/02-B_WordClouds.py
2025-12-17 00:25:03 -08:00

623 lines
18 KiB
Python

import marimo
__generated_with = "0.18.3"
app = marimo.App(width="medium")
@app.cell
def _():
import marimo as mo
import pandas as pd
import modin.pandas as mpd
from tqdm import tqdm
from pathlib import Path
from datetime import datetime
from utils import connect_qumo_ollama
OLLAMA_LOCATION= 'localhost'
# VM_NAME = 'ollama-lite'
# initialize tqdm for pandas
tqdm.pandas()
TAGUETTE_EXPORT_DIR = Path('./data/processing/02_taguette_export')
WORKING_DIR = Path('./data/processing/02-b_WordClouds')
if not WORKING_DIR.exists():
WORKING_DIR.mkdir(parents=True)
if not TAGUETTE_EXPORT_DIR.exists():
TAGUETTE_EXPORT_DIR.mkdir(parents=True)
return (
OLLAMA_LOCATION,
TAGUETTE_EXPORT_DIR,
WORKING_DIR,
connect_qumo_ollama,
mo,
pd,
)
@app.cell(hide_code=True)
def _(mo):
mo.md(r"""
# 1) Export Data out of Taguette
**Highlights**
1. Go to: https://taguette.qumo.io/project/1
2. Select 'Highlights' (left side) > 'See all hightlights' > 'Export this view' (top right) > 'CSV'
3. Save to '{TAGUETTE_EXPORT_DIR}/all_tags.csv'
**Tags Codebook**
1. Select 'Project Info' (left side) > 'Export codebook' > 'CSV'
2. Save to '{TAGUETTE_EXPORT_DIR}/codebook.csv'
_NOTE: Sometimes you need to explicitly allow 'Unsafe Download' in the browser's download manager_
""")
return
@app.cell(hide_code=True)
def _(mo):
mo.md(r"""
# 2) Import Data
""")
return
@app.cell
def _(TAGUETTE_EXPORT_DIR, pd):
all_tags_df = pd.read_csv(f'{TAGUETTE_EXPORT_DIR}/all_tags.csv')
all_tags_df['_seq_id'] = range(len(all_tags_df))
# all_tags_df
return (all_tags_df,)
@app.cell
def _(all_tags_df):
# get count of rows per tag
tag_counts = all_tags_df['tag'].value_counts().reset_index()
# tag_counts
return
@app.cell
def _(TAGUETTE_EXPORT_DIR, pd):
codebook_df = pd.read_csv(f'{TAGUETTE_EXPORT_DIR}/codebook.csv')
codebook_df.rename(columns={'description': 'theme_description'}, inplace=True)
# codebook_df
return
@app.cell(hide_code=True)
def _(mo):
mo.md(r"""
# 3) Select Tag for processing
""")
return
@app.cell(hide_code=True)
def _(all_tags_df, mo):
tag_select = mo.ui.dropdown(
options=all_tags_df['tag'].unique().tolist(),
label="Select Tag to Process",
# value="Chase as a brand",
full_width=True,
)
tag_select
return (tag_select,)
@app.cell
def _(WORKING_DIR, all_tags_df, mo, tag_select):
mo.stop(not tag_select.value, mo.md("Select tag to continue"))
start_processing_btn = None
start_processing_btn = mo.ui.button(
label="Start Keyword Extraction",
kind="warn",
on_click=lambda val: True
)
tag_fname = tag_select.value.replace(" ", "-").replace('/','-')
SAVE_DIR = WORKING_DIR / tag_fname
if not SAVE_DIR.exists():
SAVE_DIR.mkdir(parents=True)
KEYWORDS_FPATH = SAVE_DIR / f'keywords_per-highlight_{tag_fname}.xlsx'
KEYWORD_FREQ_FPATH = SAVE_DIR / f'keyword_frequencies_{tag_fname}.xlsx'
# filter all_tags_df to only the document = file_dropdown.value
tags_df = all_tags_df.loc[all_tags_df['tag'] == tag_select.value].copy()
tags_df.head()
return (
KEYWORDS_FPATH,
KEYWORD_FREQ_FPATH,
SAVE_DIR,
start_processing_btn,
tag_fname,
tags_df,
)
@app.cell(hide_code=True)
def _(KEYWORD_FREQ_FPATH, mo):
mo.md(rf"""
# 4) Keyword extraction {'(skippable, see 4b)' if KEYWORD_FREQ_FPATH.exists() else ''}
""")
return
@app.cell
def _(OLLAMA_LOCATION, connect_qumo_ollama, mo):
try:
client, _models = connect_qumo_ollama(OLLAMA_LOCATION, print_models=False)
model_select = mo.ui.dropdown(
options=_models,
value=_models[0],
label="Select Ollama Model to use",
searchable=True,
)
except Exception as e:
mo.md(f"Error connecting to Ollama server at `{OLLAMA_LOCATION}`: {e}")
model_select = None
client = None
model_select
return client, model_select
@app.cell
def _(mo, model_select, start_processing_btn, tag_select):
mo.stop(not tag_select.value or model_select is None, mo.md("Select tag to continue"))
start_processing_btn
return
@app.cell
def _(client, mo, model_select, pd, start_processing_btn, tags_df):
from utils import ollama_keyword_extraction, worker_extraction
# Wait for start processing button
mo.stop(not start_processing_btn.value, "Click button above to start processing")
if client is not None:
df = tags_df
# Run keyword extraction
df['keywords'] = df.progress_apply(
lambda row: pd.Series(ollama_keyword_extraction(
content=row['content'],
tag=row['tag'],
client=client,
model=model_select.value
)),
axis=1
)
else:
mo.md("Ollama client not available, See 4b) for loading data from xlsx.")
return (df,)
@app.cell(hide_code=True)
def _(KEYWORDS_FPATH, KEYWORD_FREQ_FPATH, df, mo, pd, start_processing_btn):
mo.stop(not start_processing_btn.value, "Click button above to process first")
df['keywords_txt'] = df['keywords'].apply(lambda kws: ', '.join(kws))
all_keywords_list = df['keywords'].tolist()
all_keywords_flat = [item for sublist in all_keywords_list for item in sublist]
# Calculate frequencies per keyword
keyword_freq = {}
for kw in all_keywords_flat:
if kw in keyword_freq:
keyword_freq[kw] += 1
else:
keyword_freq[kw] = 1
freq_df = pd.DataFrame.from_dict(keyword_freq, orient='index', columns=['frequency'])
freq_df.index.name = 'keyword'
freq_df.reset_index(inplace=True)
freq_df.sort_values(by='frequency', ascending=False, inplace=True)
# Save to Excel files
df[['id', 'tag', 'content', 'keywords_txt']].to_excel(
KEYWORDS_FPATH,
index=False
)
freq_df.to_excel(
KEYWORD_FREQ_FPATH,
index=False
)
mo.vstack([
mo.md(f"Keywords per-highlight saved to: `{KEYWORDS_FPATH}`"),
mo.md(f"Keyword frequencies saved to: `{KEYWORD_FREQ_FPATH}`")
])
return (freq_df,)
@app.cell(hide_code=True)
def _(KEYWORD_FREQ_FPATH, mo):
mo.md(rf"""
# 4b) [optional] Load data from `keyword_frequencies_{KEYWORD_FREQ_FPATH.name}`
""")
return
@app.cell(hide_code=True)
def _(KEYWORD_FREQ_FPATH, mo, start_processing_btn):
if start_processing_btn is not None: # Triggers re-execution of this cell when keyword extraction completes
pass
load_existing_btn = None
if KEYWORD_FREQ_FPATH.exists():
load_existing_btn = mo.ui.run_button(label=f"Load keywords from `{KEYWORD_FREQ_FPATH.name}`", kind='warn')
load_existing_btn
return (load_existing_btn,)
@app.cell(hide_code=True)
def _(KEYWORD_FREQ_FPATH, freq_df, load_existing_btn, pd):
if load_existing_btn is not None and load_existing_btn.value:
_fdf = pd.read_excel(KEYWORD_FREQ_FPATH, engine='openpyxl')
# Drop nan rows if any
_fdf.dropna(subset=['keyword', 'frequency'], inplace=True)
_fdf.sort_values(by='frequency', ascending=False, inplace=True)
_fdf.reset_index(drop=True, inplace=True)
print(f"Loaded `{KEYWORD_FREQ_FPATH}` successfully.")
frequency_df = _fdf
else:
frequency_df = freq_df
return (frequency_df,)
@app.cell(hide_code=True)
def _(mo):
mo.md(r"""
# 5) Wordcloud generation
""")
return
@app.cell(hide_code=True)
def _():
# Import all necessary libraries
import numpy as np
from os import path
from PIL import Image, ImageDraw
from wordcloud import WordCloud, STOPWORDS, ImageColorGenerator
import matplotlib.pyplot as plt
from utils import blue_color_func
import warnings
warnings.filterwarnings("ignore")
return Image, ImageDraw, WordCloud, blue_color_func, np, plt
@app.cell(hide_code=True)
def _(mo):
mo.md(r"""
## 5.1) Select threshold frequency
""")
return
@app.cell(hide_code=True)
def _(mo):
min_freq_select = mo.ui.number(start=1, stop=20, label="Threshold Minimum Keyword Frequency: ", value=2)
min_freq_select
return (min_freq_select,)
@app.cell(hide_code=True)
def _(mo, tag_select):
mo.md(rf"""
## 5.2) Inspect Keyword Dataset
1. Check the threshold is set correctly. If not, adjust accordingly
2. Read all the keywords and verify they are good. If not
- Add explicit exclusions if necessary below
- OR Rerun the keyword extraction above
Add words to this dict that should be ignored in the WordCloud for specific tags.
Make sure to create the correct key that matches the active selected tag:
Active selected tag = '`{tag_select.value.lower()}`'
""")
return
@app.cell(hide_code=True)
def _(frequency_df, min_freq_select, mo):
mo.stop('keyword' not in frequency_df.columns, "Waiting for keyword extraction to finish")
MIN_FREQ = min_freq_select.value
_freq_df_filtered = frequency_df.loc[frequency_df['frequency'] >= MIN_FREQ].copy()
table_selection = mo.ui.table(_freq_df_filtered, page_size=50)
table_selection
return MIN_FREQ, table_selection
@app.cell(hide_code=True)
def _(mo, table_selection):
remove_rows_btn = None
if len(table_selection.value) >0 :
remove_rows_btn = mo.ui.run_button(label="Click to remove selected keywords and update xlsx")
remove_rows_btn
return (remove_rows_btn,)
@app.cell(hide_code=True)
def _(KEYWORD_FREQ_FPATH, frequency_df, mo, remove_rows_btn, table_selection):
_s = None
if remove_rows_btn is not None and remove_rows_btn.value:
# get selected rows
selected_rows = table_selection.value
if len(selected_rows) >0 :
rows_to_drop = table_selection.value.index.tolist()
try:
frequency_df.drop(index=rows_to_drop, inplace=True, axis=0)
except KeyError:
_s = mo.callout("GO BACK TO STEP 4b) and reload data to continue refining the dataset.", kind='warn')
else:
# Save updated frequencies back to xlsx
frequency_df.to_excel(
KEYWORD_FREQ_FPATH,
index=False
)
print(f"Updated keyword frequencies saved to: `{KEYWORD_FREQ_FPATH}`")
# mo.callout(f"Updated keyword frequencies saved to: `{KEYWORD_FREQ_FPATH}`", kind="success")
_s = mo.callout("GO BACK TO STEP 4b) and reload data to continue refining the dataset.", kind='warn')
_s
return
@app.cell(hide_code=True)
def _():
IGNORE_WORDS = {
'chase as a brand': [
"brand",
"banking experience",
"banking",
"chase",
"jpmorgan",
"youthful",
"customer service",
"customer service focused",
"great brand",
],
'why customer chase': [
"customer service",
"customer loyalty",
"chase",
"chase customer",
"banking experience",
],
'chase as a person (personification)': [
"CPC1"
]
# <active-selected-tag>: [list, of, words, to, ignore]
}
return (IGNORE_WORDS,)
@app.cell(hide_code=True)
def _(mo):
buffer = -100 # Adjust this to increase/decrease space between logo and words
canvas_size = (1200, 800)
logo_switch = mo.ui.switch(label="Include Chase Logo", value=False)
n_words = mo.ui.slider(start=10, stop=200, step=1, value=40, debounce=True, show_value=True, label="Max number of words in WordCloud")
return buffer, canvas_size, logo_switch, n_words
@app.cell(hide_code=True)
def _(logo_switch, mo, n_words):
run_wordcloud_btn = mo.ui.run_button(label="Generate WordCloud")
mo.vstack([
mo.md("## 5.4) Generate WordCloud with/without Logo"),
mo.md("""Use these buttons to iteratively (re)generate the WordCloud until it looks nice.
Placement and color of words is randomized, size is proportional to frequency.
When satisfied with the result, click 'Save WordCloud to File' to save the image."""),
mo.md('---'),
mo.hstack([logo_switch, n_words, run_wordcloud_btn], align='center', justify='space-around')]
)
return (run_wordcloud_btn,)
@app.cell(hide_code=True)
def _(
IGNORE_WORDS,
Image,
ImageDraw,
MIN_FREQ,
WordCloud,
blue_color_func,
buffer,
canvas_size,
frequency_df,
logo_switch,
mo,
n_words,
np,
plt,
run_wordcloud_btn,
tag_select,
):
if run_wordcloud_btn.value:
pass
freq_df_filtered = frequency_df.loc[frequency_df['frequency'] >= MIN_FREQ].copy()
# freq_df_filtered.reset_index(drop=True, inplace=True)
keyword_freq_filtered = freq_df_filtered.set_index('keyword')['frequency'].to_dict()
# remove specific keywords depending on selected tag
if IGNORE_WORDS.get(tag_select.value.lower()):
for word in IGNORE_WORDS[tag_select.value.lower()]:
if word in keyword_freq_filtered:
del keyword_freq_filtered[word]
if logo_switch.value:
# 1. Load the logo
# Make sure this path points to your uploaded file
logo_path = "./assets/JP-Morgan-Chase-Symbol.png"
logo = Image.open(logo_path).convert("RGBA")
# Optional: Resize logo if it's too large or small for the canvas
# target_width = 600
# ratio = target_width / logo.width
# logo = logo.resize((target_width, int(logo.height * ratio)), Image.Resampling.LANCZOS)
target_width = 600 # Set a reasonable size for the logo
if logo.width > target_width:
ratio = target_width / logo.width
new_height = int(logo.height * ratio)
# Use Image.Resampling.LANCZOS for high-quality downsampling
# If you get an error, try Image.LANCZOS or Image.ANTIALIAS
logo = logo.resize((target_width, new_height), Image.Resampling.LANCZOS)
# 3. Create the mask (0 = draw here, 255 = don't draw here)
# Initialize with 0 (black/draw everywhere)
mask_image = Image.new("L", canvas_size, 0)
draw = ImageDraw.Draw(mask_image)
# 4. Draw a protected circular area in the center
center = (canvas_size[0] // 2, canvas_size[1] // 2)
# Calculate radius: half of logo max dimension + buffer
radius = (max(logo.size) // 2) + buffer
# Draw the white circle (255) which the WordCloud will avoid
draw.ellipse(
(center[0] - radius, center[1] - radius, center[0] + radius, center[1] + radius),
fill=255
)
chase_mask = np.array(mask_image)
# Generate the WordCloud
wordcloud = WordCloud(
background_color='white',
width=canvas_size[0],
height=canvas_size[1],
max_font_size=100, # Increased font size for larger canvas
max_words=n_words.value, # Increased word count to fill space
color_func=blue_color_func,
mask=chase_mask, # Apply the circular mask
contour_width=0,
contour_color='steelblue'
).generate_from_frequencies(keyword_freq_filtered)
else:
# Generate the WordCloud
wordcloud = WordCloud(
background_color='white',
width=canvas_size[0],
height=canvas_size[1],
max_font_size=150, # Increased font size for larger canvas
max_words=n_words.value, # Increased word count to fill space
color_func=blue_color_func,
# mask=chase_mask, # Apply the circular mask
# contour_width=0,
# contour_color='steelblue'
).generate_from_frequencies(keyword_freq_filtered)
# Convert WordCloud to Image to composite the logo
wc_image = wordcloud.to_image()
if logo_switch.value:
# Calculate position to center the logo
logo_pos = (
(canvas_size[0] - logo.width) // 2,
(canvas_size[1] - logo.height) // 2
)
# Paste logo (using alpha channel as mask to keep transparency)
wc_image.paste(logo, logo_pos, logo)
# Display the generated image
fig = plt.figure(figsize=(7,7))
# Display the generated image:
plt.imshow(wc_image, interpolation='bilinear')
plt.axis("off")
plt.show()
save_wordcloud_btn = None
save_wordcloud_btn = mo.ui.button(
label="Save WordCloud to File",
kind="warn",
on_click=lambda val: True
)
save_wordcloud_btn
return save_wordcloud_btn, wc_image
@app.cell(hide_code=True)
def _(SAVE_DIR, mo, save_wordcloud_btn, tag_fname, wc_image):
# Wait for start processing button
mo.stop(not save_wordcloud_btn.value, "Click button above to save wordcloud image")
filename = f'wordcloud_{tag_fname}.png'
fpath = SAVE_DIR / filename
# add a (increasing) number to the filename so we can save multiple. find the latest in the directory first
existing_files = list(SAVE_DIR.glob(f'wordcloud_{tag_fname}*.png'))
if existing_files:
existing_numbers = []
for ef in existing_files:
parts = ef.stem.split('_')
if len(parts) > 2 and parts[-1].isdigit():
existing_numbers.append(int(parts[-1]))
if existing_numbers:
next_number = max(existing_numbers) + 1
else:
next_number = 1
fpath = SAVE_DIR / f'wordcloud_{tag_fname}_{next_number}.png'
wc_image.save(fpath)
mo.md(f"Wordcloud saved to: {fpath}")
return
if __name__ == "__main__":
app.run()