Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion common/library/module_utils/local_repo/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@
}
CLI_FILE_PATH = "/root/.config/pulp/cli.toml"
POST_TIMEOUT = 3600
TAR_POLL_VAL = 3
TAR_POLL_VAL = 25
FILE_POLL_VAL = 1
ISO_POLL_VAL = 15
FILE_URI = "/pulp/api/v3/content/file/files/"
Expand Down
175 changes: 175 additions & 0 deletions common/library/modules/parallel_file_copy.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,175 @@
# Copyright 2026 Dell Inc. or its subsidiaries. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

#!/usr/bin/python
# pylint: disable=import-error,no-name-in-module,line-too-long

"""
Ansible module for parallel copying of files.

Supports copying multiple source → destination pairs in parallel,
with logging, retries, and optional cleanup.
"""

import os
import shutil
import threading
from datetime import datetime
from concurrent.futures import ThreadPoolExecutor, as_completed
from ansible.module_utils.basic import AnsibleModule
from ansible.module_utils.local_repo.standard_logger import setup_standard_logger

# ============================================================
# Default Values
# ============================================================

DEFAULT_MAX_WORKERS = 4
DEFAULT_RETRY_COUNT = 2
DEFAULT_DELETE_EXISTING = True
PARALLEL_FILE_COPY_LOG = '/opt/omnia/log/core/playbooks/parallel_file_copy.log/'

# ============================================================
# Copy Worker Function
# ============================================================

def copy_single_file(src_file, dest_dir, retry_count, delete_existing, slogger, summary):
"""Copy one directory pair with retry support."""
thread_name = threading.current_thread().name
start_time = datetime.now()

if not os.path.isfile(src_file):
slogger.info(f"NOT COPIED - Source file missing: {src_file}")
summary["skipped"].append(src_file)
return

os.makedirs(dest_dir, exist_ok=True)
dest_file = os.path.join(dest_dir, os.path.basename(src_file))

for attempt in range(1, retry_count + 1):
try:
slogger.info(f"[{thread_name}] START {start_time} Copying {src_file} (Attempt {attempt})")

if delete_existing and os.path.exists(dest_file):
os.remove(dest_file)
slogger.info(f"Deleted existing file: {dest_file}")

shutil.copy2(src_file, dest_file)

end_time = datetime.now()
duration = (end_time - start_time).total_seconds()
slogger.info(f"[{thread_name}] SUCCESS {end_time} Copied {src_file} -> {dest_file} (Duration={duration:.2f}s)")

summary["copied"].append(src_file)
return

except Exception as err:
slogger.error(f"[{thread_name}] ERROR copying {src_file} (Attempt {attempt}) Reason: {err}")
if attempt == retry_count:
summary["failed"].append(src_file)

# ============================================================
# Main Parallel Copy Logic
# ============================================================

def execute_parallel_copy(module, copy_pairs, max_workers, retry_count, delete_existing, slogger):
"""
Executes parallel copy for all pairs.
Returns summary dict.
"""
summary = {"copied": [], "skipped": [], "failed": []}
futures = []

slogger.info("===== PARALLEL FILE COPY STARTED =====")
slogger.info(f"Copy pairs received: {copy_pairs}")
slogger.info(f"Max workers: {max_workers}")

with ThreadPoolExecutor(max_workers=max_workers) as executor:
for src_dir, dest_dir in copy_pairs:

if not os.path.isdir(src_dir):
slogger.info(f"NOT COPIED - Source directory missing: {src_dir}")
summary["skipped"].append(src_dir)
continue

files = [os.path.join(src_dir, f) for f in os.listdir(src_dir) if os.path.isfile(os.path.join(src_dir, f))]
if not files:
slogger.info(f"NOT COPIED - No files found in directory: {src_dir}")
summary["skipped"].append(src_dir)
continue

# ⚡ Show Ansible warning for in-progress copy
module.warn(f"Copy in progress for {src_dir} -> {dest_dir}. Please wait ...")

