import marimo __generated_with = "0.18.3" app = marimo.App(width="medium") @app.cell def _(): import marimo as mo import pandas as pd import modin.pandas as mpd from tqdm import tqdm from pathlib import Path from datetime import datetime from utils import connect_qumo_ollama OLLAMA_LOCATION= 'localhost' # VM_NAME = 'ollama-lite' # initialize tqdm for pandas tqdm.pandas() client, _models = connect_qumo_ollama(OLLAMA_LOCATION, print_models=False) TAGUETTE_EXPORT_DIR = Path('./data/processing/02_taguette_export') WORKING_DIR = Path('./data/processing/02_taguette_postprocess') if not WORKING_DIR.exists(): WORKING_DIR.mkdir(parents=True) if not TAGUETTE_EXPORT_DIR.exists(): TAGUETTE_EXPORT_DIR.mkdir(parents=True) model_select = mo.ui.dropdown( options=_models, value=_models[0], label="Select Ollama Model to use", searchable=True, ) model_select return TAGUETTE_EXPORT_DIR, WORKING_DIR, client, mo, model_select, pd @app.cell(hide_code=True) def _(mo): mo.md(r""" # 1) Export Data out of Taguette **Highlights** 1. Go to: https://taguette.qumo.io/project/1 2. Select 'Highlights' (left side) > 'See all hightlights' > 'Export this view' (top right) > 'CSV' 3. Save to '{TAGUETTE_EXPORT_DIR}/all_tags.csv' **Tags Codebook** 1. Select 'Project Info' (left side) > 'Export codebook' > 'CSV' 2. Save to '{TAGUETTE_EXPORT_DIR}/codebook.csv' _NOTE: Sometimes you need to explicitly allow 'Unsafe Download' in the browser's download manager_ """) return @app.cell(hide_code=True) def _(mo): mo.md(r""" # 2) Import Data """) return @app.cell def _(TAGUETTE_EXPORT_DIR, pd): all_tags_df = pd.read_csv(f'{TAGUETTE_EXPORT_DIR}/all_tags.csv') all_tags_df['_seq_id'] = range(len(all_tags_df)) all_tags_df return (all_tags_df,) @app.cell def _(all_tags_df): # get count of rows per tag tag_counts = all_tags_df['tag'].value_counts().reset_index() tag_counts return @app.cell def _(TAGUETTE_EXPORT_DIR, pd): codebook_df = pd.read_csv(f'{TAGUETTE_EXPORT_DIR}/codebook.csv') codebook_df.rename(columns={'description': 'theme_description'}, inplace=True) codebook_df return @app.cell(hide_code=True) def _(mo): mo.md(r""" # 3) Select Tag for processing """) return @app.cell def _(all_tags_df, mo): tag_select = mo.ui.dropdown( options=all_tags_df['tag'].unique().tolist(), label="Select Tag to Process", value="Chase as a brand", full_width=True ) tag_select return (tag_select,) @app.cell def _(all_tags_df, mo, tag_select): mo.stop(not tag_select.value, mo.md("Select tag to continue")) # filter all_tags_df to only the document = file_dropdown.value df = all_tags_df.loc[all_tags_df['tag'] == tag_select.value].copy() df return (df,) @app.cell(hide_code=True) def _(mo): mo.md(r""" # 4) Keyword extraction """) return @app.cell def _(mo, tag_select): mo.stop(not tag_select.value, mo.md("Select tag to continue")) # mdf = mpd.from_pandas(df) start_processing_btn = mo.ui.button( label="Start Keyword Extraction", kind="warn", on_click=lambda val: True ) start_processing_btn return (start_processing_btn,) @app.cell def _( WORKING_DIR, client, df, mo, model_select, pd, start_processing_btn, tag_select, ): from utils import ollama_keyword_extraction, worker_extraction # Wait for start processing button mo.stop(not start_processing_btn.value, "Click button above to start processing") # Run keyword extraction df['keywords'] = df.progress_apply( lambda row: pd.Series(ollama_keyword_extraction( content=row['content'], tag=row['tag'], client=client, model=model_select.value )), axis=1 ) df['keywords_txt'] = df['keywords'].progress_apply(lambda kws: ', '.join(kws)) df[['id', 'tag', 'content', 'keywords_txt']].to_csv( WORKING_DIR / f'keywords_{tag_select.value.replace(" ", "-")}.csv', index=False ) return @app.cell(hide_code=True) def _(mo): mo.md(r""" # 5) Wordcloud generation """) return @app.cell def _(): # Start with loading all necessary libraries import numpy as np from os import path from PIL import Image from wordcloud import WordCloud, STOPWORDS, ImageColorGenerator import matplotlib.pyplot as plt import warnings warnings.filterwarnings("ignore") return Image, WordCloud, np, plt @app.cell def _(df): MIN_FREQ = 2 all_keywords_list = df['keywords'].tolist() all_keywords_flat = [item for sublist in all_keywords_list for item in sublist] keyword_freq = {} for kw in all_keywords_flat: if kw in keyword_freq: keyword_freq[kw] += 1 else: keyword_freq[kw] = 1 keyword_freq_filtered = {kw: freq for kw, freq in keyword_freq.items() if freq >= MIN_FREQ} # create list of keywords sorted by their frequencies. only store the keyword sorted_keywords = sorted(keyword_freq_filtered.items(), key=lambda x: x[1], reverse=True) sorted_keywords_list = [kw for kw, freq in sorted_keywords] sorted_keywords_list return (keyword_freq_filtered,) @app.cell def _(plt): import random import matplotlib.colors as mcolors def blue_color_func(word, font_size, position, orientation, random_state=None, **kwargs): # Use the provided random_state for reproducibility if available, else use random module r = random_state if random_state else random # Sample from the darker end of the 'Blues' colormap (e.g., 0.4 to 1.0) # 0.0 is white/light, 1.0 is dark blue min_val, max_val = 0.4, 1.0 color_val = r.uniform(min_val, max_val) # Get color from matplotlib colormap rgba = plt.cm.Blues(color_val) return mcolors.to_hex(rgba) return (blue_color_func,) @app.cell def _(Image, np): chase_mask = np.array(Image.open("./data/assets/Chase-National-Bank-Logo.png")) def transform_format(val): if val == 0: return 255 else: return 1 transformed_chase_mask = np.ndarray((chase_mask.shape[0], chase_mask.shape[1]), np.int32) for i in range(len(chase_mask)): transformed_chase_mask[i] = list(map(transform_format, chase_mask[i])) return @app.cell def _(WordCloud, blue_color_func, keyword_freq_filtered, mo, plt): wordcloud = WordCloud( background_color='white', width=800, max_font_size=60, max_words=20, # colormap='Blues', # relative_scaling=0.5, # Use rank in sorted frequency list instead of pure frequency color_func=blue_color_func, # mask=chase_mask # random_state=42 ).generate_from_frequencies(keyword_freq_filtered) # Display the generated image: plt.imshow(wordcloud, interpolation='bilinear') plt.axis("off") plt.show() save_wordcloud_btn = None save_wordcloud_btn = mo.ui.button( label="Save_wordcloud_button", kind="warn", on_click=lambda val: True ) save_wordcloud_btn return save_wordcloud_btn, wordcloud @app.cell def _(WORKING_DIR, mo, save_wordcloud_btn, tag_select, wordcloud): # Wait for start processing button mo.stop(not save_wordcloud_btn.value, "Click button above to save wordcloud image") filename = f'wordcloud_{tag_select.value.replace(" ", "-")}.png' fpath = WORKING_DIR / filename # add a (increasing) number to the filename so we can save multiple. find the latest in the directory first existing_files = list(WORKING_DIR.glob(f'wordcloud_{tag_select.value.replace(" ", "-")}*.png')) if existing_files: existing_numbers = [] for ef in existing_files: parts = ef.stem.split('_') if len(parts) > 2 and parts[-1].isdigit(): existing_numbers.append(int(parts[-1])) if existing_numbers: next_number = max(existing_numbers) + 1 else: next_number = 1 fpath = WORKING_DIR / f'wordcloud_{tag_select.value.replace(" ", "-")}_{next_number}.png' wordcloud.to_file(fpath) mo.md(f"Wordcloud saved to: {fpath}") return if __name__ == "__main__": app.run()