Receiving Events
A few things to consider at the receiving side when forwarding events with Data Connectors.

Request Contents

Both the header and body of the incoming request contain information of interest that can be extracted and, depending on the configuration, used to verify the content and origin of the request.
If a signature secret is set in the configuration, the following header will be included.
  • X-Dt-Signature Includes a JSON Web Token (JWT). Once decoded using the signature secret, it contains all the necessary information to verify the request content and origin.
Note that depending on the framework used to receive the Data Connector events, the header name casing may differ. Some services will force header name to lower-case, like x-dt-signature.

Body

The request body contains three fields, event, labels, and metadata. The following snippet shows an example request body of a touch event for a humidity sensor forwarded by a Data Connector.
1
{
2
"event": {
3
"eventId": "<EVENT_ID>",
4
"targetName": "projects/<PROJECT_ID>/devices/<DEVICE_ID>",
5
"eventType": "touch",
6
"data": {
7
"touch": {
8
"updateTime": "2021-05-28T08:34:06.225872Z"
9
}
10
},
11
"timestamp": "2021-05-28T08:34:06.225872Z"
12
},
13
"labels": {
14
"room-number": "99"
15
},
16
"metadata": {
17
"deviceId": "<DEVICE_ID>",
18
"projectId": "<PROJECT_ID>",
19
"deviceType": "humidity",
20
"productNumber": "102081"
21
}
22
}
Copied!
Field
Type
Description
event
struct
Contains event data. See the Event documentation where the structure is explained in detail for each event type.
labels
struct
Device label key- and value pairs included by the Data Connector. See the Advanced Configuration page for details about include labels.
metadata
struct
Contains metadata about the event. See the section below for more details.

Event Metadata

Each event has a metadata field that includes details about the event. The event metadata has the following structure.
Field
Type
Description
deviceId
string
The identifier of the device that published the event.
projectId
string
The identifier of the project the device is in.
deviceType
string
The device type that published the event.
productNumber
string
The product number of the device that published the event.
Note
The structure of the metadata field might change in the future if event types are added that are not published by devices. Make sure to first check event.eventType to make sure it is a known device event before processing the metadata field.
See the code sample below for an example of how to do this.
The event metadata makes it possible to check which device type has published the event, even for event types like touch or networkStatus which are published by many types of devices. This makes it possible to add new devices to a database without having to first look up the device using the REST API.
The metadata also provides a more convenient way to get the deviceId and projectId of the device that published the event, without having to parse the event.targetName field.

Acknowledging Received Event

A request-reply flow on the Endpoint URL should be implemented as follows:
  1. 1.
    Your endpoint receives an HTTPS POST request.
  2. 2.
    Your service processes the data in some way.
  3. 3.
    Your service replies to the event request with a 200 OK response.
What is important to note here is that the request should never return an HTTP 200 OK response before you are done processing it. When the DT Cloud receives a response with a 200 status code, the event will be taken off the internal Data Connector queue and checked off as received.
Best Practice
Do not reply 200 OK until you have finished processing your data.
Note that any status code in the range 2xx will be accepted as OK in the response.

Verifying Signed Events

