Comment on page
Refreshing Access Token
An example of how the authentication access token can be refreshed automatically.
When running integrations continuously, authentication towards the API must be refreshed periodically. By default, this must be done every hour and should happen automatically without interfering with the integration. In this example, a simple access token refresh routine that keeps the API authenticated has been implemented.
- Service Account Credentials You must create and know the credentials of a Service Account. Any role will suffice.
- OAuth2 This example heavily builds on the OAuth2 Authentication Guide. It is recommended that you understand this routine before attempting to expand upon it.
The following points summarize the provided example code.
- A simulated REST API call is triggered every 5 seconds.
- An instance of the Auth class provides the access token as required.
- If the access token is within 1 minute of expiration, the token is first refreshed.
If you wish to run the code locally, make sure you have a working runtime environment.
Python 3.11
Node.js 20
The following packages are required by the example code and must be installed.
pip install pyjwt==2.7.0
pip install requests==2.31.0
The following packages are required by the example code and must be added.
npm install [email protected]
npm install [email protected]
Add the following credentials to your environment as they will be used to authenticate the API.
export DT_SERVICE_ACCOUNT_KEY_ID=<YOUR_SERVICE_ACCOUNT_KEY_ID> # [string]
export DT_SERVICE_ACCOUNT_EMAIL=<YOUR_SERVICE_ACCOUNT_EMAIL> # [string]
export DT_SERVICE_ACCOUNT_SECRET=<YOUR_SERVICE_ACCOUNT_SECRET> # [string]
The following code snippet implements the token refresh routine.
Python 3.11
Node.js 20
import os
import time
import jwt # pip install pyjwt==2.1.0
import requests # pip install requests==2.25.1
# Constants for authentication credentials.
KEY_ID = os.environ.get('DT_SERVICE_ACCOUNT_KEY_ID', '')
SECRET = os.environ.get('DT_SERVICE_ACCOUNT_SECRET', '')
EMAIL = os.environ.get('DT_SERVICE_ACCOUNT_EMAIL', '')
class Auth():
"""
Handles automatic refresh of access token every
time get_token() is called with a buffer of 1 minute.
"""
refresh_buffer = 60 # s
auth_url = 'https://identity.disruptive-technologies.com/oauth2/token'
def __init__(self, key_id: str, email: str, secret: str):
# Set attributes.
self.key_id = key_id
self.email = email
self.secret = secret
# Initialize some variables.
self.token = ''
self.expiration = 0
def refresh(self):
# Construct the JWT header.
jwt_headers = {
'alg': 'HS256',
'kid': self.key_id,
}
# Construct the JWT payload.
jwt_payload = {
'iat': int(time.time()), # current unixtime
'exp': int(time.time()) + 3600, # expiration unixtime
'aud': self.auth_url,
'iss': self.email,
}
# Sign and encode JWT with the secret.
encoded_jwt = jwt.encode(
payload=jwt_payload,
key=self.secret,
algorithm='HS256',
headers=jwt_headers,
)
# Prepare HTTP POST request data.
# note: The requests package applies Form URL-Encoding by default.
request_data = {
'assertion': encoded_jwt,
'grant_type': 'urn:ietf:params:oauth:grant-type:jwt-bearer'
}
# Exchange the JWT for an access token.
access_token_response = requests.post(
url=self.auth_url,
headers={'Content-Type': 'application/x-www-form-urlencoded'},
data=request_data,
)
# Halt if response contains an error.
if access_token_response.status_code != 200:
print('Status Code: {}'.format(access_token_response.status_code))
print(access_token_response.json())
return None
# Unpack the response dictionary.
token_json = access_token_response.json()
self.token = token_json['access_token']
self.expiration = time.time() + token_json['expires_in']
def get_token(self):
# Check if access token needs a refresh. A 1 minute buffer is added.
if self.expiration - time.time() < self.refresh_buffer:
# Print expiration message to console.
print('Refreshing...')
# Fetch a brand new access token and expiration.
self.refresh()
# Print time until expiration.
print('Token expires in {}s.'.format(
int(self.expiration - time.time() - self.refresh_buffer),
))
# Return the token to user.
return self.token
def function_that_calls_rest_api(access_token: str):
# This would usually do something useful.
time.sleep(5)
if __name__ == '__main__':
# Initialize an authentication object.
auth = Auth(KEY_ID, EMAIL, SECRET)
# Do some task, here simulated by an infinite loop.
while True:
# Simulate some routine that needs authentication for the REST API.
function_that_calls_rest_api(auth.get_token())
const jwt = require('jsonwebtoken')
const axios = require('axios').default
// Authentication credentials.
const serviceAccountKeyID = process.env.DT_SERVICE_ACCOUNT_KEY_ID
const serviceAccountSecret = process.env.DT_SERVICE_ACCOUNT_SECRET
const serviceAccountEmail = process.env.DT_SERVICE_ACCOUNT_EMAIL
// Token refresh variables and constants.
const refreshBuffer = 60 // seconds
let expiration = 0 // unixtime
let accessToken = ''
async function getAccessToken(keyID, secret, email) {
// Check if access token needs a refresh. A buffer is added.
if (expiration - Math.floor(Date.now() / 1000) < refreshBuffer) {
// Print expiration message to console.
console.log('Refreshing...')
// Fetch a new token response.
let res = await refreshAccessToken(keyID, secret, email)
// Update token and expiration.
expiration = Math.floor(Date.now() / 1000) + res.expires_in
accessToken = res.access_token
}
// Print time until expiration.
let sLeft = expiration - Math.floor(Date.now() / 1000) - refreshBuffer
console.log(`Token expires in ${sLeft}s.`)
}
async function refreshAccessToken(keyID, secret, email) {
// Construct the JWT header.
let jwtHeaders = {
'alg': 'HS256',
'kid': keyID,
}
// Construct the JWT payload.
let jwtPayload = {
'iat': Math.floor(Date.now() / 1000), // current unixtime
'exp': Math.floor(Date.now() / 1000) + 3600, // expiration unixtime
'aud': 'https://identity.disruptive-technologies.com/oauth2/token',
'iss': email,
}
// Sign and encode JWT with the secret.
const jwtEncoded = jwt.sign(
jwtPayload,
secret,
{
header: jwtHeaders,
algorithm: 'HS256',
},
)
// Prepare POST request data.
const requestObject = {
'assertion': jwtEncoded,
'grant_type': 'urn:ietf:params:oauth:grant-type:jwt-bearer',
}
// Converts the requestObject to a Form URL-Encoded string.
const requestData = Object.keys(requestObject).map(function(key) {
return encodeURIComponent(key) + '=' + encodeURIComponent(requestObject[key])
}).join('&')
// Exchange JWT for access token.
const accessTokenResponse = await axios({
method: 'POST',
url: 'https://identity.disruptive-technologies.com/oauth2/token',
headers: { 'Content-Type': 'application/x-www-form-urlencoded' },
data: requestData,
}).catch(function (error) {
// Prints the error response (if any), an re-throws the error.
if (error.response) {
console.log(error.response.data)
}
throw error
})
// Return the response data.
return accessTokenResponse.data
}
async function functionThatCallsAPI(accessToken) {
// Use access token for calling the Rest API here.
// We'll just sleep for a few seconds.
await new Promise(r => setTimeout(r, 5000));
}
async function main() {
// Do some task, here simulated by an infite loop.
while (true) {
// Call some routine that needs the access token.
await functionThatCallsAPI(getAccessToken(
serviceAccountKeyID,
serviceAccountSecret,
serviceAccountEmail,
))
}
}
main()
For visualization purposes, an expiration timer is printed each time the token is used.
Refreshing...
Token expires in 3538s.
Token expires in 3533s.
Token expires in 3528s.
...
Token expires in 13s.
Token expires in 8s.
Token expires in 3s.
Refreshing...
Token expires in 3538s.
Token expires in 3533s
...
Last modified 4mo ago