Pub/Sub

Topics with push delivery: a publisher runner publishes; Pub/Sub POSTs each message to a subscriber runner's path.

Prerequisites

Declare a topic

Use the pubsub namespace from the wrapper SDK to declare a topic connection:

import { pubsub } from "@org/fluffy-chainsaw-pubsub";

pubsub.topic("orders");   // declares orders topic connection
topics:
  orders: {}               # provider: gcp (default) | memory

provider: memory is an in-process bus for hermetic tests / one process only — it cannot cross services or containers. Use gcp (the default) for anything real; under fluffy-chainsaw local that means the emulator.

Publish

List the topic under the runner's uses.publishes (that's what grants publish rights and injects the descriptor), then use the topic client:

import { pubsub } from "@org/fluffy-chainsaw-pubsub";

await pubsub.topic("orders").publish({ orderId: 42 }, { kind: "created" });
// .unwrap() returns the raw @google-cloud/pubsub client if you need it

Subscribe (push)

The application base SDK registers the push listener handler, while yaml configuration binds the topic to it:

import { pubsub as core } from "@org/fluffy-chainsaw";
import { pubsub } from "@org/fluffy-chainsaw-pubsub";

// Scan marker
pubsub.topic("orders");

// Push subscription
core.subscription("/on-order", async (msg) => {
  handle(msg.data);   // JSON-parsed body; also msg.attributes, msg.messageId, msg.raw
  // return normally to ACK; throw to NACK -> redelivered per the retry policy
});
runners:
  fulfillment:                    # private runner — no trigger.http
    trigger:
      subscriptions:
        - topic: orders
          path: /on-order
          dead_letter: orders-dlq   # optional; must be a declared topic
          max_retries: 5            # optional; 5–100, only with dead_letter
          ack_deadline: 30          # optional; 10–600 seconds

Multiple runners may subscribe to one topic (fan-out).

Sharing a topic across apps

The topic is owned by whichever app declares it with a pubsub.topic() marker. Any other app references it with existing: true and no marker:

# consumer app (different repo) — uses the producer's topic, never creates/destroys it
topics:
  orders: { existing: true }
runners:
  worker:
    trigger:
      subscriptions:
        - { topic: orders, path: /events }   # + core.subscription("/events", h) in code

An existing topic is looked up at deploy — if it doesn't exist you find out then, not at runtime — and destroy-ing this app can never delete it. Deploying into a project where your own topics already exist? Run fluffy-chainsaw adopt first so they're reconciled instead of adopted (see cli.md).

Rules