Featured Post

Building Taskito: A Rust-Powered Task Queue for Python — No Broker Required

How I built an embedded task queue for Python using Rust, Tokio, and Diesel — with SQLite and Postgres backends, priority queues, retries, cron scheduling, and zero external broker dependencies.

6 min read
By Pratyush Sharma

Building Taskito: A Rust-Powered Task Queue for Python — No Broker Required

If you've used Celery, you know the drill: install Redis or RabbitMQ, configure a broker URL, run a separate worker process, set up monitoring, troubleshoot connection issues, and eventually debug serialization failures at 2 AM. For large-scale distributed systems, that infrastructure makes sense. For the rest of us — background email sending, periodic data cleanup, queued API calls — it's overkill.

I built Taskito to solve this. It's a task queue that embeds directly into your Python process. No broker, no extra services. pip install taskito, define your tasks, and run. SQLite handles local storage by default. When you outgrow a single machine, swap to PostgreSQL and spin up distributed workers — same API, no code changes.

Why not just use Celery?

Celery is battle-tested and I've used it in production. But three pain points kept coming up:

  1. Operational overhead — Even a simple background task needs a Redis instance, a Celery worker process, and monitoring for both. On small projects, the broker infrastructure costs more than the application itself.

  2. Local development friction — Running docker-compose up just to process a background job during development breaks flow. I wanted something that works with python app.py and nothing else.

  3. Startup latency — Celery workers take seconds to start, establish broker connections, and begin consuming. For CLI tools or short-lived scripts that queue tasks on exit, this is a dealbreaker.

Taskito sidesteps all of this by storing tasks in a local database and processing them in-process (or across workers when you need scale).

Architecture

The system has three layers:

Rendering Mermaid diagram...
Chart length: 259 charactersThis may take a moment

Python API — The surface users interact with. Decorators for defining tasks, methods for enqueueing, and configuration through plain Python objects.

Rust Task Engine — The core scheduler and executor, compiled to a native Python extension via PyO3. Handles priority queues, retry logic, task state transitions, and worker coordination.

Storage Layer — Diesel ORM provides a unified query interface over SQLite and PostgreSQL. Tasks, schedules, and execution logs all live in the database — no separate message queue needed.

Core features

Zero-config task definition

Define tasks with a decorator. Enqueue them by calling .enqueue(). That's it.

from taskito import Taskito, task

app = Taskito()  # SQLite by default, stores in ./taskito.db

@task(app)
def send_welcome_email(user_id: int, email: str):
    # your email logic here
    send_email(email, template="welcome")

# Enqueue — returns immediately, runs in background
send_welcome_email.enqueue(user_id=42, email="new@user.com")

# Start processing
app.run_worker()

No broker URL. No serializer config. No CELERY_IMPORTS list. The task function, its arguments, and metadata all get serialized to JSON and stored in SQLite.

Priority queues

Not all tasks are equal. A payment confirmation should jump ahead of a weekly digest email.

@task(app, priority=10)  # higher = more urgent
def process_payment(order_id: int):
    charge(order_id)

@task(app, priority=1)
def send_digest(user_id: int):
    compile_and_send_digest(user_id)

The Rust engine maintains a priority-ordered queue backed by the database. When a worker polls for the next task, it always gets the highest-priority pending item. Priorities are integers — you pick the scale.

Retry with exponential backoff

Transient failures happen. API timeouts, rate limits, temporary network blips. Taskito retries failed tasks automatically with configurable backoff:

@task(app, max_retries=5, retry_backoff=2.0)
def call_external_api(endpoint: str):
    response = requests.post(endpoint, timeout=10)
    response.raise_for_status()

If call_external_api fails, Taskito retries after 2s, 4s, 8s, 16s, 32s — exponential backoff with a base of retry_backoff seconds. After 5 failures, the task moves to a dead-letter state where you can inspect it and decide what to do.

The retry state machine runs entirely in Rust:

PENDING → RUNNING → SUCCESS
                  ↘ FAILED → RETRY → RUNNING → ...
                                    ↘ DEAD (max retries exceeded)

Cron-based periodic scheduling

Cron scheduling is built in — no separate beat process like Celery requires.

@task(app, cron="0 9 * * 1-5")  # 9 AM on weekdays
def daily_report():
    generate_and_email_report()

@task(app, cron="*/5 * * * *")  # every 5 minutes
def health_check():
    ping_services()

The Tokio runtime manages cron triggers. When a cron fires, it enqueues a new instance of the task through the same priority queue. This means cron tasks get the same retry logic, logging, and monitoring as manually enqueued tasks — no special cases.

