flyde.node
Component
Bases: Node
A node that runs a function when executed.
Source code in flyde/node.py
class Component(Node):
"""A node that runs a function when executed."""
def __init__(self, **kwargs):
super().__init__(**kwargs)
self._stop = Event()
self._mutex = Lock()
"""Run the main component function.
Defaut implementation looks for a reactive method called `process()` and calls it passing the input values.
"""
def run(self):
if not hasattr(self, "process"):
raise NotImplementedError("Component does not have neither run() nor process() method. No code to run.")
def worker():
logger.debug(f"Running {self._id} worker")
# Check if all inputs are sticky or static (not queue)
# If so, we only run the loop once
all_sticky_or_static = True
for inp in self.inputs.values():
if inp._input_mode == InputMode.QUEUE:
all_sticky_or_static = False
break
run_once = len(self.inputs) > 0 and all_sticky_or_static
while not self._stop.is_set():
logger.debug(f"Waiting for inputs on {self._id}")
inputs = {}
queue_count = 0
queue_closed_count = 0
skip_iteration = False
for key, inp in self.inputs.items():
is_queue = inp._input_mode == InputMode.QUEUE
value = inp.get()
inputs[key] = value
# Count EOFs received on non-static inputs
if is_queue:
queue_count += 1
if is_EOF(value):
# The input may be connected to multiple outputs, so we need to count the references
if inp.ref_count > 0:
inp.dec_ref_count()
if inp.ref_count == 0:
queue_closed_count += 1
else:
# Ignore this EOF, it's not the last one
inputs[key] = None
skip_iteration = True
if skip_iteration:
continue
# If all of the queue input values are EOF, stop the component
if queue_count > 0 and queue_count == queue_closed_count:
logger.debug(f"All queue inputs are EOF, stopping {self._id}")
self.stop()
break
logger.debug(f"Processing {self._id} with inputs: {inputs}")
res = self.process(**inputs) # type: ignore
if isinstance(res, dict) or (isinstance(res, tuple) and hasattr(res, "_fields")):
# Send values to the outputs named as keys
for k, v in res.items(): # type: ignore
if k not in self.outputs:
# Return Exception instead of raising because we are in a thread
e = ValueError(
f'{self._node_type}.process(): sending to non-existing output "{k}" from return value'
)
# logger.error(e)
self.stop()
self.finish()
raise e
logger.debug(f"Sending value '{v}' to output {k} of {self._id}")
if self.outputs[k].connected:
self.outputs[k].send(v)
# If all inputs are sticky/static, exit after the first iteration
if run_once:
logger.debug(f"All inputs are sticky or static for {self._id}, stopping after first execution")
break
self.finish()
logger.debug(f"Starting {self._id} thread")
thread = Thread(target=worker, daemon=False)
thread.start()
def stop(self):
"""Stop the component execution."""
logger.debug(f"Stopping {self._id}")
self._stop.set()
stop()
Stop the component execution.
Source code in flyde/node.py
def stop(self):
"""Stop the component execution."""
logger.debug(f"Stopping {self._id}")
self._stop.set()
Graph
Bases: Node
A visual graph node that contains other nodes.
Source code in flyde/node.py
class Graph(Node):
"""A visual graph node that contains other nodes."""
def __init__(
self,
/,
id: str = "",
node_type: str = "",
config: dict[str, InputConfig] = {},
display_name: str = "",
instances: dict[str, Node] = {},
instances_stopped: dict[str, Event] = {},
connections: list[Connection] = [],
inputs: dict[str, GraphPort] = {},
outputs: dict[str, GraphPort] = {},
stopped: Event = Event(),
):
super().__init__(
id=id,
node_type=node_type,
config=config,
display_name=display_name,
stopped=stopped,
)
self.inputs: dict[str, GraphPort] = inputs # type: ignore
self.outputs: dict[str, GraphPort] = outputs # type: ignore
self._connections = connections
self._instances = instances
self._instances_stopped = instances_stopped
# Wire all connections
for conn in self._connections:
from_id = conn.from_node.ins_id
from_pin = conn.from_node.pin_id
to_id = conn.to_node.ins_id
to_pin = conn.to_node.pin_id
# Validate the ids
if from_id == "__this":
if from_pin not in self.inputs:
raise ValueError(f"Input {from_pin} not found in graph {self._id}")
else:
self._check_pin("out", from_id, from_pin)
if to_id == "__this":
if to_pin not in self.outputs:
raise ValueError(f"Output {to_pin} not found in graph {self._id}")
else:
self._check_pin("in", to_id, to_pin)
if from_id != "__this" and to_id != "__this":
# Simple case: connect two instances inside the graph
input = self._instances[to_id].inputs[to_pin]
q = input.queue
output = self._instances[from_id].outputs[from_pin]
output.connect(q)
input.inc_ref_count()
elif from_id == "__this":
# Graph ports are reversed: we read from inputs and write to outputs
input = self._instances[to_id].inputs[to_pin]
q = input.queue
output = self.inputs[from_pin]
output.connect(q)
input.inc_ref_count()
elif to_id == "__this":
input = self.outputs[to_pin]
q = input.queue
output = self._instances[from_id].outputs[from_pin]
output.connect(q)
input.inc_ref_count()
def _check_pin(self, pin_type: str, instance_id: str, pin_id: str):
"""Check if the instance and pin exist."""
if instance_id not in self._instances:
raise ValueError(f"Instance {instance_id} not found")
if pin_type == "in" and pin_id not in self._instances[instance_id].inputs:
raise ValueError(f"Input {pin_id} not found in instance {instance_id}")
if pin_type == "out" and pin_id not in self._instances[instance_id].outputs:
raise ValueError(f"Output {pin_id} not found in instance {instance_id}")
def run(self):
"""Run the graph."""
for instance in self._instances.values():
logger.debug(f"Running instance {instance._id} of type {instance._node_type}")
instance.run()
def worker():
logger.debug(f"Running {self._id} worker")
# Wait for all instances to finish
for k, v in self._instances_stopped.items():
logger.debug(f"Waiting for instance {k} to stop")
v.wait()
logger.debug(f"Instance {k} stopped")
self.finish()
logger.debug(f"Graph {self._id} finished")
logger.debug(f"Starting {self._id} thread")
thread = Thread(target=worker, daemon=False)
thread.start()
def shutdown(self):
"""Call shutdown handlers on all instances.
This method is called from the main thread to allow cleanup and things like UI."""
for instance in self._instances.values():
# If the instance has a `shutdown()` handler method, call it at this point
if hasattr(instance, "shutdown"):
instance.shutdown()
def stop(self):
"""Stop all instances gracefully."""
# Close all inputs and wait for all instances to stop
for v in self.inputs.values():
if v.queue is not None and v.ref_count == 0:
v.queue.put(EOF)
def terminate(self):
"""Terminate all instances immediately."""
for instance in self._instances.values():
instance.stop()
self.stop()
self.finish()
@property
def stopped(self) -> Event:
"""Return the stopped event which is set when the node has stopped."""
return self._stopped
@stopped.setter
def stopped(self, value: Event):
"""Set the stopped event."""
self._stopped = value
@classmethod
def from_yaml(cls, create: InstanceFactory, yml: dict):
"""Create a Graph node from a parsed YAML dictionary."""
# Load metadata
node_type = yml.get("nodeId", __name__)
id = yml["id"] if "id" in yml else create_instance_id(node_type)
config = {k: InputConfig(**v) for k, v in yml.get("config", {}).items()}
display_name = yml.get("displayName", node_type)
# Load instances and macros
instances = {}
instances_stopped = {}
for ins in yml.get("instances", []):
ins_id = ins["id"]
stopped = Event()
ins["stopped"] = stopped
logger.debug(f"Creating instance {ins_id}")
instances[ins_id] = Node.from_yaml(create, ins)
instances_stopped[ins_id] = stopped
logger.debug(f"Loaded instance {ins_id}")
# Load connections and graph inputs/outputs
connections = [Connection.from_yaml(conn) for conn in yml.get("connections", [])]
inputs = {}
for k, v in yml.get("inputs", {}).items():
if "mode" in v:
v["required"] = Requiredness(v["mode"])
del v["mode"]
inputs[k] = GraphPort(id=k, **v)
outputs = {k: GraphPort(id=k, **v) for k, v in yml.get("outputs", {}).items()}
# Initialize the stopped event
stopped = Event()
if "stopped" in yml:
logger.debug(f"Creating graph {id} from yaml with stopped event")
stopped = yml["stopped"]
# Instatiate through the constructor
return cls(
id=id,
node_type=node_type,
config=config,
display_name=display_name,
instances=instances,
instances_stopped=instances_stopped,
connections=connections,
inputs=inputs,
outputs=outputs,
stopped=stopped,
)
def to_dict(self) -> dict:
"""Return a dictionary representation of the node."""
return {
"id": self._id,
"nodeId": self._node_type,
"config": self._config,
"displayName": self._display_name,
"inputs": self.inputs,
"outputs": self.outputs,
"instances": [v.to_dict() for k, v in self._instances.items()],
"connections": [conn.to_dict() for conn in self._connections],
}
stopped: Event
property
writable
Return the stopped event which is set when the node has stopped.
from_yaml(create, yml)
classmethod
Create a Graph node from a parsed YAML dictionary.
Source code in flyde/node.py
@classmethod
def from_yaml(cls, create: InstanceFactory, yml: dict):
"""Create a Graph node from a parsed YAML dictionary."""
# Load metadata
node_type = yml.get("nodeId", __name__)
id = yml["id"] if "id" in yml else create_instance_id(node_type)
config = {k: InputConfig(**v) for k, v in yml.get("config", {}).items()}
display_name = yml.get("displayName", node_type)
# Load instances and macros
instances = {}
instances_stopped = {}
for ins in yml.get("instances", []):
ins_id = ins["id"]
stopped = Event()
ins["stopped"] = stopped
logger.debug(f"Creating instance {ins_id}")
instances[ins_id] = Node.from_yaml(create, ins)
instances_stopped[ins_id] = stopped
logger.debug(f"Loaded instance {ins_id}")
# Load connections and graph inputs/outputs
connections = [Connection.from_yaml(conn) for conn in yml.get("connections", [])]
inputs = {}
for k, v in yml.get("inputs", {}).items():
if "mode" in v:
v["required"] = Requiredness(v["mode"])
del v["mode"]
inputs[k] = GraphPort(id=k, **v)
outputs = {k: GraphPort(id=k, **v) for k, v in yml.get("outputs", {}).items()}
# Initialize the stopped event
stopped = Event()
if "stopped" in yml:
logger.debug(f"Creating graph {id} from yaml with stopped event")
stopped = yml["stopped"]
# Instatiate through the constructor
return cls(
id=id,
node_type=node_type,
config=config,
display_name=display_name,
instances=instances,
instances_stopped=instances_stopped,
connections=connections,
inputs=inputs,
outputs=outputs,
stopped=stopped,
)
run()
Run the graph.
Source code in flyde/node.py
def run(self):
"""Run the graph."""
for instance in self._instances.values():
logger.debug(f"Running instance {instance._id} of type {instance._node_type}")
instance.run()
def worker():
logger.debug(f"Running {self._id} worker")
# Wait for all instances to finish
for k, v in self._instances_stopped.items():
logger.debug(f"Waiting for instance {k} to stop")
v.wait()
logger.debug(f"Instance {k} stopped")
self.finish()
logger.debug(f"Graph {self._id} finished")
logger.debug(f"Starting {self._id} thread")
thread = Thread(target=worker, daemon=False)
thread.start()
shutdown()
Call shutdown handlers on all instances.
This method is called from the main thread to allow cleanup and things like UI.
Source code in flyde/node.py
def shutdown(self):
"""Call shutdown handlers on all instances.
This method is called from the main thread to allow cleanup and things like UI."""
for instance in self._instances.values():
# If the instance has a `shutdown()` handler method, call it at this point
if hasattr(instance, "shutdown"):
instance.shutdown()
stop()
Stop all instances gracefully.
Source code in flyde/node.py
def stop(self):
"""Stop all instances gracefully."""
# Close all inputs and wait for all instances to stop
for v in self.inputs.values():
if v.queue is not None and v.ref_count == 0:
v.queue.put(EOF)
terminate()
Terminate all instances immediately.
Source code in flyde/node.py
def terminate(self):
"""Terminate all instances immediately."""
for instance in self._instances.values():
instance.stop()
self.stop()
self.finish()
to_dict()
Return a dictionary representation of the node.
Source code in flyde/node.py
def to_dict(self) -> dict:
"""Return a dictionary representation of the node."""
return {
"id": self._id,
"nodeId": self._node_type,
"config": self._config,
"displayName": self._display_name,
"inputs": self.inputs,
"outputs": self.outputs,
"instances": [v.to_dict() for k, v in self._instances.items()],
"connections": [conn.to_dict() for conn in self._connections],
}
InstanceArgs
dataclass
Arguments to pass to the instance factory.
Source code in flyde/node.py
@dataclass
class InstanceArgs:
"""Arguments to pass to the instance factory."""
id: str
display_name: str
stopped: Optional[Event]
config: dict[str, Any]
type: InstanceType = InstanceType.CODE
source: Optional[InstanceSource] = None
def to_dict(self) -> dict:
"""Convert the instance arguments to a dictionary."""
return {
"id": self.id,
"display_name": self.display_name,
"stopped": self.stopped,
"config": self.config,
}
to_dict()
Convert the instance arguments to a dictionary.
Source code in flyde/node.py
def to_dict(self) -> dict:
"""Convert the instance arguments to a dictionary."""
return {
"id": self.id,
"display_name": self.display_name,
"stopped": self.stopped,
"config": self.config,
}
InstanceSource
dataclass
Source configuration of an instance.
Source code in flyde/node.py
@dataclass
class InstanceSource:
"""Source configuration of an instance."""
type: InstanceSourceType
data: str
InstanceSourceType
Bases: Enum
InstanceSourceType is the source type of an instance.
FILE: The instance is created from a file. PACKAGE: The instance is created from a built in package. CUSTOM: The instance is created from a custom module with path format.
Source code in flyde/node.py
class InstanceSourceType(Enum):
"""InstanceSourceType is the source type of an instance.
FILE: The instance is created from a file.
PACKAGE: The instance is created from a built in package.
CUSTOM: The instance is created from a custom module with path format."""
FILE = "file"
PACKAGE = "package"
CUSTOM = "custom"
InstanceType
Bases: Enum
InstanceType is the type of an instance.
VISUAL: The instance is a visual node. CODE: The instance is a code node.
Source code in flyde/node.py
class InstanceType(Enum):
"""InstanceType is the type of an instance.
VISUAL: The instance is a visual node.
CODE: The instance is a code node.
"""
VISUAL = "visual"
CODE = "code"
Node
Bases: ABC
Node is the main building block of an application.
Attributes: id (str): A unique identifier for the node. node_type (str): The node type identifier. config (dict): A dictionary of input pin configurations. display_name (str): A human-readable name for the node. inputs (dict[str, Input]): Node input map. outputs (dict[str, Output]): Node output map.
Source code in flyde/node.py
class Node(ABC):
"""Node is the main building block of an application.
Attributes:
id (str): A unique identifier for the node.
node_type (str): The node type identifier.
config (dict): A dictionary of input pin configurations.
display_name (str): A human-readable name for the node.
inputs (dict[str, Input]): Node input map.
outputs (dict[str, Output]): Node output map.
"""
inputs: dict[str, Input] = {}
outputs: dict[str, Output] = {}
def __init__(
self,
/,
id: str,
node_type: str = "",
display_name: str = "",
inputs: dict[str, Input] = {},
outputs: dict[str, Output] = {},
stopped: Event = Event(),
config: dict[str, InputConfig] = {},
):
node_type = node_type if node_type else self.__class__.__name__
self._node_type = node_type
self._id = id if id else create_instance_id(node_type)
self._display_name = display_name if display_name else node_type
self._config_raw = config or {}
self._config = self.parse_config(self._config_raw)
if len(inputs) > 0:
self.inputs = inputs
elif hasattr(self.__class__, "inputs") and len(self.__class__.inputs) > 0:
# Copy from class definition, but instance will have own connections
self.inputs = deepcopy(self.__class__.inputs)
else:
self.inputs = {}
for k, v in self.inputs.items():
v.id = f"{self._id}.{k}"
if k in self._config:
v.apply_config(self._config[k])
if len(outputs) > 0:
self.outputs = outputs
elif hasattr(self.__class__, "outputs") and len(self.__class__.outputs) > 0:
# Copy from class definition, but instance will have own connections
self.outputs = deepcopy(self.__class__.outputs)
else:
self.outputs = {}
for k, vv in self.outputs.items():
vv.id = f"{self._id}.{k}"
self._stopped = stopped
def parse_config(self, config: dict[str, Any]) -> dict[str, Any]:
"""Parse the raw config into a typed config dictionary."""
result = {}
for key, value in config.items():
if isinstance(value, dict) and "type" in value and value["type"] in [item.value for item in InputType]:
config_value = value.get("value", None)
# If config_value is `{{key}}`, reset it to None, as this is the editor default for dynamic inputs
if config_value == f"{{{{{key}}}}}":
config_value = None
result[key] = InputConfig(
type=InputType(value["type"]),
value=config_value,
)
else:
result[key] = value
return result
@abstractmethod
def run(self):
"""Run the node. This method should be overridden by subclasses."""
pass
@abstractmethod
def stop(self):
"""Stop the node. This method should be overridden by subclasses."""
pass
def finish(self):
"""Finish the component execution gracefully by closing all its outputs and notifying others."""
logger.debug(f"Sending EOF to all outputs of {self._id}")
for output in self.outputs.values():
if output.connected:
output.send(EOF)
logger.debug(f"Node {self._id} finished, sending stopped event")
self._stopped.set()
logger.debug(f"Stop event set for node {self._id}")
@property
def stopped(self) -> Event:
return self._stopped
def shutdown(self):
"""Shutdown the component. This method is optional and can be overridden by subclasses."""
pass
def send(self, output_id: str, value: Any):
"""Send a value to an output."""
if output_id not in self.outputs:
raise ValueError(f"Output {output_id} not found in node {self._id}")
self.outputs[output_id].send(value)
def receive(self, input_id: str) -> Any:
"""Receive a value from an input."""
if input_id not in self.inputs:
raise ValueError(f"Input {input_id} not found in node {self._id}")
value = self.inputs[input_id].get()
if is_EOF(value):
raise value
return value
@classmethod
def from_yaml(cls, create: InstanceFactory, yml: dict):
"""Create a node from a parsed YAML dictionary."""
node_class_name = yml.get("nodeId", "VisualNode")
if node_class_name == "VisualNode":
return Graph.from_yaml(create, yml)
config = yml.get("config", {})
source = InstanceSource(
type=InstanceSourceType(yml.get("source", {}).get("type", "file").lower()),
data=yml.get("source", {}).get("data", ""),
)
args = InstanceArgs(
id=yml["id"],
display_name=yml.get("displayName", ""),
stopped=yml.get("stopped", None), # It's a hacky way to pass the stopped event to the constructor
config=config,
type=InstanceType(yml.get("type", "code").lower()),
source=source,
)
return create(node_class_name, args)
def to_dict(self) -> dict:
return {
"id": self._id,
"nodeId": self._node_type,
"config": self._config,
"displayName": self._display_name,
}
finish()
Finish the component execution gracefully by closing all its outputs and notifying others.
Source code in flyde/node.py
def finish(self):
"""Finish the component execution gracefully by closing all its outputs and notifying others."""
logger.debug(f"Sending EOF to all outputs of {self._id}")
for output in self.outputs.values():
if output.connected:
output.send(EOF)
logger.debug(f"Node {self._id} finished, sending stopped event")
self._stopped.set()
logger.debug(f"Stop event set for node {self._id}")
from_yaml(create, yml)
classmethod
Create a node from a parsed YAML dictionary.
Source code in flyde/node.py
@classmethod
def from_yaml(cls, create: InstanceFactory, yml: dict):
"""Create a node from a parsed YAML dictionary."""
node_class_name = yml.get("nodeId", "VisualNode")
if node_class_name == "VisualNode":
return Graph.from_yaml(create, yml)
config = yml.get("config", {})
source = InstanceSource(
type=InstanceSourceType(yml.get("source", {}).get("type", "file").lower()),
data=yml.get("source", {}).get("data", ""),
)
args = InstanceArgs(
id=yml["id"],
display_name=yml.get("displayName", ""),
stopped=yml.get("stopped", None), # It's a hacky way to pass the stopped event to the constructor
config=config,
type=InstanceType(yml.get("type", "code").lower()),
source=source,
)
return create(node_class_name, args)
parse_config(config)
Parse the raw config into a typed config dictionary.
Source code in flyde/node.py
def parse_config(self, config: dict[str, Any]) -> dict[str, Any]:
"""Parse the raw config into a typed config dictionary."""
result = {}
for key, value in config.items():
if isinstance(value, dict) and "type" in value and value["type"] in [item.value for item in InputType]:
config_value = value.get("value", None)
# If config_value is `{{key}}`, reset it to None, as this is the editor default for dynamic inputs
if config_value == f"{{{{{key}}}}}":
config_value = None
result[key] = InputConfig(
type=InputType(value["type"]),
value=config_value,
)
else:
result[key] = value
return result
receive(input_id)
Receive a value from an input.
Source code in flyde/node.py
def receive(self, input_id: str) -> Any:
"""Receive a value from an input."""
if input_id not in self.inputs:
raise ValueError(f"Input {input_id} not found in node {self._id}")
value = self.inputs[input_id].get()
if is_EOF(value):
raise value
return value
run()
abstractmethod
Run the node. This method should be overridden by subclasses.
Source code in flyde/node.py
@abstractmethod
def run(self):
"""Run the node. This method should be overridden by subclasses."""
pass
send(output_id, value)
Send a value to an output.
Source code in flyde/node.py
def send(self, output_id: str, value: Any):
"""Send a value to an output."""
if output_id not in self.outputs:
raise ValueError(f"Output {output_id} not found in node {self._id}")
self.outputs[output_id].send(value)
shutdown()
Shutdown the component. This method is optional and can be overridden by subclasses.
Source code in flyde/node.py
def shutdown(self):
"""Shutdown the component. This method is optional and can be overridden by subclasses."""
pass
stop()
abstractmethod
Stop the node. This method should be overridden by subclasses.
Source code in flyde/node.py
@abstractmethod
def stop(self):
"""Stop the node. This method should be overridden by subclasses."""
pass
create_instance_id(node_type)
Create a unique instance ID.
Source code in flyde/node.py
def create_instance_id(node_type: str) -> str:
"""Create a unique instance ID."""
return f"{node_type}-{uuid4()}"