flyde.flow

Flow

Flow is a root-level runnable directed acyclic graph of nodes.

Source code in flyde/flow.py
class Flow:
    """Flow is a root-level runnable directed acyclic graph of nodes."""

    def __init__(self, imports: dict[str, list[str]]):
        self._imports = imports
        self._path = ""
        self._base_path = ""
        self._node: Graph
        self._components: dict[str, Callable] = {}
        self._graphs: dict[str, dict] = {}

    def _preload_imports(self, base_path: str, imports: dict[str, list[str]]):
        if not imports:
            return

        for module, classes in imports.items():
            logger.debug(f"Importing {module}")
            # If module name ends with .flyde it's a Graph
            if module.endswith(".flyde"):
                node_id = classes[0]
                logger.debug(f"Importing graph {node_id} from {base_path}/{module}")
                yml = load_yaml_file(f"{base_path}/{module}")
                if not isinstance(yml, dict):
                    raise ValueError(f"Invalid YAML file {module}")
                # Merge the imports from the graph with the current imports recursively
                self._preload_imports(base_path, yml.get("imports", {}))
                # Save the blueprint YAML for the graph to be instantiated later
                self._graphs[node_id] = yml["node"]
                continue
            # Convert module path format
            module = module.replace("/", ".").replace("@", "")
            logger.debug(f"Importing module {module}")
            mod = importlib.import_module(module)
            for class_name in classes:
                logger.debug(f"Importing {class_name} from {module}")
                self._components[class_name] = getattr(mod, class_name)

    def _load_graph(self, name: str, path: str):
        """Loads a graph YAML."""
        full_path = os.path.join(self._base_path, path)
        yml = load_yaml_file(full_path)
        if not isinstance(yml, dict):
            raise ValueError(f"Invalid YAML file {path}")
        # Save the blueprint YAML for the graph to be instantiated later
        self._graphs[name] = yml["node"]
        return

    def _load_component(self, name: str, path: str):
        """Loads a component from a Python module."""
        # If component is already loaded, return
        if name in self._components:
            return

        # Handle custom://path/to/mod.py/ClassName format
        if path.startswith("custom://"):
            custom_path = path[9:]  # Remove "custom://" prefix
            if "/" in custom_path and custom_path.count("/") >= 1:
                # Split into module path and class name
                parts = custom_path.rsplit("/", 1)
                if len(parts) == 2:
                    module_path, class_name = parts

                    # Resolve the module path relative to the flow file's directory
                    if module_path.endswith(".py"):
                        # It's a file path, resolve it relative to the flow file directory
                        absolute_module_path = os.path.join(self._base_path, module_path)
                        # Convert to module name for importing
                        spec = importlib.util.spec_from_file_location(class_name, absolute_module_path)
                        if spec and spec.loader:
                            mod = importlib.util.module_from_spec(spec)
                            spec.loader.exec_module(mod)
                            self._components[name] = getattr(mod, class_name)
                            return
                    else:
                        # It's already a module path, convert file path to module path
                        module_path = module_path.replace("/", ".").replace(".py", "")

                        # Add the flow file's directory to sys.path temporarily for relative imports
                        original_path = sys.path[:]
                        if self._base_path not in sys.path:
                            sys.path.insert(0, self._base_path)

                        try:
                            logger.debug(f"Importing custom module {module_path}, class {class_name}")
                            mod = importlib.import_module(module_path)
                            self._components[name] = getattr(mod, class_name)
                            return
                        finally:
                            # Restore original sys.path
                            sys.path[:] = original_path

        # Handle @flyde/nodes package format for stdlib components
        if path == "@flyde/nodes":
            logger.debug(f"Loading stdlib component {name}")
            from flyde.nodes import Conditional, GetAttribute, Http, InlineValue

            stdlib_components = {
                "InlineValue": InlineValue,
                "Conditional": Conditional,
                "GetAttribute": GetAttribute,
                "Http": Http,
            }
            if name in stdlib_components:
                self._components[name] = stdlib_components[name]
                return

        raise ValueError(
            f"Invalid component source path: {path}. Only custom:// and @flyde/nodes formats are supported."
        )

    def create_graph(self, name: str, args: InstanceArgs):
        if name not in self._graphs:
            if args.source is None:
                raise ValueError(f"Graph {name} does not have a valid source")
            self._load_graph(name, args.source.data)

        # Merge the blueprint YAML with the arguments
        yml = self._graphs[name] | args.to_dict()
        node = Graph.from_yaml(self.factory, yml)
        return node

    def create_component(self, name: str, args: InstanceArgs):
        if name not in self._components:
            if args.source is None:
                raise ValueError(f"Component {name} does not have a valid source")
            self._load_component(name, args.source.data)

        if name not in self._components:
            raise ValueError(f"Component {name} could not be loaded")

        # Create the component instance
        component = self._components[name]
        return component(**args.to_dict())

    def factory(self, class_name: str, args: InstanceArgs):
        """Factory method to create a node from a class name and arguments.

        It is used by the runtime to create nodes from the YAML definition or on the fly.
        """
        if args.type == InstanceType.VISUAL:
            return self.create_graph(class_name, args)

        return self.create_component(class_name, args)

    def run(self):
        """Start the flow running. This is a non-blocking call as the flow runs in a separate thread."""
        self._node.run()

    def run_sync(self):
        """Run the flow synchronously. Shutdown handlers will be executed after the flow has finished."""
        self._node.run()
        self._node.stopped.wait()
        self._node.shutdown()

    @property
    def node(self) -> Graph:
        """The root node of the flow."""
        return self._node

    @property
    def stopped(self) -> Event:
        """Stopped event is set when the flow has finished working."""
        return self._node.stopped

    @classmethod
    def from_yaml(cls, path: str, yml: dict):
        """Load Flyde Flow definition from parsed YAML dict."""
        imports = yml.get("imports", {})

        if "node" not in yml:
            raise ValueError("No node in flow definition")

        ins = cls(imports)
        ins._path = path
        ins._base_path = os.path.dirname(path)
        ins._node = Graph.from_yaml(ins.factory, yml["node"])
        ins._node._stopped = Event()
        return ins

    @classmethod
    def from_file(cls, path: str):
        """Load Flyde Flow definition from a *.flyde YAML file."""
        yml = load_yaml_file(path)
        if not isinstance(yml, dict):
            raise ValueError("Invalid YAML file")

        # Get the absolute path from the yaml_file path and add it to the sys.path for discoverability
        add_folder_to_path(path)

        return cls.from_yaml(path, yml)

    def to_dict(self) -> dict:
        return {"imports": self._imports, "node": self._node.to_dict()}