slogger.info(f"Copying {len(files)} files from {src_dir} -> {dest_dir} ...")

for file_path in files:
futures.append(executor.submit(copy_single_file, file_path, dest_dir, retry_count, delete_existing, slogger, summary))

# Wait for all copies to finish
for future in as_completed(futures):
future.result()

slogger.info("===== PARALLEL FILE COPY FINISHED =====")
return summary

# ============================================================
# Ansible Module Entry Point
# ============================================================

def main():
"""Main Ansible module execution entrypoint."""
module_args = dict(
copy_pairs=dict(type="list", required=True),
max_workers=dict(type="int", required=False, default=DEFAULT_MAX_WORKERS),
retry_count=dict(type="int", required=False, default=DEFAULT_RETRY_COUNT),
delete_existing=dict(type="bool", required=False, default=DEFAULT_DELETE_EXISTING),
slog_file=dict(type="str", required=False, default=PARALLEL_FILE_COPY_LOG),
)

module = AnsibleModule(argument_spec=module_args, supports_check_mode=True)

copy_pairs = module.params["copy_pairs"]
max_workers = module.params["max_workers"]
retry_count = module.params["retry_count"]
delete_existing = module.params["delete_existing"]
slog_file = module.params["slog_file"]

slogger = setup_standard_logger(slog_file)

result = dict(changed=False, copied=[], skipped=[], failed=[])

try:
summary = execute_parallel_copy(module, copy_pairs, max_workers, retry_count, delete_existing, slogger)

result["copied"] = summary["copied"]
result["skipped"] = summary["skipped"]
result["failed"] = summary["failed"]
if summary["copied"]:
result["changed"] = True

overall_status = "SUCCESS"
if summary["failed"] and summary["copied"]:
overall_status = "PARTIAL"
elif summary["failed"] and not summary["copied"]:
overall_status = "FAILURE"

result["overall_status"] = overall_status
module.exit_json(**result)

except Exception as err:
slogger.error(f"Parallel copy execution failed: {err}")
module.fail_json(msg=str(err), **result)

if __name__ == "__main__":
main()
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@
echo "[INFO] Setting up shared CUDA directory..."
# Create and mount shared directory for compute nodes
mkdir -p /shared-cuda-toolkit
mount -t nfs {{ cloud_init_nfs_path }}/cuda/ /shared-cuda-toolkit
mount -t nfs {{ cloud_init_nfs_path }}/hpc_tools/cuda/ /shared-cuda-toolkit

if [ $? -ne 0 ]; then
echo "[ERROR] Failed to mount NFS cuda share. Exiting."
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@
echo "[INFO] Setting up shared CUDA directory..."
# Create and mount shared directory for compute nodes
mkdir -p /shared-cuda-toolkit
mount -t nfs {{ cloud_init_nfs_path }}/cuda/ /shared-cuda-toolkit
mount -t nfs {{ cloud_init_nfs_path }}/hpc_tools/cuda/ /shared-cuda-toolkit

if [ $? -ne 0 ]; then
echo "[ERROR] Failed to mount NFS cuda share. Exiting."
Expand Down Expand Up @@ -190,6 +190,18 @@
{{ lookup('template', 'templates/ldms/ldms_sampler.sh.j2') | indent(12) }}
{% endif %}

- path: /usr/local/bin/install_openmpi.sh
owner: root:root
permissions: '{{ file_mode_755 }}'
content: |
{{ lookup('template', 'templates/hpc_tools/install_openmpi.sh.j2') | indent(12) }}

- path: /usr/local/bin/install_ucx.sh
owner: root:root
permissions: '{{ file_mode_755 }}'
content: |
{{ lookup('template', 'templates/hpc_tools/install_ucx.sh.j2') | indent(12) }}

- path: /etc/hosts
append: true
content: |
Expand All @@ -207,6 +219,18 @@
permissions: '0644'
content: |
{{ lookup('template', 'templates/nodes/apptainer_mirror.conf.j2') | indent(12) }}

