Skip to main content

Wire reference

Native bincode over TCP on the queue port (default 10100). Frames use the same celeriant_wire::network::wire_header::WireHeader as Celeriant. Body types in celeriant_queue_proto::requests and ::responses.

No HTTP. No JSON. No JWT. The wire is the same shape Celeriant uses for its own client traffic.

Verbs

IDVerbPurpose
1CreateQueueRegister a queue with config + DLQ. Idempotent on (queue_key, dlq, config).
2DeleteQueueBest-effort delete. Removes the in-memory projection and zeroes gauges.
3ProduceAppend messages to a queue. Per-message (client_id, client_seq) for idempotency.
4ConsumeLease N messages with an optional long-poll wait_ms.
5AckTerminate-by-success. Batched per-request into one durable AckBatch event.
6NackReturn-to-Available. Optional delay_ms delays redelivery.
7ExtendBump the lease deadline by another visibility_timeout_ms. Same lease_id, no delivery_count bump.
8StatsQueue state snapshot: tail, trim, in-flight, parked, blocked, ack-hole ranges, range assignments.
9TrimQueueAdvance trim_cursor to keep_from_version. Fold clamps to lowest live lease / blocked / nack-delay.
10SnapshotNowForce-write a Snapshot control event. Useful for tests. Production relies on the 60s timer.
11AssignRangeAssign a ring range (lo, hi) to a (group, consumer). Idempotent on exact bounds. Partial overlap is InvalidConfig.
12UnblockClear a head-of-line Block for a specific version.

Response IDs

Response message types are 100 + verb ID. Errors are 199.

Error codes

QueueErrorCode (in celeriant_queue_proto::responses):

CodeVariantWhen
1UnknownQueuequeue_key not registered.
2QueueAlreadyExistsCreateQueue with different DLQ or config.
3InvalidConfigValidation failure on QueueConfig or AssignRange overlap.
4InvalidHandleAck/Nack/Extend against a stale lease_id, parked version, below trim, or no live lease.
5AckHoleCapExceededPer-queue ack-hole range count above max_ack_holes.
6InFlightCapExceededPer-queue in-flight above max_in_flight.
7OccConflictProduce.expected_last_version mismatch.
8IdempotencyConflictProduce.client_seq is not strictly greater than prior writes for the same client_id.
9NotLeaderThis shard isn't the leader for the queue's aggregates.
10SchemaValidationReserved for schema enforcement (Tier 2).
11PayloadTooLargeFrame exceeded --queue-max-frame-size.
12QuotaExceeded--queue-max-queues-per-org-per-shard hit.
99InternalServer-side bug or unexpected storage error.

Control event enum

ControlEvent discriminants are positional in bincode. NEW VARIANTS MUST BE APPENDED AT THE END. Inserting mid-enum breaks every stored WAL log.

IdxVariant
0Lease
1Ack
2Nack
3Park
4TrimStart
5SnapshotMarker
6Register
7AckBatch
8Snapshot
9RangeAssign
10Block
11Unblock

Snapshot schema

QueueSnapshot carries schema_version: u32 (currently 2) as the FIRST field. Recovery rejects mismatches with SnapshotSchemaMismatch and falls back to genesis-fold of the control log.

Canonical client shape

No first-class client crate yet. The reference RPC helper lives in celeriant_queue_integration_tests/tests/common/mod.rs::rpc:

pub async fn rpc(tcp: &mut TcpStream, req: &QueueRequest, codec: &DictCodec) -> QueueResponse {
write_request_uncompressed(tcp, req, 1024 * 1024, PROTOCOL_VERSION_V2).await.unwrap();
tcp.flush().await.unwrap();
let header = WireHeader::from_reader(tcp, 16 * 1024 * 1024).await.unwrap();
read_response(header, tcp, codec).await.unwrap()
}

Connections are persistent. Open once, pipeline as many requests as you want, multiplex by correlation_id if you want to send before reading.