Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/main/scala/io/hydrosphere/serving/manager/App.scala
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ object App {
implicit val storageOps = StorageOps.default[F]
implicit val uuidGen = UUIDGenerator.default[F]()
implicit val dc = dockerClient

for {
rngF <- Resource.liftF(RNG.default[F])
cloudDriver =
Expand Down
6 changes: 3 additions & 3 deletions src/main/scala/io/hydrosphere/serving/manager/Core.scala
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,9 @@ object Core {
servableRepo: ServableRepository[F],
appRepo: ApplicationRepository[F],
buildLogsRepo: BuildLogRepository[F],
monitoringRepo: MonitoringRepository[F]
): F[Core[F]] = {
monitoringRepo: MonitoringRepository[F],
cs: ContextShift[F]
): F[Core[F]] =
for {
buildLoggingService <- BuildLoggingService.make[F]()
core <- {
Expand Down Expand Up @@ -127,5 +128,4 @@ object Core {
}
}
} yield core
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import io.hydrosphere.serving.manager.api.http.controller.AkkaHttpControllerDsl
import io.hydrosphere.serving.manager.config.ApplicationConfig
import io.hydrosphere.serving.manager.util.AsyncUtil

import java.util.concurrent.Executors
import scala.collection.immutable.Seq
import scala.concurrent.ExecutionContext

Expand All @@ -23,24 +24,26 @@ trait HttpServer[F[_]] {

object HttpServer extends AkkaHttpControllerDsl {

def akkaBased[F[_] : Async](
config: ApplicationConfig,
modelRoutes: Route,
applicationRoutes: Route,
hostSelectorRoutes: Route,
servableRoutes: Route,
sseRoutes: Route,
monitoringRoutes: Route,
externalModelRoutes: Route,
deploymentConfRoutes: Route,
def akkaBased[F[_]: Async](
config: ApplicationConfig,
modelRoutes: Route,
applicationRoutes: Route,
hostSelectorRoutes: Route,
servableRoutes: Route,
sseRoutes: Route,
monitoringRoutes: Route,
externalModelRoutes: Route,
deploymentConfRoutes: Route
)(implicit
as: ActorSystem,
am: ActorMaterializer,
ec: ExecutionContext
as: ActorSystem,
am: ActorMaterializer
): HttpServer[F] = {
val ex = Executors.newCachedThreadPool()
implicit val ec: ExecutionContext = ExecutionContext.fromExecutor(ex)

val controllerRoutes: Route = pathPrefix("v2") {
handleExceptions(commonExceptionHandler) {
modelRoutes ~
modelRoutes ~
externalModelRoutes ~
applicationRoutes ~
hostSelectorRoutes ~
Expand All @@ -52,13 +55,17 @@ object HttpServer extends AkkaHttpControllerDsl {
}

val buildInfoRoute = pathPrefix("buildinfo") {
complete(HttpResponse(
status = StatusCodes.OK,
entity = HttpEntity(ContentTypes.`application/json`, BuildInfo.toJson)
))
complete(
HttpResponse(
status = StatusCodes.OK,
entity = HttpEntity(ContentTypes.`application/json`, BuildInfo.toJson)
)
)
}

val routes: Route = CorsDirectives.cors(CorsSettings.defaultSettings.withAllowedMethods(Seq(GET, POST, HEAD, OPTIONS, PUT, DELETE))) {
val routes: Route = CorsDirectives.cors(
CorsSettings.defaultSettings.withAllowedMethods(Seq(GET, POST, HEAD, OPTIONS, PUT, DELETE))
) {
pathPrefix("health") {
complete("OK")
} ~
Expand All @@ -67,9 +74,10 @@ object HttpServer extends AkkaHttpControllerDsl {
}
}
new HttpServer[F] {
override def start(): F[Http.ServerBinding] = AsyncUtil.futureAsync {
Http().bindAndHandle(routes, "0.0.0.0", config.port)
}
override def start(): F[Http.ServerBinding] =
AsyncUtil.futureAsync {
Http().bindAndHandle(routes, "0.0.0.0", config.port)
}
}
}
}
}
Original file line number Diff line number Diff line change
@@ -1,11 +1,8 @@
package io.hydrosphere.serving.manager.api.http.controller.events

import java.util.UUID
import akka.actor.ActorSystem
import akka.http.scaladsl.marshalling.sse.EventStreamMarshalling._
import akka.http.scaladsl.model.sse.ServerSentEvent
import akka.http.scaladsl.server.Route
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.Source
import cats.effect.{ConcurrentEffect, ContextShift}
import streamz.converter._
Expand All @@ -15,12 +12,12 @@ import io.hydrosphere.serving.manager.api.http.controller.application.Applicatio
import io.hydrosphere.serving.manager.api.http.controller.servable.ServableView
import io.hydrosphere.serving.manager.discovery._
import io.hydrosphere.serving.manager.domain.application.ApplicationEvents
import io.hydrosphere.serving.manager.domain.deploy_config
import io.hydrosphere.serving.manager.domain.deploy_config.DeploymentConfigurationEvents
import io.hydrosphere.serving.manager.domain.model_version.{ModelVersion, ModelVersionEvents}
import io.hydrosphere.serving.manager.domain.monitoring.MetricSpecEvents
import io.hydrosphere.serving.manager.domain.servable.ServableEvents

import java.util.concurrent.Executors
import scala.concurrent.ExecutionContext
import scala.concurrent.duration._

Expand All @@ -32,14 +29,12 @@ class SSEController[F[_]](
depSubscriber: DeploymentConfigurationEvents.Subscriber[F]
)(implicit
F: ConcurrentEffect[F],
cs: ContextShift[F],
ec: ExecutionContext,
actorSystem: ActorSystem
cs: ContextShift[F]
) extends AkkaHttpControllerDsl {
val ex = Executors.newCachedThreadPool()
implicit val ec: ExecutionContext = ExecutionContext.fromExecutor(ex)

implicit val am: ActorMaterializer = ActorMaterializer.create(actorSystem)

def subscribe: Route =
def subscribe =
pathPrefix("events") {
get {
val id = UUID.randomUUID().toString
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,7 @@ object K8sCloudInstanceEventAdapter {
desiredReplicas <- desiredReplicasOrError
currentReplicas <- currentReplicasOrError
} yield
if (currentReplicas == 0)
ReplicaSetIsFailed(name, "All pods aren't available")
else if (currentReplicas < desiredReplicas)
if (currentReplicas < desiredReplicas)
ReplicaSetIsOk(
name,
s"$currentReplicas from $desiredReplicas are available".some
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,19 @@ import io.hydrosphere.serving.manager.domain.clouddriver.{
ServableEvent,
ServableNotReady,
ServableReady,
ServableStarting,
ServableState
}

sealed trait RsStatus
final case class RsIsOk(msg: Option[String] = None) extends RsStatus
final case class RsIsFailed(msg: String) extends RsStatus
case class RsInitialized() extends RsStatus

sealed trait SvcStatus
final case object SvcIsAvailable extends SvcStatus
final case class SvcIsUnavailable(msg: String) extends SvcStatus
case class SvcInitialized() extends SvcStatus

case class K8sServableState(rs: RsStatus, svc: SvcStatus) extends ServableState

Expand All @@ -25,10 +28,11 @@ case object K8sServableState {
implicit val rsOkShow: Show[RsIsOk] = Show.show(s => s.msg.map("Replicaset:" + _).getOrElse(""))

def default: K8sServableState =
new K8sServableState(RsIsFailed("unknown status"), SvcIsUnavailable("unknown status"))
new K8sServableState(RsInitialized(), SvcInitialized())

def toServableEvent(state: K8sServableState): ServableEvent =
state match {
case K8sServableState(_: RsInitialized, _: SvcInitialized) => ServableStarting
case K8sServableState(rs: RsIsFailed, svc: SvcIsUnavailable) =>
ServableNotReady(rs.show + svc.show)
case K8sServableState(_, s: SvcIsUnavailable) => ServableNotReady(s.show)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,11 @@ class K8sServableStates[F[_]: Sync](ref: Ref[F, Map[String, K8sServableState]])
servableState =
state.getOrElse[K8sServableState](event.instanceName, K8sServableState.default)
newServableState = event match {
case ReplicaSetIsFailed(_, msg) =>
servableState.copy(rs = RsIsFailed(msg))
case ReplicaSetIsOk(_, msg) =>
servableState.copy(rs = RsIsOk(msg))
case ServiceIsAvailable(_) => servableState.copy(svc = SvcIsAvailable)
case ServiceIsUnavailable(_, message) =>
servableState.copy(svc = SvcIsUnavailable(message))
case _ => servableState
case ReplicaSetIsFailed(_, msg) => servableState.copy(rs = RsIsFailed(msg))
case ReplicaSetIsOk(_, msg) => servableState.copy(rs = RsIsOk(msg))
case ServiceIsAvailable(_) => servableState.copy(svc = SvcIsAvailable)
case ServiceIsUnavailable(_, message) => servableState.copy(svc = SvcIsUnavailable(message))
case _ => servableState
}
_ <- ref.update(_ + (event.instanceName -> newServableState))
servableEvent = K8sServableState.toServableEvent(newServableState)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package io.hydrosphere.serving.manager.domain.model

import java.nio.file.Path
import cats.data.OptionT
import cats.effect.Clock
import cats.effect.{Clock, ContextShift}
import cats.implicits._
import cats.{Monad, MonadError}
import io.hydrosphere.serving.manager.api.http.controller.model._
Expand All @@ -20,6 +20,9 @@ import io.hydrosphere.serving.manager.util.DeferredResult
import io.hydrosphere.serving.manager.util.InstantClockSyntax._
import org.apache.logging.log4j.scala.Logging

import java.util.concurrent.Executors
import scala.concurrent.ExecutionContext

trait ModelService[F[_]] {
def get(modelId: Long): F[Model]

Expand Down Expand Up @@ -48,9 +51,12 @@ object ModelService {
appRepo: ApplicationRepository[F],
servableRepo: ServableRepository[F],
fetcher: ModelFetcher[F],
modelVersionBuilder: ModelVersionBuilder[F]
modelVersionBuilder: ModelVersionBuilder[F],
cs: ContextShift[F]
): ModelService[F] =
new ModelService[F] with Logging {
val ex = Executors.newCachedThreadPool()
val ec = ExecutionContext.fromExecutor(ex)

def deleteModel(modelId: Long): F[Model] =
for {
Expand Down Expand Up @@ -84,7 +90,7 @@ object ModelService {
.leftMap(x => InvalidRequest(x.toList.mkString))
)
parentModel <- createIfNecessary(versionMetadata.modelName)
b <- modelVersionBuilder.build(parentModel, versionMetadata, modelPath)
b <- cs.evalOn(ec)(modelVersionBuilder.build(parentModel, versionMetadata, modelPath))
} yield b

def createIfNecessary(modelName: String): F[Model] =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ object ModelVersionBuilder {
_ <- handleBuild(init, modelFileStructure, handler).flatMap(deferred.complete).start
} yield DeferredResult(init, deferred)

def initialVersion(model: Model, metadata: ModelVersionMetadata) =
private def initialVersion(model: Model, metadata: ModelVersionMetadata) =
for {
version <- modelVersionService.getNextModelVersion(model.id)
image = imageRepository.getImage(metadata.modelName, version.toString)
Expand All @@ -70,7 +70,7 @@ object ModelVersionBuilder {
modelVersion <- modelVersionRepository.create(mv)
} yield mv.copy(id = modelVersion.id)

def buildImage(buildPath: Path, image: DockerImage, handler: ProgressHandler) =
private def buildImage(buildPath: Path, image: DockerImage, handler: ProgressHandler) =
for {
imageId <- dockerClient.build(
buildPath,
Expand All @@ -82,7 +82,7 @@ object ModelVersionBuilder {
res <- dockerClient.inspectImage(imageId)
} yield res.id().stripPrefix("sha256:")

def handleBuild(
private def handleBuild(
mv: ModelVersion.Internal,
modelFileStructure: ModelFileStructure,
handler: ProgressHandler
Expand Down Expand Up @@ -111,7 +111,7 @@ object ModelVersionBuilder {
}
}

def prepare(
private def prepare(
modelVersion: ModelVersion.Internal,
modelFileStructure: ModelFileStructure
): F[ModelFileStructure] =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,13 +66,10 @@ object ServableMonitoring extends Logging {

def updatedServable(servable: Servable, servEvent: ServableEvent): Servable =
servEvent match {
case ServableNotReady(message) =>
servable.copy(status = NotServing, message = message.some)
case ServableReady(message) =>
servable.copy(status = Serving, message = message)
case ServableStarting =>
servable.copy(status = Starting, message = None)
case _ => servable
case ServableNotReady(msg) => servable.copy(status = NotServing, message = msg.some)
case ServableReady(msg) => servable.copy(status = Serving, message = msg)
case ServableStarting => servable.copy(status = Starting, message = None)
case _ => servable
}

private def streamFinishMessage(msg: String): F[Unit] =
Expand Down