Source code for higgs_dna.scripts.samples.download_files

import argparse
import json
import os
import subprocess
import sys
import shutil
from concurrent.futures import ThreadPoolExecutor, as_completed


[docs]def is_directory_empty(directory): """ Check if a given directory is empty. Args: directory (str): Path to the directory. Returns: bool: True if the directory is empty, False otherwise. """ return not any(os.scandir(directory))
[docs]def handle_target_directory(target_dir, force_recreate): """ Manage the target directory based on its current state and user preferences. - If the directory exists and is empty, proceed with downloading. - If the directory exists and is not empty: - If `force_recreate` is True, delete its contents and proceed. - Otherwise, exit the script to prevent accidental data loss. - If the directory does not exist, create it. Args: target_dir (str): Path to the target directory. force_recreate (bool): Flag indicating whether to force recreate the directory. Raises: SystemExit: If unable to handle the directory based on the above conditions. """ if os.path.exists(target_dir): if is_directory_empty(target_dir): print(f"Directory '{target_dir}' exists and is empty. Proceeding with download.") else: if force_recreate: print(f"Directory '{target_dir}' exists and is not empty. Force recreating by deleting its contents.") try: # Iterate over all items in the directory and remove them for filename in os.listdir(target_dir): file_path = os.path.join(target_dir, filename) if os.path.isfile(file_path) or os.path.islink(file_path): os.unlink(file_path) # Remove file or symbolic link elif os.path.isdir(file_path): shutil.rmtree(file_path) # Remove directory and its contents except Exception as e: print(f"Failed to clear directory '{target_dir}': {e}") sys.exit(1) else: print(f"Directory '{target_dir}' exists and is not empty. Use --force-recreate to overwrite.") sys.exit(1) else: try: os.makedirs(target_dir, exist_ok=True) # Create the directory if it doesn't exist print(f"Created directory '{target_dir}'.") except Exception as e: print(f"Failed to create directory '{target_dir}': {e}") sys.exit(1)
[docs]def download_file(file_url, destination_folder, key): """ Download a single file using xrdcp. Args: file_url (str): URL of the file to download. destination_folder (str): Base directory where files are downloaded. key (str): The key associated with the file, used to create subdirectories. Returns: tuple: (file_url, success, message) - file_url (str): The URL of the file attempted to download. - success (bool): True if download was successful, False otherwise. - message (str): Success or error message. """ try: filename = os.path.basename(file_url) # Extract the filename from the URL destination_path = os.path.join(destination_folder, os.path.basename(file_url)) # Skip downloading if the file already exists if os.path.exists(destination_path): message = f"File already exists: {filename}. Skipping download." print(message) return (file_url, True, message) # Define the xrdcp command command = ['xrdcp', '-f', file_url, destination_path] # Execute the xrdcp command to copy the file subprocess.run(command, check=True) message = f"Downloaded: {filename} to '{destination_folder}'" print(message) return (file_url, True, message) except subprocess.CalledProcessError as e: message = f"Failed to download {file_url}. Error: {e}" print(message) return (file_url, False, message) except Exception as e: message = f"Unexpected error downloading {file_url}. Error: {e}" print(message) return (file_url, False, message)
[docs]def download_files_in_parallel(file_dict, target_dir, num_files, num_threads): """ Download files in parallel using multiple threads. Args: file_dict (dict): Dictionary where each key is a string representing a category (e.g., dataset name) and the value is a list of file URLs to download. target_dir (str): Base directory where files will be downloaded. Subdirectories for each key will be created within this directory. num_files (int or None): Number of files to download per key. If None, download all files. num_threads (int): Number of concurrent threads to use for downloading. Returns: None """ # Iterate over each key and its corresponding list of file URLs for key, files in file_dict.items(): # Determine the actual number of files to download for the current key actual_num_files = num_files if num_files is not None else len(files) if actual_num_files > len(files): print(f"Requested {actual_num_files} files for key '{key}', but only {len(files)} available. Proceeding to download all available files.") actual_num_files = len(files) # Slice the list of files based on the determined number files_to_download = files[:actual_num_files] print(f"\nStarting download of {actual_num_files} files for key '{key}' with {num_threads} threads.") # Use ThreadPoolExecutor to manage parallel downloads with ThreadPoolExecutor(max_workers=num_threads) as executor: # Submit download tasks for each file and map futures to their file URLs future_to_file = { executor.submit(download_file, file_url, target_dir, key): file_url for file_url in files_to_download } # Process each completed future as they finish for future in as_completed(future_to_file): file_url = future_to_file[future] try: _, success, message = future.result() if not success: print(f"Error downloading {file_url}: {message}") except Exception as exc: print(f"Generated an exception while downloading {file_url}: {exc}")
[docs]def main(): """ Main function to parse arguments and initiate the file download process. Workflow: 1. Parse command-line arguments. 2. Read and parse the JSON file containing file URLs. 3. Determine whether to process a specific key or all keys. 4. Handle the target directory based on its state and user flags. 5. Download the specified number of files using xrdcp, either sequentially or in parallel. """ # Initialize the argument parser with a description of the script parser = argparse.ArgumentParser(description="Download files from a JSON file using xrdcp") # Define required and optional command-line arguments parser.add_argument('--json-file', required=True, help='Path to the JSON file') parser.add_argument('--key', required=True, help='Key in the JSON file to process or "all" to process all keys') parser.add_argument('--num-files', type=int, default=None, help='Number of files to process per key') parser.add_argument('--target-dir', required=True, help='Target directory to copy files into') parser.add_argument('--force-recreate', '-f', action='store_true', help='Force recreate the target directory by deleting its contents if it exists') parser.add_argument('--num-threads', type=int, default=8, help='Number of parallel threads to use for downloading') # Parse the arguments provided by the user args = parser.parse_args() # Read and load the JSON file try: with open(args.json_file, 'r') as f: data = json.load(f) except Exception as e: print(f"Error reading JSON file: {e}") sys.exit(1) # Check if the user wants to process all keys in the JSON if args.key == 'all': keys = data.keys() # Retrieve all keys from the JSON if not keys: print("No keys found in the JSON file.") sys.exit(1) # Iterate over each key to process its associated files for key in keys: print(f"\nProcessing key '{key}':") # Define a subdirectory for the current key within the target directory sub_target_dir = os.path.join(args.target_dir, key) # Handle the subdirectory based on existing content and force flag handle_target_directory(sub_target_dir, args.force_recreate) file_list = data[key] # Retrieve the list of files for the current key # Validate that file_list is a list if not isinstance(file_list, list): print(f"Data for key '{key}' is not a list. Skipping this key.") continue # Download the files in parallel download_files_in_parallel({key: file_list}, sub_target_dir, args.num_files, args.num_threads) else: # If a specific key is provided, process only that key if args.key not in data: print(f"Key '{args.key}' not found in JSON file.") sys.exit(1) file_list = data[args.key] # Retrieve the list of files for the specified key # Validate that file_list is a list if not isinstance(file_list, list): print(f"Data for key '{args.key}' is not a list. Exiting.") sys.exit(1) # Handle the main target directory based on existing content and force flag handle_target_directory(args.target_dir, args.force_recreate) # Download the files in parallel download_files_in_parallel({args.key: file_list}, args.target_dir, args.num_files, args.num_threads)
if __name__ == '__main__': main()