MessagingManager

object MessagingManager : Messenger, CoroutineScope

Contains methods for processing jobs from worker queues and subscribed channels

Properties

Link copied to clipboard
open override val apiQueue: String
Link copied to clipboard
Link copied to clipboard
open override val graderQueue: String

Names of the queues required

Link copied to clipboard
open override val graderReservedQueue: String
Link copied to clipboard
private val logger: Logger
Link copied to clipboard

Adjustable sleeping duration between polling in queueUp

Link copied to clipboard
open override val waitUntilReconnect: Duration

How long to wait until attempting a reconnection

Functions

Link copied to clipboard
open suspend override fun cleanReserved()

Cleans any reserved jobs from reserved queue

Link copied to clipboard
open override fun close()
Link copied to clipboard
open override fun decompressingStatus(assignmentConfigId: Long): String

Names of the statuses required

Link copied to clipboard
open suspend override fun deleteReserved(jobName: String)

Deletes job of jobName from graderReservedQueue

Link copied to clipboard
open suspend override fun getDecompressing(assignmentConfigId: Long): Set<Long>

Retrieve the set of all submission ids under assignmentConfigId that are still undergoing decompression

Link copied to clipboard
open suspend override fun getNextJob(): String?

Gets next job from graderQueue

Link copied to clipboard
open override fun gradingStatus(assignmentConfigId: Long): String
Link copied to clipboard
open override fun logFailedConnection(type: String, tr: Throwable)

Common logging for failing to connect to the messenger

Link copied to clipboard
open suspend override fun <R> loopingTry(type: String, resetBlock: suspend () -> Unit, block: suspend () -> R): R

Runs block, retrying every period set by waitUntilReconnect until the operation succeeds.

Link copied to clipboard
open suspend override fun loopRecover(type: String, resetBlock: suspend () -> Unit, block: suspend () -> Unit)

Always run block, and executes resetBlock when an exception occurs.

Link copied to clipboard
open override fun makeGradingStatusString(stageName: String, details: String): String

Generate the status string for the grading-in-progress state

Link copied to clipboard
private fun pop(reservedRawJobMessage: String?, jobParamWithRaw: (String) -> JobParam): JobRequest?

Pops and parse a queued job to a JobRequest

Link copied to clipboard
open suspend override fun publish(jobName: String, payload: Payload)

Publish payload to jobName

Link copied to clipboard
open suspend override fun push(queueName: String, jobMessage: JobMessage)

Push jobs as jobMessage to queueName for external system components to consume

Link copied to clipboard
suspend fun pushToApi(jobMessage: JobMessage)
Link copied to clipboard
suspend fun pushToGrader(jobMessage: JobMessage)
Link copied to clipboard
open suspend override fun queueUp()

Blocking queue to consume incoming jobs on Messenger.graderQueue

Link copied to clipboard
suspend fun runOnce(): JobRequest?

Step function to process one job from the Messenger.graderQueue

fun runOnce(channel: String?, message: String?)

Step function to process one provided message from the subscription channels

Link copied to clipboard
open suspend override fun setGradingProgress(assignmentConfigId: Long, submissionId: Long, status: String?)

Update the status string of submissionId under assignmentConfigId to status if it is not-null, or delete the key submissionId to nil if status is null, to indicate this submission is no longer active

Link copied to clipboard
open suspend override fun subscribe()

Blocking subscribe to Job.subscriptions channels on Redis