- path: /usr/local/bin/install_nvhpc_sdk.sh
owner: root:root
permissions: '{{ file_mode_755 }}'
content: |
{{ lookup('template', 'templates/hpc_tools/install_nvhpc_sdk.sh.j2') | indent(12) }}

- path: /usr/local/bin/configure_nvhpc_env.sh
owner: root:root
permissions: '{{ file_mode_755 }}'
content: |
{{ lookup('template', 'templates/hpc_tools/configure_nvhpc_env.sh.j2') | indent(12) }}

runcmd:
- /usr/local/bin/set-ssh.sh
Expand Down Expand Up @@ -299,71 +323,31 @@
{% endif %}

{% if hostvars['localhost']['ucx_support'] %}
# UCX build and install
- |
UCX_BIN={{ client_mount_path }}/benchmarks/ucx
mkdir -p {{ client_mount_path }}/compile/ucx
mkdir -p {{ client_mount_path }}/benchmarks/ucx
cd {{ client_mount_path }}/compile/ucx
wget --no-check-certificate https://{{ hostvars['localhost']['admin_nic_ip'] }}:2225/pulp/content/opt/omnia/offline_repo/cluster/x86_64/{{ hostvars['localhost']['cluster_os_type'] }}/{{ hostvars['localhost']['cluster_os_version'] }}/tarball/ucx/ucx.tar.gz -O ucx.tar.gz
tar xzf ucx.tar.gz
cd ucx-*
mkdir -p build
cd build
../contrib/configure-release --prefix={{ client_mount_path }}/benchmarks/ucx
make -j 8
make install
- echo "===== UCX Setup ====="
- echo "UCX support is enabled."
- /usr/local/bin/install_ucx.sh
# - echo "Build script available at"
# - echo " /usr/local/bin/install_ucx.sh"
# - echo "NFS must be mounted at {{ client_mount_path }} before running."
{% endif %}

{% if hostvars['localhost']['openmpi_support'] %}
# OpenMPI build and install with UCX + Slurm detection
- |
OPENMPI_INSTALL_PREFIX="{{ client_mount_path }}/benchmarks/openmpi"
OPENMPI_SRC="{{ client_mount_path }}/compile/openmpi"
mkdir -p $OPENMPI_SRC
mkdir -p $OPENMPI_INSTALL_PREFIX

cd $OPENMPI_SRC
wget --no-check-certificate https://{{ hostvars['localhost']['admin_nic_ip'] }}:2225/pulp/content/opt/omnia/offline_repo/cluster/x86_64/{{ hostvars['localhost']['cluster_os_type'] }}/{{ hostvars['localhost']['cluster_os_version'] }}/tarball/openmpi/openmpi.tar.gz -O openmpi.tar.gz

tar xzf openmpi.tar.gz
cd openmpi-*
mkdir -p build

# Check Slurm
if sinfo >/dev/null 2>&1; then
SLURM_FLAG="--with-slurm=yes --with-munge=/usr"
else
SLURM_FLAG="--with-slurm=no"
fi

# Check UCX
if [ -x "{{ client_mount_path }}/benchmarks/ucx/bin/ucx_info" ]; then
{{ client_mount_path }}/benchmarks/ucx/bin/ucx_info -v
if [ $? -eq 0 ]; then
UCX_FLAG="--with-ucx={{ client_mount_path }}/benchmarks/ucx"
else
echo "ucx_info failed, disabling UCX"
UCX_FLAG=""
fi
else
echo "ucx_info not found, disabling UCX"
UCX_FLAG=""
fi

cd build
../configure --prefix=$OPENMPI_INSTALL_PREFIX \
--enable-mpi1-compatibility \
--enable-prte-prefix-by-default \
$SLURM_FLAG $UCX_FLAG 2>&1 | tee config.out

