Search documentation
karat

+

K

User Documentation ↗

Subscribe to incoming observations

You can subscribe to a real-time stream of incoming observations via websocket, enabling workflows requiring sub-second latency. This is a beta feature and is subject to API changes or removal.

Opening a connection

You can open a websocket connection by hitting the following endpoint with a valid bearer token:

Copied!
1 2 GET "wss://$HOSTNAME/api/ws/gotham/v1/observations/$OBSERVATION_SPEC_ID?preview=true" \ -H "Sec-Websocket-Protocol: Bearer-$TOKEN"

Note that the preview query flag is required while this feature is in beta.

Interacting with the websocket

Messages sent through the websocket (sent to the server or received from the server) will be JSON objects.

Subscribing

Once the connection is established, you must send a subscribe message to inform the server how to filter to the data you're interested in receiving. The query language supports the same filtering capability as the search observations endpoint.

For example, to subscribe to observations within a polygon, you would send the following message:

Copied!
1 2 3 4 5 6 7 8 9 10 11 12 { "type": "subscribe", "query": { "location": [ {"longitude":-77.05974824713265,"latitude":38.903335277742656}, {"longitude":-77.06130631105295,"latitude":38.90278989613124}, {"longitude":-77.0628268311872,"latitude":38.905175909773085}, {"longitude":-77.06011742685993,"latitude":38.905195387105344}, {"longitude":-77.05974824713265,"latitude":38.903335277742656} ] } }

The server will then reply with a subscribed message informing you whether the subscription was created successfully or not. If successful, a UUID is included to correlate future messages from the server with the subscription they relate to.

Copied!
1 2 3 4 5 { "type": "subscribed", "id": "3255ebeb-a897-437c-ae7b-99c5d64d17cf", "errors": [] }

If there are errors, no subscription ID will be given, and instead the errors array will contain items of the form:

Copied!
1 2 3 4 5 6 7 Error: error: string # a name for the error args: array<Arg> # parameters helping describe the failure, for example, enumerating invalid inputs Arg: name: string # name of the parameter value: string # value of the parameter

Once you are subscribed, the server will begin forwarding any newly-ingested observation data matching your subscription via data messages (see Receiving data)

Multiple subscriptions for an observation spec can be opened through the same websocket rather than opening a new websocket. To subscribe to additional data, send another subscribe message with the query you wish to subscribe to. If successful, the server will response with a different subscription ID to represent that subscription and begin sending data.

Receiving data

Once you've successfully subscribed, the server will begin forwarding all newly-ingested data that matches your subscription via data messages. These messages adhere to the following schema:

Copied!
1 2 id: uuid # the subscription ID this data relates to observations: array<Observation> # the data

For example, a response for our example susbcription above may look like:

Copied!
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 { "type": "data", "id": "3255ebeb-a897-437c-ae7b-99c5d64d17cf", "observations": [ { "sourceSystemId": "foo", "collectionId": "bar", "observationSpecId": "baz", "trackId": "track0", "position": { "longitude": -77.05974824713265, "latitude": 38.903335277742656 }, "timestamp": "2024-06-24T17:00:00Z", "name": "name0", "staticProperties": [], "liveProperties": [ { "propertyType": "liveProperty", "value": 0 } ] } ] }

Unsubscribing

You can unsubscribe from subscriptions that you've successfully subscribed to by sending an unsubscribe message to the server.

For example, to unsubscribe from the subscription above, you would send:

Copied!
1 2 3 4 { "type": "unsubscribe", "id": "3255ebeb-a897-437c-ae7b-99c5d64d17cf" }

The server will then unsubscribe your websocket from that subscription and will stop sending newly-ingested data matching that subscription.

Example

Below is an example script that will open a websocket, subscribe to all newly-ingested for the observation spec, and print the incoming messages from the server.

Copied!
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 # NOTE: you must install websockets module # pip3 install websockets import asyncio import websockets HOSTNAME = <your_hostname> OBSERVATION_SPEC_ID = <your_observation_spec_id> TOKEN = <your_token> endpoint = f"wss://{HOSTNAME}/api/ws/gotham/v1/observations/{OBSERVATION_SPEC_ID}?preview=true" websocket_token=f"Bearer-{TOKEN}" async def connect_and_listen(): async with websockets.connect(endpoint, subprotocols=[websocket_token], max_size=2**31, read_limit=2**31) as websocket: print("Websocket opened") subscribe = '{"type": "subscribe", "query": {}}}' await websocket.send(subscribe) while True: try: response = await websocket.recv() print(response) except websockets.ConnectionClosed as e: print("Connection closed.") print(e) break async def main(): await asyncio.create_task(connect_and_listen()) print("Terminated") if __name__ == "__main__": asyncio.run(main())