Workshop 14: Timers and Cron Workflows
What we want to build
Create timer-based workflows and recurring scheduled tasks using Temporal's timer capabilities. This lesson covers Workflow.sleep()
, timeout patterns, conditional waiting, and implementing cron-like recurring workflows with continueAsNew
.
Expecting Result
By the end of this lesson, you'll have:
- Workflows that use timers for delays and scheduling
- Understanding of different timer patterns (simple delays, timeouts, conditional waits)
- Implementation of cron-like recurring workflows
- Knowledge of timezone handling and scheduling best practices
Code Steps
Step 1: Create Basic Timer Workflow Interface
Open class/workshop/lesson_14/workflow/TimerWorkflow.kt
and create the interfaces:
package com.temporal.bootcamp.lesson14.workflow
import io.temporal.workflow.WorkflowInterface
import io.temporal.workflow.WorkflowMethod
@WorkflowInterface
interface TimerWorkflow {
@WorkflowMethod
fun processWithTimer(request: TimerRequest): TimerResult
}
@WorkflowInterface
interface CronWorkflow {
@WorkflowMethod
fun runCronJob(config: CronConfig): CronResult
}
Step 2: Implement Simple Timer Patterns
Create the timer workflow implementation:
class TimerWorkflowImpl : TimerWorkflow {
override fun processWithTimer(request: TimerRequest): TimerResult {
val logger = Workflow.getLogger(this::class.java)
val startTime = Instant.now()
when (request.operation) {
"simple_delay" -> return processWithSimpleDelay(request, startTime)
"timeout_pattern" -> return processWithTimeout(request, startTime)
"conditional_wait" -> return processWithConditionalWait(request, startTime)
// ... other patterns
}
throw IllegalArgumentException("Unknown operation: ${request.operation}")
}
private fun processWithSimpleDelay(request: TimerRequest, startTime: Instant): TimerResult {
val logger = Workflow.getLogger(this::class.java)
logger.info("Sleeping for ${request.delaySeconds} seconds")
// Simple delay using Workflow.sleep
Workflow.sleep(Duration.ofSeconds(request.delaySeconds))
logger.info("Timer completed, performing operation")
val actualDelay = Duration.between(startTime, Instant.now())
return TimerResult(
requestId = request.requestId,
success = true,
result = "Simple delay completed after ${request.delaySeconds}s",
actualDelay = actualDelay
)
}
}
Key points:
Workflow.sleep()
is the primary timer mechanism- Timers are durable and survive worker restarts
- Always measure actual delay for monitoring
Step 3: Implement Timeout Patterns
Add timeout handling with concurrent operations:
private fun processWithTimeout(request: TimerRequest, startTime: Instant): TimerResult {
val logger = Workflow.getLogger(this::class.java)
val timeoutSeconds = request.timeoutSeconds ?: 30
// Simulate long-running operation
val operationFuture = Async.procedure {
logger.info("Starting long operation")
Workflow.sleep(Duration.ofSeconds(request.delaySeconds))
logger.info("Long operation completed")
}
// Wait for operation or timeout
val timedOut = try {
operationFuture.get(Duration.ofSeconds(timeoutSeconds))
false
} catch (e: Exception) {
logger.warn("Operation timed out after ${timeoutSeconds}s")
true
}
val actualDelay = Duration.between(startTime, Instant.now())
return TimerResult(
requestId = request.requestId,
success = !timedOut,
result = if (timedOut) "Operation timed out" else "Operation completed within timeout",
actualDelay = actualDelay,
timedOut = timedOut
)
}
Key points:
- Use
Async.procedure
for concurrent operations get(timeout)
provides timeout functionality- Handle timeout exceptions gracefully
Step 4: Implement Conditional Waiting
Add conditional waiting with periodic checks:
private fun processWithConditionalWait(request: TimerRequest, startTime: Instant): TimerResult {
val logger = Workflow.getLogger(this::class.java)
var conditionMet = false
var iterations = 0
// Wait for condition with periodic checks
val success = Workflow.await(Duration.ofSeconds(request.delaySeconds)) {
iterations++
// Simulate checking some condition
if (iterations >= 5) {
conditionMet = true
logger.info("Condition met after $iterations iterations")
return@await true
}
// Sleep between checks
Workflow.sleep(Duration.ofSeconds(2))
false
}
val actualDelay = Duration.between(startTime, Instant.now())
return TimerResult(
requestId = request.requestId,
success = success && conditionMet,
result = if (conditionMet) "Condition met after $iterations checks" else "Condition not met, timed out",
actualDelay = actualDelay,
timedOut = !success
)
}
Key points:
Workflow.await()
provides conditional waiting with timeout- Combine with periodic checks for polling patterns
- Return
true
from the condition to exit early
Step 5: Implement Cron Workflow
Create a recurring workflow using continueAsNew
:
class CronWorkflowImpl : CronWorkflow {
override fun runCronJob(config: CronConfig): CronResult {
val logger = Workflow.getLogger(this::class.java)
val executionTime = Instant.now()
logger.info("Executing cron job: ${config.jobId} at $executionTime")
if (!config.enabled) {
// Skip execution but continue scheduling
val nextRun = calculateNextRunTime(config, executionTime)
if (nextRun != null) {
val sleepDuration = Duration.between(executionTime, nextRun)
if (sleepDuration.isPositive) {
Workflow.sleep(sleepDuration)
}
Workflow.continueAsNew(config)
}
return CronResult(
jobId = config.jobId,
executionTime = executionTime,
success = true,
nextRun = nextRun,
result = "Job skipped (disabled)"
)
}
try {
// Execute the actual job
val jobResult = executeJob(config)
// Calculate and schedule next run
val nextRun = calculateNextRunTime(config, executionTime)
if (nextRun != null) {
val sleepDuration = Duration.between(executionTime, nextRun)
if (sleepDuration.isPositive) {
Workflow.sleep(sleepDuration)
}
// Use continueAsNew to prevent history bloat
Workflow.continueAsNew(config)
}
return CronResult(
jobId = config.jobId,
executionTime = executionTime,
success = true,
nextRun = nextRun,
result = jobResult
)
} catch (e: Exception) {
logger.error("Job failed: ${e.message}")
// Schedule next run even on failure
val nextRun = calculateNextRunTime(config, executionTime)
if (nextRun != null) {
val sleepDuration = Duration.between(executionTime, nextRun)
if (sleepDuration.isPositive) {
Workflow.sleep(sleepDuration)
}
Workflow.continueAsNew(config)
}
return CronResult(
jobId = config.jobId,
executionTime = executionTime,
success = false,
nextRun = nextRun,
result = "Job failed: ${e.message}"
)
}
}
}
Step 6: Implement Cron Time Calculation
Add cron expression parsing and next run calculation:
private fun calculateNextRunTime(config: CronConfig, currentTime: Instant): Instant? {
val zoneId = ZoneId.of(config.timezone)
val currentDateTime = currentTime.atZone(zoneId)
return when (config.cronExpression) {
"0 0 * * *" -> {
// Daily at midnight
currentDateTime.plusDays(1)
.withHour(0)
.withMinute(0)
.withSecond(0)
.withNano(0)
.toInstant()
}
"0 */5 * * *" -> {
// Every 5 minutes
currentDateTime.plusMinutes(5)
.withSecond(0)
.withNano(0)
.toInstant()
}
"0 0 */6 * *" -> {
// Every 6 hours
currentDateTime.plusHours(6)
.withMinute(0)
.withSecond(0)
.withNano(0)
.toInstant()
}
else -> {
// Default: run every hour
currentDateTime.plusHours(1)
.withMinute(0)
.withSecond(0)
.withNano(0)
.toInstant()
}
}
}
private fun executeJob(config: CronConfig): String {
return when (config.jobId) {
"daily-report" -> {
Workflow.sleep(Duration.ofSeconds(5))
"Daily report generated successfully"
}
"data-cleanup" -> {
Workflow.sleep(Duration.ofSeconds(10))
"Data cleanup completed, 1000 records processed"
}
else -> {
Workflow.sleep(Duration.ofSeconds(3))
"Generic job completed"
}
}
How to Run
1. Start Timer Workflow
val timerWorkflow = workflowClient.newWorkflowStub(
TimerWorkflow::class.java,
WorkflowOptions.newBuilder()
.setTaskQueue("timer-queue")
.setWorkflowId("timer-${System.currentTimeMillis()}")
.build()
)
// Simple delay
val request = TimerRequest(
requestId = "REQ-001",
delaySeconds = 10,
operation = "simple_delay"
)
val result = timerWorkflow.processWithTimer(request)
// Output: Simple delay completed after 10s
// Timeout pattern
val timeoutRequest = TimerRequest(
requestId = "REQ-002",
delaySeconds = 30,
operation = "timeout_pattern",
timeoutSeconds = 15
)
val timeoutResult = timerWorkflow.processWithTimer(timeoutRequest)
// Output: Operation timed out (because 30s > 15s timeout)
2. Start Cron Workflow
val cronWorkflow = workflowClient.newWorkflowStub(
CronWorkflow::class.java,
WorkflowOptions.newBuilder()
.setTaskQueue("cron-queue")
.setWorkflowId("daily-report-cron")
.build()
)
val cronConfig = CronConfig(
jobId = "daily-report",
cronExpression = "0 0 * * *", // Daily at midnight
timezone = "America/New_York",
enabled = true
)
// This starts an infinite recurring workflow
cronWorkflow.runCronJob(cronConfig)
3. Schedule Future Task
val scheduledWorkflow = workflowClient.newWorkflowStub(
ScheduledTaskWorkflow::class.java,
WorkflowOptions.newBuilder()
.setTaskQueue("scheduled-queue")
.setWorkflowId("scheduled-${System.currentTimeMillis()}")
.build()
)
val task = ScheduledTask(
taskId = "reminder-001",
taskType = TaskType.EMAIL_REMINDER,
scheduledTime = Instant.now().plus(Duration.ofHours(24)), // 24 hours from now
parameters = mapOf("recipient" to "user@example.com")
)
val taskResult = scheduledWorkflow.runScheduledTask(task)
4. Expected Output
Timer workflow: Sleeping for 10 seconds
Timer workflow: Timer completed, performing operation
Result: Simple delay completed after 10s
Cron workflow: Executing cron job: daily-report at 2024-01-15T00:00:00Z
Cron workflow: Daily report generated successfully
Cron workflow: Next run scheduled for: 2024-01-16T00:00:00Z
What You've Learned
- ✅ How to use
Workflow.sleep()
for durable delays - ✅ Implementing timeout patterns with
Async
and timeouts - ✅ Conditional waiting with
Workflow.await()
- ✅ Building recurring workflows with
continueAsNew
- ✅ Handling cron expressions and timezone calculations
- ✅ Creating scheduled task workflows
- ✅ Best practices for timer-based workflows
Timer workflows enable powerful scheduling and time-based automation patterns!