Receiving Events
A few things to consider at the receiving side when forwarding events with Data Connectors.

Request Contents

Both the header and body of the incoming request contain information of interest that can be extracted and, depending on the configuration, used to verify the content and origin of the request.
If a signature secret is set in the configuration, the following header will be included.
  • X-Dt-Signature Includes a JSON Web Token (JWT). Once decoded using the signature secret, it contains all the necessary information to verify the request content and origin.
Note that depending on the framework used to receive the Data Connector events, the header name casing may differ. Some services will force header name to lower-case, like x-dt-signature.

Body

The request body contains two fields, event and labels. The following snippet shows an example request body of a touch event forwarded by a Data Connector.
1
{
2
"event": {
3
"eventId": "EVENT_ID",
4
"targetName": "projects/PROJECT_ID/devices/DEVICE_ID",
5
"eventType": "touch",
6
"data": {
7
"touch": {
8
"updateTime": "2021-05-28T08:34:06.225872Z"
9
}
10
},
11
"timestamp": "2021-05-28T08:34:06.225872Z"
12
},
13
"labels": {
14
"room-number": "99"
15
}
16
}
Copied!
Field
Type
Description
event
struct
Contains event data. See the Event documentation where the structure is explained in detail for each event type.
labels
sctruct
Device label key- and value pairs included by the Data Connector. See the Advanced Configuration page for details about include labels.

Acknowledging Received Event

A request-reply flow on the Endpoint URL should be implemented as follows:
  1. 1.
    Your endpoint receives an HTTPS POST request.
  2. 2.
    Your service processes the data in some way.
  3. 3.
    Your service replies to the event request with a 200 OK response.
What is important to note here is that the request should never return an HTTP 200 OK response before you are done processing it. When our cloud receives a 200 OK, the event will be taken off the internal Data Connector queue and checked off as received.
Best Practice
Do not reply 200 OK until you have finished processing your data.
Note that the status code range 2xx will also be accepted as OK in the response.

Verifying Signed Events

When using a Signature Secret, the X-Dt-Signature header is included and contains a JWT, signed by the Signature Secret. Inside, a checksum of the request body can be found and used to check for tampering.
The following steps sum up the process of verifying the received request at the receiving endpoint.
  1. 1.
    Extract the signed JWT from the HTTP header X-Dt-Signature of the received request.
  2. 2.
    Verify the JWT's signature with the signature secret.
  3. 3.
    Calculate a SHA1 checksum over the entire request body.
  4. 4.
    Compare the body checksum with the checksum contained in the JWT.
  5. 5.
    If these checksums are identical, you can be certain that the event has not been tampered with and originated from your Data Connector.
