HKube offers two ways to integrate your algorithm:
HKube communicates with algorithms using WebSocket and JSON messages.
Integrating an algorithm into HKube involves three main steps:
1) Implement connectivity with HKube using WebSocket.
2) Build the algorithm image and push it to Docker registry.
3) Add the algorithm to HKube.
We can handle the first two steps for you – so you don’t have to deal with WebSocket or Docker yourself.
HKube and your algorithm communicate using WebSocket's full-duplex messaging system.
All messages between them are in JSON format.
If you want a simple, code-free setup, use hkubectl
to register an algorithm without handling WebSockets or Docker manually.
- No code involved.
- No WebSocket.
- No Docker.
Steps to Register an Algorithm:
1. Create an algorithm configuration file (YAML or JSON).
2. Apply the algorithm using hkubectl
.
Example YAML Configuration:
name: my-alg env: python resources: cpu: 0.5 gpu: 1 mem: 512Mi code: path: /home/user/my-alg.tar.gz entryPoint: main.py
Apply Algorithm: Run the following command:
hkubectl algorithm apply --f algorithm.yml
Key Parameters:
env
- Supported environments: python
, nodejs
, jvm
resources
- Define CPU, GPU, and memory allocation.code.path
- Path to the algorithm`s source code.code.entryPoint
- The file containing the algorithm methods.Algorithm Methods:
start
method.<T any> start(Dictionary<T string, T any> data)
initialize(data)
, stop(data)
<T> initialize(Dictionary data) <T> stop(Dictionary data)
The data
argument contains the following keys:
input
Array <Any>pipelineName
<String>algorithmName
<String>nodeName
<String>jobId
<String>taskId
<String>info
<Object>extraData
<Object>lastRunResult
<Object>If the response contains a buildId
, it means a build was triggered. You can monitor its progress: Build Status
You can do the same using our API
This section explains the events exchanged between HKube and your algorithm, and Sub-Pipelines.
These events are sent from HKube to your algorithm.
Example Event Format:
{ "command": "<string>", // one of the above "data": "<Object>" }
This is the first event sent to the algorithm when a task starts.
{ "command": "initialize", "data": { "input": ["str", 512, false, {"foo":"bar"}] } }
data includes an input array, same input as written in the descriptor
Invokes the algorithm task.
{ "command": "start" }
This event includes no data
Sends a request to stop the algorithm task.
{ "command": "stop" }
Event invoked before taking the algorithm container down. As best practice, when invoked make the process running the algorithm exit.
{ "command": "exit" }
Event to inform algorithm that sub pipeline (Raw or Stored) has started
{ "command": "subPipelineStarted", "data": { "subPipelineId": "<alg-subPipeline-internal-id>" } }
The "subPipelineId" property holds the sub pipeline internal Id in algorithm (as given in startRawSubPipeline/startStoredSubPipeline events).
Event to inform algorithm that sub pipeline (Raw or Stored) has failed.
{ "command": "subPipelineError", "data": { "subPipelineId": "<alg-subPipeline-internal-id>" "error": "error-message" } }
Event to inform algorithm that sub pipeline (Raw or Stored) has completed successfully.
{ "command": "subPipelineDone", "data": { "subPipelineId": "<alg-subPipeline-internal-id>" "response": ["array", "of", "subpipeline", "output", "values"] } }
Event to inform algorithm that sub pipeline has stopped
{ "command": "subPiplineStopped", "data": { "subPipelineId": "<alg-subPipeline-internal-id>", "reason": "<stopping-reason>" } }
These events are sent from algorithm to HKube.
Example Event Format:
{ "command": "<string>", // one of the above "data": "<Any>", "error": "<Object>" { "code": "<string>", "message": "<string>", "details": "<string>" } }
Response event after initialization completes.
{ "command": "initialized" }
Response event after start complete.
{ "command": "started" }
Response event after stop complete.
{ "command": "stopped" }
Sent when the algorithm completes its task.
{ "command": "done" }
If you want to report progress about your algorithm, send this event.
{ "command": "progress", "data": "optional extra details" }
Sent if an error occurs.
{ "command": "errorMessage", "error": { "code": "<YOUR_CODE>", "message": "<YOUR_MESSAGE>", "details": "<YOUR_DETAILS>" } }
You can trigger and manage sub-pipelines directly from your algorithm.
If you want to start a Raw sub-pipeline from your algorithm, use this event.
{ "command": "startRawSubPipeline", "data": { "subPipeline": { "name": "<sub-pipeline-name>", "nodes": [ { "nodeName": "<first-node-name>", "algorithmName": "<alg-name>", "input": ["@flowInput.data"] } ], "options": { }, "webhooks": { }, "flowInput": { "data": ["array", "of", "subpipeline", "input", "values"] } }, "subPipelineId": "<alg-subPipeline-internal-id>", } }
If you want to start a Stored sub-pipeline from your algorithm, use this event.
{ "command": "startStoredSubPipeline", "data": { "subPipeline": { "name": "<stored-sub-pipeline-name>", "flowInput": { "data": ["array", "of", "subpipeline", "input", "values"] } }, "subPipelineId": "<alg-subPipeline-internal-id>", } }
If you want to stop a sub-pipeline (Raw or Stored) from your algorithm, use this event.
{ "command": "stopSubPipeline", "data": { "subPipelineId": "<alg-subPipeline-internal-id>", "reason": "<reason>", } }
To start a tracer span, use this event:
{ "command": "startSpan", "data": { "name": "<span-name>", "tags": { "<key1>": <value1>, "<key2>": <value2>, ... } } }
To finish the last opened tracer span, use this event:
{ "command": "finishSpan", "data": { "tags": { "<key1>": <value1>, "<key2>": <value2>, ... }, "error": "<error-text>" } }
HKube communicates with your algorithm via WebSocket (native WebSocket or socketio).
This tutorial explains how to create a websocket client that works with HKube.
You can implement the websocket client in any language (PR are welcomed).
The first thing your algorithm should do is create a WebSocket client that connects to: ws://localhost:3000.
// create ws client and listen to ws://localhost:3000 this._socket = new WebSocket(this._url); this._socket.on('open', () => { log.debug("connected"); });
Here we are registering to events from HKube.
Each event has a specific handler, as described below.
this._socket.on('message', (message) => { const payload = JSON.parse(message); switch (payload.command) { case messages.incoming.initialize: this._initialize(payload); break; case messages.incoming.start: this._start(); break; case messages.incoming.stop: this._stop(); break; default: log.debug("unknown message payload.command"); } });
The initialize event is the first event that HKube sends to your algorithm.
The payload of this event includes the pipeline data and the input for your algorithm.
You need to store the input in a local variable for later use.
same input as written in the descriptor
_initialize(payload) { this._input = payload.data.input; // store the input this._send(messages.outgoing.initialized); // send ack event }
The start event is the second event that HKube sends to your algorithm.
As you can see, at the first step of this handler you need to tell HKube that your algorithm has started.
Then you let the algorithm do it's work and finally you send the done event with the algorithm result.
_start() { this._send(messages.outgoing.started); // send ack event try { // your code goes here... const output = await myAlgorithm.process(this._input); // use the input // your code goes here... // send response this._send(messages.outgoing.done, { data: output }); } catch (error) { this._onError(error); // send error event } }
HKube will send this event to your algorithm only if stop request was made by HKube users.
_stop() { // your code goes here... // e.g. myAlgorithm.stop(); // your code goes here... this._send(messages.outgoing.stopped); // send ack event }
Web Sockets are not auto reconnect, so it's important that you will handle connection lose.
// connect handler _connect() { this._socket = new WebSocket(this._url); this._socket.on('close', (code, reason) => { switch (code) { case 1000: log.debug("socket normal closed"); break; default: this._reconnect(); break; } }); this._socket.on('error', (e) => { switch (e.code) { case 'ECONNREFUSED': this._reconnect(); break; default: log.error("error"); break; } }); } // reconnect handler _reconnect() { log.debug("socket reconnecting"); this._socket.removeAllListeners(); setTimeout(() => { this._connect(); }, this._reconnectInterval); } }
It’s recommended to catch and report errors in your algorithm to HKube.
_onError(error) { this._send(messages.outgoing.error, { error: { code: 'Failed', message: error.message || error, details: error.stackTrace } }); }
This is a simple handler for send response back to HKube.
_send(command, data, error) { try { this._socket.send(JSON.stringify({ command, data, error })); } catch (e) { log.error(e); } }
If your algorithm uses TensorFlow, you can generate metrics for visualization in TensorBoard. HKube supports this by allowing you to store TensorBoard-compatible logs, which can later be viewed in a dedicated dashboard. How It Works:
ALGO_METRICS_DIR
.Running TensorBoard: To launch TensorBoard in HKube, use the 'board' API in the HKube spec. This will start a web server where you can visualize and analyze the recorded metrics.