Messenger

Abstraction for the message broker used in MessagingManager

It requires

  1. Queues: for queued jobs.

  2. PubSub: for immediate jobs.

Inheritors

Properties

Link copied to clipboard
abstract val apiQueue: String
Link copied to clipboard
abstract val graderQueue: String

Names of the queues required

Link copied to clipboard
Link copied to clipboard

How long to wait until attempting a reconnection

Functions

Link copied to clipboard
abstract suspend fun cleanReserved()

Cleans any reserved jobs from reserved queue

Link copied to clipboard
abstract fun close()
Link copied to clipboard
abstract fun decompressingStatus(assignmentConfigId: Long): String

Names of the statuses required

Link copied to clipboard
abstract suspend fun deleteReserved(jobName: String)

Deletes job of jobName from graderReservedQueue

Link copied to clipboard
abstract suspend fun getDecompressing(assignmentConfigId: Long): Set<Long>

Retrieve the set of all submission ids under assignmentConfigId that are still undergoing decompression

Link copied to clipboard
abstract suspend fun getNextJob(): String?

Gets next job from graderQueue

Link copied to clipboard
abstract fun gradingStatus(assignmentConfigId: Long): String
Link copied to clipboard
abstract fun logFailedConnection(type: String, tr: Throwable)

Common logging for failing to connect to the messenger

Link copied to clipboard
open suspend fun <R> loopingTry(type: String, resetBlock: suspend () -> Unit = {}, block: suspend () -> R): R

Runs block, retrying every period set by waitUntilReconnect until the operation succeeds.

Link copied to clipboard
open suspend fun loopRecover(type: String, resetBlock: suspend () -> Unit, block: suspend () -> Unit)

Always run block, and executes resetBlock when an exception occurs.

Link copied to clipboard
abstract fun makeGradingStatusString(stageName: String = "", details: String = ""): String

Generate the status string for the grading-in-progress state

Link copied to clipboard
abstract suspend fun publish(jobName: String, payload: Payload)

Publish payload to jobName

Link copied to clipboard
abstract suspend fun push(queueName: String, jobMessage: JobMessage)

Push jobs as jobMessage to queueName for external system components to consume

Link copied to clipboard
abstract suspend fun queueUp()

Blocking queue to consume incoming jobs on Messenger.graderQueue

Link copied to clipboard
abstract suspend fun setGradingProgress(assignmentConfigId: Long, submissionId: Long, status: String?)

Update the status string of submissionId under assignmentConfigId to status if it is not-null, or delete the key submissionId to nil if status is null, to indicate this submission is no longer active

Link copied to clipboard
abstract suspend fun subscribe()

Blocking subscribe to Job.subscriptions channels on Redis