import json import math import os import numpy as np from torch.utils.data import Dataset from text import text_to_sequence from utils.tools import pad_1D, pad_2D class Dataset(Dataset): def __init__( self, filename, preprocess_config, model_config, train_config, sort=False, drop_last=False ): self.dataset_name = preprocess_config["dataset"] self.preprocessed_path = preprocess_config["path"]["preprocessed_path"] self.cleaners = preprocess_config["preprocessing"]["text"]["text_cleaners"] self.batch_size = train_config["optimizer"]["batch_size"] self.load_spker_embed = model_config["multi_speaker"] \ and preprocess_config["preprocessing"]["speaker_embedder"] != 'none' self.basename, self.speaker, self.text, self.raw_text = self.process_meta( filename ) with open(os.path.join(self.preprocessed_path, "speakers.json")) as f: self.speaker_map = json.load(f) self.sort = sort self.drop_last = drop_last def __len__(self): return len(self.text) def __getitem__(self, idx): basename = self.basename[idx] speaker = self.speaker[idx] speaker_id = self.speaker_map[speaker] raw_text = self.raw_text[idx] phone = np.array(text_to_sequence(self.text[idx], self.cleaners)) mel_path = os.path.join( self.preprocessed_path, "mel", "{}-mel-{}.npy".format(speaker, basename), ) mel = np.load(mel_path) pitch_path = os.path.join( self.preprocessed_path, "pitch", "{}-pitch-{}.npy".format(speaker, basename), ) pitch = np.load(pitch_path) # Phoneme Level ref_pitch_path = os.path.join( self.preprocessed_path, "pitch_frame", "{}-pitch-{}.npy".format(speaker, basename), ) ref_pitch = np.load(ref_pitch_path) # Frame Level energy_path = os.path.join( self.preprocessed_path, "energy", "{}-energy-{}.npy".format(speaker, basename), ) energy = np.load(energy_path) # Phoneme Level ref_energy_path = os.path.join( self.preprocessed_path, "energy_frame", "{}-energy-{}.npy".format(speaker, basename), ) ref_energy = np.load(ref_energy_path) # Frame Level duration_path = os.path.join( self.preprocessed_path, "duration", "{}-duration-{}.npy".format(speaker, basename), ) duration = np.load(duration_path) spker_embed = np.load(os.path.join( self.preprocessed_path, "spker_embed", "{}-spker_embed.npy".format(speaker), )) if self.load_spker_embed else None sample = { "id": basename, "speaker": speaker_id, "text": phone, "raw_text": raw_text, "mel": mel, "pitch": pitch, "energy": energy, "duration": duration, "spker_embed": spker_embed, "ref_pitch": ref_pitch, "ref_energy": ref_energy, } return sample def process_meta(self, filename): with open( os.path.join(self.preprocessed_path, filename), "r", encoding="utf-8" ) as f: name = [] speaker = [] text = [] raw_text = [] for line in f.readlines(): n, s, t, r = line.strip("\n").split("|") name.append(n) speaker.append(s) text.append(t) raw_text.append(r) return name, speaker, text, raw_text def reprocess(self, data, idxs): ids = [data[idx]["id"] for idx in idxs] speakers = [data[idx]["speaker"] for idx in idxs] texts = [data[idx]["text"] for idx in idxs] raw_texts = [data[idx]["raw_text"] for idx in idxs] mels = [data[idx]["mel"] for idx in idxs] pitches = [data[idx]["pitch"] for idx in idxs] ref_pitches = [data[idx]["ref_pitch"] for idx in idxs] energies = [data[idx]["energy"] for idx in idxs] ref_energies = [data[idx]["ref_energy"] for idx in idxs] durations = [data[idx]["duration"] for idx in idxs] spker_embeds = np.concatenate(np.array([data[idx]["spker_embed"] for idx in idxs]), axis=0) \ if self.load_spker_embed else None text_lens = np.array([text.shape[0] for text in texts]) mel_lens = np.array([mel.shape[0] for mel in mels]) speakers = np.array(speakers) texts = pad_1D(texts) mels = pad_2D(mels) pitches = pad_1D(pitches) ref_pitches = pad_1D(ref_pitches) energies = pad_1D(energies) ref_energies = pad_1D(ref_energies) durations = pad_1D(durations) return ( ids, raw_texts, speakers, texts, text_lens, max(text_lens), mels, mel_lens, max(mel_lens), pitches, energies, durations, spker_embeds, ref_pitches, ref_energies, ) def collate_fn(self, data): data_size = len(data) if self.sort: len_arr = np.array([d["text"].shape[0] for d in data]) idx_arr = np.argsort(-len_arr) else: idx_arr = np.arange(data_size) tail = idx_arr[len(idx_arr) - (len(idx_arr) % self.batch_size) :] idx_arr = idx_arr[: len(idx_arr) - (len(idx_arr) % self.batch_size)] idx_arr = idx_arr.reshape((-1, self.batch_size)).tolist() if not self.drop_last and len(tail) > 0: idx_arr += [tail.tolist()] output = list() for idx in idx_arr: output.append(self.reprocess(data, idx)) return output class TextDataset(Dataset): def __init__(self, filepath, preprocess_config, model_config): self.cleaners = preprocess_config["preprocessing"]["text"]["text_cleaners"] self.preprocessed_path = preprocess_config["path"]["preprocessed_path"] self.load_spker_embed = model_config["multi_speaker"] \ and preprocess_config["preprocessing"]["speaker_embedder"] != 'none' self.basename, self.speaker, self.text, self.raw_text = self.process_meta( filepath ) with open( os.path.join( preprocess_config["path"]["preprocessed_path"], "speakers.json" ) ) as f: self.speaker_map = json.load(f) def __len__(self): return len(self.text) def __getitem__(self, idx): basename = self.basename[idx] speaker = self.speaker[idx] speaker_id = self.speaker_map[speaker] raw_text = self.raw_text[idx] phone = np.array(text_to_sequence(self.text[idx], self.cleaners)) mel_path = os.path.join( self.preprocessed_path, "mel", "{}-mel-{}.npy".format(speaker, basename), ) mel = np.load(mel_path) ref_pitch_path = os.path.join( self.preprocessed_path, "pitch_frame", "{}-pitch-{}.npy".format(speaker, basename), ) ref_pitch = np.load(ref_pitch_path) # Frame Level ref_energy_path = os.path.join( self.preprocessed_path, "energy_frame", "{}-energy-{}.npy".format(speaker, basename), ) ref_energy = np.load(ref_energy_path) # Frame Level spker_embed = np.load(os.path.join( self.preprocessed_path, "spker_embed", "{}-spker_embed.npy".format(speaker), )) if self.load_spker_embed else None return (basename, speaker_id, phone, raw_text, mel, spker_embed, ref_pitch, ref_energy) def process_meta(self, filename): with open(filename, "r", encoding="utf-8") as f: name = [] speaker = [] text = [] raw_text = [] for line in f.readlines(): n, s, t, r = line.strip("\n").split("|") name.append(n) speaker.append(s) text.append(t) raw_text.append(r) return name, speaker, text, raw_text def collate_fn(self, data): ids = [d[0] for d in data] speakers = np.array([d[1] for d in data]) texts = [d[2] for d in data] raw_texts = [d[3] for d in data] mels = [d[4] for d in data] spker_embeds = np.concatenate(np.array([d[5] for d in data]), axis=0) \ if self.load_spker_embed else None ref_pitches = [d[6] for d in data] ref_energies = [d[7] for d in data] text_lens = np.array([text.shape[0] for text in texts]) mel_lens = np.array([mel.shape[0] for mel in mels]) texts = pad_1D(texts) mels = pad_2D(mels) ref_pitches = pad_1D(ref_pitches) ref_energies = pad_1D(ref_energies) return ( ids, raw_texts, speakers, texts, text_lens, max(text_lens), mels, mel_lens, max(mel_lens), spker_embeds, ref_pitches, ref_energies, )