5
5
import sys
6
6
import time
7
7
from datetime import datetime
8
-
9
8
import boto3
10
9
from botocore .compat import total_seconds
11
10
from botocore .config import Config
12
-
13
-
14
- job_type_info = {
15
- 'ci-cpu' : {
16
- 'job_definition' : 'd2l-ci-cpu-builder:2' ,
17
- 'job_queue' : 'D2L-CI-CPU'
18
- },
19
- 'ci-cpu-push' : {
20
- 'job_definition' : 'd2l-ci-cpu-builder-push:7' ,
21
- 'job_queue' : 'D2L-CI-CPU'
22
- },
23
- 'ci-cpu-release' : {
24
- 'job_definition' : 'd2l-ci-cpu-builder-release:1' ,
25
- 'job_queue' : 'D2L-CI-CPU'
26
- },
27
- 'ci-gpu-torch' : {
28
- 'job_definition' : 'd2l-ci-zh-gpu-torch:1' ,
29
- 'job_queue' : 'D2L-CI-GPU'
30
- },
31
- 'ci-gpu-tf' : {
32
- 'job_definition' : 'd2l-ci-zh-gpu-tf:1' ,
33
- 'job_queue' : 'D2L-CI-GPU'
34
- },
35
- 'ci-gpu-mxnet' : {
36
- 'job_definition' : 'd2l-ci-zh-gpu-mxnet:1' ,
37
- 'job_queue' : 'D2L-CI-GPU'
38
- },
39
- 'ci-gpu-paddle' : {
40
- 'job_definition' : 'd2l-ci-zh-gpu-paddle:1' ,
41
- 'job_queue' : 'D2L-CI-GPU'
42
- }
43
- }
44
-
45
- # Create push job types for GPUs with same definitions
11
+ job_type_info = {'ci-cpu' : {'job_definition' : 'd2l-ci-cpu-builder:2' , 'job_queue' : 'D2L-CI-CPU' }, 'ci-cpu-push' : {'job_definition' : 'd2l-ci-cpu-builder-push:7' , 'job_queue' : 'D2L-CI-CPU' }, 'ci-cpu-release' : {'job_definition' : 'd2l-ci-cpu-builder-release:1' , 'job_queue' : 'D2L-CI-CPU' }, 'ci-gpu-torch' : {'job_definition' : 'd2l-ci-zh-gpu-torch:1' , 'job_queue' : 'D2L-CI-GPU' }, 'ci-gpu-tf' : {'job_definition' : 'd2l-ci-zh-gpu-tf:1' , 'job_queue' : 'D2L-CI-GPU' }, 'ci-gpu-mxnet' : {'job_definition' : 'd2l-ci-zh-gpu-mxnet:1' , 'job_queue' : 'D2L-CI-GPU' }, 'ci-gpu-paddle' : {'job_definition' : 'd2l-ci-zh-gpu-paddle:1' , 'job_queue' : 'D2L-CI-GPU' }}
46
12
for job_type in list (job_type_info .keys ()):
47
13
if job_type .startswith ('ci-gpu' ):
48
- job_type_info [job_type + '-push' ] = job_type_info [job_type ]
49
- job_type_info [job_type + '-release' ] = job_type_info [job_type ]
50
-
14
+ job_type_info [job_type + '-push' ] = job_type_info [job_type ]
15
+ job_type_info [job_type + '-release' ] = job_type_info [job_type ]
51
16
parser = argparse .ArgumentParser (formatter_class = argparse .ArgumentDefaultsHelpFormatter )
52
-
53
- parser .add_argument ('--profile' , help = 'profile name of aws account.' , type = str ,
54
- default = None )
55
- parser .add_argument ('--region' , help = 'Default region when creating new connections' , type = str ,
56
- default = 'us-west-2' )
17
+ parser .add_argument ('--profile' , help = 'profile name of aws account.' , type = str , default = None )
18
+ parser .add_argument ('--region' , help = 'Default region when creating new connections' , type = str , default = 'us-west-2' )
57
19
parser .add_argument ('--name' , help = 'name of the job' , type = str , default = 'd2l-ci' )
58
- parser .add_argument ('--job-type' , help = 'type of job to submit.' , type = str ,
59
- choices = job_type_info .keys (), default = 'ci-cpu' )
60
- parser .add_argument ('--source-ref' ,
61
- help = 'ref in d2l-zh main github. e.g. master, refs/pull/500/head' ,
62
- type = str , default = 'master' )
63
- parser .add_argument ('--work-dir' ,
64
- help = 'working directory inside the repo. e.g. scripts/preprocess' ,
65
- type = str , default = '.' )
66
- parser .add_argument ('--saved-output' ,
67
- help = 'output to be saved, relative to working directory. '
68
- 'it can be either a single file or a directory' ,
69
- type = str , default = 'None' )
70
- parser .add_argument ('--save-path' ,
71
- help = 's3 path where files are saved.' ,
72
- type = str , default = 'batch/temp/{}' .format (datetime .now ().isoformat ()))
73
- parser .add_argument ('--command' , help = 'command to run' , type = str ,
74
- default = 'git rev-parse HEAD | tee stdout.log' )
75
- parser .add_argument ('--remote' ,
76
- help = 'git repo address. https://github.com/d2l-ai/d2l-zh' ,
77
- type = str , default = "https://github.com/d2l-ai/d2l-zh" )
78
- parser .add_argument ('--safe-to-use-script' ,
79
- help = 'whether the script changes from the actor is safe. We assume it is safe if the actor has write permission to our repo' ,
80
- action = 'store_true' )
20
+ parser .add_argument ('--job-type' , help = 'type of job to submit.' , type = str , choices = job_type_info .keys (), default = 'ci-cpu' )
21
+ parser .add_argument ('--source-ref' , help = 'ref in d2l-zh main github. e.g. master, refs/pull/500/head' , type = str , default = 'master' )
22
+ parser .add_argument ('--work-dir' , help = 'working directory inside the repo. e.g. scripts/preprocess' , type = str , default = '.' )
23
+ parser .add_argument ('--saved-output' , help = 'output to be saved, relative to working directory. it can be either a single file or a directory' , type = str , default = 'None' )
24
+ parser .add_argument ('--save-path' , help = 's3 path where files are saved.' , type = str , default = 'batch/temp/{}' .format (datetime .now ().isoformat ()))
25
+ parser .add_argument ('--command' , help = 'command to run' , type = str , default = 'git rev-parse HEAD | tee stdout.log' )
26
+ parser .add_argument ('--remote' , help = 'git repo address. https://github.com/d2l-ai/d2l-zh' , type = str , default = 'https://github.com/d2l-ai/d2l-zh' )
27
+ parser .add_argument ('--safe-to-use-script' , help = 'whether the script changes from the actor is safe. We assume it is safe if the actor has write permission to our repo' , action = 'store_true' )
81
28
parser .add_argument ('--original-repo' , help = 'name of the repo' , type = str , default = 'd2l-zh' )
82
- parser .add_argument ('--wait' , help = 'block wait until the job completes. '
83
- 'Non-zero exit code if job fails.' , action = 'store_true' )
29
+ parser .add_argument ('--wait' , help = 'block wait until the job completes. Non-zero exit code if job fails.' , action = 'store_true' )
84
30
parser .add_argument ('--timeout' , help = 'job timeout in seconds' , default = 7200 , type = int )
85
-
86
-
87
31
args = parser .parse_args ()
88
-
89
32
session = boto3 .Session (profile_name = args .profile , region_name = args .region )
90
- config = Config (
91
- retries = dict (
92
- max_attempts = 20
93
- )
94
- )
33
+ config = Config (retries = dict (max_attempts = 20 ))
95
34
batch , cloudwatch = [session .client (service_name = sn , config = config ) for sn in ['batch' , 'logs' ]]
96
35
97
-
98
36
def printLogs (logGroupName , logStreamName , startTime ):
99
- kwargs = {'logGroupName' : logGroupName ,
100
- 'logStreamName' : logStreamName ,
101
- 'startTime' : startTime ,
102
- 'startFromHead' : True }
103
-
37
+ '''"""Auto-generated docstring for function 'printLogs'."""'''
38
+ kwargs = {'logGroupName' : logGroupName , 'logStreamName' : logStreamName , 'startTime' : startTime , 'startFromHead' : True }
104
39
lastTimestamp = startTime - 1
105
40
while True :
106
41
logEvents = cloudwatch .get_log_events (** kwargs )
107
-
108
42
for event in logEvents ['events' ]:
109
43
lastTimestamp = event ['timestamp' ]
110
44
timestamp = datetime .utcfromtimestamp (lastTimestamp / 1000.0 ).isoformat ()
111
45
print ('[{}] {}' .format ((timestamp + '.000' )[:23 ] + 'Z' , event ['message' ]))
112
-
113
46
nextToken = logEvents ['nextForwardToken' ]
114
47
if nextToken and kwargs .get ('nextToken' ) != nextToken :
115
48
kwargs ['nextToken' ] = nextToken
116
49
else :
117
50
break
118
51
return lastTimestamp
119
52
120
-
121
53
def nowInMillis ():
54
+ '''"""Auto-generated docstring for function 'nowInMillis'."""'''
122
55
endTime = int (total_seconds (datetime .utcnow () - datetime (1970 , 1 , 1 ))) * 1000
123
56
return endTime
124
57
125
-
126
58
def main ():
59
+ '''"""Auto-generated docstring for function 'main'."""'''
127
60
spin = ['-' , '/' , '|' , '\\ ' , '-' , '/' , '|' , '\\ ' ]
128
61
logGroupName = '/aws/batch/job'
129
-
130
- jobName = re .sub ('[^A-Za-z0-9_\-]' , '' , args .name )[:128 ] # Enforce AWS Batch jobName rules
62
+ jobName = re .sub ('[^A-Za-z0-9_\\ -]' , '' , args .name )[:128 ]
131
63
jobType = args .job_type
132
64
jobQueue = job_type_info [jobType ]['job_queue' ]
133
65
jobDefinition = job_type_info [jobType ]['job_definition' ]
134
66
wait = args .wait
135
-
136
67
safe_to_use_script = 'False'
137
68
if args .safe_to_use_script :
138
69
safe_to_use_script = 'True'
139
-
140
- parameters = {
141
- 'SOURCE_REF' : args .source_ref ,
142
- 'WORK_DIR' : args .work_dir ,
143
- 'SAVED_OUTPUT' : args .saved_output ,
144
- 'SAVE_PATH' : args .save_path ,
145
- 'COMMAND' : f"\" { args .command } \" " , # wrap command with double quotation mark, so that batch can treat it as a single command
146
- 'REMOTE' : args .remote ,
147
- 'SAFE_TO_USE_SCRIPT' : safe_to_use_script ,
148
- 'ORIGINAL_REPO' : args .original_repo
149
- }
150
- kwargs = dict (
151
- jobName = jobName ,
152
- jobQueue = jobQueue ,
153
- jobDefinition = jobDefinition ,
154
- parameters = parameters ,
155
- )
70
+ parameters = {'SOURCE_REF' : args .source_ref , 'WORK_DIR' : args .work_dir , 'SAVED_OUTPUT' : args .saved_output , 'SAVE_PATH' : args .save_path , 'COMMAND' : f'"{ args .command } "' , 'REMOTE' : args .remote , 'SAFE_TO_USE_SCRIPT' : safe_to_use_script , 'ORIGINAL_REPO' : args .original_repo }
71
+ kwargs = dict (jobName = jobName , jobQueue = jobQueue , jobDefinition = jobDefinition , parameters = parameters )
156
72
if args .timeout is not None :
157
73
kwargs ['timeout' ] = {'attemptDurationSeconds' : args .timeout }
158
74
submitJobResponse = batch .submit_job (** kwargs )
159
-
160
75
jobId = submitJobResponse ['jobId' ]
161
-
162
- # Export Batch_JobID to Github Actions Environment Variable
163
76
with open (os .environ ['GITHUB_ENV' ], 'a' ) as f :
164
77
f .write (f'Batch_JobID={ jobId } \n ' )
165
78
os .environ ['batch_jobid' ] = jobId
166
-
167
79
print ('Submitted job [{} - {}] to the job queue [{}]' .format (jobName , jobId , jobQueue ))
168
-
169
80
spinner = 0
170
81
running = False
171
82
status_set = set ()
@@ -181,7 +92,6 @@ def main():
181
92
print ('=' * 80 )
182
93
print ('Job [{} - {}] {}' .format (jobName , jobId , status ))
183
94
sys .exit (status == 'FAILED' )
184
-
185
95
elif status == 'RUNNING' :
186
96
logStreamName = describeJobsResponse ['jobs' ][0 ]['container' ]['logStreamName' ]
187
97
if not running :
@@ -193,10 +103,8 @@ def main():
193
103
startTime = printLogs (logGroupName , logStreamName , startTime ) + 1
194
104
elif status not in status_set :
195
105
status_set .add (status )
196
- print ('\r Job [%s - %s] is %-9s... %s' % (jobName , jobId , status , spin [spinner % len (spin )]), )
106
+ print ('\r Job [%s - %s] is %-9s... %s' % (jobName , jobId , status , spin [spinner % len (spin )]))
197
107
sys .stdout .flush ()
198
108
spinner += 1
199
-
200
-
201
109
if __name__ == '__main__' :
202
- main ()
110
+ main ()
0 commit comments