import marimo __generated_with = "0.18.3" app = marimo.App(width="medium") @app.cell def _(): import marimo as mo import pandas as pd import modin.pandas as mpd from tqdm import tqdm from pathlib import Path from datetime import datetime from utils import connect_qumo_ollama OLLAMA_LOCATION= 'localhost' # VM_NAME = 'ollama-lite' # initialize tqdm for pandas tqdm.pandas() client, _models = connect_qumo_ollama(OLLAMA_LOCATION, print_models=False) TAGUETTE_EXPORT_DIR = Path('./data/processing/02_taguette_export') WORKING_DIR = Path('./data/processing/02-b_WordClouds') if not WORKING_DIR.exists(): WORKING_DIR.mkdir(parents=True) if not TAGUETTE_EXPORT_DIR.exists(): TAGUETTE_EXPORT_DIR.mkdir(parents=True) model_select = mo.ui.dropdown( options=_models, value=_models[0], label="Select Ollama Model to use", searchable=True, ) model_select return TAGUETTE_EXPORT_DIR, WORKING_DIR, client, mo, model_select, pd @app.cell(hide_code=True) def _(mo): mo.md(r""" # 1) Export Data out of Taguette **Highlights** 1. Go to: https://taguette.qumo.io/project/1 2. Select 'Highlights' (left side) > 'See all hightlights' > 'Export this view' (top right) > 'CSV' 3. Save to '{TAGUETTE_EXPORT_DIR}/all_tags.csv' **Tags Codebook** 1. Select 'Project Info' (left side) > 'Export codebook' > 'CSV' 2. Save to '{TAGUETTE_EXPORT_DIR}/codebook.csv' _NOTE: Sometimes you need to explicitly allow 'Unsafe Download' in the browser's download manager_ """) return @app.cell(hide_code=True) def _(mo): mo.md(r""" # 2) Import Data """) return @app.cell def _(TAGUETTE_EXPORT_DIR, pd): all_tags_df = pd.read_csv(f'{TAGUETTE_EXPORT_DIR}/all_tags.csv') all_tags_df['_seq_id'] = range(len(all_tags_df)) # all_tags_df return (all_tags_df,) @app.cell def _(all_tags_df): # get count of rows per tag tag_counts = all_tags_df['tag'].value_counts().reset_index() # tag_counts return @app.cell def _(TAGUETTE_EXPORT_DIR, pd): codebook_df = pd.read_csv(f'{TAGUETTE_EXPORT_DIR}/codebook.csv') codebook_df.rename(columns={'description': 'theme_description'}, inplace=True) # codebook_df return @app.cell(hide_code=True) def _(mo): mo.md(r""" # 3) Select Tag for processing """) return @app.cell(hide_code=True) def _(all_tags_df, mo): tag_select = mo.ui.dropdown( options=all_tags_df['tag'].unique().tolist(), label="Select Tag to Process", # value="Chase as a brand", full_width=True, ) tag_select return (tag_select,) @app.cell def _(WORKING_DIR, all_tags_df, mo, tag_select): mo.stop(not tag_select.value, mo.md("Select tag to continue")) start_processing_btn = None start_processing_btn = mo.ui.button( label="Start Keyword Extraction", kind="warn", on_click=lambda val: True ) tag_fname = tag_select.value.replace(" ", "-").replace('/','-') SAVE_DIR = WORKING_DIR / tag_fname if not SAVE_DIR.exists(): SAVE_DIR.mkdir(parents=True) KEYWORDS_FPATH = SAVE_DIR / f'keywords_per-highlight_{tag_fname}.xlsx' KEYWORD_FREQ_FPATH = SAVE_DIR / f'keyword_frequencies_{tag_fname}.xlsx' # filter all_tags_df to only the document = file_dropdown.value tags_df = all_tags_df.loc[all_tags_df['tag'] == tag_select.value].copy() tags_df return ( KEYWORDS_FPATH, KEYWORD_FREQ_FPATH, SAVE_DIR, start_processing_btn, tag_fname, tags_df, ) @app.cell(hide_code=True) def _(mo): mo.md(r""" # 4) Keyword extraction """) return @app.cell def _(mo, start_processing_btn, tag_select): mo.stop(not tag_select.value, mo.md("Select tag to continue")) start_processing_btn return @app.cell def _(client, mo, model_select, pd, start_processing_btn, tags_df): from utils import ollama_keyword_extraction, worker_extraction # Wait for start processing button mo.stop(not start_processing_btn.value, "Click button above to start processing") df = tags_df # Run keyword extraction df['keywords'] = df.progress_apply( lambda row: pd.Series(ollama_keyword_extraction( content=row['content'], tag=row['tag'], client=client, model=model_select.value )), axis=1 ) return (df,) @app.cell def _(KEYWORDS_FPATH, KEYWORD_FREQ_FPATH, df, mo, pd, start_processing_btn): mo.stop(not start_processing_btn.value, "Click button above to process first") df['keywords_txt'] = df['keywords'].apply(lambda kws: ', '.join(kws)) all_keywords_list = df['keywords'].tolist() all_keywords_flat = [item for sublist in all_keywords_list for item in sublist] # Calculate frequencies per keyword keyword_freq = {} for kw in all_keywords_flat: if kw in keyword_freq: keyword_freq[kw] += 1 else: keyword_freq[kw] = 1 freq_df = pd.DataFrame.from_dict(keyword_freq, orient='index', columns=['frequency']) freq_df.index.name = 'keyword' freq_df.reset_index(inplace=True) freq_df.sort_values(by='frequency', ascending=False, inplace=True) # Save to Excel files df[['id', 'tag', 'content', 'keywords_txt']].to_excel( KEYWORDS_FPATH, index=False ) freq_df.to_excel( KEYWORD_FREQ_FPATH, index=False ) mo.vstack([ mo.md(f"Keywords per-highlight saved to: `{KEYWORDS_FPATH}`"), mo.md(f"Keyword frequencies saved to: `{KEYWORD_FREQ_FPATH}`") ]) return (freq_df,) @app.cell(hide_code=True) def _(mo): mo.md(r""" # 4b) [optional] Load data from `keyword_frequencies_*.xlsx` """) return @app.cell(hide_code=True) def _(KEYWORD_FREQ_FPATH, mo): load_existing_btn = None if KEYWORD_FREQ_FPATH.exists(): load_existing_btn = mo.ui.run_button(label=f"Load keywords from `{KEYWORD_FREQ_FPATH.name}`") load_existing_btn return (load_existing_btn,) @app.cell(hide_code=True) def _(KEYWORD_FREQ_FPATH, freq_df, load_existing_btn, pd): if load_existing_btn.value: _fdf = pd.read_excel(KEYWORD_FREQ_FPATH, engine='openpyxl') # Drop nan rows if any _fdf.dropna(subset=['keyword', 'frequency'], inplace=True) _fdf.sort_values(by='frequency', ascending=False, inplace=True) _fdf.reset_index(drop=True, inplace=True) print(f"Loaded `{KEYWORD_FREQ_FPATH}` successfully.") frequency_df = _fdf else: frequency_df = freq_df return (frequency_df,) @app.cell(hide_code=True) def _(mo): mo.md(r""" # 5) Wordcloud generation """) return @app.cell(hide_code=True) def _(): # Import all necessary libraries import numpy as np from os import path from PIL import Image, ImageDraw from wordcloud import WordCloud, STOPWORDS, ImageColorGenerator import matplotlib.pyplot as plt from utils import blue_color_func import warnings warnings.filterwarnings("ignore") return Image, ImageDraw, WordCloud, blue_color_func, np, plt @app.cell(hide_code=True) def _(mo): mo.md(r""" ## 5.1) Select threshold frequency """) return @app.cell(hide_code=True) def _(mo): min_freq_select = mo.ui.number(start=1, stop=20, label="Threshold Minimum Keyword Frequency: ", value=2) min_freq_select return (min_freq_select,) @app.cell(hide_code=True) def _(freq_df, frequency_df, min_freq_select, mo): mo.stop('keyword' not in freq_df.columns, "Waiting for keyword extraction to finish") MIN_FREQ = min_freq_select.value freq_df_filtered = frequency_df.loc[freq_df['frequency'] >= MIN_FREQ] freq_df_filtered.reset_index(drop=True, inplace=True) keyword_freq_filtered = freq_df_filtered.set_index('keyword')['frequency'].to_dict() table_selection = mo.ui.table(freq_df_filtered, page_size=50) table_selection # keyword_freq_filtered = {kw: freq for kw, freq in keyword_freq.items() if freq >= MIN_FREQ} # # create list of keywords sorted by their frequencies. only store the keyword # sorted_keywords = sorted(keyword_freq_filtered.items(), key=lambda x: x[1], reverse=True) # sorted_keywords_list = [f"{kw}:{freq}" for kw, freq in sorted_keywords] # sorted_keywords_list return (keyword_freq_filtered,) @app.cell(hide_code=True) def _(mo, tag_select): mo.md(rf""" ## 5.2) Inspect Keyword Dataset 1. Check the threshold is set correctly. If not, adjust accordingly 2. Read all the keywords and verify they are good. If not - Add explicit exclusions if necessary below - OR Rerun the keyword extraction above Add words to this dict that should be ignored in the WordCloud for specific tags. Make sure to create the correct key that matches the active selected tag: Active selected tag = '`{tag_select.value.lower()}`' """) return @app.cell def _(): IGNORE_WORDS = { 'chase as a brand': [ "brand", "banking experience", "banking", "chase", "jpmorgan", "youthful", "customer service", "customer service focused", "great brand", ], 'why customer chase': [ "customer service", "customer loyalty", "chase", "chase customer", "banking experience", ], 'chase as a person (personification)': [ "CPC1" ] # : [list, of, words, to, ignore] } return (IGNORE_WORDS,) @app.cell(hide_code=True) def _(mo): buffer = -100 # Adjust this to increase/decrease space between logo and words canvas_size = (1200, 800) logo_switch = mo.ui.switch(label="Include Chase Logo", value=False) return buffer, canvas_size, logo_switch @app.cell(hide_code=True) def _(logo_switch, mo): run_wordcloud_btn = mo.ui.run_button(label="Generate WordCloud") mo.vstack([ mo.md("## 5.4) Generate WordCloud with/without Logo"), mo.md("""Use these buttons to iteratively (re)generate the WordCloud until it looks nice. Placement and color of words is randomized, size is proportional to frequency. When satisfied with the result, click 'Save WordCloud to File' to save the image."""), mo.md('---'), mo.hstack([logo_switch, run_wordcloud_btn], align='center', justify='space-around')] ) return (run_wordcloud_btn,) @app.cell(hide_code=True) def _( IGNORE_WORDS, Image, ImageDraw, WordCloud, blue_color_func, buffer, canvas_size, keyword_freq_filtered, logo_switch, mo, np, plt, run_wordcloud_btn, tag_select, ): if run_wordcloud_btn.value: pass # remove specific keywords depending on selected tag if IGNORE_WORDS.get(tag_select.value.lower()): for word in IGNORE_WORDS[tag_select.value.lower()]: if word in keyword_freq_filtered: del keyword_freq_filtered[word] if logo_switch.value: # 1. Load the logo # Make sure this path points to your uploaded file logo_path = "./data/assets/JP-Morgan-Chase-Symbol.png" logo = Image.open(logo_path).convert("RGBA") # Optional: Resize logo if it's too large or small for the canvas # target_width = 600 # ratio = target_width / logo.width # logo = logo.resize((target_width, int(logo.height * ratio)), Image.Resampling.LANCZOS) target_width = 600 # Set a reasonable size for the logo if logo.width > target_width: ratio = target_width / logo.width new_height = int(logo.height * ratio) # Use Image.Resampling.LANCZOS for high-quality downsampling # If you get an error, try Image.LANCZOS or Image.ANTIALIAS logo = logo.resize((target_width, new_height), Image.Resampling.LANCZOS) # 3. Create the mask (0 = draw here, 255 = don't draw here) # Initialize with 0 (black/draw everywhere) mask_image = Image.new("L", canvas_size, 0) draw = ImageDraw.Draw(mask_image) # 4. Draw a protected circular area in the center center = (canvas_size[0] // 2, canvas_size[1] // 2) # Calculate radius: half of logo max dimension + buffer radius = (max(logo.size) // 2) + buffer # Draw the white circle (255) which the WordCloud will avoid draw.ellipse( (center[0] - radius, center[1] - radius, center[0] + radius, center[1] + radius), fill=255 ) chase_mask = np.array(mask_image) # Generate the WordCloud wordcloud = WordCloud( background_color='white', width=canvas_size[0], height=canvas_size[1], max_font_size=100, # Increased font size for larger canvas max_words=20, # Increased word count to fill space color_func=blue_color_func, mask=chase_mask, # Apply the circular mask contour_width=0, contour_color='steelblue' ).generate_from_frequencies(keyword_freq_filtered) else: # Generate the WordCloud wordcloud = WordCloud( background_color='white', width=canvas_size[0], height=canvas_size[1], max_font_size=150, # Increased font size for larger canvas max_words=20, # Increased word count to fill space color_func=blue_color_func, # mask=chase_mask, # Apply the circular mask # contour_width=0, # contour_color='steelblue' ).generate_from_frequencies(keyword_freq_filtered) # Convert WordCloud to Image to composite the logo wc_image = wordcloud.to_image() if logo_switch.value: # Calculate position to center the logo logo_pos = ( (canvas_size[0] - logo.width) // 2, (canvas_size[1] - logo.height) // 2 ) # Paste logo (using alpha channel as mask to keep transparency) wc_image.paste(logo, logo_pos, logo) # Display the generated image fig = plt.figure(figsize=(7,7)) # Display the generated image: plt.imshow(wc_image, interpolation='bilinear') plt.axis("off") plt.show() save_wordcloud_btn = None save_wordcloud_btn = mo.ui.button( label="Save WordCloud to File", kind="warn", on_click=lambda val: True ) save_wordcloud_btn return save_wordcloud_btn, wc_image @app.cell(hide_code=True) def _(SAVE_DIR, mo, save_wordcloud_btn, tag_fname, wc_image): # Wait for start processing button mo.stop(not save_wordcloud_btn.value, "Click button above to save wordcloud image") filename = f'wordcloud_{tag_fname}.png' fpath = SAVE_DIR / filename # add a (increasing) number to the filename so we can save multiple. find the latest in the directory first existing_files = list(SAVE_DIR.glob(f'wordcloud_{tag_fname}*.png')) if existing_files: existing_numbers = [] for ef in existing_files: parts = ef.stem.split('_') if len(parts) > 2 and parts[-1].isdigit(): existing_numbers.append(int(parts[-1])) if existing_numbers: next_number = max(existing_numbers) + 1 else: next_number = 1 fpath = SAVE_DIR / f'wordcloud_{tag_fname}_{next_number}.png' wc_image.save(fpath) mo.md(f"Wordcloud saved to: {fpath}") return if __name__ == "__main__": app.run()