Streaming Events
An example of how to use the REST API to stream sensor events in real-time.

Overview

There are many applications where having a continuous stream of data is preferred over polling chunks periodically, like plotting life graphs or triggering certain alarms. In this example, we will see how the REST API can be used to set up a stream with some best practices and a simple retry policy.

Preliminaries

    Data Connectors If you want to forward your data in a server-to-server integration, consider using Data Connectors for a simpler and more reliable service with an added at-least-once guarantee.
    Basic Auth For simplicity, we here use Basic Auth for authentication. We recommend replacing this with an OAuth2 flow for integrations more complex than local experimentation.
    Service Account Credentials You must create and know the credentials of a Service Account. Any role will suffice.

Stream API

To stream events from one or several devices, the following endpoint will be used.
https://api.disruptive-technologies.com/v2/projects/{project}/devices:stream
For full details on the endpoint, see the REST API Reference.

Parameters

The following parameters are useful to know about for streaming events.
    device_ids A list of device identifiers to limit the query to a set of specific devices.
    device_types Filter streaming events to one or several device types only.
    event_types Filter streaming events to one or several types of events only.
    label_filters
    Filter streaming events to one or more labels. You can either specify just a label key (labelKey) or a key-value pair (labelKey=labelValue).

Response Format

The :stream endpoints support two types of response formats, text/event-stream and application/json. They are set using the Accept header, and defaults to application/json.
1
headers = { "Accept": "application/json" } # default
2
headers = { "Accept": "text/event-stream" } # alternative
Copied!
The different headers are used in different cases.
    text/event-stream: A Server-Sent Events specific format. Used by any Server-Sent Events libraries.
    application/json: Returns a JSON object for each event, separated by line-breaks. Easy to parse in any high-level language and used in the following example.

Stream Best Practices

The following practices should be considered and are present in the following example code.
    Dropped Connections The connection to a stream can be lost at any moment due to several factors that break the connection and should be handled by implementing a retry policy.
    Query Parameters Use the various query parameters to reduce the number of events that need to be processed.
    Authorization Header Use the Authorization header if possible. Otherwise, such as when using the EventSource library in JavaScript, the token query parameter can be used.
    Pinging An optional pingInterval query parameter may be used to make sure the client can still receive messages from the server. If no ping messages have been received in the specified interval, the client should reconnect to the stream.

Example Code

The following points summarize the provided example code.
    Sends a GET request to the REST API to initialize an event stream.
    Keep the TCP connection open while receiving events.
    If the connection is lost or it's been too long between pings, retry N times before giving up.

Environment Setup

If you wish to run the code locally, make sure you have a working runtime environment.
Python 3.9
Python API
Node.js 14
The following packages are required by the example code and must be installed.
1
pip install requests==2.25.1
Copied!
The latest version of our Python API can be installed through pip.
1
pip install --upgrade disruptive
Copied!
The following modules are required by the example code and must be installed.
1
npm install [email protected]
2
npm install [email protected]
3
npm install [email protected]
Copied!
Add environment variables for authentication details.
1
export DT_SERVICE_ACCOUNT_KEY_ID=<YOUR_SERVICE_ACCOUNT_KEY_ID> # [string]
2
export DT_SERVICE_ACCOUNT_SECRET=<YOUR_SERVICE_ACCOUNT_SECRET> # [string]
3
export DT_SERVICE_ACCOUNT_EMAIL=<YOUR_SERVICE_ACCOUNT_EMAIL> # [string]
4
export DT_PROJECT_ID=<YOUR_PROJECT_ID> # [string]
Copied!

Source

