Skip to content

Commit e0dd042

Browse files
authored
Merge branch 'develop' into AN-146-batch-vm-cost
2 parents 3134ed3 + e917e52 commit e0dd042

File tree

9 files changed

+61
-223
lines changed

9 files changed

+61
-223
lines changed

.github/workflows/combine_scalasteward_prs.yml

Lines changed: 0 additions & 108 deletions
This file was deleted.

.scala-steward.conf

Lines changed: 0 additions & 77 deletions
This file was deleted.

CHANGELOG.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ be found [here](https://cromwell.readthedocs.io/en/stable/backends/HPC/#optional
3333
- Fixes the reference disk feature.
3434
- Fixes pulling Docker image metadata from private GCR repositories.
3535
- Fixed `google_project` and `google_compute_service_account` workflow options not taking effect when using GCP Batch backend
36-
- Added a way to use a custom LogsPolicy for the job execution, setting `backend.providers.batch.config.batch.logs-policy` to "CLOUD_LOGGING" (default) keeps the current behavior, or, set it to "PATH" to save the logs into the the mounted disk, at the end, this log file gets copied to the google cloud storage bucket with "task.log" as the name.
36+
- Added a way to use a custom LogsPolicy for the job execution, setting `backend.providers.batch.config.batch.logs-policy` to "CLOUD_LOGGING" (default) keeps the current behavior, or, set it to "PATH" to stream the logs to Google Cloud Storage.
3737
- When "CLOUD_LOGGING" is used, many more Cromwell / WDL labels for workflow, root workflow, call, shard etc. are now assigned to GCP Batch log entries.
3838
- Fixed subnet selection for networks that use custom subnet creation
3939

supportedBackends/google/batch/src/main/scala/cromwell/backend/google/batch/actors/GcpBatchAsyncBackendJobExecutionActor.scala

Lines changed: 30 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -607,14 +607,39 @@ class GcpBatchAsyncBackendJobExecutionActor(override val standardParams: Standar
607607
// not the `stderr` file contained memory retry error keys
608608
val retryWithMoreMemoryKeys: Option[List[String]] = memoryRetryFactor.flatMap(_ => memoryRetryErrorKeys)
609609

610+
val targetLogFile = batchAttributes.logsPolicy match {
611+
case GcpBatchLogsPolicy.CloudLogging => None
612+
case GcpBatchLogsPolicy.Path =>
613+
DefaultPathBuilder.build(
614+
gcpBatchLogPath.pathAsString.replace(
615+
gcpBatchLogPath.root.pathAsString,
616+
GcpBatchAttachedDisk.GcsMountPoint + "/"
617+
)
618+
) match {
619+
case Failure(exception) =>
620+
throw new RuntimeException(
621+
"Unable to use GcpBatchLogsPolicy.Path because the destination path could not be built, this is likely a programming error and a bug must be reported",
622+
exception
623+
)
624+
case Success(path) =>
625+
// remove trailing slash
626+
val bucket = workflowPaths.workflowRoot.root.pathWithoutScheme.replace("/", "")
627+
628+
log.info(s"Batch logs for workflow $workflowId will be streamed to GCS at: $gcpBatchLogPath")
629+
630+
Some(
631+
GcpBatchLogFile(gcsBucket = bucket, mountPath = GcpBatchAttachedDisk.GcsMountPoint, diskPath = path)
632+
)
633+
}
634+
}
635+
610636
CreateBatchJobParameters(
611637
jobDescriptor = jobDescriptor,
612638
runtimeAttributes = runtimeAttributes,
613639
dockerImage = jobDockerImage,
614640
cloudWorkflowRoot = workflowPaths.workflowRoot,
615641
cloudCallRoot = callRootPath,
616642
commandScriptContainerPath = cmdInput.containerPath,
617-
logGcsPath = gcpBatchLogPath,
618643
inputOutputParameters = inputOutputParameters,
619644
projectId = googleProject(jobDescriptor.workflowDescriptor),
620645
computeServiceAccount = computeServiceAccount(jobDescriptor.workflowDescriptor),
@@ -632,7 +657,8 @@ class GcpBatchAsyncBackendJobExecutionActor(override val standardParams: Standar
632657
checkpointingConfiguration,
633658
enableSshAccess = enableSshAccess,
634659
vpcNetworkAndSubnetworkProjectLabels = data.vpcNetworkAndSubnetworkProjectLabels,
635-
dockerhubCredentials = dockerhubCredentials
660+
dockerhubCredentials = dockerhubCredentials,
661+
targetLogFile = targetLogFile
636662
)
637663
case Some(other) =>
638664
throw new RuntimeException(s"Unexpected initialization data: $other")
@@ -838,16 +864,6 @@ class GcpBatchAsyncBackendJobExecutionActor(override val standardParams: Standar
838864
contentType = plainTextContentType
839865
)
840866

841-
val logFileOutput = GcpBatchFileOutput(
842-
logFilename,
843-
logGcsPath,
844-
DefaultPathBuilder.get(logFilename),
845-
workingDisk,
846-
optional = true,
847-
secondary = false,
848-
contentType = plainTextContentType
849-
)
850-
851867
val memoryRetryRCFileOutput = GcpBatchFileOutput(
852868
memoryRetryRCFilename,
853869
memoryRetryRCGcsPath,
@@ -888,8 +904,7 @@ class GcpBatchAsyncBackendJobExecutionActor(override val standardParams: Standar
888904
DetritusOutputParameters(
889905
monitoringScriptOutputParameter = monitoringOutput,
890906
rcFileOutputParameter = rcFileOutput,
891-
memoryRetryRCFileOutputParameter = memoryRetryRCFileOutput,
892-
logFileOutputParameter = logFileOutput
907+
memoryRetryRCFileOutputParameter = memoryRetryRCFileOutput
893908
)
894909
)
895910

@@ -907,10 +922,7 @@ class GcpBatchAsyncBackendJobExecutionActor(override val standardParams: Standar
907922
runtimeAttributes = runtimeAttributes,
908923
batchAttributes = batchAttributes,
909924
projectId = batchAttributes.project,
910-
region = batchAttributes.location,
911-
logfile = createParameters.commandScriptContainerPath.sibling(
912-
batchParameters.detritusOutputParameters.logFileOutputParameter.name
913-
)
925+
region = batchAttributes.location
914926
)
915927

916928
drsLocalizationManifestCloudPath = jobPaths.callExecutionRoot / GcpBatchJobPaths.DrsLocalizationManifestName

supportedBackends/google/batch/src/main/scala/cromwell/backend/google/batch/actors/GcpBatchJobCachingActorHelper.scala

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -33,10 +33,6 @@ trait GcpBatchJobCachingActorHelper extends StandardCachingActorHelper {
3333
lazy val gcpBatchLogPath: Path = gcpBatchCallPaths.batchLogPath
3434
lazy val memoryRetryRCFilename: String = gcpBatchCallPaths.memoryRetryRCFilename
3535
lazy val memoryRetryRCGcsPath: Path = gcpBatchCallPaths.memoryRetryRC
36-
37-
lazy val logFilename: String = "task.log"
38-
lazy val logGcsPath: Path = gcpBatchCallPaths.callExecutionRoot.resolve(logFilename)
39-
4036
lazy val batchAttributes: GcpBatchConfigurationAttributes = batchConfiguration.batchAttributes
4137

4238
lazy val defaultLabels: Labels = {

supportedBackends/google/batch/src/main/scala/cromwell/backend/google/batch/api/GcpBatchRequestFactory.scala

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -41,11 +41,9 @@ object GcpBatchRequestFactory {
4141
case class DetritusOutputParameters(
4242
monitoringScriptOutputParameter: Option[GcpBatchFileOutput],
4343
rcFileOutputParameter: GcpBatchFileOutput,
44-
memoryRetryRCFileOutputParameter: GcpBatchFileOutput,
45-
logFileOutputParameter: GcpBatchFileOutput
44+
memoryRetryRCFileOutputParameter: GcpBatchFileOutput
4645
) {
4746
def all: List[GcpBatchFileOutput] = memoryRetryRCFileOutputParameter ::
48-
logFileOutputParameter ::
4947
rcFileOutputParameter ::
5048
monitoringScriptOutputParameter.toList
5149
}
@@ -67,13 +65,21 @@ object GcpBatchRequestFactory {
6765

6866
case class CreateBatchDockerKeyAndToken(key: String, encryptedToken: String)
6967

68+
/**
69+
* Defines the values used for streaming the job logs to GCS.
70+
*
71+
* @param gcsBucket the Cloud Storage bucket where the log file should be streamed to.
72+
* @param mountPath the path where the Cloud Storage bucket will be mounted to.
73+
* @param diskPath the path in the mounted disk where the log file should be written to.
74+
*/
75+
case class GcpBatchLogFile(gcsBucket: String, mountPath: String, diskPath: Path)
76+
7077
case class CreateBatchJobParameters(jobDescriptor: BackendJobDescriptor,
7178
runtimeAttributes: GcpBatchRuntimeAttributes,
7279
dockerImage: String,
7380
cloudWorkflowRoot: Path,
7481
cloudCallRoot: Path,
7582
commandScriptContainerPath: Path,
76-
logGcsPath: Path,
7783
inputOutputParameters: InputOutputParameters,
7884
projectId: String,
7985
computeServiceAccount: String,
@@ -91,7 +97,8 @@ object GcpBatchRequestFactory {
9197
checkpointingConfiguration: CheckpointingConfiguration,
9298
enableSshAccess: Boolean,
9399
vpcNetworkAndSubnetworkProjectLabels: Option[VpcAndSubnetworkProjectLabelValues],
94-
dockerhubCredentials: (String, String)
100+
dockerhubCredentials: (String, String),
101+
targetLogFile: Option[GcpBatchLogFile]
95102
) {
96103
def outputParameters = inputOutputParameters.fileOutputParameters
97104
}

supportedBackends/google/batch/src/main/scala/cromwell/backend/google/batch/api/GcpBatchRequestFactoryImpl.scala

Lines changed: 14 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import com.google.cloud.batch.v1.{
1111
ComputeResource,
1212
CreateJobRequest,
1313
DeleteJobRequest,
14+
GCS,
1415
GetJobRequest,
1516
Job,
1617
JobName,
@@ -24,7 +25,7 @@ import com.google.cloud.batch.v1.{
2425
import com.google.protobuf.Duration
2526
import cromwell.backend.google.batch.io.GcpBatchAttachedDisk
2627
import cromwell.backend.google.batch.models.GcpBatchConfigurationAttributes.GcsTransferConfiguration
27-
import cromwell.backend.google.batch.models.{GcpBatchLogsPolicy, GcpBatchRequest, VpcAndSubnetworkProjectLabelValues}
28+
import cromwell.backend.google.batch.models.{GcpBatchRequest, VpcAndSubnetworkProjectLabelValues}
2829
import cromwell.backend.google.batch.runnable._
2930
import cromwell.backend.google.batch.util.{BatchUtilityConversions, GcpBatchMachineConstraints}
3031
import cromwell.core.labels.{Label, Labels}
@@ -228,7 +229,12 @@ class GcpBatchRequestFactoryImpl()(implicit gcsTransferConfiguration: GcsTransfe
228229
val networkInterface = createNetwork(data = data)
229230
val networkPolicy = createNetworkPolicy(networkInterface.build())
230231
val allDisks = toDisks(allDisksToBeMounted)
231-
val allVolumes = toVolumes(allDisksToBeMounted)
232+
val allVolumes = toVolumes(allDisksToBeMounted) ::: createParameters.targetLogFile.map { targetLogFile =>
233+
Volume.newBuilder
234+
.setGcs(GCS.newBuilder().setRemotePath(targetLogFile.gcsBucket))
235+
.setMountPath(targetLogFile.mountPath)
236+
.build()
237+
}.toList
232238

233239
val containerSetup: List[Runnable] = containerSetupRunnables(allVolumes)
234240
val localization: List[Runnable] = localizeRunnables(createParameters, allVolumes)
@@ -266,13 +272,14 @@ class GcpBatchRequestFactoryImpl()(implicit gcsTransferConfiguration: GcsTransfe
266272
val locationPolicy = LocationPolicy.newBuilder.addAllAllowedLocations(zones.asJava).build
267273
val allocationPolicy =
268274
createAllocationPolicy(data, locationPolicy, instancePolicy.build, networkPolicy, gcpSa, accelerators)
269-
val logsPolicy = data.gcpBatchParameters.batchAttributes.logsPolicy match {
270-
case GcpBatchLogsPolicy.CloudLogging =>
271-
LogsPolicy.newBuilder.setDestination(Destination.CLOUD_LOGGING).build
272-
case GcpBatchLogsPolicy.Path =>
275+
276+
val logsPolicy = data.createParameters.targetLogFile match {
277+
case None => LogsPolicy.newBuilder.setDestination(Destination.CLOUD_LOGGING).build
278+
279+
case Some(targetLogFile) =>
273280
LogsPolicy.newBuilder
274281
.setDestination(Destination.PATH)
275-
.setLogsPath(data.gcpBatchParameters.logfile.toString)
282+
.setLogsPath(targetLogFile.diskPath.pathAsString)
276283
.build
277284
}
278285

0 commit comments

Comments
 (0)