Source code for src.test

from nussl.evaluation.si_sdr import ScaleInvariantSDR
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor
import os
from .. import algorithms, model
import inspect
import copy
import logging
import numpy as np
import yaml

[docs]class EvaluationRunner(object): """ Class for efficiently evaluating an algorithm across multiple threads for every mixture in a dataset. If the algorithm requires something that must happen in a single thread, then the execution of the algorithm can happen in two steps - once on a blocking thread, and then continue on a parallel thread by specifying use_blocking_executor as True: .. code-block:: python # run on a blocking executor (e.g. on one GPU), in self.blocking_func features = algorithm.extract_features() # run the rest on the main executor, using the result self.blocking_func # this is in self.run_func algorithm.run(features=features) algorithm.make_audio_signals() This class is built assuming that you are using NUSSL style algorithms for source separation. If you're evaluating on other tasks (e.g. sound classification or music transcription), then you can use new testers or edit this class to your liking. If you're editing the class, your new code likely belongs in either main_func, run_func, or blocking_func. Args: testers ([type]): [description] algorithm_config ([type]): [description] dataset ([type]): [description] output_path ([type]): [description] max_workers (int, optional): [description]. Defaults to 1. use_blocking_executor (bool, optional): [description]. Defaults to False. """ def __init__( self, testers, algorithm_config, dataset, output_path, max_workers=1, use_blocking_executor=False ): self.evaluation_executor = ThreadPoolExecutor(max_workers=max_workers) self.main_executor = ThreadPoolExecutor(max_workers=max_workers) self.dataset = dataset self.testers = testers self.algorithm_config = algorithm_config self.use_blocking_executor = use_blocking_executor self.output_path = os.path.join(output_path, 'results') os.makedirs(self.output_path, exist_ok=True) self.AlgorithmClass = getattr(algorithms, algorithm_config['class']) dummy_mixture = self.dataset.load_audio_files(self.dataset.files[0])[0] if self.use_blocking_executor: blocking_algorithm_config = copy.deepcopy( self.algorithm_config['args'] ) if 'use_cuda' in inspect.getargspec(self.AlgorithmClass).args: blocking_algorithm_config['use_cuda'] = True self.algorithm_config['args']['use_cuda'] = False args = inspect.getfullargspec(self.AlgorithmClass)[0] if 'extra_modules' in args: blocking_algorithm_config['extra_modules'] = model.extras self.blocking_algorithm = self.AlgorithmClass( dummy_mixture, **blocking_algorithm_config ) self.blocking_executor = ThreadPoolExecutor(max_workers=1)
[docs] def blocking_func(self, file_path): """ [summary] [extended_summary] Args: file_path ([type]): [description] Returns: [type]: [description] """ mixture, _, _ = self.dataset.load_audio_files(file_path) self.blocking_algorithm.set_audio_signal(mixture) self.blocking_algorithm._compute_spectrograms() features = self.blocking_algorithm.extract_features() return {'features': features}
[docs] def log_scores(self, scores): """ [summary] [extended_summary] Args: scores ([type]): [description] """ for key in scores: if key != 'permutation': logging_str = f"{key}: " for metric in scores[key]: logging_str += f"{metric} => {np.mean(scores[key][metric])}, " logging.info(logging_str)
[docs] def run_func(self, file_path, data=None): """ [summary] [extended_summary] Args: file_path ([type]): [description] data ([type], optional): [description]. Defaults to None. """ mixture, sources, labels = self.dataset.load_audio_files(file_path) classes = self.dataset.options['source_labels'] labels = [classes[np.argmax(l)] for l in labels] args = inspect.getfullargspec(self.AlgorithmClass)[0] algorithm_args = copy.deepcopy(self.algorithm_config['args']) if 'extra_modules' in args: algorithm_args['extra_modules'] = model.extras algorithm = self.AlgorithmClass(mixture, **algorithm_args) if data: for key in data: if key not in inspect.getargspec(algorithm.run).args: data.pop(key) algorithm.run(**data) else: algorithm.run() estimates = algorithm.make_audio_signals() tester_args = { 'true_sources_list': sources, 'estimated_sources_list': estimates, 'source_labels': labels } all_scores = [] try: for tester in self.testers: TestClass = tester[0] kwargs = tester[1] args = {} for k in tester_args: if k in inspect.getargspec(TestClass).args: args[k] = tester_args[k] args.update(kwargs) evaluator = TestClass(**args) scores = evaluator.evaluate() self.log_scores(scores) all_scores.append(scores) path_to_yml = os.path.join( self.output_path, os.path.splitext(os.path.basename(file_path))[0] + '.yml' ) logging.info(path_to_yml) with open(path_to_yml, 'w') as f: yaml.dump(all_scores, f) except: logging.exception()
[docs] def main_func(self, file_path): """ [summary] [extended_summary] Args: file_path ([type]): [description] """ data = None if self.use_blocking_executor: task = self.blocking_executor.submit( self.blocking_func, file_path ) data = task.result() self.evaluation_executor.submit( self.run_func, file_path, data )
[docs] def run(self): """ [summary] [extended_summary] """ for file_path in self.dataset.files: self.main_executor.submit(self.main_func, file_path) self.main_executor.shutdown(wait=True) self.evaluation_executor.shutdown(wait=True)