-
Notifications
You must be signed in to change notification settings - Fork 36
/
audio_ds.py
143 lines (118 loc) · 5.7 KB
/
audio_ds.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
import logging
import os
from collections import defaultdict
from pathlib import Path
import librosa
import numpy as np
from python_speech_features import fbank
from tqdm import tqdm
from deepspeaker.constants import SAMPLE_RATE, NUM_FBANKS
from deepspeaker.utils import find_files, ensures_dir
logger = logging.getLogger(__name__)
# See https://github.com/jameslyons/python_speech_features/pull/76/commits/9ab32879b1fb31a38c1a70392fd21370b8fdc30f
def calculate_nfft(samplerate, winlen):
"""Calculates the FFT size as a power of two greater than or equal to
the number of samples in a single window length.
Having an FFT less than the window length loses precision by dropping
many of the samples; a longer FFT than the window allows zero-padding
of the FFT buffer which is neutral in terms of frequency domain conversion.
:param samplerate: The sample rate of the signal we are working with, in Hz.
:param winlen: The length of the analysis window in seconds.
"""
window_length_samples = winlen * samplerate
nfft = 1
while nfft < window_length_samples:
nfft *= 2
return nfft
def read_mfcc(audio, sample_rate, win_length):
energy = np.abs(audio)
silence_threshold = np.percentile(energy, 95)
offsets = np.where(energy > silence_threshold)[0]
# left_blank_duration_ms = (1000.0 * offsets[0]) // self.sample_rate # frame_id to duration (ms)
# right_blank_duration_ms = (1000.0 * (len(audio) - offsets[-1])) // self.sample_rate
# TODO: could use trim_silence() here or a better VAD.
audio_voice_only = audio[offsets[0]:offsets[-1]]
nfft = calculate_nfft(sample_rate, win_length /
sample_rate) # winlen in seconds
mfcc = mfcc_fbank(audio_voice_only, sample_rate, nfft)
return mfcc
def extract_speaker_and_utterance_ids(filename: str): # LIBRI.
# 'audio/dev-other/116/288045/116-288045-0000.flac'
speaker, _, basename = Path(filename).parts[-3:]
filename.split('-')
utterance = os.path.splitext(basename.split('-', 1)[-1])[0]
assert basename.split('-')[0] == speaker
return speaker, utterance
class Audio:
def __init__(self, cache_dir: str, audio_dir: str = None, sample_rate: int = SAMPLE_RATE, ext='flac'):
self.ext = ext
self.cache_dir = os.path.join(cache_dir, 'audio-fbanks')
ensures_dir(self.cache_dir)
if audio_dir is not None:
self.build_cache(os.path.expanduser(audio_dir), sample_rate)
self.speakers_to_utterances = defaultdict(dict)
for cache_file in find_files(self.cache_dir, ext='npy'):
# /path/to/speaker_utterance.npy
speaker_id, utterance_id = Path(cache_file).stem.split('_')
self.speakers_to_utterances[speaker_id][utterance_id] = cache_file
@property
def speaker_ids(self):
return sorted(self.speakers_to_utterances)
@staticmethod
def trim_silence(audio, threshold):
"""Removes silence at the beginning and end of a sample."""
energy = librosa.feature.rms(audio)
frames = np.nonzero(np.array(energy > threshold))
indices = librosa.core.frames_to_samples(frames)[1]
# Note: indices can be an empty array, if the whole audio was silence.
audio_trim = audio[0:0]
left_blank = audio[0:0]
right_blank = audio[0:0]
if indices.size:
audio_trim = audio[indices[0]:indices[-1]]
left_blank = audio[:indices[0]] # slice before.
right_blank = audio[indices[-1]:] # slice after.
return audio_trim, left_blank, right_blank
@staticmethod
def read(filename, sample_rate=SAMPLE_RATE):
audio, sr = librosa.load(
filename, sr=sample_rate, mono=True, dtype=np.float32)
assert sr == sample_rate
return audio
def build_cache(self, audio_dir, sample_rate):
logger.info(f'audio_dir: {audio_dir}.')
logger.info(f'sample_rate: {sample_rate:,} hz.')
audio_files = find_files(audio_dir, ext=self.ext)
audio_files_count = len(audio_files)
assert audio_files_count != 0, f'Could not find any {self.ext} files in {audio_dir}.'
logger.info(f'Found {audio_files_count:,} files in {audio_dir}.')
with tqdm(audio_files) as bar:
for audio_filename in bar:
bar.set_description(audio_filename)
self.cache_audio_file(audio_filename, sample_rate)
def cache_audio_file(self, input_filename, sample_rate):
sp, utt = extract_speaker_and_utterance_ids(input_filename)
cache_filename = os.path.join(self.cache_dir, f'{sp}_{utt}.npy')
if not os.path.isfile(cache_filename):
try:
mfcc = read_mfcc(input_filename, sample_rate)
np.save(cache_filename, mfcc)
except librosa.util.exceptions.ParameterError as e:
logger.error(e)
def pad_mfcc(mfcc, max_length): # num_frames, nfilt=64.
if len(mfcc) < max_length:
mfcc = np.vstack(
(mfcc, np.tile(np.zeros(mfcc.shape[1]), (max_length - len(mfcc), 1))))
return mfcc
def mfcc_fbank(signal: np.array, sample_rate: int, nfft): # 1D signal array.
# Returns MFCC with shape (num_frames, n_filters, 3).
filter_banks, energies = fbank(
signal, samplerate=sample_rate, nfilt=NUM_FBANKS, nfft=nfft)
frames_features = normalize_frames(filter_banks)
# delta_1 = delta(filter_banks, N=1)
# delta_2 = delta(delta_1, N=1)
# frames_features = np.transpose(np.stack([filter_banks, delta_1, delta_2]), (1, 2, 0))
# Float32 precision is enough here.
return np.array(frames_features, dtype=np.float32)
def normalize_frames(m, epsilon=1e-12):
return [(v - np.mean(v)) / max(np.std(v), epsilon) for v in m]