# WebSocket Pipeline Handler

:::note

The WebSocket Pipeline Handler is an Enterprise-only feature. Please contact us
to trial this or sign up for an Enterprise account.

:::

The `webSocketPipelineHandler` proxies WebSocket connections exactly like the
[WebSocket Handler](./websocket-handler.mdx), but additionally passes every
WebSocket message through a pipeline of **policy functions** before forwarding
it. Each policy can inspect, transform, or drop the message. Use this to redact
sensitive fields, enforce a message schema, filter events, or add observability
to real-time traffic.

Messages are intercepted in both directions, configured independently:

- **`inbound`** policies process messages traveling from the client to your
  backend.
- **`outbound`** policies process messages traveling from your backend to the
  client.

:::note

The pipeline only intercepts **message** events. Connection lifecycle events are
handled automatically: when either side closes, the other side is closed, and
socket errors are logged and forwarded. There is no policy hook for `close` or
`error` events.

:::

## Configuration

Use the `webSocketPipelineHandler` export and add an `inbound` and/or `outbound`
array under `options.policies`. Each entry points to an exported function in one
of your project's modules.

```json
"/my-websocket": {
  "x-zuplo-path": {
    "pathMode": "open-api"
  },
  "get": {
    "summary": "WebSocket route with message interception",
    "x-zuplo-route": {
      "corsPolicy": "none",
      "handler": {
        "export": "webSocketPipelineHandler",
        "module": "$import(@zuplo/runtime)",
        "options": {
          "rewritePattern": "https://myservice.com/websocket",
          "policies": {
            "inbound": [
              {
                "module": "$import(./modules/websocket-policies)",
                "export": "redactInbound"
              }
            ],
            "outbound": [
              {
                "module": "$import(./modules/websocket-policies)",
                "export": "filterOutbound"
              }
            ]
          }
        }
      },
      "policies": {
        "inbound": []
      }
    },
    "operationId": "b2c3d4e5-f6a7-4b8c-9d0e-1f2a3b4c5d6e"
  }
}
```

:::caution{title="Two different policies blocks"}

The `policies` object inside `handler.options` configures the **message**
policies that run on each WebSocket frame. This is separate from the route-level
`policies` block (the `inbound` array next to `handler`), which configures the
standard request policies — such as [API Key](../policies/api-key-inbound.mdx)
or [Rate Limiting](../policies/rate-limit-inbound.mdx) — that run once during
the initial connection upgrade.

:::

The `rewritePattern` option behaves identically to the
[WebSocket Handler](./websocket-handler.mdx#handler-options), including
JavaScript string interpolation.

## Writing a message policy

A message policy is an exported function that matches the following signature:

```ts
import { ZuploContext, ZuploRequest } from "@zuplo/runtime";

async function webSocketPolicy(
  data: string,
  target: WebSocket,
  source: WebSocket,
  request: ZuploRequest,
  context: ZuploContext,
): Promise<unknown>;
```

| Parameter | Description                                                                                                                                                                                                                                                             |
| --------- | ----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| `data`    | The message payload. For text protocols this is a string; binary frames arrive in the platform's binary form (for example, an `ArrayBuffer`). For an `inbound` policy this is the message from the client; for an `outbound` policy it is the message from the backend. |
| `target`  | The destination socket the message is being forwarded to. For `inbound` this is the backend connection; for `outbound` it is the client connection.                                                                                                                     |
| `source`  | The socket the message originated from. Call `source.send(...)` to send a message back to the originator, such as an acknowledgement or error.                                                                                                                          |
| `request` | The original [`ZuploRequest`](../programmable-api/zuplo-request.mdx) from the connection upgrade.                                                                                                                                                                       |
| `context` | The [`ZuploContext`](../programmable-api/zuplo-context.mdx) for the connection.                                                                                                                                                                                         |

The return value controls what happens next:

- **Return the data** (modified or unchanged) to forward it to `target`. When
  multiple policies are configured, the return value is passed as `data` to the
  next policy in the array.
- **Return `undefined`** to drop the message. It is not forwarded, and no
  further policies run for that message.

Policies run in the order they appear in the `inbound` / `outbound` array, and
each policy may be asynchronous. If a policy throws, Zuplo logs the error and
drops the message.

## Example: redact fields from inbound messages

This inbound policy parses each JSON message from the client, removes a
sensitive field, and forwards the result to the backend.

```ts
import { ZuploContext, ZuploRequest } from "@zuplo/runtime";

export async function redactInbound(
  data: string,
  target: WebSocket,
  source: WebSocket,
  request: ZuploRequest,
  context: ZuploContext,
) {
  let message: Record<string, unknown>;
  try {
    message = JSON.parse(data);
  } catch (err) {
    context.log.warn("Dropping non-JSON WebSocket message");
    return undefined; // drop messages that aren't valid JSON
  }

  delete message.ssn;

  return JSON.stringify(message);
}
```

## Example: filter outbound messages

This outbound policy inspects messages from the backend and drops internal
events so they never reach the client. Other messages pass through unchanged.

```ts
import { ZuploContext, ZuploRequest } from "@zuplo/runtime";

export async function filterOutbound(
  data: string,
  target: WebSocket,
  source: WebSocket,
  request: ZuploRequest,
  context: ZuploContext,
) {
  const message = JSON.parse(data);

  if (message.type === "internal") {
    return undefined; // drop internal events; the client never sees them
  }

  return data;
}
```

You can configure multiple policies in each direction to compose behavior — for
example, one policy to validate a message against a schema and a second to
redact fields. Because each policy receives the previous policy's output, order
matters.
