import json import math import os import numpy as np from torch.utils.data import Dataset from text import text_to_sequence from utils.tools import pad_1D, pad_2D, pad_3D class Dataset(Dataset): def __init__( self, filename, preprocess_config, model_config, train_config, sort=False, drop_last=False ): self.dataset_name = preprocess_config["dataset"] self.preprocessed_path = preprocess_config["path"]["preprocessed_path"] self.cleaners = preprocess_config["preprocessing"]["text"]["text_cleaners"] self.batch_size = train_config["optimizer"]["batch_size"] self.load_spker_embed = model_config["multi_speaker"] \ and preprocess_config["preprocessing"]["speaker_embedder"] != 'none' self.basename, self.speaker, self.text, self.raw_text = self.process_meta( filename ) with open(os.path.join(self.preprocessed_path, "speakers.json")) as f: self.speaker_map = json.load(f) self.sort = sort self.drop_last = drop_last def __len__(self): return len(self.text) def __getitem__(self, idx): basename = self.basename[idx] speaker = self.speaker[idx] speaker_id = self.speaker_map[speaker] raw_text = self.raw_text[idx] phone = np.array(text_to_sequence(self.text[idx], self.cleaners)) mel_path = os.path.join( self.preprocessed_path, "mel", "{}-mel-{}.npy".format(speaker, basename), ) mel = np.load(mel_path) duration_path = os.path.join( self.preprocessed_path, "duration", "{}-duration-{}.npy".format(speaker, basename), ) duration = np.load(duration_path) phones_per_word_path = os.path.join( self.preprocessed_path, "phones_per_word", "{}-phones_per_word-{}.npy".format(speaker, basename), ) phones_per_word = np.load(phones_per_word_path) spker_embed = np.load(os.path.join( self.preprocessed_path, "spker_embed", "{}-spker_embed.npy".format(speaker), )) if self.load_spker_embed else None attn_prior_path = os.path.join( self.preprocessed_path, "attn_prior", "{}-attn_prior-{}.npy".format(speaker, basename), ) attn_prior = np.load(attn_prior_path) sample = { "id": basename, "speaker": speaker_id, "text": phone, "raw_text": raw_text, "mel": mel, "duration": duration, "word_boundary": phones_per_word, "spker_embed": spker_embed, "attn_prior": attn_prior, } return sample def process_meta(self, filename): with open( os.path.join(self.preprocessed_path, filename), "r", encoding="utf-8" ) as f: name = [] speaker = [] text = [] raw_text = [] for line in f.readlines(): n, s, t, r = line.strip("\n").split("|") name.append(n) speaker.append(s) text.append(t) raw_text.append(r) return name, speaker, text, raw_text def reprocess(self, data, idxs): ids = [data[idx]["id"] for idx in idxs] speakers = [data[idx]["speaker"] for idx in idxs] texts = [data[idx]["text"] for idx in idxs] raw_texts = [data[idx]["raw_text"] for idx in idxs] mels = [data[idx]["mel"] for idx in idxs] durations = [data[idx]["duration"] for idx in idxs] word_boundaries = [data[idx]["word_boundary"] for idx in idxs] spker_embeds = np.concatenate(np.array([data[idx]["spker_embed"] for idx in idxs]), axis=0) \ if self.load_spker_embed else None attn_priors = [data[idx]["attn_prior"] for idx in idxs] text_w_lens = np.array([word_boundary.shape[0] for word_boundary in word_boundaries]) text_lens = np.array([text.shape[0] for text in texts]) mel_lens = np.array([mel.shape[0] for mel in mels]) speakers = np.array(speakers) texts = pad_1D(texts) mels = pad_2D(mels) durations = pad_1D(durations) word_boundaries = pad_1D(word_boundaries) attn_priors = pad_3D(attn_priors, len(idxs), max(text_lens), max(mel_lens)) return ( ids, raw_texts, speakers, texts, text_lens, max(text_lens), word_boundaries, text_w_lens, max(text_w_lens), spker_embeds, attn_priors, mels, mel_lens, max(mel_lens), durations, ) def collate_fn(self, data): data_size = len(data) if self.sort: len_arr = np.array([d["text"].shape[0] for d in data]) idx_arr = np.argsort(-len_arr) else: idx_arr = np.arange(data_size) tail = idx_arr[len(idx_arr) - (len(idx_arr) % self.batch_size):] idx_arr = idx_arr[: len(idx_arr) - (len(idx_arr) % self.batch_size)] idx_arr = idx_arr.reshape((-1, self.batch_size)).tolist() if not self.drop_last and len(tail) > 0: idx_arr += [tail.tolist()] output = list() for idx in idx_arr: output.append(self.reprocess(data, idx)) return output class TextDataset(Dataset): def __init__(self, filepath, preprocess_config, model_config): self.cleaners = preprocess_config["preprocessing"]["text"]["text_cleaners"] self.preprocessed_path = preprocess_config["path"]["preprocessed_path"] self.load_spker_embed = model_config["multi_speaker"] \ and preprocess_config["preprocessing"]["speaker_embedder"] != 'none' self.basename, self.speaker, self.text, self.raw_text = self.process_meta( filepath ) with open( os.path.join( preprocess_config["path"]["preprocessed_path"], "speakers.json" ) ) as f: self.speaker_map = json.load(f) def __len__(self): return len(self.text) def __getitem__(self, idx): basename = self.basename[idx] speaker = self.speaker[idx] speaker_id = self.speaker_map[speaker] raw_text = self.raw_text[idx] phone = np.array(text_to_sequence(self.text[idx], self.cleaners)) phones_per_word_path = os.path.join( self.preprocessed_path, "phones_per_word", "{}-phones_per_word-{}.npy".format(speaker, basename), ) phones_per_word = np.load(phones_per_word_path) spker_embed = np.load(os.path.join( self.preprocessed_path, "spker_embed", "{}-spker_embed.npy".format(speaker), )) if self.load_spker_embed else None return (basename, speaker_id, phone, raw_text, phones_per_word, spker_embed) def process_meta(self, filename): with open(filename, "r", encoding="utf-8") as f: name = [] speaker = [] text = [] raw_text = [] for line in f.readlines(): n, s, t, r = line.strip("\n").split("|") name.append(n) speaker.append(s) text.append(t) raw_text.append(r) return name, speaker, text, raw_text def collate_fn(self, data): ids = [d[0] for d in data] speakers = np.array([d[1] for d in data]) texts = [d[2] for d in data] raw_texts = [d[3] for d in data] text_lens = np.array([text.shape[0] for text in texts]) word_boundaries = [d[4] for d in data] text_w_lens = np.array([word_boundary.shape[0] for word_boundary in word_boundaries]) spker_embeds = np.concatenate(np.array([d[5] for d in data]), axis=0) \ if self.load_spker_embed else None texts = pad_1D(texts) word_boundaries = pad_1D(word_boundaries) return ( ids, raw_texts, speakers, texts, text_lens, max(text_lens), word_boundaries, text_w_lens, max(text_w_lens), spker_embeds )