Polling the Events API
An example of how to poll sensor events at a set interval.

Overview

If your application requires somewhat up-to-date data, but not to the extent where a stream is necessary, polling chunks periodically may be an option. In this example, we will see how the REST API can be used to poll events with some best practices and simple deduplication.

Preliminaries

    Data Connectors If you want to forward your data in a server-to-server integration, consider using Data Connectors for a simpler and more reliable service with an added at-least-once guarantee.
    Basic Auth For simplicity, we here use Basic Auth for authentication. We recommend replacing this with an OAuth2 flow for integrations more complex than local experimentation.
    Service Account Credentials You must create and know the credentials of a Service Account. Any role will suffice.

Events API

All device events are stored and are accessible through the API for 31 days. To perform polling, the following endpoint will be used.
https://api.disruptive-technologies.com/v2/projects/{project}/devices/{device}/events
For full details on the endpoint, see the REST API Reference.

Parameters

Key parameters for polling the Events API are as follows.
    start_time: ISO-format YYYY-MM-DDThh:mm:ssZ Start of the time interval to fetch events for. Defaults to 24h ago.
    end_time: ISO-format YYYY-MM-DDThh:mm:ssZ End of the interval to fetch events for. Defaults to now.

Indexing Delay

There is a delay from when our servers receive an event until they are indexed and available via this endpoint. This is typically 1-2 seconds but can be up to 10 seconds. To account for this, a buffer delay can be added to the start of every polling interval.

Polling Best Practices

The following practices should be considered and are present in the following example code.
    Always include the start_time and end_time parameters. This gives you total control over the interval limits and iteration thereof.
    Using the event_types parameter, fetch only events of interest to avoid wasted time and load.

Example Code

The following points summarize the provided example code.
    Defines the time range to fetch events from.
    Polls the Events API for events from the given device, printing a summary of the result.
    Checks for duplicates caused by indexing delay by matching event ids.

Environment Setup

If you wish to run the code locally, make sure you have a working runtime environment.
Python 3.9
Python API
Node.js 14
The following packages are required by the example code and must be installed.
1
pip install requests==2.25.1
Copied!
The latest version of our Python API can be installed through pip.
1
pip install --upgrade disruptive
Copied!
The following modules are required by the example code and must be installed.
1
npm install [email protected]
Copied!
Add the following environment variables as they will be used to authenticate the API and define which device is polled from the given project. This can, of course, be changed to work for more than 1 device.
Bash
1
export DT_SERVICE_ACCOUNT_KEY_ID=<YOUR_SERVICE_ACCOUNT_KEY_ID> # [string]
2
export DT_SERVICE_ACCOUNT_SECRET=<YOUR_SERVICE_ACCOUNT_SECRET> # [string]
3
export DT_SERVICE_ACCOUNT_EMAIL=<YOUR_SERVICE_ACCOUNT_EMAIL> # [string]
4
export DT_PROJECT_ID=<YOUR_PROJECT_ID> # [string]
5
export DT_DEVICE_ID=<YOUR_DEVICE_ID> # [string]
6
export DT_POLL_INTERVAL=N_SECONDS # [seconds]
Copied!

Source

