Otoroshi Workflows
Workflows in Otoroshi provide a powerful way to describe and execute sequences of logic using a JSON-based pseudo-language. Think of it as a lightweight alternative to tools like n8n or Apache NiFi. Workflows let you orchestrate actions, transform data, trigger functions, and control flow based on conditional logic, all within the Otoroshi ecosystem.
Key Concepts
There are three primary building blocks:
- Nodes: The execution units that form the workflow steps.
- Functions: Reusable logic units that can be invoked by
callnodes. - Operators: Data transformation helpers that let you compute, compare, transform, and extract values within the workflow.
Otoroshi comes with a set of default implementations, but you can easily extend workflows with custom plugins:
WorkflowFunction.registerFunction("custom.function", new MyFunction())
Node.registerNode("custom-node", new MyNode())
WorkflowOperator.registerOperator("$custom_op", new MyOperator())
Basic Example
{
"kind": "workflow",
"steps": [
{
"kind": "call",
"description": "Say hello to the name passed as input",
"function": "core.hello",
"args": {
"name": "${workflow_input.name}"
},
"result": "call_res"
}
],
"returned": {
"$mem_ref": {
"name": "call_res"
}
}
}
Input:
{
"name": "foo"
}
Output:
{
"returned": "Hello foo !",
"error": null
}
Memory state:
{
"call_res": "Hello foo !",
"input": {
"name": "foo"
}
}
Memory
Each workflow execution comes with its own memory, where variables can be read from or written to. This memory enables communication between steps.
The assign node lets you manipulate memory directly, often combined with operators to compute values.
At any moment you can access the memory using an expression language like ${workflow_input.name} given a memory containing something like:
{
"input": {
"name": "foo"
}
}
Nodes
Each node must declare a kind field and can optionally define:
description(string) - The description of what this node does in the workflow (optional). for debug purposes onlyresult(string) - The name of the memory that will be assigned with the result of this node (optional)enabled(boolean) - Is the node enabled (optional)returned(string) - Overrides the output of the node with the result of an operator (optional)id(string) - id of the node (optional). for debug purposes onlykind(string) - The kind of the node- required fields are: kind
Full List of Nodes
Workflow (workflow)
This node executes a sequence of nodes sequentially
expected configuration:
description(string) - The description of what this node does in the workflow (optional). for debug purposes onlyresult(string) - The name of the memory that will be assigned with the result of this node (optional)enabled(boolean) - Is the node enabled (optional)returned(string) - Overrides the output of the node with the result of an operator (optional)id(string) - id of the node (optional). for debug purposes onlysteps(array) - the nodes to be executedkind(string) - The kind of the node- required fields are: kind
Usage example
{
"kind" : "workflow",
"description" : "This node executes says hello, waits and says hello again",
"steps" : [ {
"kind" : "call",
"function" : "core.hello"
}, {
"kind" : "wait",
"duration" : 10000
}, {
"kind" : "call",
"function" : "core.hello"
} ]
}
Switch paths (switch)
This node executes the first path matching a predicate
expected configuration:
description(string) - The description of what this node does in the workflow (optional). for debug purposes onlyresult(string) - The name of the memory that will be assigned with the result of this node (optional)enabled(boolean) - Is the node enabled (optional)paths(array) - the nodes to be executedpredicate(boolean) - The predicate defining if the path is run or not- required fields are:
returned(string) - Overrides the output of the node with the result of an operator (optional)id(string) - id of the node (optional). for debug purposes onlykind(string) - The kind of the node- required fields are: kind
Usage example
{
"kind" : "switch",
"description" : "This node will say 'Hello Otoroshi 1'",
"paths" : [ {
"predicate" : true,
"kind" : "call",
"function" : "core.hello",
"args" : {
"name" : "Otoroshi 1"
}
}, {
"predicate" : false,
"kind" : "call",
"function" : "core.hello",
"args" : {
"name" : "Otoroshi 2"
}
} ]
}
Pause (pause)
This node pauses the current workflow
expected configuration:
description(string) - The description of what this node does in the workflow (optional). for debug purposes onlyresult(string) - The name of the memory that will be assigned with the result of this node (optional)enabled(boolean) - Is the node enabled (optional)returned(string) - Overrides the output of the node with the result of an operator (optional)id(string) - id of the node (optional). for debug purposes onlykind(string) - The kind of the node- required fields are: kind
Usage example
{
"kind" : "pause",
"description" : "Pause the workflow at this point."
}
Wait (wait)
This node waits a certain amount of time
expected configuration:
description(string) - The description of what this node does in the workflow (optional). for debug purposes onlyresult(string) - The name of the memory that will be assigned with the result of this node (optional)enabled(boolean) - Is the node enabled (optional)returned(string) - Overrides the output of the node with the result of an operator (optional)id(string) - id of the node (optional). for debug purposes onlykind(string) - The kind of the node- required fields are: kind
Usage example
{
"kind" : "wait",
"description" : "This node waits 20 seconds",
"duration" : 20000
}
Parallel paths (parallel)
This node executes multiple nodes in parallel
expected configuration:
-
description(string) - The description of what this node does in the workflow (optional). for debug purposes only -
result(string) - The name of the memory that will be assigned with the result of this node (optional) -
enabled(boolean) - Is the node enabled (optional) -
paths(array) - the nodes to be executed- required fields are:
-
returned(string) - Overrides the output of the node with the result of an operator (optional) -
id(string) - id of the node (optional). for debug purposes only -
kind(string) - The kind of the node -
required fields are: kind
Usage example
{
"kind" : "parallel",
"description" : "This node call 3 core.hello function in parallel",
"paths" : [ {
"kind" : "call",
"function" : "core.hello",
"args" : {
"name" : "Otoroshi 1"
}
}, {
"kind" : "call",
"function" : "core.hello",
"args" : {
"name" : "Otoroshi 2"
}
}, {
"kind" : "call",
"function" : "core.hello",
"args" : {
"name" : "Otoroshi 3"
}
} ]
}
Flatmap (flatmap)
This node transforms an array by applying a node on each value
expected configuration:
description(string) - The description of what this node does in the workflow (optional). for debug purposes onlyresult(string) - The name of the memory that will be assigned with the result of this node (optional)enabled(boolean) - Is the node enabled (optional)returned(string) - Overrides the output of the node with the result of an operator (optional)id(string) - id of the node (optional). for debug purposes onlykind(string) - The kind of the node- required fields are: kind
Usage example
{
"kind" : "map",
"description" : "This node extract all emails from users",
"values" : "${users}",
"node" : {
"kind" : "value",
"value" : "${foreach_value.emails}"
}
}
Map (map)
This node transforms an array by applying a node on each value
expected configuration:
description(string) - The description of what this node does in the workflow (optional). for debug purposes onlyresult(string) - The name of the memory that will be assigned with the result of this node (optional)enabled(boolean) - Is the node enabled (optional)returned(string) - Overrides the output of the node with the result of an operator (optional)id(string) - id of the node (optional). for debug purposes onlykind(string) - The kind of the node- required fields are: kind
Usage example
{
"kind" : "map",
"description" : "This node will transform user names",
"values" : "${users}",
"node" : {
"kind" : "value",
"value" : {
"$str_upper_case" : "${foreach_value.name}"
}
}
}
For each (foreach)
This node executes a node for each element in an array
expected configuration:
description(string) - The description of what this node does in the workflow (optional). for debug purposes onlyresult(string) - The name of the memory that will be assigned with the result of this node (optional)enabled(boolean) - Is the node enabled (optional)returned(string) - Overrides the output of the node with the result of an operator (optional)id(string) - id of the node (optional). for debug purposes onlykind(string) - The kind of the node- required fields are: kind
Usage example
{
"kind" : "foreach",
"description" : "This node execute the core.hello function for each user",
"values" : "${users}",
"node" : {
"kind" : "call",
"function" : "core.hello",
"args" : {
"name" : "${foreach_value.name}"
}
}
}
Assign in memory (assign)
This node with executes a sequence of memory assignation operations sequentially
expected configuration:
description(string) - The description of what this node does in the workflow (optional). for debug purposes onlyresult(string) - The name of the memory that will be assigned with the result of this node (optional)enabled(boolean) - Is the node enabled (optional)returned(string) - Overrides the output of the node with the result of an operator (optional)id(string) - id of the node (optional). for debug purposes onlyvalues(array) - the assignation operations sequencename(string) - the name of the assignment in memoryvalue(any) - the value of the assignment- required fields are: name, value
kind(string) - The kind of the node- required fields are: kind
Usage example
{
"kind" : "assign",
"description" : "set a counter, increment it of 2 and initialize an array in memory",
"values" : [ {
"name" : "count",
"value" : 0
}, {
"name" : "count",
"value" : {
"$incr" : {
"value" : "count",
"increment" : 2
}
}
}, {
"name" : "items",
"value" : [ 1, 2, 3 ]
} ]
}
Value (value)
This node returns a value
expected configuration:
description(string) - The description of what this node does in the workflow (optional). for debug purposes onlyresult(string) - The name of the memory that will be assigned with the result of this node (optional)enabled(boolean) - Is the node enabled (optional)returned(string) - Overrides the output of the node with the result of an operator (optional)id(string) - id of the node (optional). for debug purposes onlykind(string) - The kind of the nodevalue(any) - the returned value- required fields are: kind
Usage example
{
"kind" : "value",
"description" : "This node returns 'foo'",
"value" : "foo"
}
If then else (if)
This executes a node if the predicate matches or another one if not
expected configuration:
predicate(boolean) - The predicate defining if the path is run or notdescription(string) - The description of what this node does in the workflow (optional). for debug purposes onlyresult(string) - The name of the memory that will be assigned with the result of this node (optional)enabled(boolean) - Is the node enabled (optional)else(object) - The node run if the predicate does not matchesreturned(string) - Overrides the output of the node with the result of an operator (optional)id(string) - id of the node (optional). for debug purposes onlythen(object) - The node run if the predicate matcheskind(string) - The kind of the node- required fields are: kind
Usage example
{
"kind" : "if",
"description" : "This node will say 'Hello Otoroshi 1'",
"predicate" : "${truthyValueInMemory}",
"then" : {
"kind" : "call",
"function" : "core.hello",
"args" : {
"name" : "Otoroshi 1"
}
},
"else" : {
"kind" : "call",
"function" : "core.hello",
"args" : {
"name" : "Otoroshi 2"
}
}
}
Stop and Error (error)
This node returns an error
expected configuration:
description(string) - The description of what this node does in the workflow (optional). for debug purposes onlyresult(string) - The name of the memory that will be assigned with the result of this node (optional)enabled(boolean) - Is the node enabled (optional)returned(string) - Overrides the output of the node with the result of an operator (optional)id(string) - id of the node (optional). for debug purposes onlykind(string) - The kind of the node- required fields are: kind
Usage example
{
"kind" : "error",
"description" : "This node fails the workflow with an error",
"message" : "an error occurred",
"details" : {
"foo" : "bar"
}
}
Call (call)
This node calls a function an returns its result
expected configuration:
description(string) - The description of what this node does in the workflow (optional). for debug purposes onlyresult(string) - The name of the memory that will be assigned with the result of this node (optional)enabled(boolean) - Is the node enabled (optional)returned(string) - Overrides the output of the node with the result of an operator (optional)id(string) - id of the node (optional). for debug purposes onlyfunction(string) - the function nameargs(object) - the arguments of the callkind(string) - The kind of the node- required fields are: kind
Usage example
{
"kind" : "call",
"description" : "This node call the core.hello function",
"function" : "core.hello",
"args" : {
"name" : "Otoroshi"
}
}
Filter (filter)
This node transforms an array by filtering values based on a node execution
expected configuration:
description(string) - The description of what this node does in the workflow (optional). for debug purposes onlyresult(string) - The name of the memory that will be assigned with the result of this node (optional)enabled(boolean) - Is the node enabled (optional)returned(string) - Overrides the output of the node with the result of an operator (optional)id(string) - id of the node (optional). for debug purposes onlykind(string) - The kind of the node- required fields are: kind
Usage example
{
"kind" : "filter",
"description" : "This node will filter out users that are not admins",
"values" : "${users}",
"predicate" : {
"kind" : "value",
"value" : "${foreach_value.admin}"
}
}
Functions
Workflow functions are reusable logic blocks invoked via call nodes.
Prototype:
{
"kind": "call",
"function": "<function_name>",
"args": { ... },
"result": "<memory_var_name>"
}
Full List of Functions
Read from Otoroshi config. (core.config_read)
This function retrieves values from otoroshi config.
expected configuration:
path(string) - The path of the config. to read- required fields are: path
Usage example
{
"kind" : "call",
"function" : "core.config_read",
"args" : {
"path" : "otoroshi.domain"
}
}
Datastore get (core.store_get)
This function gets keys from the store
expected configuration:
key(string) - The key to get- required fields are: key
Usage example
{
"kind" : "call",
"function" : "core.store_get",
"args" : {
"key" : "my_key"
}
}
Read a file (core.file_read)
This function reads a file
expected configuration:
path(string) - The path of the file to readparse_json(boolean) - Whether to parse the file as JSONencode_base64(boolean) - Whether to encode the file content in base64- required fields are: path
Usage example
{
"kind" : "call",
"function" : "core.file_read",
"args" : {
"path" : "/path/to/file.txt",
"parse_json" : true,
"encode_base64" : true
}
}
Hello function (core.hello)
This function returns a hello message
expected configuration:
name(string) - The name of the person to greet- required fields are: name
Usage example
{
"kind" : "call",
"function" : "core.hello",
"args" : {
"name" : "Otoroshi"
}
}
Datastore matching keys (core.store_match)
This function gets keys from the datastore matching a pattern
expected configuration:
pattern(string) - The pattern to match- required fields are: pattern
Usage example
{
"kind" : "call",
"function" : "core.store_match",
"args" : {
"pattern" : "my_pattern:*"
}
}
Datastore set (core.store_set)
This function sets a key in the datastore
expected configuration:
key(string) - The key to setvalue(string) - The value to setttl(number) - The optional time to live in seconds- required fields are: key, value
Usage example
{
"kind" : "call",
"function" : "core.store_set",
"args" : {
"key" : "my_key",
"value" : "my_value",
"ttl" : 3600
}
}
Get environment variable (core.env_get)
This function retrieves values from environment variables
expected configuration:
name(string) - The environment variable name- required fields are: name
Usage example
{
"kind" : "call",
"function" : "core.env_get",
"args" : {
"name" : "OPENAI_APIKEY"
}
}
Compute a resume token for the current workflow (core.compute_resume_token)
This function computes a resume token for the current workflow
expected configuration:
Usage example
{
"kind" : "call",
"function" : "core.compute_resume_token",
"args" : { }
}
Wasm call (core.wasm_call)
This function calls a wasm function
expected configuration:
wasm_plugin(string) - The wasm plugin to usefunction(string) - The function to callparams(object) - The parameters to passed to the function- required fields are: wasm_plugin, function
Usage example
{
"kind" : "call",
"function" : "core.wasm_call",
"args" : {
"wasm_plugin" : "my_wasm_plugin",
"function" : "my_function",
"params" : {
"foo" : "bar"
}
}
}
Delete a file (core.file_del)
This function deletes a file
expected configuration:
path(string) - The path of the file to delete- required fields are: path
Usage example
{
"kind" : "call",
"function" : "core.file_delete",
"args" : {
"path" : "/path/to/file.txt"
}
}
Datastore list keys (core.store_keys)
This function lists keys from the datastore
expected configuration:
pattern(string) - The pattern to match- required fields are: pattern
Usage example
{
"kind" : "call",
"function" : "core.store_keys",
"args" : {
"pattern" : "my_pattern:*"
}
}
Log a message (core.log)
This function writes whatever the user want to the otoroshi logs
expected configuration:
message(string) - The message to logparams(array) - The parameters to log- required fields are: message
Usage example
{
"kind" : "call",
"function" : "core.log",
"args" : {
"message" : "Hello",
"params" : [ "World" ]
}
}
HTTP client (core.http_client)
This function makes a HTTP request
expected configuration:
tls_config(object) - The TLS configurationmethod(string) - The HTTP method to usebody(string) - The body (string) to sendurl(string) - The URL to callbody_bytes(array) - The body (bytes array) to sendbody_json(object) - The body (json) to sendbody_str(string) - The body (string) to sendbody_base64(string) - The body (base64) to sendheaders(object) - The headers to sendtimeout(number) - The timeout in milliseconds- required fields are: url
Usage example
{
"kind" : "call",
"function" : "core.http_client",
"args" : {
"url" : "https://httpbin.org/get",
"method" : "GET",
"headers" : {
"User-Agent" : "Otoroshi"
},
"timeout" : 30000,
"body_json" : {
"foo" : "bar"
}
}
}
Send an email (core.send_mail)
This function sends an email
expected configuration:
mailer_config(object) - The mailer configurationsubject(string) - The email subjectto(array) - The recipient email addressesfrom(string) - The sender email addresshtml(string) - The email HTML content- required fields are: from, to, subject, html, mailer_config
Usage example
{
"kind" : "call",
"function" : "core.send_mail",
"args" : {
"from" : "sender@example.com",
"to" : [ "recipient@example.com" ],
"subject" : "Test email",
"html" : "Hello, this is a test email",
"mailer_config" : {
"kind" : "mailgun",
"api_key" : "your_api_key",
"domain" : "your_domain"
}
}
}
System call (core.system_call)
This function calls a system command
expected configuration:
command(array) - The command to execute- required fields are: command
Usage example
{
"kind" : "call",
"function" : "core.system_call",
"args" : {
"command" : [ "ls", "-l" ]
}
}
Get all resources from the state (core.state_get_all)
This function gets all resources from the state
expected configuration:
name(string) - The name of the resourcegroup(string) - The group of the resourceversion(string) - The version of the resource- required fields are: name, group, version
Usage example
{
"kind" : "call",
"function" : "core.state_get_all",
"args" : {
"name" : "my_resource",
"group" : "my_group",
"version" : "my_version"
}
}
Datastore delete (core.store_del)
This function deletes keys from the store
expected configuration:
keys(array) - The keys to delete- required fields are: keys
Usage example
{
"kind" : "call",
"function" : "core.store_del",
"args" : {
"keys" : [ "key1", "key2" ]
}
}
Write a file (core.file_write)
This function writes a file
expected configuration:
path(string) - The path of the file to writevalue(string) - The value to writeprettify(boolean) - Whether to prettify the JSONfrom_base64(boolean) - Whether to decode the base64 content- required fields are: path, value
Usage example
{
"kind" : "call",
"function" : "core.file_write",
"args" : {
"path" : "/path/to/file.txt",
"value" : "my_value",
"prettify" : true,
"from_base64" : true
}
}
Call a workflow (core.workflow_call)
This function calls another workflow stored in otoroshi
expected configuration:
workflow_id(string) - The ID of the workflow to callinput(object) - The input of the workflow- required fields are: workflow_id, input
Usage example
{
"kind" : "call",
"function" : "core.workflow_call",
"args" : {
"workflow_id" : "my_workflow_id",
"input" : {
"foo" : "bar"
}
}
}
Get a resource from the state (core.state_get)
This function gets a resource from the state
expected configuration:
id(string) - The ID of the resourcename(string) - The name of the resourcegroup(string) - The group of the resourceversion(string) - The version of the resource- required fields are: id, name, group, version
Usage example
{
"kind" : "call",
"function" : "core.state_get_one",
"args" : {
"id" : "my_id",
"name" : "my_resource",
"group" : "my_group",
"version" : "my_version"
}
}
Datastore get multiple keys (core.store_mget)
This function gets multiple keys from the datastore
expected configuration:
keys(array) - The keys to get- required fields are: keys
Usage example
{
"kind" : "call",
"function" : "core.store_mget",
"args" : {
"keys" : [ "key1", "key2" ]
}
}
Emit an event (core.emit_event)
This function emits an event
expected configuration:
event(object) - The event to emit- required fields are: event
Usage example
{
"kind" : "call",
"function" : "core.emit_event",
"args" : {
"event" : {
"type" : "object",
"properties" : {
"name" : {
"type" : "string",
"description" : "The name of the event"
}
}
}
}
}