make -j 8
make install
- echo "===== OpenMPI Setup ====="
- echo "OpenMPI support is enabled."
- /usr/local/bin/install_openmpi.sh
# - echo "Build script available at"
# - echo " /usr/local/bin/install_openmpi.sh"
# - echo "Run UCX installation first if UCX support is enabled."
# - echo "NFS must be mounted at {{ client_mount_path }} before running."
{% endif %}

{% if hostvars['localhost']['ldms_support'] %}
- echo " Starting LDMS setup " | tee -a /var/log/ldms-cloudinit.log

- /root/ldms_sampler.sh
{% endif %}

# nvidia sdk install
- /usr/local/bin/install_nvhpc_sdk.sh
- /usr/local/bin/configure_nvhpc_env.sh
- echo "Cloud-Init has completed successfully."
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@
# Create mount point
mkdir -p /usr/local/cuda

cuda_nfs_share="{{ cloud_init_nfs_path }}/cuda"
cuda_nfs_share="{{ cloud_init_nfs_path }}/hpc_tools/cuda"

echo "[INFO] Mounting CUDA toolkit from NFS: $cuda_nfs_share"
mount -t nfs "$cuda_nfs_share" /usr/local/cuda
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@
# Create mount point
mkdir -p /usr/local/cuda

cuda_nfs_share="{{ cloud_init_nfs_path }}/cuda"
cuda_nfs_share="{{ cloud_init_nfs_path }}/hpc_tools/cuda"

echo "[INFO] Mounting CUDA toolkit from NFS: $cuda_nfs_share"
mount -t nfs "$cuda_nfs_share" /usr/local/cuda
Expand Down Expand Up @@ -408,6 +408,24 @@
permissions: '0644'
content: |
{{ lookup('template', 'templates/nodes/apptainer_mirror.conf.j2') | indent(12) }}

- path: /usr/local/bin/configure_ucx_openmpi_env.sh
owner: root:root
permissions: '{{ file_mode_755 }}'
content: |
{{ lookup('template', 'templates/hpc_tools/configure_ucx_openmpi_env.sh.j2') | indent(12) }}

- path: /usr/local/bin/setup_nvhpc_sdk.sh
owner: root:root
permissions: '{{ file_mode_755 }}'
content: |
{{ lookup('template', 'templates/hpc_tools/setup_nvhpc_sdk.sh.j2') | indent(12) }}

- path: /usr/local/bin/export_nvhpc_env.sh
owner: root:root
permissions: '{{ file_mode_755 }}'
content: |
{{ lookup('template', 'templates/hpc_tools/export_nvhpc_env.sh.j2') | indent(12) }}

runcmd:
- /usr/local/bin/set-ssh.sh
Expand Down Expand Up @@ -455,11 +473,25 @@
- mkdir -p {{ client_mount_path }}
- echo "{{ cloud_init_slurm_nfs_path }} {{ client_mount_path }} nfs defaults,_netdev 0 0" >> /etc/fstab
- mount -a
- echo "One or more shared components (UCX / OpenMPI / LDMS) are enabled."
- echo "Shared NFS mount is available at: {{ client_mount_path }}"
- /usr/local/bin/configure_ucx_openmpi_env.sh
# - echo ""
# - echo "IMPORTANT:"
# - echo "1. Install UCX and/or OpenMPI on the LOGIN / COMPILER node first."
# - echo "2. Ensure they are installed under the shared mount:"
# - echo " {{ client_mount_path }}/hpc_tools/benchmarks/"
# - echo "3. On this node, run the environment setup script when ready:"
# - echo ""
# - echo "This step is intentionally NOT run automatically."
- echo "=================================================="
{% endif %}

{% if hostvars['localhost']['ldms_support'] %}
- echo " Starting LDMS setup " | tee -a /var/log/ldms-cloudinit.log

- /root/ldms_sampler.sh
{% endif %}
- /usr/local/bin/setup_nvhpc_sdk.sh
- /usr/local/bin/export_nvhpc_env.sh
- echo "Cloud-Init has completed successfully."
Loading