The following code snippet implements the polling loop in a few languages.
Python 3.9
Python API
Node.js 14
1
import os
2
import time
3
import datetime
4
import requests # pip install requests==2.25.1
5
6
# Authenticate credentials for the request.
7
SERVICE_ACCOUNT_KEY_ID = os.environ.get('DT_SERVICE_ACCOUNT_KEY_ID')
8
SERVICE_ACCOUNT_SECRET = os.environ.get('DT_SERVICE_ACCOUNT_SECRET')
9
10
# Details related to the device that is to be polled.
11
PROJECT_ID = os.environ.get('DT_PROJECT_ID')
12
DEVICE_ID = os.environ.get('DT_DEVICE_ID')
13
14
# Polling parameters.
15
# Note that the inclusion of MAX_INDEXING_DELAY may cause events to
16
# be polled twice due to overlap. Can be avoided by checking event id.
17
INTERVAL = int(os.environ.get('DT_POLL_INTERVAL')) # [seconds]
18
MAX_INDEXING_DELAY = 10 # [seconds]
19
20
# Base URL of the REST API to which is expanded for calls.
21
API_URL_BASE = 'https://api.disruptive-technologies.com/v2'
22
23
24
def to_iso(unixtime):
25
return "{:%Y-%m-%dT%H:%M:%SZ}".format(unixtime)
26
27
28
def main():
29
# Initialize an empty list of previously polled events.
30
polled = []
31
32
# Start polling and suspend in a while loop.
33
endtime = datetime.datetime.utcnow() - datetime.timedelta(seconds=INTERVAL)
34
while True:
35
# Update the polling timerange.
36
starttime = endtime - datetime.timedelta(seconds=MAX_INDEXING_DELAY)
37
endtime = datetime.datetime.utcnow()
38
39
# Print some logging information to the console.
40
print('Polling: {} -> {}'.format(
41
to_iso(starttime),
42
to_iso(endtime),
43
))
44
45
# Send GET request for temperature events for device in defined range.
46
response = requests.get(
47
url=API_URL_BASE + '/projects/{}/devices/{}/events'.format(
48
PROJECT_ID,
49
DEVICE_ID,
50
),
51
auth=(SERVICE_ACCOUNT_KEY_ID, SERVICE_ACCOUNT_SECRET),
52
params={
53
'start_time': to_iso(starttime),
54
'end_time': to_iso(endtime),
55
'event_types': ['temperature'],
56
}
57
)
58
59
# List out events in poll response.
60
if 'events' in response.json() and len(response.json()['events']) > 0:
61
for event in response.json()['events']:
62
# Append to polled list if eventId is unique
63
if event['eventId'] not in [e['eventId'] for e in polled]:
64
polled.append(event)
65
print('\t- Got event {}'.format(event['eventId']))
66
else:
67
print('\t- Got event {} [duplicate]'.format(
68
event['eventId']
69
))
70
else:
71
print('\t- Response: {}'.format(response.json()))
72
73
# Wait for next polling interval.
74
time.sleep(INTERVAL)
75
76
77
if __name__ == '__main__':
78
main()
Copied!
1
import os
2
import time
3
from datetime import datetime, timedelta
4
import disruptive as dt # pip install disruptive
5
6
# Authenticate the disruptive package using Servive Account credentials.
7
dt.default_auth = dt.Auth.service_account(
8
key_id=os.environ.get('DT_SERVICE_ACCOUNT_KEY_ID'),
9
secret=os.environ.get('DT_SERVICE_ACCOUNT_SECRET'),
10
email=os.environ.get('DT_SERVICE_ACCOUNT_EMAIL'),
11
)
12
13
# Polling parameters.
14
# Note that the inclusion of MAX_INDEXING_DELAY may cause events to
15
# be polled twice due to overlap. Can be avoided by checking event id.
16
INTERVAL = int(os.environ.get('DT_POLL_INTERVAL')) # [seconds]
17
MAX_INDEXING_DELAY = 10 # [seconds]
18
PROJECT_ID = os.environ.get('DT_PROJECT_ID')
19
DEVICE_ID = os.environ.get('DT_DEVICE_ID')
20
21
22
def main():
23
# Initialize an empty list of previously polled events.
24
polled = []
25
26
# Start polling and suspend in a while loop.
27
end_time = datetime.utcnow() - timedelta(seconds=INTERVAL)
28
while True:
29
# Update the polling timerange.
30
start_time = end_time - timedelta(seconds=MAX_INDEXING_DELAY)
31
end_time = datetime.utcnow()
32
33
# Print some logging information to the console.
34
print('Polling: {} -> {}'.format(start_time, end_time))
35
36
# Fetch temperature events for the specified device and timerange.
37
events = dt.EventHistory.list_events(
38
device_id=DEVICE_ID,
39
project_id=PROJECT_ID,
40
start_time=start_time,
41
end_time=end_time,
42
event_types=[dt.events.TEMPERATURE],
43
)
44
45
# Iterate events in response.
46
for event in events:
47
# Append only if event ID is unique.
48
if event.event_id not in [e.event_id for e in polled]:
49
polled.append(event)
50
print('\t- Got event {}.'.format(event.event_id))
51
else:
52
print('\t- Got event {} [duplicate].'.format(event.event_id))
53
54
# Wait for next polling interval.
55
time.sleep(INTERVAL)
56
57
58
if __name__ == '__main__':
59
main()
Copied!
1
const axios = require('axios').default // npm install [email protected]
2
3
// Authenticate credentials for the request.
4
const serviceAccountKeyID = process.env.DT_SERVICE_ACCOUNT_KEY_ID
5
const serviceAccountSecret = process.env.DT_SERVICE_ACCOUNT_SECRET
6
7
// Details related to the device that is to be polled.
8
const projectID = process.env.DT_PROJECT_ID
9
const deviceID = process.env.DT_DEVICE_ID
10
11
// Polling parameters. Note that the inclusion of MAX_INDEXING_DELAY may cause
12
// events to be polled twice due to overlap. Can be avoided by checking event id.
13
const interval = process.env.DT_POLL_INTERVAL
14
const maxIndexingDelay = 10 // [seconds]
15
16
// Base URL of the REST API to which is expanded for calls.
17
const apiUrlBase = 'https://api.disruptive-technologies.com/v2'
18
19
// Function for async sleeping.
20
function sleep(ms) {
21
return new Promise(resolve => setTimeout(resolve, ms));
22
}
23
24
// Function for converting unixtime to ISO-time format.
25
function toISO(unixtime) {
26
return new Date(unixtime).toISOString()
27
}
28
29
async function main() {
30
// Initialize an empty list of previously polled events.
31
let polled = []
32
33
// Start polling and suspend in a while loop.
34
let starttime = Date.now()
35
let endtime = Date.now() - interval*1000
36
while (true) {
37
// Update the polling timerange.
38
starttime = endtime - maxIndexingDelay*1000
39
endtime = Date.now()
40
41
// Print some logging information to the console.
42
console.log(`Polling: ${toISO(starttime)} -> ${toISO(endtime)}`)
43
44
// Send GET request for temperature events for device in defined range.
45
response = await axios({
46
method: 'GET',
47
url: apiUrlBase + `/projects/${projectID}/devices/${deviceID}/events`,
48
auth: {
49
username: serviceAccountKeyID,
50
password: serviceAccountSecret,
51
},
52
params: {
53
start_time: toISO(starttime),
54
end_time: toISO(endtime),
55
event_types: ['temperature'],
56
},
57
})
58
59
// List out events in poll response.
60
if (response.data.events && response.data.events.length > 0) {
61
for (e of response.data.events) {
62
// Append to polled list if eventId is unique
63
if (!polled.some(p => p.eventId === e.eventId)) {
64
polled.push(e)
65
console.log(`\t- Got event ${e.eventId}`)
66
} else {
67
console.log(`\t- Got event ${e.eventId} [duplicate]`)
68
}
69
}
70
} else {
71
console.log(`\t- Response: ${JSON.stringify(response.data)}`)
72
}
73
74
// Wait for next polling interval.
75
await sleep(interval*1000)
76
}
77
}
78
main().catch((err) => {console.log(err)})
Copied!

Expected Output

The number of events resulting from each polling interval will be displayed in the stdout. Notice the tagging of duplicate events, which, as mentioned earlier, may occur due to the time range overlap.
1
Polling: 2021-02-13T13:10:59.297Z -> 2021-02-13T13:12:10.409Z
2
- Got event c0jt09hm5np98ql379vg
3
- Got event c0jt07hm5np98ql379ug
4
- Got event c0jt05pm5np98ql379tg
5
- Got event c0jt04hm5np98ql379sg
6
- Got event c0jt039m5np98ql379rg
7
8
Polling: 2021-02-13T13:12:00.409Z -> 2021-02-13T13:13:11.681Z
9
- Got event c0jt0i1m5np98ql37a1g
10
- Got event c0jt0e1m5np98ql37a0g
11
- Got event c0jt09hm5np98ql379vg [duplicate]
Copied!
Last modified 1mo ago