The following code snippet implements streaming in a few languages.
Python 3.9
Python API
Node.js 14
1
import os
2
import time
3
import json
4
import requests # pip install requests==2.25.1
5
6
# Service Account credentials.
7
SERVICE_ACCOUNT_KEY_ID = os.environ.get('DT_SERVICE_ACCOUNT_KEY_ID')
8
SERVICE_ACCOUNT_SECRET = os.environ.get('DT_SERVICE_ACCOUNT_SECRET')
9
10
# Construct API URL.
11
PROJECT_ID = os.environ.get('DT_PROJECT_ID')
12
BASE_URL = 'https://api.d21s.com/v2'
13
DEVICES_STREAM_URL = '{}/projects/{}/devices:stream'.format(
14
BASE_URL,
15
PROJECT_ID
16
)
17
18
MAX_CONNECTION_RETRIES = 5
19
PING_INTERVAL = 10
20
PING_JITTER = 2
21
22
if __name__ == '__main__':
23
# Set up a simple catch-all retry policy.
24
nth_retry = 0
25
while nth_retry <= MAX_CONNECTION_RETRIES:
26
try:
27
print('Streaming... Press CTRL+C to terminate.')
28
# Set up a stream connection.
29
# Connection will timeout and reconnect if no single event
30
# is received in an interval of PING_INTERVAL + PING_JITTER.
31
stream = requests.get(
32
url=DEVICES_STREAM_URL,
33
auth=(SERVICE_ACCOUNT_KEY_ID, SERVICE_ACCOUNT_SECRET),
34
stream=True,
35
timeout=PING_INTERVAL + PING_JITTER,
36
params={
37
'event_types': ['temperature'],
38
'ping_interval': '{}s'.format(PING_INTERVAL),
39
},
40
)
41
42
# Iterate through the events as they come in (one per line).
43
for line in stream.iter_lines():
44
# Decode the response payload and break on error.
45
payload = json.loads(line.decode('ascii'))
46
if 'result' not in payload:
47
print(payload)
48
break
49
event = payload['result']['event']
50
51
# Skip ping events.
52
if event['eventType'] == 'ping':
53
continue
54
55
# Print temperature events.
56
if event['eventType'] == 'temperature':
57
temperature = event['data']['temperature']
58
print('\t- {:<5}°C from {} at {}'.format(
59
temperature['value'],
60
event['targetName'].split('/')[-1],
61
temperature['updateTime']
62
))
63
64
# Reset retry counter.
65
nth_retry = 0
66
67
except KeyboardInterrupt:
68
break
69
70
except requests.exceptions.ConnectionError:
71
print('TIMEOUT. Reconnecting...')
72
nth_retry += 1
73
74
except Exception:
75
# Print the error and try again up to MAX_CONNECTION_RETRIES.
76
if nth_retry < MAX_CONNECTION_RETRIES:
77
print('Connection lost. Retry #{}'.format(nth_retry+1))
78
79
# Exponential backoff in sleep time.
80
time.sleep(2**nth_retry)
81
nth_retry += 1
82
else:
83
break
Copied!
1
import os
2
import disruptive as dt
3
4
# Environment variables for authentication credentials and target.
5
KEY_ID = os.environ.get('DT_SERVICE_ACCOUNT_KEY_ID')
6
SECRET = os.environ.get('DT_SERVICE_ACCOUNT_SECRET')
7
EMAIL = os.environ.get('DT_SERVICE_ACCOUNT_EMAIL')
8
PROJECT_ID = os.environ.get('DT_PROJECT_ID')
9
10
if __name__ == '__main__':
11
# Authenticate the package using Service Account credentials.
12
dt.default_auth = dt.Auth.service_account(KEY_ID, SECRET, EMAIL)
13
14
# Enable logging to see what's happening under the hood.
15
dt.log_level = 'debug'
16
17
# Initialize a stream generator for all temperature events in a project.
18
for e in dt.Stream.event_stream(PROJECT_ID, event_types=['temperature']):
19
# Print event information.
20
print('- {:<5}°C from {} at {}'.format(
21
e.data.celsius,
22
e.device_id,
23
e.data.timestamp,
24
))
Copied!
1
const EventSource = require("eventsource") // npm install [email protected]
2
const jwt = require('jsonwebtoken') // npm install [email protected]
3
const axios = require('axios').default // npm install [email protected]
4
5
// Service Account credentials
6
const serviceAccountKeyID = process.env.DT_SERVICE_ACCOUNT_KEY_ID
7
const serviceAccountSecret = process.env.DT_SERVICE_ACCOUNT_SECRET
8
const serviceAccountEmail = process.env.DT_SERVICE_ACCOUNT_EMAIL
9
10
// Construct API URL
11
const projectID = process.env.DT_PROJECT_ID
12
const apiBase = 'https://api.d21s.com/v2/'
13
const devicesStreamUrl = apiBase + `projects/${projectID}/devices:stream`
14
15
// Constants
16
const maxConnectionRetries = 3 // Max retries without any received messages
17
const pingInterval = 10 // Expected interval between pings in seconds
18
const pingJitter = 2000 // Expected ping jitter in milliseconds
19
20
async function main() {
21
22
let retryCount = 0
23
let stream
24
setupStream()
25
26
// Sets up a timer that will restart the stream if there has passed too much
27
// time between ping events. This timer is reset every time we receive a ping.
28
const pingTimer = setTimeout(() => {
29
console.log("Too long between pings. Reconnecting...")
30
setupStream()
31
}, pingInterval * 1000 + pingJitter)
32
33
async function setupStream() {
34
// If we've retried too many times without getting any messages, exit
35
if (retryCount >= maxConnectionRetries) {
36
console.log("Retried too many times. Exiting")
37
process.exit(1)
38
}
39
retryCount += 1
40
41
// Since EventSource does not support setting HTTP headers, we need to use
42
// OAuth for authentication. The received access token will be set as a
43
// `token` query parameter when setting up the stream in the next step.
44
const accessToken = await getAccessToken(
45
serviceAccountKeyID,
46
serviceAccountEmail,
47
serviceAccountSecret,
48
)
49
50
// Add query parameters to the URL
51
let url = devicesStreamUrl
52
url += `?eventTypes=temperature` // Filters temperature events
53
url += `&pingInterval=${pingInterval}s` // Specifies ping interval
54
url += `&token=${accessToken}` // Access token for authentication
55
56
// Close the existing stream if we have one
57
if (stream) {
58
stream.close()
59
}
60
61
// Set up a new stream with callback functions for
62
// messages and errors
63
console.log('Streaming... Press CTRL+C to exit.')
64
stream = new EventSource(url)
65
stream.onmessage = handleStreamMessage
66
stream.onerror = handleStreamError
67
}
68
69
function handleStreamError(err) {
70
console.error("Got error from stream:")
71
console.error(err)
72
console.log("Reconnecting...")
73
74
setupStream()
75
}
76
77
function handleStreamMessage(message) {
78
// Parse the payload as JSON
79
const data = JSON.parse(message.data)
80
if (!data || !data.result) {
81
return
82
}
83
84
// Reset the retry counter now that we got an event
85
retryCount = 0
86
87
// Parse the event object
88
const event = data.result.event
89
if (event.eventType === "ping") {
90
// We got a ping event. Reset the ping timer
91
pingTimer.refresh()
92
} else if (event.eventType === "temperature") {
93
// We got a temperature event
94
handleTemperatureEvent(event)
95
}
96
}
97
98
// Prints out the values of new incoming temperature events
99
function handleTemperatureEvent(event) {
100
const temp = event.data.temperature
101
const deviceID = event.targetName.split("/")[3]
102
console.log(`Got ${temp.value}°C from ${deviceID} at ${temp.updateTime}`)
103
}
104
105
}
106
main().catch((err) => {console.log(err)})
107
108
// Fetches an access token. See the following guide for documentation:
109
// https://developer.disruptive-technologies.com/docs/authentication/oauth2
110
async function getAccessToken(keyID, email, secret) {
111
// Construct the JWT header
112
let jwtHeaders = {
113
'alg': 'HS256',
114
'kid': keyID,
115
}
116
117
// Construct the JWT payload
118
let jwtPayload = {
119
'iat': Math.floor(Date.now() / 1000), // current unixtime
120
'exp': Math.floor(Date.now() / 1000) + 3600, // expiration unixtime
121
'aud': 'https://identity.disruptive-technologies.com/oauth2/token',
122
'iss': email,
123
}
124
125
// Sign and encode JWT with the secret
126
const jwtEncoded = jwt.sign(
127
jwtPayload,
128
secret,
129
{
130
header: jwtHeaders,
131
algorithm: 'HS256',
132
},
133
)
134
135
// Prepare POST request data
136
const requestObject = {
137
'assertion': jwtEncoded,
138
'grant_type': 'urn:ietf:params:oauth:grant-type:jwt-bearer',
139
}
140
141
// Converts the requestObject to a Form URL-Encoded string
142
const requestData = Object.keys(requestObject).map(function(key) {
143
return encodeURIComponent(key)+'='+encodeURIComponent(requestObject[key])
144
}).join('&')
145
146
// Exchange JWT for access token
147
const accessTokenResponse = await axios({
148
method: 'POST',
149
url: 'https://identity.disruptive-technologies.com/oauth2/token',
150
headers: { 'Content-Type': 'application/x-www-form-urlencoded' },
151
data: requestData,
152
}).catch(function (error) {
153
if (error.response) {
154
console.log(error.response.data)
155
}
156
throw error
157
})
158
159
// Return the access token in the request
160
return accessTokenResponse.data.access_token
161
}
Copied!

Expected Output

For each new event in the stream, a line will be printed to stdout.
1
Streaming... Press CTRL+C to terminate.
2
- 21.85°C from bjei75vbluqg00dltkig at 2021-02-15T13:27:58.536000Z
3
- 22.25°C from bjejgi7bluqg00dlu0g0 at 2021-02-15T13:28:16.911000Z
4
- 21.75°C from bjei55fbluqg00dltjvg at 2021-02-15T13:28:33.965000Z
5
- 23.2 °C from bfuj75ho5b7g0093bbk0 at 2021-02-15T13:28:57.709000Z
6
- 21.5 °C from bjehnue7gpvg00cjo000 at 2021-02-15T13:29:06.409000Z
7
- 21 °C from bjei8rgpismg008hqdu0 at 2021-02-15T13:29:19.768000Z
Copied!
Last modified 1mo ago