You can subscribe to a real-time stream of incoming observations via websocket, enabling workflows requiring sub-second latency. This is a beta feature and is subject to API changes or removal.
You can open a websocket connection by hitting the following endpoint with a valid bearer token:
Copied!1 2
GET "wss://$HOSTNAME/api/ws/gotham/v1/observations/$OBSERVATION_SPEC_ID?preview=true" \ -H "Sec-Websocket-Protocol: Bearer-$TOKEN"
Note that the preview query flag is required while this feature is in beta.
Messages sent through the websocket (sent to the server or received from the server) will be JSON objects.
Once the connection is established, you must send a subscribe
message to inform the server how to filter to the data you're interested in receiving. The query language supports the same filtering capability as the search observations endpoint.
For example, to subscribe to observations within a polygon, you would send the following message:
Copied!1 2 3 4 5 6 7 8 9 10 11 12
{ "type": "subscribe", "query": { "location": [ {"longitude":-77.05974824713265,"latitude":38.903335277742656}, {"longitude":-77.06130631105295,"latitude":38.90278989613124}, {"longitude":-77.0628268311872,"latitude":38.905175909773085}, {"longitude":-77.06011742685993,"latitude":38.905195387105344}, {"longitude":-77.05974824713265,"latitude":38.903335277742656} ] } }
The server will then reply with a subscribed
message informing you whether the subscription was created successfully or not. If successful, a UUID is included to correlate future messages from the server with the subscription they relate to.
Copied!1 2 3 4 5
{ "type": "subscribed", "id": "3255ebeb-a897-437c-ae7b-99c5d64d17cf", "errors": [] }
If there are errors, no subscription ID will be given, and instead the errors
array will contain items of the form:
Copied!1 2 3 4 5 6 7
Error: error: string # a name for the error args: array<Arg> # parameters helping describe the failure, for example, enumerating invalid inputs Arg: name: string # name of the parameter value: string # value of the parameter
Once you are subscribed, the server will begin forwarding any newly-ingested observation data matching your subscription via data
messages (see Receiving data)
Multiple subscriptions for an observation spec can be opened through the same websocket rather than opening a new websocket. To subscribe to additional data, send another subscribe
message with the query you wish to subscribe to. If successful, the server will response with a different subscription ID to represent that subscription and begin sending data.
Once you've successfully subscribed, the server will begin forwarding all newly-ingested data that matches your subscription via data
messages. These messages adhere to the following schema:
Copied!1 2
id: uuid # the subscription ID this data relates to observations: array<Observation> # the data
For example, a response for our example susbcription above may look like:
Copied!1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
{ "type": "data", "id": "3255ebeb-a897-437c-ae7b-99c5d64d17cf", "observations": [ { "sourceSystemId": "foo", "collectionId": "bar", "observationSpecId": "baz", "trackId": "track0", "position": { "longitude": -77.05974824713265, "latitude": 38.903335277742656 }, "timestamp": "2024-06-24T17:00:00Z", "name": "name0", "staticProperties": [], "liveProperties": [ { "propertyType": "liveProperty", "value": 0 } ] } ] }
You can unsubscribe from subscriptions that you've successfully subscribed to by sending an unsubscribe
message to the server.
For example, to unsubscribe from the subscription above, you would send:
Copied!1 2 3 4
{ "type": "unsubscribe", "id": "3255ebeb-a897-437c-ae7b-99c5d64d17cf" }
The server will then unsubscribe your websocket from that subscription and will stop sending newly-ingested data matching that subscription.
Below is an example script that will open a websocket, subscribe to all newly-ingested for the observation spec, and print the incoming messages from the server.
Copied!1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35
# NOTE: you must install websockets module # pip3 install websockets import asyncio import websockets HOSTNAME = <your_hostname> OBSERVATION_SPEC_ID = <your_observation_spec_id> TOKEN = <your_token> endpoint = f"wss://{HOSTNAME}/api/ws/gotham/v1/observations/{OBSERVATION_SPEC_ID}?preview=true" websocket_token=f"Bearer-{TOKEN}" async def connect_and_listen(): async with websockets.connect(endpoint, subprotocols=[websocket_token], max_size=2**31, read_limit=2**31) as websocket: print("Websocket opened") subscribe = '{"type": "subscribe", "query": {}}}' await websocket.send(subscribe) while True: try: response = await websocket.recv() print(response) except websockets.ConnectionClosed as e: print("Connection closed.") print(e) break async def main(): await asyncio.create_task(connect_and_listen()) print("Terminated") if __name__ == "__main__": asyncio.run(main())