node: Graph property

The root node of the flow.

stopped: Event property

Stopped event is set when the flow has finished working.

factory(class_name, args)

Factory method to create a node from a class name and arguments.

It is used by the runtime to create nodes from the YAML definition or on the fly.

Source code in flyde/flow.py
def factory(self, class_name: str, args: InstanceArgs):
    """Factory method to create a node from a class name and arguments.

    It is used by the runtime to create nodes from the YAML definition or on the fly.
    """
    if args.type == InstanceType.VISUAL:
        return self.create_graph(class_name, args)

    return self.create_component(class_name, args)

from_file(path) classmethod

Load Flyde Flow definition from a *.flyde YAML file.

Source code in flyde/flow.py
@classmethod
def from_file(cls, path: str):
    """Load Flyde Flow definition from a *.flyde YAML file."""
    yml = load_yaml_file(path)
    if not isinstance(yml, dict):
        raise ValueError("Invalid YAML file")

    # Get the absolute path from the yaml_file path and add it to the sys.path for discoverability
    add_folder_to_path(path)

    return cls.from_yaml(path, yml)

from_yaml(path, yml) classmethod

Load Flyde Flow definition from parsed YAML dict.

Source code in flyde/flow.py
@classmethod
def from_yaml(cls, path: str, yml: dict):
    """Load Flyde Flow definition from parsed YAML dict."""
    imports = yml.get("imports", {})

    if "node" not in yml:
        raise ValueError("No node in flow definition")

    ins = cls(imports)
    ins._path = path
    ins._base_path = os.path.dirname(path)
    ins._node = Graph.from_yaml(ins.factory, yml["node"])
    ins._node._stopped = Event()
    return ins

run()

Start the flow running. This is a non-blocking call as the flow runs in a separate thread.

Source code in flyde/flow.py
def run(self):
    """Start the flow running. This is a non-blocking call as the flow runs in a separate thread."""
    self._node.run()

run_sync()

Run the flow synchronously. Shutdown handlers will be executed after the flow has finished.

Source code in flyde/flow.py
def run_sync(self):
    """Run the flow synchronously. Shutdown handlers will be executed after the flow has finished."""
    self._node.run()
    self._node.stopped.wait()
    self._node.shutdown()