import click from transformers import AutoTokenizer, RobertaModel import numpy as np from data import Data, from_db, connect, data_dir from tqdm import tqdm import torch from pathlib import Path @click.command(name="word:max-sequence") def max_sequence(): """calculate the maximum token length given the story titles""" db = connect() longest = db.sql(""" select title from stories order by length(title) desc limit 5000 """).df() db.close() tokenizer = AutoTokenizer.from_pretrained("roberta-base") tokens = tokenizer(longest['title'].to_list()) print(f"{max([len(x) for x in tokens['input_ids']])}") @click.command(name="word:train") def train(): """TODO""" table = from_db(Data.Titles) n_classes = 10 @click.command(name="word:embed") @click.option('-c', '--chunks', type=int, default=5000, show_default=True) @click.option('--embedding_dir', help="path to save embeddings as np array", type=Path, default=Path(data_dir() / 'embeddings'), show_default=True) @click.option('--token_dir', help="path to save tokens as np array", type=Path, default=Path(data_dir() / 'tokens'), show_default=True) @click.option('--device', help="device to process data on", type=str, default="cuda:0", show_default=True) def embed(chunks, embedding_dir, token_dir, device): """ given titles, generate tokens and word embeddings and saves to disk """ # init models device = torch.device(device) tokenizer = AutoTokenizer.from_pretrained("roberta-base") model = RobertaModel.from_pretrained("roberta-base") model.to(device) # load data db = connect() table = db.sql(""" select title from stories order by id desc """).df() db.close() # normalize text table['title'] = table['title'].str.normalize('NFKD').str.encode('ascii', errors='ignore').str.decode('utf-8') # generate embeddings from list of titles chunks = np.array_split(table['title'].to_numpy(), chunks) chunk_iter = tqdm(chunks, 'embedding') for i, chunk in enumerate(chunk_iter): # create tokens, padding to max width tokens = tokenizer(chunk.tolist(), add_special_tokens = True, truncation = True, padding = "max_length", max_length=92, return_attention_mask = True, return_tensors = "pt") tokens = tokens.to(device) with torch.no_grad(): outputs = model(**tokens) # to disk hidden = outputs.last_hidden_state.to(torch.device('cpu')).detach().numpy() np.save(embedding_dir / f"embedding_{i}.npy", hidden) tokens = tokens.to(torch.device('cpu')) np.save(token_dir / f"token_{i}.npy", tokens) @click.command(name="word:distance") def distance(): """TODO: measure distance between sequence embeddings""" distances = distance.cdist(classes, classes, 'euclidean') np.fill_diagonal(distances, np.inf) min_index = (np.argmin(distances)) closest = np.unravel_index(min_index, distances.shape) distances.flatten().shape # path = data_dir() / 'embeddings' # chunks = [x for x in path.iterdir() if x.match('*.npy')] # chunks = sorted(chunks, key=lambda x: int(x.stem.split('_')[1])) # # data = None # for i, f in enumerate(tqdm(chunks)): # loaded = np.load(f) # if data is None: # data = loaded # else: # data = np.concatenate([data, loaded]) # if i > 20: # break # # data.shape # # np.save(data, data_dir() / 'embeddings.npy')