Task chaining and parallel execution

Complex workflows often require tasks to run in sequence or in parallel. Taskito supports both:

from taskito import chain, group

# Sequential: download → process → upload
workflow = chain(
    download_data.s(url="https://data.example.com/dump.csv"),
    process_data.s(),
    upload_results.s(destination="s3://bucket/results/"),
)
workflow.enqueue()

# Parallel: run all three concurrently, then aggregate
batch = group(
    fetch_from_api_a.s(),
    fetch_from_api_b.s(),
    fetch_from_api_c.s(),
)
batch.enqueue()

Chains pass results from one task to the next. Groups run concurrently and complete when all members finish. Under the hood, the Rust engine manages task dependencies as a DAG — no polling or sleep loops.

Progress tracking

Long-running tasks can report progress back to the application:

@task(app)
def process_large_file(file_path: str):
    lines = open(file_path).readlines()
    for i, line in enumerate(lines):
        process(line)
        if i % 1000 == 0:
            task.update_progress(current=i, total=len(lines))

Progress updates are batched and written to the database. The monitoring dashboard (or your own code) can poll task progress without interfering with execution.

SQLite in production — WAL mode is the key

The most common question I get is: "SQLite for a task queue? Doesn't it lock on writes?"

Yes — in the default journal mode, SQLite allows only one writer at a time and blocks readers during writes. For a task queue with multiple workers, this is a non-starter.

WAL (Write-Ahead Logging) mode changes the game. Readers never block writers, and writers never block readers. Multiple workers can poll for tasks concurrently while the engine writes status updates.

# Taskito enables WAL mode automatically
app = Taskito(database_url="sqlite:///taskito.db")
# Under the hood: PRAGMA journal_mode=WAL

For single-machine deployments with moderate throughput (hundreds of tasks per second), WAL-mode SQLite handles the load comfortably. When you need more, switch to PostgreSQL:

# Same API, different backend
app = Taskito(database_url="postgresql://user:pass@host/taskito")

Diesel's query abstraction means the engine runs the same SQL logic on both backends. The only PostgreSQL-specific code is connection pooling configuration.

Serialization trade-offs

Tasks need to be serialized to store in the database and deserialized when a worker picks them up. I initially used JSON everywhere because it's human-readable — you can SELECT * FROM tasks and actually read the arguments.

But for tasks with large payloads (bulk data imports, ML feature vectors), JSON serialization became a bottleneck. I benchmarked three options:

Format1MB payload serializeDeserializeHuman-readable
JSON12ms18msYes
MessagePack3ms4msNo
Pickle2ms2msNo

Taskito defaults to JSON for debuggability. You can switch per-task:

@task(app, serializer="msgpack")
def process_bulk_data(records: list[dict]):
    # large payloads benefit from MessagePack
    ...

I intentionally excluded Pickle as a default option despite its speed. Deserializing untrusted Pickle data is a remote code execution vector — not something a task queue should encourage.

The monitoring dashboard

Every task queue needs observability. Taskito includes a lightweight web dashboard that shows:

  • Queue depth — pending, running, and failed task counts
  • Throughput — tasks processed per minute over time
  • Task details — individual task status, arguments, retries, and error tracebacks
  • Worker status — active workers and their current assignments
app = Taskito(dashboard=True, dashboard_port=8089)

The dashboard is a single-page app served by the Rust engine itself (using axum). No Node.js build step, no separate frontend deploy. It reads directly from the task database, so there's zero additional infrastructure.

What I learned

SQLite's WAL mode is essential for concurrent access. Without it, worker contention makes the queue unusable. With it, SQLite becomes a surprisingly capable backend for moderate-scale task processing.

Task serialization format matters more than you'd expect. JSON is the right default for debuggability, but offering MessagePack for hot paths prevents serialization from becoming the bottleneck on bulk workloads.

A built-in dashboard saves more time than it costs to build. I initially planned to rely on CLI-only monitoring (taskito status). After spending too long SELECT-ing from the database during development, I built the dashboard. It paid for itself immediately.

Cron and queue should be one system, not two. Celery separates beat (cron) from workers, which means two processes, two failure modes, and two things to monitor. Embedding the cron scheduler in the same runtime simplifies deployment and eliminates the "beat is down but workers are fine" failure mode.

Try it

pip install taskito

The source is on GitHub. If you're tired of maintaining a broker just to send emails in the background, give it a shot.

Published on March 14, 2026

Related Posts

Enjoyed this post?

Subscribe to get notified when I publish new content about web development and technology.