Kanject.Core.EtlTaskManager

Long-running batch jobs with checkpointing, resume, and dead-letter handling. Designed for Lambda's 15-minute ceiling — checkpoint every batch, restart from the last good cursor on the next invocation.

Install

bash
dotnet add package Kanject.Core.EtlTaskManager

Register

csharp
using Kanject.Core.EtlTaskManager.Extensions;

builder.Services.AddEtlTaskManager(options =>
{
    options.CheckpointTable = $"{appSettings.Stage}-etl-checkpoints";
    options.AwsRegion       = appSettings.AwsRegion;
});

Define a task

csharp
using Kanject.Core.EtlTaskManager.Abstractions;
using Kanject.Core.EtlTaskManager.Abstractions.Attributes;

[EtlTask(name: "backfill-product-search-index")]
public class BackfillSearchIndex(
    IProductRepository products,
    ISearchIndex search) : IEtlTask
{
    public async Task RunAsync(IEtlContext ctx, CancellationToken ct)
    {
        // Resume from the last successful checkpoint
        var cursor = ctx.GetCheckpoint<string>() ?? string.Empty;

        await foreach (var batch in products.ScanFromAsync(cursor, batchSize: 500, ct))
        {
            await search.IndexAsync(batch, ct);

            // Persist progress; if the Lambda dies, the next run picks up here
            await ctx.SaveCheckpointAsync(batch[^1].Id);
        }
    }
}

Run it

bash
# Kick off the task — runs as a Lambda, checkpoints each batch
kanject etl run backfill-product-search-index --env dev

# Resume after a crash — same command. The task reads its last checkpoint.
kanject etl run backfill-product-search-index --env dev

# List in-flight + recently completed tasks
kanject etl status --env dev

What ships with it

  • IEtlContext.SaveCheckpointAsync(state) — typed checkpoints in DynamoDB
  • Automatic resume after Lambda timeout or transient failure
  • Per-task DLQ for unrecoverable batches
  • kanject etl status for in-flight + recent runs
  • Concurrency lock per task name — only one runner active at a time