Cloud Pub/Sub¶
Cloudbox emulates the Pub/Sub REST API (v1) and the gRPC streaming API used by
google-cloud-pubsub. Both transports run concurrently and share the same in-memory state.
Connection¶
| Transport | Port | Environment variable |
|---|---|---|
| gRPC | 8085 |
PUBSUB_EMULATOR_HOST=localhost:8085 |
| REST (HTTP/1.1) | 8086 |
PUBSUB_REST_HOST=localhost:8086 |
from google.auth.credentials import AnonymousCredentials
from google.cloud import pubsub_v1
# gRPC transport (default)
publisher = pubsub_v1.PublisherClient(
credentials=AnonymousCredentials(),
client_options={"api_endpoint": "localhost:8085"},
)
subscriber = pubsub_v1.SubscriberClient(
credentials=AnonymousCredentials(),
client_options={"api_endpoint": "localhost:8085"},
)
For REST transport (useful for environments that cannot use gRPC):
publisher = pubsub_v1.PublisherClient(
credentials=AnonymousCredentials(),
client_options={"api_endpoint": "http://localhost:8086"},
transport="rest",
)
Use sdk_compat/clients.py for pre-configured client factories with correct Cloudbox settings.
Topics¶
Create topic¶
{
"labels": { "env": "local" },
"schemaSettings": {
"schema": "projects/local-project/schemas/my-schema",
"encoding": "JSON"
},
"messageRetentionDuration": "86400s",
"messageStoragePolicy": { "allowedPersistenceRegions": ["us-central1"] }
}
All fields are optional. Returns the topic resource. If the topic already exists, returns the existing resource (idempotent).
Get topic¶
Returns the topic resource. 404 if not found.
Patch topic¶
Updates the fields named in updateMask. Returns the updated topic resource.
List topics¶
Returns { "topics": [...], "nextPageToken": "..." }.
Delete topic¶
204 on success. 404 if not found.
Publishing¶
Publish messages¶
{
"messages": [
{
"data": "SGVsbG8gV29ybGQ=",
"attributes": { "key": "value" },
"orderingKey": "user-123"
}
]
}
data must be base64-encoded. Returns the assigned message IDs:
Each published message is fanned out to all subscriptions on the topic. Filtering, push dispatch, BigQuery writing, and Cloud Storage writing happen synchronously (BigQuery and GCS) or as background tasks (push).
Subscriptions¶
Create subscription¶
{
"topic": "projects/local-project/topics/my-topic",
"ackDeadlineSeconds": 60,
"retainAckedMessages": false,
"messageRetentionDuration": "604800s",
"filter": "attributes.env = \"prod\"",
"enableMessageOrdering": false,
"deadLetterPolicy": {
"deadLetterTopic": "projects/local-project/topics/dead-letter",
"maxDeliveryAttempts": 5
},
"retryPolicy": {
"minimumBackoff": "10s",
"maximumBackoff": "600s"
},
"pushConfig": {
"pushEndpoint": "http://localhost:8080/push"
}
}
All configuration fields are optional except topic. Returns the subscription resource. If
the subscription already exists, returns the existing resource (idempotent).
Get subscription¶
List subscriptions¶
Delete subscription¶
204 on success. Removes the subscription's message queue.
Pulling and acknowledging¶
Pull messages¶
Returns up to maxMessages unacknowledged messages:
{
"receivedMessages": [
{
"ackId": "ack-uuid",
"message": {
"data": "SGVsbG8=",
"attributes": {},
"messageId": "msg-uuid",
"publishTime": "2024-01-01T00:00:00Z",
"orderingKey": ""
},
"deliveryAttempt": 1
}
]
}
Pull is only available on subscriptions without a pushConfig.pushEndpoint set.
Acknowledge messages¶
Removes the acknowledged messages from the queue. Returns {}.
Modify ack deadline¶
Extends the deadline for messages that need more processing time. Returns {}.
Push subscriptions¶
When a subscription has pushConfig.pushEndpoint set, Cloudbox dispatches each message to
the endpoint as a POST request immediately after publishing. The body follows the Pub/Sub
push format:
{
"message": {
"data": "SGVsbG8=",
"attributes": {},
"messageId": "msg-uuid",
"publishTime": "2024-01-01T00:00:00Z"
},
"subscription": "projects/local-project/subscriptions/my-sub"
}
If the push endpoint returns a 2xx status, the message is acknowledged automatically.
Non-2xx responses re-enqueue the message for retry.
Message filtering¶
Subscriptions accept a filter expression that is evaluated against each incoming message.
Messages that do not match the filter are dropped for that subscription.
Filter syntax:
Supported operators: =, !=, >, <, >=, <=, hasPrefix(), NOT, AND, OR.
Message ordering¶
When enableMessageOrdering: true is set on a subscription, messages published with the
same orderingKey are delivered in the order they were published. Messages without an
ordering key are delivered in an unspecified order.
Dead-letter policies¶
Subscriptions support a deadLetterPolicy:
After maxDeliveryAttempts failed delivery attempts, the message is forwarded to the
dead-letter topic. The deliveryAttempt field on each ReceivedMessage tracks the current
attempt count.
Retry policies¶
Retry policy configuration is stored and returned in the subscription resource. The local emulator does not enforce actual backoff delays — messages are available for re-pull immediately after ack deadline expiry.
Seek¶
Reset a subscription to an earlier point to replay messages.
Seek to a snapshot:
Seek to a timestamp (RFC 3339):
Messages published to the topic after the given time are replayed into the subscription
queue from the topic log. Returns {}.
Snapshots¶
Snapshots capture the state of a subscription's unacknowledged messages at a point in time. They can be used to seek a subscription back to that point.
Create snapshot¶
Get snapshot¶
Patch snapshot¶
List snapshots¶
Delete snapshot¶
Schemas¶
Schemas enforce message structure at publish time. When a topic has schemaSettings set,
every published message is validated against the schema before being accepted.
Create schema¶
{
"id": "my-schema",
"type": "AVRO",
"definition": "{\"type\": \"record\", \"name\": \"Msg\", \"fields\": [{\"name\": \"id\", \"type\": \"string\"}]}"
}
Supported schema types: AVRO, PROTOCOL_BUFFER.
Supported encodings (on schemaSettings): JSON, BINARY, ENCODING_UNSPECIFIED.
Get schema¶
List schemas¶
Delete schema¶
Validate schema¶
Returns {} if valid, or 400 with an error message.
Validate message against schema¶
{
"name": "projects/local-project/schemas/my-schema",
"encoding": "JSON",
"message": { "data": "eyJpZCI6ICIxMjMifQ==" }
}
Returns {} if the message matches the schema, or 400 if validation fails.
BigQuery subscriptions¶
Messages can be routed directly to a BigQuery table by setting bigqueryConfig on the
subscription:
{
"topic": "projects/local-project/topics/my-topic",
"bigqueryConfig": {
"table": "local-project:my_dataset.my_table",
"writeMetadata": true,
"useTopicSchema": false
}
}
Messages are written to the BigQuery emulator (DuckDB) synchronously at publish time.
The table reference format is project:dataset.table.
Cloud Storage subscriptions¶
Messages can be batched and written to GCS by setting cloudStorageConfig on the
subscription:
{
"topic": "projects/local-project/topics/my-topic",
"cloudStorageConfig": {
"bucket": "my-bucket",
"filenamePrefix": "output/",
"filenameSuffix": ".json",
"maxDuration": "300s",
"maxBytes": "10000000",
"avroConfig": {}
}
}
Messages are written to the GCS emulator synchronously at publish time. Avro and text
formats are supported via avroConfig and textConfig respectively.
gRPC streaming¶
The gRPC server on port 8085 supports:
StreamingPull— bidirectional streaming: the subscriber sendsStreamingPullRequest(with ack IDs and deadline modifications) and receivesStreamingPullResponse(with batches of messages) continuously over a single connection.
The gRPC surface is compatible with the google-cloud-pubsub SDK's default transport.
Known limitations¶
| Feature | Notes |
|---|---|
| Exactly-once delivery | Duplicate redelivery within an ack deadline window is possible |
| Seek to timestamp (full fidelity) | Replay is approximate — only messages in the topic log after the target timestamp |
| OIDC / SAML push auth | Push requests are sent without authentication headers |
| Subscription-level IAM | Not enforced |
Examples¶
# Run Pub/Sub examples (requires Cloudbox running on ports 8085 / 8086)
uv run python examples/pubsub/publish_subscribe.py
uv run python examples/pubsub/batch_publish.py
| Example | What it demonstrates |
|---|---|
publish_subscribe.py |
Create topic and subscription, publish, pull, acknowledge |
batch_publish.py |
Publishing many messages in batches; pull and acknowledge in pages |