When using a Signature Secret, the X-Dt-Signature header is included and contains a JWT, signed by the Signature Secret. Inside, a checksum of the request body can be found and used to check for tampering.
The following steps sum up the process of verifying the received request at the receiving endpoint.
  1. 1.
    Extract the signed JWT from the HTTP header X-Dt-Signature of the received request.
  2. 2.
    Verify the JWT's signature with the signature secret.
  3. 3.
    Calculate a SHA256 checksum over the entire request body.
  4. 4.
    Compare the body checksum with the checksum_sha256 field contained in the JWT (there's a checksum field as well that uses SHA1 which is less secure than SHA256, and is kept only for backward compatibility).
  5. 5.
    If these checksums are identical, you can be certain that the event has not been tampered with and originated from your Data Connector.
The following snippet from our Google Cloud Function example integration implements this verification process.
Python 3.9
Python API
Node.js 16
Go 1.16
1
# This Python is built on Flask, docs are available here:
2
# https://flask.palletsprojects.com/en/2.0.x/quickstart/
3
4
import os
5
import hashlib
6
import jwt # pip install pyjwt==2.3.0
7
from flask import Flask, request # pip install Flask==2.0.2
8
9
app = Flask(__name__)
10
11
# Read environment variable.
12
SIGNATURE_SECRET = os.environ.get('DT_SIGNATURE_SECRET')
13
14
@app.route('/', methods=["POST"])
15
def data_connector_endpoint():
16
# Extract the body as a bytestring and the signed JWT.
17
# We'll use these values to verify the request.
18
payload = request.get_data()
19
token = request.headers['x-dt-signature']
20
21
# Verify request origin and content integrity.
22
if not verify_request(payload, token):
23
return ('Could not verify request.', 400)
24
25
# We now know the request came from DT Cloud, and the integrity
26
# of the body has been verify. We can now handle the event safely.
27
handle_event(request.get_json())
28
29
# Respond with a 200 status code to ack the event. Any states codes
30
# that are outside the 2xx range will nack the event, meaning it will
31
# be retried later.
32
return ('OK', 200)
33
34
def verify_request(body, token):
35
"""
36
Verifies that the request originated from DT, and that the body
37
hasn't been modified since it was sent. This is done by verifying
38
that the checksum field of the JWT token matches the checksum of the
39
request body, and that the JWT is signed with the signature secret.
40
"""
41
42
# Decode the JWT, and verify that it was signed using the
43
# signature secret. Also verifies that the algorithm used was HS256.
44
try:
45
payload = jwt.decode(token, SIGNATURE_SECRET, algorithms=["HS256"])
46
except Exception as err:
47
print(err)
48
return False
49
50
# Verify the request body checksum.
51
m = hashlib.sha256()
52
m.update(body)
53
checksum = m.digest().hex()
54
if payload["checksum_sha256"] != checksum:
55
print('Checksum Mismatch')
56
return False
57
58
return True
59
60
def handle_event(body):
61
"""
62
Processes the event itself. For this example, we will just
63
decode a touch event, and print out the timestamp, device ID,
64
and the device type.
65
"""
66
# First, check if the event type is one of the event
67
# types we're expecting.
68
# As an example, we'll check for touch events here.
69
if body['event']['eventType'] == 'touch':
70
# Now that we know this is a device event, we can
71
# check for the device type and device identifier
72
# in the event metadata.
73
device_type = body['metadata']['deviceType']
74
device_id = body['metadata']['deviceId']
75
timestamp = body['event']['data']['touch']['updateTime']
76
77
print(f"Received touch event at {timestamp} from {device_type} sensor with id {device_id}")
Copied!
1
# This Python is built on Flask, docs are available here:
2
# https://flask.palletsprojects.com/en/2.0.x/quickstart/
3
4
import os
5
from dtintegrations import data_connector, provider
6
from flask import Flask, request # pip install Flask==2.0.2
7
8
app = Flask(__name__)
9
10
# Read environment variable.
11
DT_SIGNATURE_SECRET = os.getenv('DT_SIGNATURE_SECRET')
12
13
@app.route('/', methods=['POST'])
14
def dataconnector_endpoint():
15
# Validate and decode the incoming request.
16
payload = data_connector.HttpPush.from_provider(
17
request,
18
provider=provider.FLASK,
19
secret=DT_SIGNATURE_SECRET,
20
)
21
22
# Print the event data.
23
handle_event(payload)
24
25
# If all is well, return 200 response.
26
return ('OK', 200)
27
28
def handle_event(payload):
29
"""
30
Processes the event itself. For this example, we will just
31
decode a touch event, and print out the timestamp, device ID,
32
and the device type.
33
"""
34
# First, check if the event type is one of the event
35
# types we're expecting.
36
# As an example, we'll check for touch events here.
37
if payload.event.event_type == 'touch':
38
# Now that we know this is a device event, we can
39
# check for the device type and device identifier
40
# in the event metadata.
41
metadata = payload.get_device_metadata()
42
if metadata != None:
43
device_type = metadata.device_type
44
device_id = metadata.device_id
45
timestamp = payload.event.data.timestamp
46
47
print(f"Received touch event at {timestamp} from {device_type} sensor with id {device_id}")
Copied!
1
const crypto = require('crypto')
2
const express = require('express') // npm install [email protected]
3
const jwt = require('jsonwebtoken') // npm install [email protected]
4
5
// Read environment variable
6
const signatureSecret = process.env.DT_SIGNATURE_SECRET
7
8
// dataConnectorEndpoint receives, validates, and returns a response
9
// for the forwarded event.
10
const dataConnectorEndpoint = (req, res) => {
11
12
// Validate request origin and content integrity.
13
let token = req.headers['x-dt-signature']
14
if (verifyRequest(JSON.stringify(req.body), token) === false) {
15
res.sendStatus(400)
16
return
17
}
18
19
// We now know the request came from DT Cloud, and the integrity
20
// of the body has been verify. We can now handle the event safely.
21
handleEvent(req.body)
22
23
// Respond with a 200 status code to ack the event. Any states codes
24
// that are outside the 2xx range will nack the event, meaning it will
25
// be retried later.
26
res.sendStatus(200);
27
};
28
29
// Verifies that the request originated from DT, and that the body
30
// hasn't been modified since it was sent. This is done by verifying
31
// that the checksum field of the JWT token matches the checksum of the
32
// request body, and that the JWT is signed with the signature secret.
33
const verifyRequest = (payload, token) => {
34
// Decode the JWT, and verify that it was signed using the
35
// signature secret. Also verifies that the algorithm used was HS256.
36
let decoded
37
try {
38
decoded = jwt.verify(token, signatureSecret, { algorithms: "HS256" })
39
} catch(err) {
40
console.log(err)
41
return false
42
}
43
44
// Verify the request body checksum.
45
const hash = crypto.createHash("sha256")
46
const checksum = hash.update(payload).digest("hex")
47
if (checksum !== decoded.checksum_sha256) {
48
console.log('Checksum Mismatch')
49
return false
50
}
51
52
return true
53
}
54
55
// handleEvent processes the event itself. For this example,
56
// we will just decode a touch event, and print out the timestamp,
57
// device ID, and the device type.
58
const handleEvent = (payload) => {
59
// First, check if the event type is one of the event
60
// types we're expecting.
61
// As an example, we'll check for touch events here.
62
switch (payload.event.eventType) {
63
case 'touch':
64
// Now that we know this is a device event, we can
65
// check for the device type and device identifier
66
// in the event metadata.
67
const deviceType = payload.metadata.deviceType
68
const deviceId = payload.metadata.deviceId
69
const timestamp = payload.event.data.touch.updateTime
70
71
console.log(`Received touch event at ${timestamp} from ${deviceType} sensor with id ${deviceId}`)
72
break
73
default:
74
break
75
}
76
}
77
78
// Sets up a bare-bones server that listens on port 8080, and routes
79
// all requests to the path "/" to the `dataConnectorEndpoint` function
80
const app = express()
81
app.use(express.json())
82
app.listen(8080)
83
app.post('/', dataConnectorEndpoint)
84
Copied!
1
package main
2
3
import (
4
"crypto/sha256"
5
"encoding/hex"
6
"encoding/json"
7
"fmt"
8
"io/ioutil"
9
"net/http"
10
"os"
11
12
// go get github.com/golang-jwt/jwt/[email protected]
13
jwt "github.com/golang-jwt/jwt/v4"
14
)
15
16
// Read environment variable
17
var signatureSecret = os.Getenv("DT_SIGNATURE_SECRET")
18
19
// DataConnectorEndpoint receives, validates, and returns a response
20
// for the forwarded event.
21
func DataConnectorEndpoint(w http.ResponseWriter, r *http.Request) {
22
23
// Extract the body and the signed JWT.
24
// We'll use these values to verify the request.
25
tokenString := r.Header.Get("x-dt-signature")
26
bodyBytes, err := ioutil.ReadAll(r.Body)
27
if err != nil {
28
fmt.Println(err)
29
http.Error(w, err.Error(), http.StatusBadRequest)
30
return
31
}
32
33
// Validate request origin and content integrity.
34
if err := verifyRequest(bodyBytes, tokenString); err != nil {
35
fmt.Println(err)
36
http.Error(w, err.Error(), http.StatusBadRequest)
37
return
38
}
39
40
// We now know the request came from DT Cloud, and the integrity
41
// of the body has been verify. We can now handle the event safely.
42
if err := handleEvent(bodyBytes); err != nil {
43
fmt.Println(err)
44
http.Error(w, err.Error(), http.StatusBadRequest)
45
return
46
}
47
48
// Respond with a 200 status code to ack the event. Any states codes
49
// that are outside the 2xx range will nack the event, meaning it
50
// will be retried later.
51
w.WriteHeader(http.StatusOK)
52
_, _ = w.Write([]byte("OK"))
53
}
54
55
// verifyRequest verifies that the request originated from DT, and that
56
// the body hasn't been modified since it was sent. This is done by
57
// verifying that the checksum field of the JWT token matches the checksum
58
// of the request body, and that the JWT is signed with the signature secret.
59
func verifyRequest(bodyBytes []byte, tokenString string) error {
60
// Decode the JWT, and verify that it was signed using the
61
// signature secret. Also verifies the algorithm used to sign the JWT.
62
token, err := jwt.Parse(
63
tokenString,
64
func(token *jwt.Token) (interface{}, error) {
65
// Return out signature secret to verify that it was used to
66
// sign the JWT.
67
return []byte(signatureSecret), nil
68
},
69
jwt.WithValidMethods([]string{"HS256"}),
70
)
71
if err != nil {
72
return err
73
}
74
75
// Verify the request body checksum.
76
sha256Bytes := sha256.Sum256(bodyBytes)
77
sha256String := hex.EncodeToString(sha256Bytes[:])
78
claims := token.Claims.(jwt.MapClaims)
79
if sha256String != claims["checksum_sha256"] {
80
return fmt.Errorf("Checksum mismatch.")
81
}
82
83
return nil
84
}
85
86
// handleEvent processes the event itself. For this example,
87
// we will just decode a touch event, and print out the timestamp,
88
// device ID, and the device type.
89
func handleEvent(payload []byte) error {
90
// The structure of the events we'll received from a Data Connector.
91
type Event struct {
92
Event struct {
93
EventId string `json:"eventId"`
94
EventType string `json:"eventType"`
95
Data json.RawMessage `json:"data"`
96
Timestamp string `json:"timestamp"`
97
} `json:"event"`
98
Labels map[string]string `json:"labels"`
99
Metadata map[string]string `json:"metadata"`
100
}
101
102
// The structure of the `Event.Data` field for a touch event.
103
// We'll be using touch events for this example.
104
type TouchData struct {
105
Touch struct {
106
Timestamp string `json:"updateTime"`
107
} `json:"touch"`
108
}
109
110
// Decode the event
111
var event Event
112
if err := json.Unmarshal(payload, &event); err != nil {
113
return err
114
}
115
116
// First, check if the event type is one of the event
117
// types we're expecting.
118
// As an example, we'll check for touch events here.
119
switch event.Event.EventType {
120
case "touch":
121
// Now that we know this is a touch event, we can decode
122
// the `Event.Data` field.
123
var touchData TouchData
124
err := json.Unmarshal(event.Event.Data, &touchData)
125
if err != nil {
126
return err
127
}
128
129
// Also, since we now know this is a device event, we can
130
// check for the device type and device identifier
131
// in the event metadata.
132
deviceType := event.Metadata["deviceType"]
133
deviceId := event.Metadata["deviceId"]
134
timestamp := touchData.Touch.Timestamp
135
136
fmt.Printf("Received touch event at %s from %s sensor with id %s\n",
137
timestamp,
138
deviceType,
139
deviceId,
140
)
141
}
142
143
return nil
144
}
145
146
func main() {
147
// Sets up a bare-bones server that listens on port 8080, and
148
// routes all requests to the path "/" to the
149
// `DataConnectorEndpoint` function.
150
151
http.HandleFunc("/", DataConnectorEndpoint)
152
153
fmt.Println("Started listening on localhost:8080 ...")
154
if err := http.ListenAndServe(":8080", nil); err != nil {
155
fmt.Printf("Closed with error: %v\n", err)
156
} else {
157
fmt.Println("Closed server successfully")
158
}
159
}
160
Copied!

Handling Duplicates

Every event received by DT Cloud is put in a dedicated, per-Data Connector queue. Messages are removed from this queue once acknowledged, or if the message is older than 12 hours.
A side effect of this delivery guarantee is that, under certain conditions, you may receive duplicates of the same event. While rare, deduplication should be performed on the receiving end by checking event IDs.
Best Practice
Use the included eventId field to check for duplicated events.

Retry policy

Any time a Data Connector does not receive a successful response (HTTP status code 2xx), the event will be retried. If an event has not been successfully acknowledged after 12 hours, it will be discarded.
The retry interval is calculated as an exponential backoff policy, given by
t02n1,t_0\cdot2^{n-1},
where
t0t0
is the initial interval of 8 seconds and
nn
the retry counter. The interval will not exceed 1 hour. For very slow endpoints, the minimum retry interval will be
4x4x
the response time.
Attempt
Retry Interval [s]
1
8
2
16
3
32
...
...
9
2048
10
3600
11
3600