To provide custom logic to the Camunda BPM workflows, we use External Service Tasks. These tasks are then implemented by a NestJS or Python worker. We prefer pull workers instead of “push” REST API endpoints because:
- it’s more scalable
- easier to debug (locally)
- potential for better (centralized) logging
- easier authentication mechanism (workers authenticating to Camunda REST API instead of Camunda authenticating to external task REST APIs)
- no need to worry about SSL etc. for individual workers
- does not require http-connector
Cons:
- a worker requires long-running operation, i.e. deployed in Kubernetes, cannot be deployed serverless to AWS Lambda. We reduce this overhead by bundling multiple workers into one project, and by ensuring workers are lightweight (does not use unnecessary dependencies).
Table of Contents
NestJS (TypeScript)
It may seem overkill to use NestJS for a worker that typically does not serve HTTP API. However, NestJS ensures uniform project structure, Jest testing, standardized TypeScript configuration, and provides room to add more transports in the future (including HTTP with Fastify). Here’s how to setup minimal NestJS for Camunda worker.
sudo yarn global add @nestjs/cli nest new --package-manager yarn lovia-[name]
Delete app.controller.spec.ts
and app.controller.ts
.
yarn remove @nestjs/platform-express @types/express yarn add dotenv @sentry/node camunda-external-task-client-js axios yarn add --dev @types/camunda-external-task-client-js
Prepend to top of .gitignore
:
/.env
Create files .env
and .env.dev
:
SENTRY_DSN= CAMUNDA_API_URL=https://camunda.lovia.life/engine-rest CAMUNDA_USERNAME= CAMUNDA_PASSWORD=
Tweak app.module.ts
:
import { Module } from '@nestjs/common'; import { AppService } from './app.service'; @Module({ imports: [], controllers: [], providers: [AppService], }) export class AppModule {}
Implement workers in app.service.ts
:
import { Injectable } from '@nestjs/common'; import { Client, logger, Variables, ClientConfig } from "camunda-external-task-client-js"; import * as camunda from "camunda-external-task-client-js"; const { BasicAuthInterceptor } = camunda as any; // TODO: https://jira.camunda.com/browse/CAM-11830 // const { BasicAuthInterceptor } = require("camunda-external-task-client-js"); import axios from 'axios'; @Injectable() export class AppService { /** * Subscribe Camunda workers. */ async subscribeWorkers() { const basicAuthentication = new BasicAuthInterceptor({ username: process.env.CAMUNDA_USERNAME, password: process.env.CAMUNDA_PASSWORD }); const config: ClientConfig = { baseUrl: process.env.CAMUNDA_API_URL, interceptors: basicAuthentication, use: logger, // recommended to avoid issues, see https://forum.camunda.org/t/external-workers-scalability/11384/4 maxTasks: 1 }; const client = new Client(config); // TODO: Change topic name client.subscribe('infra.rocketchat.PostMessage', this.postMessage.bind(this)); } /** * Posts a message to Rocket.Chat chatroom. * @param args Task and task service. */ async postMessage({task, taskService}: camunda.HandlerArgs) { const userId: number = task.variables.get('userId'); try { console.info(task.topicName, userId, '...'); // TODO: Implement const processVariables = new Variables(); // processVariables.set('user', userDoc); await taskService.complete(task, processVariables); } catch (e) { logger.error(`${task.topicName}: ${e}`); await taskService.handleFailure(task, {errorMessage: e.toString()}); } finally { // cleanup } } }
Edit main.ts
:
import { NestFactory } from '@nestjs/core'; import { AppModule } from './app.module'; import { AppService } from './app.service'; import { config } from 'dotenv'; import * as Sentry from '@sentry/node'; config(); Sentry.init({ dsn: process.env.SENTRY_DSN }); async function bootstrap() { // https://www.petermorlion.com/nestjs-aws-lambda-without-http/ const app = await NestFactory.createApplicationContext(AppModule); const appService = app.get(AppService); appService.subscribeWorkers(); } bootstrap();
Now you can start the workers (in development):
yarn start:dev
The worker user in Camunda needs to be authorized in the Process Definition for: TASK_WORK, READ_INSTANCE, READ_TASK, UPDATE_TASK_VARIABLE, UPDATE_INSTANCE_VARIABLE, READ_INSTANCE_VARIABLE, READ_TASK_VARIABLE, UPDATE_INSTANCE, UPDATE_TASK. (this may be more than needed, but too few authorizations will fail silently)
GitLab CI/CD
To deploy this worker to production, you’ll need:
- .dockerignore
- Dockerfile
- kube/*.yaml
- .gitlab-ci.yml
.dockerignore
.env node_modules
Dockerfile
FROM node:lts-slim WORKDIR /app COPY ./package.json ./ COPY ./yarn.lock ./ RUN yarn install COPY . . ENV NODE_ENV production RUN yarn build CMD ["yarn", "start:prod"]
kube/[PROJECT].yaml
apiVersion: apps/v1 kind: Deployment metadata: name: lovia-infra # FIXME: change this spec: selector: matchLabels: app: lovia-infra # FIXME: change this replicas: 1 template: metadata: labels: app: lovia-infra # FIXME: change this spec: containers: - name: lovia-infra # FIXME: change this image: registry.gitlab.com/lovia/lovia-infra:latest # FIXME: change this env: - name: SENTRY_DSN valueFrom: secretKeyRef: name: lovia-prod-infra # FIXME: change this key: sentry-dsn - name: CAMUNDA_API_URL valueFrom: secretKeyRef: name: lovia-prod-camunda-worker key: api-url - name: CAMUNDA_USERNAME value: worker - name: CAMUNDA_PASSWORD valueFrom: secretKeyRef: name: lovia-prod-camunda-worker key: password resources: requests: memory: 100Mi cpu: 100m limits: memory: 100Mi imagePullSecrets: - name: regcred
.gitlab-ci.yml
image: docker:latest services: - docker:dind variables: # Use TLS https://docs.gitlab.com/ee/ci/docker/using_docker_build.html#tls-enabled DOCKER_HOST: tcp://docker:2376 DOCKER_TLS_CERTDIR: "/certs" build: stage: build script: - docker login -u $CI_REGISTRY_USER -p $CI_REGISTRY_PASSWORD $CI_REGISTRY - docker pull $CI_REGISTRY_IMAGE:latest || true - docker build --cache-from $CI_REGISTRY_IMAGE:latest --tag $CI_REGISTRY_IMAGE:$CI_COMMIT_SHA --tag $CI_REGISTRY_IMAGE:latest . - docker push $CI_REGISTRY_IMAGE:$CI_COMMIT_SHA - docker push $CI_REGISTRY_IMAGE:latest only: - prod deploy_prod: stage: deploy image: name: bitnami/kubectl # https://gitlab.com/gitlab-org/gitlab-foss/-/issues/65110#note_198073241 entrypoint: [""] script: # FIXME: change the "lovia-infra" below #- kubectl apply -f kube/lovia-infra.yaml # https://github.com/kubernetes/kubernetes/issues/27081 #- kubectl patch deployment lovia-infra -p \ # "{\"spec\":{\"template\":{\"metadata\":{\"annotations\":{\"date\":\"`date +'%s'`\"}}}}}" environment: name: production only: - prod
As we’re using GitLab Kubernetes integration for deploying to environments, this means that each environment is its own Kubernetes namespace. You need to create the secrets for both GitLab Private Docker Container Registry and all app secrets.
kubectl create secret docker-registry regcred -n NAMESPACE --docker-server=registry.gitlab.com --docker-username=DOCKER_USER --docker-password=PERSONAL_ACCESS_TOKEN
kubectl create secret generic lovia-prod-infra -n NAMESPACE --from-literal=sentry-dsn=...
Python Worker
Unfortunately there is no official Python Camunda Worker library yet. There are some discussions, and Camunda External Task Python module.
So based on camundacon2019’s code, here’s a template:
import requests import json import time import uuid class client: def __init__(self, url, workerid = "defaultid"): self.url = url self.workerid = workerid def subscribe(self, topic, lockDuration = 1000, longPolling = 5000): # Define the endpoint for fetch and lock endpoint = str(self.url) +"/external-task/fetchAndLock" # Define unique ID for the worker #global uid #uid = uuid.uuid1() #uid = str(uid) workerid = str(self.workerid) #Define the Json for the Request task= {"workerId": workerid, "maxTasks":1, "usePriority":"true", "asyncResponseTimeout": longPolling, "topics": [{"topicName": topic, "lockDuration": lockDuration }] } #Make the request global engine engine = True try: fetch_and_lock = requests.post(endpoint, json=task) print(fetch_and_lock.status_code) global body body = fetch_and_lock.text except: engine = False print("Engine is down") if (engine == True): while body == '[]': print("polling") fetch_and_lock = requests.post(endpoint, json=task) body = fetch_and_lock.text time.sleep(5) if body != '[]': break data = json.loads(body) # TODO: Do your processing here with 'data', then call self.complete(vars) when done #Complete Call def complete(self, **kwargs): response_body = json.loads(body) taskid = response_body[0]['id'] taskid = str(taskid) endpoint = str(self.url) + "/external-task/" + taskid + "/complete" #get workerid workerid = response_body[0]['workerId'] workerid = str(workerid) #puts the variables from the dictonary into the nested format for the json response variables_for_response = {} for key, val in kwargs.items(): variable_new = {key:{"value": val}} variables_for_response.update(variable_new) response= {"workerId": workerid, "variables": variables_for_response } try: complete = requests.post(endpoint, json =response) body_complete = complete.text print(body_complete) print(complete.status_code) except: print('fail') #BPMN Error def error(self, bpmn_error, error_message = "not defined", **kwargs): response_body = json.loads(body) taskid = response_body[0]['id'] taskid = str(taskid) endpoint = str(self.url) + "/external-task/"+ taskid + "/bpmnError" workerid = response_body[0]['workerId'] workerid = str(workerid) variables_for_response = {} for key, val in kwargs.items(): variable_new = {key:{"value": val}} variables_for_response.update(variable_new) response = { "workerId": workerid, "errorCode": bpmn_error, "errorMessage": error_message, "variables": variables_for_response } try: error = requests.post(endpoint, json = response) print(error.status_code) except: print('fail') #Create an incident def fail(self, error_message, retries = 0, retry_timeout= 0): response_body = json.loads(body) taskid = response_body[0]['id'] taskid = str(taskid) endpoint = str(self.url) + "/external-task/"+ taskid + "/failure" workerid = response_body[0]['workerId'] workerid = str(workerid) response = { "workerId": workerid, "errorMessage": error_message, "retries": retries, "retryTimeout": retry_timeout} try: fail = requests.post(endpoint, json = response) print(fail.status_code) except: print('fail') # New Lockduration def new_lockduration(self, new_duration): response_body = json.loads(body) taskid = response_body[0]['id'] taskid = str(taskid) endpoint = str(self.url) + "/external-task/"+ taskid + "/extendLock" workerid = response_body[0]['workerId'] workerid = str(workerid) response = { "workerId": workerid, "newDuration": new_duration } try: newDuration = requests.post(endpoint, json = response) print(newDuration.status_code) print(workerid) except: print('fail')