generate_sge_command(resources, other_resources, job_variables, scheduler_script)

Generate an SGE command to submit a job to the scheduler.

Parameters:
  • resources (dict) –

    A dictionary of resources to request for the job.

  • other_resources (dict) –

    A dictionary of other resources to pass to the scheduler.

  • job_variables (str) –

    A string of environment variables to pass to the job.

  • scheduler_script (str) –

    The path to the script to run.

Returns:
  • str

    The SGE command to submit

Source code in ribbon/batch/queue_utils.py
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
def generate_sge_command(resources, other_resources, job_variables, scheduler_script):
    """
    Generate an SGE command to submit a job to the scheduler.

    Args:
        resources (dict): A dictionary of resources to request for the job.
        other_resources (dict): A dictionary of other resources to pass to the scheduler.
        job_variables (str): A string of environment variables to pass to the job.
        scheduler_script (str): The path to the script to run.

    Returns:
        str: The SGE command to submit
    """

    scheduler_command = 'qsub'
    # Map resources to SGE options
    resources_string = parse_sge_resources(resources)
    # Add other resources as-is, from dict:
    for key, value in other_resources.items():
        if value == '': # If value is empty, assume it's a flag without a value
            resources_string += f" {key}"
        else:
            if key.startswith('-l'):
                resources_string += f" {key}={value}"
            else:
                resources_string += f" {key} {value}"
    # Construct the command
    command = f"{scheduler_command} -v {job_variables} {resources_string} {scheduler_script}"
    return command

generate_slurm_command(resources, other_resources, job_variables, scheduler_script)

Generate a SLURM command to submit a job to the scheduler.

Parameters:
  • resources (dict) –

    A dictionary of resources to request for the job.

  • other_resources (dict) –

    A dictionary of other resources to pass to the scheduler.

  • job_variables (str) –

    A string of environment variables to pass to the job.

  • scheduler_script (str) –

    The path to the script to run.

Returns:
  • str

    The SLURM command to submit

Source code in ribbon/batch/queue_utils.py
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
def generate_slurm_command(resources, other_resources, job_variables, scheduler_script):
    """
    Generate a SLURM command to submit a job to the scheduler.

    Args:
        resources (dict): A dictionary of resources to request for the job.
        other_resources (dict): A dictionary of other resources to pass to the scheduler.
        job_variables (str): A string of environment variables to pass to the job.
        scheduler_script (str): The path to the script to run.

    Returns:
        str: The SLURM command to submit
    """
    scheduler_command = 'sbatch'
    # Map resources to SLURM options
    resources_string = parse_slurm_resources(resources)
    # Add other resources as-is, from dict:
    for key, value in other_resources.items():
        if value == '': # If value is empty, assume it's a flag without a value
            resources_string += f" {key}"
        else:
            resources_string += f" {key}={value}"
    # Construct the command
    command = f"{scheduler_command} --export={job_variables} {resources_string} {scheduler_script}"
    return command

parse_sge_resources(resources, dependency_type=None)

Parse a dictionary of resources into a string of SGE options.

Parameters:
  • resources (dict) –

    A dictionary of resources to request for the job.

Returns:
  • str

    A string of SGE options

TODO: implement dependency handling

Source code in ribbon/batch/queue_utils.py
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
def parse_sge_resources(resources, dependency_type=None):
    """
    Parse a dictionary of resources into a string of SGE options.

    Args:
        resources (dict): A dictionary of resources to request for the job.

    Returns:
        str: A string of SGE options

    TODO: implement dependency handling
    """
    resource_mappings = {
        'time': '-l h_rt',
        'mem': '-l mem_free',
        'dependency': '-hold_jid',
        'gpus': '-l gpu',
        'job-name': '-N',
        'output': '-o',
        'queue': '-q',
        'node-name': '-l hostname',
        # Add other resource mappings as needed
    }

    # Parse dependencies:
    if 'dependency' in resources:
        dependencies = resources['dependency']
        if isinstance(dependencies, list):
            dependencies = ','.join([str(job_id) for job_id in dependencies])
        resources['dependency'] = dependencies

    resources_list = []
    for key, value in resources.items():
        if key == 'dependency':
            # Handle dependencies specifically
            resources_list.append(f"-hold_jid {value}")
        else:
            if key not in resource_mappings:
                print(f"Warning: Unrecognized resource key: {key}. Skipping.")
                continue
            sge_option = resource_mappings.get(key)
            if sge_option:
                if sge_option.startswith('-l'):
                    resources_list.append(f"{sge_option}={value}")
                else:
                    resources_list.append(f"{sge_option} {value}")
            else:
                # For unrecognized keys, assume they are '-l key=value'
                resources_list.append(f"-l {key}={value}")

    resources_string = ' '.join(resources_list)
    return resources_string

