There are two ways to integrate your algorithm into Hkube:
1) No code involves, no WebSocket and no Docker.
2) Involves code writing, work with WebSocket and Docker.
Integrating algorithms into Hkube require 3 steps:
1) Implement connectivity with Hkube using WebSocket.
2) Build the algorithm image and push it to Docker registry.
3) Add the algorithm to Hkube.
We can do these first two steps for you, so you won't have to deal with WebSocket and Docker. The algorithm (your code) needs a way to communicate with Hkube (receive input, report results and errors)
Hkube communicates with algorithms via WebSocket using their full-duplex communication support.
All messages between Hkube and algorithm are in JSON format.
Using the hkubectl
hkubectl algorithm apply --f algorithm.yml
Create a basic algorithm yaml/json file
name: my-alg env: python resources: cpu: 0.5 gpu: 1 mem: 512Mi code: path: /home/user/my-alg.tar.gz entryPoint: main.py
Your algorithm must include a method named start
.
<T any> start(Dictionary<T string, T any> data)
There are also some optional methods:
<T> initialize(Dictionary data)
<T> stop(Dictionary data)
The data argument contains the following keys:
If the response contains a buildId, it means that a build was triggered, and you can follow the build status
You can do the same using our API
These events are sent from Hkube to your algorithm.
{ "command": "<string>", // one of the above "data": "<Object>" }
The first event sent to the algorithm, sent for every task activation.
{ "command": "initialize", "data": { "input": ["str", 512, false, {"foo":"bar"}] } }
data includes an input array, same input as written in the descriptor
The event the algorithm task is invoked by
{ "command": "start" }
This event includes no data
Event to abort the running algorithm task
{ "command": "stop" }
Event invoked before taking the algorithm container down. As best practice, when invoked make the process running the algorithm exit.
{ "command": "exit" }
Event to inform algorithm that sub pipeline (Raw or Stored) has started
{ "command": "subPipelineStarted", "data": { "subPipelineId": "<alg-subPipeline-internal-id>" } }
The "subPipelineId" property holds the sub pipeline internal Id in algorithm (as given in startRawSubPipeline/startStoredSubPipeline events).
Event to inform algorithm that sub pipeline (Raw or Stored) has failed.
{ "command": "subPipelineError", "data": { "subPipelineId": "<alg-subPipeline-internal-id>" "error": "error-message" } }
Event to inform algorithm that sub pipeline (Raw or Stored) has completed successfully.
{ "command": "subPipelineDone", "data": { "subPipelineId": "<alg-subPipeline-internal-id>" "response": ["array", "of", "subpipeline", "output", "values"] } }
Event to inform algorithm that sub pipeline has stopped
{ "command": "subPiplineStopped", "data": { "subPipelineId": "<alg-subPipeline-internal-id>", "reason": "<stopping-reason>" } }
These events are sent from algorithm to Hkube.
{ "command": "<string>", // one of the above "data": "<Any>", "error": "<Object>" { "code": "<string>", "message": "<string>", "details": "<string>" } }
Response event after initialization completes.
{ "command": "initialized" }
Response event after start complete.
{ "command": "started" }
Response event after stop complete.
{ "command": "stopped" }
Response event after the algorithm finish the task.
{ "command": "done" }
If you want to report progress about your algorithm, send this event.
{ "command": "progress", "data": "optional extra details" }
If any error occurs in your algorithm, send this event.
{ "command": "errorMessage", "error": { "code": "<YOUR_CODE>", "message": "<YOUR_MESSAGE>", "details": "<YOUR_DETAILS>" } }
If you want to start a Raw sub-pipeline from your algorithm, use this event.
{ "command": "startRawSubPipeline", "data": { "subPipeline": { "name": "<sub-pipeline-name>", "nodes": [ { "nodeName": "<first-node-name>", "algorithmName": "<alg-name>", "input": ["@flowInput.data"] } ], "options": { }, "webhooks": { }, "flowInput": { "data": ["array", "of", "subpipeline", "input", "values"] } }, "subPipelineId": "<alg-subPipeline-internal-id>", } }
If you want to start a Stored sub-pipeline from your algorithm, use this event.
{ "command": "startStoredSubPipeline", "data": { "subPipeline": { "name": "<stored-sub-pipeline-name>", "flowInput": { "data": ["array", "of", "subpipeline", "input", "values"] } }, "subPipelineId": "<alg-subPipeline-internal-id>", } }
If you want to stop a sub-pipeline (Raw or Stored) from your algorithm, use this event.
{ "command": "stopSubPipeline", "data": { "subPipelineId": "<alg-subPipeline-internal-id>", "reason": "<reason>", } }
To start a tracer span, use this event:
{ "command": "startSpan", "data": { "name": "<span-name>", "tags": { "<key1>": <value1>, "<key2>": <value2>, ... } } }
To finish the last opened tracer span, use this event:
{ "command": "finishSpan", "data": { "tags": { "<key1>": <value1>, "<key2>": <value2>, ... }, "error": "<error-text>" } }
Hkube communicates with your algorithm via WebSocket (native WebSocket or socketio).
This tutorial explains how to create a websocket client that works with Hkube.
You can implement the websocket client in any language. (PR are welcomed)
The first thing your algorithm should do is create a websocket client that listens to: ws://localhost:3000.
// create ws client and listen to ws://localhost:3000 this._socket = new WebSocket(this._url); this._socket.on('open', () => { log.debug("connected"); });
Here we are registering to events from Hkube.
Each event has a specific handler, as described below.
this._socket.on('message', (message) => { const payload = JSON.parse(message); switch (payload.command) { case messages.incoming.initialize: this._initialize(payload); break; case messages.incoming.start: this._start(); break; case messages.incoming.stop: this._stop(); break; default: log.debug("unknown message payload.command"); } });
The initialize event is the first event that Hkube sends to your algorithm.
The payload of this event includes the pipeline data and the input for your algorithm.
You need to store the input in a local variable for later use.
same input as written in the descriptor
_initialize(payload) { this._input = payload.data.input; // store the input this._send(messages.outgoing.initialized); // send ack event }
The start event is the second event that Hkube sends to your algorithm.
As you can see, at the first step of this handler you need to tell Hkube that your algorithm has started.
Then you let the algorithm do it's work and finally you send the done event with the algorithm result.
_start() { this._send(messages.outgoing.started); // send ack event try { // your code goes here... const output = await myAlgorithm.process(this._input); // use the input // your code goes here... // send response this._send(messages.outgoing.done, { data: output }); } catch (error) { this._onError(error); // send error event } }
Hkube will send this event to your algorithm only if stop request was made by Hkube users.
_stop() { // your code goes here... // e.g. myAlgorithm.stop(); // your code goes here... this._send(messages.outgoing.stopped); // send ack event }
Web Sockets are not auto reconnect, so it's important that you will handle connection lose.
// connect handler _connect() { this._socket = new WebSocket(this._url); this._socket.on('close', (code, reason) => { switch (code) { case 1000: log.debug("socket normal closed"); break; default: this._reconnect(); break; } }); this._socket.on('error', (e) => { switch (e.code) { case 'ECONNREFUSED': this._reconnect(); break; default: log.error("error"); break; } }); } // reconnect handler _reconnect() { log.debug("socket reconnecting"); this._socket.removeAllListeners(); setTimeout(() => { this._connect(); }, this._reconnectInterval); } }
It's highly recommended that you will catch any error in your algorithm and send it to Hkube.
_onError(error) { this._send(messages.outgoing.error, { error: { code: 'Failed', message: error.message || error, details: error.stackTrace } }); }
This is a simple handler for send response back to Hkube.
_send(command, data, error) { try { this._socket.send(JSON.stringify({ command, data, error })); } catch (e) { log.error(e); } }
Algorithms using Tensorflow can generate metrics for a Tenosrboard view. Later upon request, a Tensorboard webserver will start, serving a dashboard comparing different runs of the algorithm. To allow hkube to display your algorithms Tesorboard metrics: In the algorithm code, write your Tensorboard metrics to a folder path set as environment variable ALGO_METRICS_DIR value. To run Tesorboard: In Hkube spec find 'board' api to start a tensorboard web server, visualizing the tensor metrics.