The following snippet implements the verification process. It is written to run on a Google Cloud Function.
Python 3.9
Python API
Node.js 14
Go 1.13
1
import os
2
import hashlib
3
import jwt
4
5
# Fetch environment variables.
6
SIGNATURE_SECRET = os.environ.get('DT_SIGNATURE_SECRET')
7
8
9
def verify_request(body, token):
10
# Decode the token using signature secret.
11
try:
12
payload = jwt.decode(token, SIGNATURE_SECRET, algorithms=["HS256"])
13
except Exception as err:
14
print(err)
15
return False
16
17
# Verify the request body checksum.
18
m = hashlib.sha1()
19
m.update(body)
20
checksum = m.digest().hex()
21
if payload["checksum"] != checksum:
22
print('Checksum Mismatch')
23
return False
24
25
return True
26
27
28
def dataconnector_endpoint(request):
29
# Extract necessary request information.
30
body = request.get_data()
31
token = request.headers['x-dt-signature']
32
33
# Validate request origin and content integrity.
34
if not verify_request(body, token):
35
return ('Could not verify request.', 400)
36
37
#
38
# Further processing here.
39
#
40
41
return ('OK', 200)
Copied!
1
import os
2
from dtintegrations import data_connector, provider
3
4
DT_SIGNATURE_SECRET = os.getenv('DT_SIGNATURE_SECRET')
5
6
7
def dataconnector_endpoint(request):
8
# Validate and decode the incoming request.
9
event = data_connector.http_push.decode_request(
10
request,
11
provider=provider.GCLOUD,
12
secret=DT_SIGNATURE_SECRET,
13
)
14
15
# Print the event data.
16
print(event)
17
18
# If all is well, return 200 response.
19
return ('OK', 200)
Copied!
1
const crypto = require('crypto')
2
const jwt = require('jsonwebtoken') // npm install [email protected]
3
4
// Fetch environment variables
5
const signatureSecret = process.env.SIGNATURE_SECRET
6
7
function verifyRequest(body, token) {
8
// Decode the token using signature secret.
9
let decoded
10
try {
11
decoded = jwt.verify(token, signatureSecret)
12
} catch(err) {
13
console.log(err)
14
return false
15
}
16
17
// Verify the request body checksum.
18
let shasum = crypto.createHash('sha1')
19
let checksum = shasum.update(body).digest('hex')
20
if (checksum !== decoded.checksum) {
21
console.log('Checksum Mismatch')
22
return false
23
}
24
25
return true
26
}
27
28
exports.dataconnectorEndpoint = (req, res) => {
29
// Extract necessary request information.
30
let body = JSON.stringify(req.body)
31
let token = req.get('X-Dt-Signature')
32
33
// Validate request origin and content integrity.
34
if (verifyRequest(body, token) === false) {
35
res.sendStatus(400)
36
return
37
}
38
39
//
40
// Further processing here.
41
//
42
43
res.sendStatus(200);
44
};
Copied!
1
package triggerfunction
2
3
import (
4
"crypto/sha1"
5
"encoding/hex"
6
"fmt"
7
jwt "github.com/dgrijalva/jwt-go" // go get github.com/dgrijalva/[email protected]
8
"io/ioutil"
9
"log"
10
"net/http"
11
"os"
12
)
13
14
// Environment variables.
15
var signatureSecret = os.Getenv("DT_SIGNATURE_SECRET")
16
17
// verifyRequest validates the request origin and content integrity.
18
func verifyRequest(bodyBytes []byte, tokenString string) error {
19
// Decode the token using signature secret.
20
claims := jwt.MapClaims{}
21
_, err := jwt.ParseWithClaims(tokenString, claims, func(token *jwt.Token) (interface{}, error) {
22
return []byte(signatureSecret), nil
23
})
24
if err != nil {
25
return err
26
}
27
28
// Verify the request body checksum.
29
sha1Bytes := sha1.Sum(bodyBytes)
30
sha1String := hex.EncodeToString(sha1Bytes[:])
31
if sha1String != claims["checksum"] {
32
return fmt.Errorf("Checksum mismatch.")
33
}
34
35
return nil
36
}
37
38
// DataConnectorEndpoint receives, validates, and returns a response for the forwarded event.
39
func DataconnectorEndpoint(w http.ResponseWriter, r *http.Request) {
40
41
// Extract necessary request information.
42
tokenString := r.Header.Get("x-dt-signature")
43
bodyBytes, err := ioutil.ReadAll(r.Body)
44
if err != nil {
45
log.Fatal(err)
46
}
47
48
// Validate request origin and content integrity.
49
if err := verifyRequest(bodyBytes, tokenString); err != nil {
50
log.Println(err)
51
http.Error(w, err.Error(), http.StatusBadRequest)
52
return
53
}
54
55
//
56
// Further processing here.
57
//
58
59
log.Println("OK")
60
fmt.Fprintf(w, "OK\n")
61
}
Copied!

Handling Duplicates

Every event received by DT Cloud is put in a dedicated, per-Data Connector queue. Messages are removed from this queue once acknowledged, or if the message is older than 12 hours.
A side effect of this delivery guarantee is that, under certain conditions, you may receive duplicates of the same event. While rare, deduplication should be performed on the receiving end by checking event IDs.
Best Practice
Use the included eventId field to check for duplicated events.

Retry policy

Any time a Data Connector does not receive a successful response (HTTP status code 2xx), the event will be retried. If an event has not been successfully acknowledged after 12 hours, it will be discarded.
The retry interval is calculated as an exponential backoff policy, given by
t02n1,t_0\cdot2^{n-1},
where
t0t0
is the initial interval of 8 seconds and
nn
the retry counter. The interval will not exceed 1 hour. For very slow endpoints, the minimum retry interval will be
4x4x
the response time.
Attempt
Retry Interval [s]
1
8
2
16
3
32
...
...
9
2048
10
3600
11
3600
Last modified 3mo ago