parse_slurm_resources(resources, dependency_type='afterok')

Parse a dictionary of resources into a string of SLURM options.

Parameters:
  • resources (dict) –

    A dictionary of resources to request for the job.

  • dependency_type (str, default: 'afterok' ) –

    The type of dependency to use (e.g. 'afterok', 'afterany', 'afternotok')

Returns:
  • str

    A string of SLURM options

Source code in ribbon/batch/queue_utils.py
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
def parse_slurm_resources(resources, dependency_type='afterok'):
    """
    Parse a dictionary of resources into a string of SLURM options.

    Args:
        resources (dict): A dictionary of resources to request for the job.
        dependency_type (str): The type of dependency to use (e.g. 'afterok', 'afterany', 'afternotok')

    Returns:
        str: A string of SLURM options
    """
    resource_mappings = {
        'time': '--time',
        'mem': '--mem',
        'dependency': '--dependency',
        'gpus': '--gpus',
        'job-name': '--job-name',
        'requeue': '--requeue',
        'output': '--output',
        'queue': '--partition',
        'node-name': '--nodelist',
        # Add other resource mappings as needed
    }

    # Parse dependencies:
    if 'dependency' in resources:
        dependencies = resources['dependency']
        if isinstance(dependencies, list):
            dependencies = ':'.join([str(job_id) for job_id in dependencies])
        resources['dependency'] = dependency_type + ':'+ dependencies

    resources_list = []
    for key, value in resources.items():
        if key not in resource_mappings:
            print(f"Warning: Unrecognized resource key: {key}. Skipping.")
            continue
        slurm_option = resource_mappings.get(key, key)
        if value is True:
            # Flags without values
            resources_list.append(f"{slurm_option}")
        else:
            resources_list.append(f"{slurm_option}={value}")

    resources_string = ' '.join(resources_list)
    return resources_string

sge_check_job_status(job_ids)

Check if SGE jobs are still running or have completed.

Parameters:
  • job_ids (list) –

    A list of job IDs (as integers or strings)

Returns:
  • dict

    A dictionary with job IDs as keys and statuses as values ('running' or 'completed')

Source code in ribbon/batch/queue_utils.py
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
def sge_check_job_status(job_ids):
    """
    Check if SGE jobs are still running or have completed.

    Parameters:
        job_ids (list): A list of job IDs (as integers or strings)

    Returns:
        dict: A dictionary with job IDs as keys and statuses as values ('running' or 'completed')
    """
    status_dict = {}
    for jobid in job_ids:
        try:
            # Run 'qstat -j <jobid>' and suppress output
            result = subprocess.run(
                ['qstat', '-j', str(jobid)],
                stdout=subprocess.DEVNULL,
                stderr=subprocess.DEVNULL
            )
            if result.returncode == 0:
                status = 'not completed'
            else:
                status = 'completed'
            status_dict[jobid] = status
        except Exception as e:
            status_dict[jobid] = f'Error: {e}'
    return status_dict

slurm_check_job_status(job_ids)

Check if SLURM jobs are still running or have completed.

Parameters:
  • job_ids (list) –

    A list of job IDs (as integers or strings)

Returns:
  • dict

    A dictionary with job IDs as keys and statuses as values ('running' or 'completed')

Source code in ribbon/batch/queue_utils.py
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
def slurm_check_job_status(job_ids):
    """
    Check if SLURM jobs are still running or have completed.

    Parameters:
        job_ids (list): A list of job IDs (as integers or strings)

    Returns:
        dict: A dictionary with job IDs as keys and statuses as values ('running' or 'completed')
    """
    status_dict = {}
    for jobid in job_ids:
        try:
            # Run 'squeue -j <jobid> --noheader' and capture the output.
            result = subprocess.run(
                ['squeue', '-j', str(jobid), '--noheader'],
                stdout=subprocess.PIPE,
                stderr=subprocess.DEVNULL,
                text=True
            )
            # If any output is returned, the job is still in the queue (running or pending)
            if result.stdout.strip():
                status = 'not completed'
            else:
                status = 'completed'
            status_dict[jobid] = status
        except Exception as e:
            status_dict[jobid] = f'Error: {e}'
    return status_dict