From 70c8f563d9a720ae3a58e18558134fe59bdbe84e Mon Sep 17 00:00:00 2001 From: Daniel Guderian <dguderian@km3net.de> Date: Thu, 21 Jan 2021 18:41:36 +0100 Subject: [PATCH] added option to produce submittable bash scripts to do the concat + shuffle --- orcasong/tools/make_data_split.py | 113 +++++++++++++++--- .../example_make_data_split_config.toml | 18 ++- tests/data/test_make_data_split_config.toml | 6 + tests/test_make_data_split.py | 65 +++++++++- 4 files changed, 179 insertions(+), 23 deletions(-) diff --git a/orcasong/tools/make_data_split.py b/orcasong/tools/make_data_split.py index acab2d9..4a16baf 100644 --- a/orcasong/tools/make_data_split.py +++ b/orcasong/tools/make_data_split.py @@ -12,6 +12,17 @@ import random import numpy as np +def get_parser(): + parser = argparse.ArgumentParser( + description="Create datasets based on the run_id's." + "Use the config to add input folder and set the ranges." + "Outputs a list in an txt file that can be used to " + "concatenate the files specfied") + parser.add_argument( + 'config', type=str, + help="See example config for detailed information") + + return parser def get_all_ip_group_keys(cfg): """ @@ -283,18 +294,79 @@ def make_dsplit_list_files(cfg): print("----------------------------------------------") -def get_parser(): - parser = argparse.ArgumentParser( - description="Create datasets based on the run_id's." - "Use the config to add input folder and set the ranges." - "Outputs a list in an txt file that can be used to " - "concatenate the files specfied") - parser.add_argument( - 'config', type=str, - help="See example config for detailed information") + +def make_concatenate_and_shuffle_scripts(cfg): + """ + Function that writes qsub .sh files which concatenates all files inside the list files. + + Parameters + ---------- + cfg : dict + Dict that contains all configuration options and additional information. + + """ + + dirpath = cfg['output_file_folder'] + + if not os.path.exists(dirpath + '/logs'): # check if /logs folder exists, if not create it. + os.makedirs(dirpath + '/logs') + if not os.path.exists(dirpath + '/job_scripts'): # check if /job_scripts folder exists, if not create it. + os.makedirs(dirpath + '/job_scripts') + if not os.path.exists(dirpath + '/data_split'): # check if /data_split folder exists, if not create it. + os.makedirs(dirpath + '/data_split') - return parser - + #not available atm... + #chunksize = '' if cfg['chunksize'] is None else ' --chunksize ' + str(cfg['chunksize']) + #complib = '' if cfg['complib'] is None else ' --complib ' + str(cfg['complib']) + #complevel = '' if cfg['complevel'] is None else ' --complevel ' + str(cfg['complevel']) + + # make qsub .sh file for concatenating + for listfile_fpath in cfg['output_lists']: + listfile_fname = os.path.basename(listfile_fpath) + listfile_fname_wout_ext = os.path.splitext(listfile_fname)[0] + conc_outputfile_fpath = cfg['output_file_folder'] + '/data_split/' + listfile_fname_wout_ext + '.h5' + + fpath_bash_script = dirpath + '/job_scripts/concatenate_h5_' + listfile_fname_wout_ext + '.sh' + + with open(fpath_bash_script, 'w') as f: + f.write('#!/usr/bin/env bash\n') + f.write('\n') + f.write('source ' + cfg['venv_path'] + 'activate' + '\n') + f.write('\n') + f.write('# Concatenate the files in the list\n') + + f.write('concatenate ' + listfile_fpath + ' --outfile ' + conc_outputfile_fpath) + # at the moment it is not possible to set the comp opts like this+ chunksize + complib + complevel + + + # make qsub .sh file for shuffling + if cfg['shuffle_delete']: + delete_concatenate = ' --delete' + else: + delete_concatenate = '' + + for listfile_fpath in cfg['output_lists']: + listfile_fname = os.path.basename(listfile_fpath) + listfile_fname_wout_ext = os.path.splitext(listfile_fname)[0] + + # This is the input for the shuffle tool! + conc_outputfile_fpath = cfg['output_file_folder'] + '/data_split/' + listfile_fname_wout_ext + '.h5' + + fpath_bash_script = dirpath + '/job_scripts/shuffle_h5_' + listfile_fname_wout_ext + '.sh' + + with open(fpath_bash_script, 'w') as f: + f.write('#!/usr/bin/env bash\n') + f.write('\n') + f.write('source ' + cfg['venv_path'] + 'activate' + '\n') + f.write('\n') + f.write('# Shuffle the h5 file \n') + + f.write('postproc ' + conc_outputfile_fpath + delete_concatenate) + #time python shuffle/shuffle_h5.py' + #+ delete_flag_shuffle_tool + #+ chunksize + complib + complevel + + def main(): """ Main function to make the data split. @@ -310,8 +382,15 @@ def main(): cfg = toml.load(config_file) cfg['toml_filename'] = config_file - ip_group_keys = get_all_ip_group_keys(cfg) + #set some defaults/Nones - at the moment setting of the com opts is not available! + #if 'chunksize' not in cfg: cfg['chunksize'] = None + #if 'complib' not in cfg: cfg['complib'] = None + #if 'complevel' not in cfg: cfg['complevel'] = None + #read out all the input groups + ip_group_keys = get_all_ip_group_keys(cfg) + + #and now loop over input groups extracting info n_evts_total = 0 for key in ip_group_keys: print('Collecting information from input group ' + key) @@ -322,18 +401,22 @@ def main(): n_evts_total += cfg[key]['n_evts'] cfg['n_evts_total'] = n_evts_total + #print the extracted statistics print_input_statistics(cfg, ip_group_keys) if cfg['print_only'] is True: from sys import exit exit() - + for key in ip_group_keys: add_fpaths_for_data_split_to_cfg(cfg, key) - + + #create the list files make_dsplit_list_files(cfg) - + #create bash scripts that can be submitted to do the concatenation and shuffle + if cfg['make_qsub_bash_files'] is True: + make_concatenate_and_shuffle_scripts(cfg) if __name__ == '__main__': main() diff --git a/orcasong/tools/make_data_split_configs/example_make_data_split_config.toml b/orcasong/tools/make_data_split_configs/example_make_data_split_config.toml index 55f88ae..e654dd6 100644 --- a/orcasong/tools/make_data_split_configs/example_make_data_split_config.toml +++ b/orcasong/tools/make_data_split_configs/example_make_data_split_config.toml @@ -21,8 +21,16 @@ # xyzc_tight_0_train_0.list, xyzc_tight_0_validate_0.list, ... # print_only : bool # If only informationa about the input_groups should be printed, and no .list files should be made. -# -# +# shuffle_delete : bool +# True = the input file that will be deleted after the shuffling is finished. +# Option for the shuffle_h5 tool. +# venv_path : str +# Path to a virtualenv, e.g. "/home/hpc/capn/mppi033h/.virtualenv/python_3_env/" +# make_qsub_bash_files : bool +# true = Makes the cluster submission bash files needed to actually +# concatenate the files in the list files. + + # Input Group Parameters # ---------------------- # dir : str @@ -48,6 +56,12 @@ output_file_name = "test_list" print_only = false # only print information of your input_groups, don't make any .list files +shuffle_delete = true + +venv_path = "/sps/km3net/users/guderian/NN_stuff/deep_learning_source/venv_song/bin/" + +make_qsub_bash_files = true + # --- Main options ---# diff --git a/tests/data/test_make_data_split_config.toml b/tests/data/test_make_data_split_config.toml index 8cba374..111ced8 100644 --- a/tests/data/test_make_data_split_config.toml +++ b/tests/data/test_make_data_split_config.toml @@ -48,6 +48,12 @@ output_file_name = "test_list" print_only = false # only print information of your input_groups, don't make any .list files +make_qsub_bash_files = true + +venv_path = "/sps/km3net/users/guderian/NN_stuff/deep_learning_source/venv_song/bin/" + +shuffle_delete = true + # --- Main options ---# diff --git a/tests/test_make_data_split.py b/tests/test_make_data_split.py index f6ee1f5..311f73f 100644 --- a/tests/test_make_data_split.py +++ b/tests/test_make_data_split.py @@ -17,8 +17,15 @@ config_file = os.path.join(test_data_dir, "test_make_data_split_config.toml") #the list files that will be created list_file_dir = os.path.join(test_data_dir, "data_split_test_output", "conc_list_files") list_output_val = os.path.join(list_file_dir, "test_list_validate_0.txt") -list_output_train = os.path.join(list_file_dir, "test_list_train_0.txt") - +list_output_train = os.path.join("data_split_test_output", "conc_list_files", "test_list_train_0.txt") +#the scripts outputs +scripts_output_dir = os.path.join(test_data_dir, "data_split_test_output", "job_scripts") +concatenate_bash_script_train = os.path.join(scripts_output_dir, "concatenate_h5_test_list_train_0.sh") +concatenate_bash_script_val = os.path.join(scripts_output_dir, "concatenate_h5_test_list_validate_0.sh") +shuffle_bash_script_train = os.path.join(scripts_output_dir, "shuffle_h5_test_list_train_0.sh") +shuffle_bash_script_val = os.path.join(scripts_output_dir, "shuffle_h5_test_list_validate_0.sh") +#and the files that will be created from these scripts +concatenate_file = os.path.join("data_split_test_output", "data_split", "test_list_train_0.h5") class TestMakeDataSplit(TestCase): @@ -33,18 +40,28 @@ class TestMakeDataSplit(TestCase): 'processed_data_muon/processed_graph_muon.h5\n','processed_data_neutrino/processed_graph_neutrino.h5\n'] cls.file_path_list_val = ['processed_data_neutrino/processed_graph_neutrino.h5','processed_data_neutrino/processed_graph_neutrino.h5\n'] cls.n_events_list = [18,3] + cls.contents_concatenate_script = ['concatenate ' + list_output_train + ' --outfile ' + concatenate_file] + cls.contents_shuffle_script = ['postproc ' + concatenate_file + ' --delete'] + #create list_file_dir if not os.path.exists(list_file_dir): os.makedirs(list_file_dir) - - - + @classmethod def tearDownClass(cls): #remove the lists created os.remove(list_output_val) os.remove(list_output_train) + os.remove(concatenate_bash_script_train) + os.remove(concatenate_bash_script_val) + os.remove(shuffle_bash_script_train) + os.remove(shuffle_bash_script_val) + os.removedirs(scripts_output_dir) os.removedirs(list_file_dir) + os.removedirs(os.path.join(test_data_dir, "data_split_test_output", "logs")) + os.removedirs(os.path.join(test_data_dir, "data_split_test_output", "data_split")) + + def test_read_keys_off_config(self): self.cfg = read_config(config_file) @@ -77,14 +94,50 @@ class TestMakeDataSplit(TestCase): mds.make_dsplit_list_files(self.cfg) #assert the single output lists + assert os.path.exists(list_output_val) == 1 with open(list_output_val) as f: for line in f: self.assertIn(line,self.file_path_list_val) f.close + + assert os.path.exists(list_output_train) == 1 with open(list_output_train) as f2: for line in f2: self.assertIn(line,self.file_path_list) + f2.close + + def test_make_concatenate_and_shuffle_scripts(self): + #main + #repeat first 4 steps + self.cfg = read_config(config_file) + self.ip_group_keys = mds.get_all_ip_group_keys(self.cfg) + self.cfg,self.n_evts_total = update_cfg(self.cfg) + + self.cfg['n_evts_total'] = self.n_evts_total + mds.print_input_statistics(self.cfg, self.ip_group_keys) + for key in self.ip_group_keys: + mds.add_fpaths_for_data_split_to_cfg(self.cfg, key) + mds.make_dsplit_list_files(self.cfg) + + #create the bash job scripts and test their content + mds.make_concatenate_and_shuffle_scripts(self.cfg) + + assert os.path.exists(concatenate_bash_script_train) == 1 + with open(concatenate_bash_script_train) as f: + for line in f: + pass #yay, awesome style! ^^ + last_line = line + self.assertIn(last_line,self.contents_concatenate_script) f.close + + assert os.path.exists(shuffle_bash_script_train) == 1 + with open(shuffle_bash_script_train) as f2: + for line in f2: + pass + last_line = line + self.assertIn(last_line,self.contents_shuffle_script) + f2.close + def update_cfg(cfg): @@ -108,4 +161,4 @@ def read_config(config_file): cfg = toml.load(config_file) cfg['toml_filename'] = config_file return cfg - \ No newline at end of file + -- GitLab