Skip to content

Chapter 3: Workflow Engine (Pocket Flow)#

Welcome back! In the previous chapters, we learned about the Web Interface (Frontend) that takes your request and the Serverless Deployment (Azure Functions) that receives that request and puts it on a waiting list (a queue) for processing.

But what exactly happens inside the Azure Function that takes a job from the queue? Generating a tutorial isn't a single, simple step. It involves fetching code, understanding it, talking to AI, organizing content, and saving files. That's a lot of tasks! How do we make sure they happen in the right order, handle information between them, and deal with potential issues?

This is where our Workflow Engine, a small framework called Pocket Flow, comes in.

The Problem: Orchestrating Complex Tasks#

Imagine you're baking a complicated cake. You don't just mix everything at once. You follow steps: measure flour, add sugar, mix wet ingredients, mix dry ingredients, combine, bake, cool, frost. Each step needs the result of the previous one, and you need to make sure you don't add the frosting before baking!

Similarly, generating a tutorial involves a sequence of operations. We need: 1. To get the code. 2. To figure out the main ideas or "abstractions" in the code. 3. To understand how those ideas connect. 4. To decide the best order to explain them in the tutorial. 5. To write each chapter based on its concept and relevant code. 6. To put all the chapters together into the final tutorial format.

If any step fails (like the AI call times out), we might need to retry that step without starting everything over. We also need a way for the results of one step (like the fetched code) to be easily available to the next step (like the step that identifies concepts).

The Solution: Pocket Flow#

Pocket Flow acts like a project supervisor or the recipe book for our tutorial generation process. It breaks the entire job into smaller, manageable pieces called Nodes. It then defines the order in which these Nodes should run, making sure the 'ingredients' (data) from one Node are passed correctly to the next.

Pocket Flow is designed to be lightweight and flexible. Its core ideas are simple but powerful:

  • Nodes: The individual steps or tasks (like "Fetch Code" or "Write Chapter").
  • Flows: How the Nodes are connected and the order they run in.
  • Shared Store: A central place where all Nodes can read and write data they need.

Let's dive into these concepts using analogies.

Nodes: The Task Boxes#

Think of a Node as a small, self-contained box that does just one specific job. Our Nodes are written in Python.

Each Node typically has three main phases:

  1. prep(shared): This is the "Get Ready" phase. The Node looks at the shared data (our central table) and gathers everything it needs to do its job. It prepares the 'ingredients'. It returns this prepared data.
  2. exec(prep_res): This is the "Do the Work" phase. The Node performs its core task using the data prepared in prep. This is where the main computation or external interaction happens (like calling the AI). This phase can be set up to automatically retry if it fails. It returns the result of its work.
  3. post(shared, prep_res, exec_res): This is the "Clean Up and Pass On" phase. The Node takes the result from exec and saves it back into the shared data so other Nodes can use it. It also decides what happens next in the workflow (which Node should run after this one).
flowchart LR
    shared_in[(Shared Store)] --> prep_node[Node.prep()]
    prep_node --> prep_res_data[(Prepared Data)]
    prep_res_data --> exec_node[Node.exec()]
    exec_node --> exec_res_data[(Execution Result)]
    exec_res_data --> post_node[Node.post()]
    prep_res_data --> post_node
    shared_in --> post_node
    post_node --> shared_out[(Shared Store)]
    post_node --> action_out[Action]

This diagram shows the flow within a single Node and how it interacts with the Shared Store and returns an Action to decide the next step.

Flows: The Recipe Steps#

A Flow is like the full recipe. It defines the sequence of Nodes and tells Pocket Flow which Node to run after the current one finishes.

You connect Nodes using a simple >> syntax for the default path, or - "action_name" >> for branching based on what a Node returns from its post method. In our tutorial generation, it's mostly a straightforward sequence.

# Example connection
node_a >> node_b
node_b >> node_c

# Create the flow starting point
my_flow = Flow(start=node_a)

When you run my_flow.run(shared), Pocket Flow starts node_a, waits for it to finish and decide the next action, then runs that next Node, and so on, until there are no more connected Nodes.

Shared Store: The Central Whiteboard#

The Shared Store is a central place (like a Python dictionary in memory) where all Nodes can access and modify data. This is how information flows between the steps. The prep phase reads from it, and the post phase writes results back to it.

# Example using shared store
class NodeA(Node):
    def post(self, shared, prep_res, exec_res):
        # Put some data into the shared store
        shared["step_a_result"] = "Data from A"

class NodeB(Node):
    def prep(self, shared):
        # Read data from the shared store
        data_from_a = shared["step_a_result"]
        return data_from_a # Pass to exec

# ... connect NodeA >> NodeB and run flow ...

This central shared store makes it easy for any Node in the workflow to get the information it needs from previous steps without complex direct connections between Nodes.

Our Project's Workflow#

In our tutorial generation project, the generate Azure Function (from Chapter 2) is where a Pocket Flow runs. This Flow orchestrates all the steps needed to turn a GitHub repository or local code into a tutorial.

The main Flow is defined in function_app/flow.py. Let's look at it:

# function_app/flow.py (Simplified)
from pocketflow import Flow
# Import all node classes from nodes.py
from .nodes import ( # Note the '.' for relative import
    FetchRepo,
    IdentifyAbstractions,
    AnalyzeRelationships,
    OrderChapters,
    WriteChapters,
    CombineTutorial
)

def create_tutorial_flow():
    """Creates and returns the codebase tutorial generation flow."""

    # Instantiate nodes
    fetch_repo = FetchRepo()
    # Nodes that interact with LLM often have retries set
    identify_abstractions = IdentifyAbstractions(max_retries=5, wait=20)
    analyze_relationships = AnalyzeRelationships(max_retries=5, wait=20)
    order_chapters = OrderChapters(max_retries=5, wait=20)
    # WriteChapters is a BatchNode (explained below)
    write_chapters = WriteChapters(max_retries=5, wait=20)
    combine_tutorial = CombineTutorial()

    # Connect nodes in sequence
    fetch_repo >> identify_abstractions
    identify_abstractions >> analyze_relationships
    analyze_relationships >> order_chapters
    order_chapters >> write_chapters
    write_chapters >> combine_tutorial

    # Create the flow starting with FetchRepo
    tutorial_flow = Flow(start=fetch_repo)

    return tutorial_flow

This code defines our "recipe" for tutorial generation. It creates instances of each "Task Box" (Node) and connects them one after another using the >> operator. The whole process starts with the fetch_repo node.

Here's a diagram of this flow:

flowchart LR
    A[Fetch Repo] --> B[Identify Abstractions]
    B --> C[Analyze Relationships]
    C --> D[Order Chapters]
    D --> E[Write Chapters]
    E --> F[Combine Tutorial]

This visualizes the sequence: First fetch, then identify, then analyze, then order, then write, and finally combine and save the output.

The Nodes in Detail#

Let's look at simplified examples of what each of these Nodes does (you can see the full code in function_app/nodes.py):

  • FetchRepo(Node):

    • Job: Get the code files from GitHub or a local directory.
    • prep: Reads the repo URL, GitHub token (if any), and file patterns from shared.
    • exec: Calls utility functions (crawl_github_files or crawl_local_files) to actually fetch the files. Returns a list of file paths and content.
    • post: Stores the list of files in shared["files"].
    # function_app/nodes.py (Simplified FetchRepo)
    from pocketflow import Node
    # ... other imports ...
    
    class FetchRepo(Node):
        def prep(self, shared):
            # Reads repo_url, github_token, patterns, etc. from shared
            return {"repo_url": shared["repo_url"], "token": shared.get("github_token")} # Simplified
    
        def exec(self, prep_res):
            print(f"Crawling repository: {prep_res['repo_url']}...")
            # Calls actual fetching logic (omitted)
            result = {"files": {"path/to/file.py": "print('hello')"}, "info": {}} # Dummy result
            files_list = list(result.get("files", {}).items())
            print(f"Fetched {len(files_list)} files.")
            return files_list # List of (path, content)
    
        def post(self, shared, prep_res, exec_res):
            shared["files"] = exec_res # Store list of files
            # returns "default" implicitly
    
  • IdentifyAbstractions(Node):

    • Job: Analyze the fetched code to find the main concepts or "abstractions" (like classes, modules, or key logic areas).
    • prep: Reads the files list from shared and formats the code content into a prompt for the AI.
    • exec: Calls the AI (call_llm) with a prompt asking it to identify abstractions, names, descriptions, and relevant files (by index). Returns the AI's parsed response (a list of dictionaries). This node has max_retries=5, wait=20 because AI calls can be flaky.
    • post: Stores the list of identified abstractions in shared["abstractions"].
    # function_app/nodes.py (Simplified IdentifyAbstractions)
    from pocketflow import Node
    from utils.call_llm import call_llm # Need call_llm
    
    class IdentifyAbstractions(Node):
        def prep(self, shared):
            files_data = shared["files"]
            # Formats files_data into a prompt string (omitted)
            context_prompt = "...formatted file content..."
            return context_prompt # Pass prompt to exec
    
        def exec(self, context_prompt):
            print("Identifying abstractions using LLM...")
            # Calls AI (omitted)
            # Returns parsed AI response, e.g., [{"name": "...", "description": "...", "files": [...]}]
            return [{"name": "Core Concept", "description": "...", "files": [0, 1]}] # Dummy result
    
        def post(self, shared, prep_res, exec_res):
            shared["abstractions"] = exec_res # Store list of abstractions
    
  • AnalyzeRelationships(Node):

    • Job: Figure out how the identified abstractions relate to each other.
    • prep: Reads abstractions and files from shared, formats them into a prompt for the AI.
    • exec: Calls the AI (call_llm) with a prompt asking for a project summary and a list of relationships between abstractions. Returns the parsed AI response. This node also has retries.
    • post: Stores the summary and relationship details in shared["relationships"].
    # function_app/nodes.py (Simplified AnalyzeRelationships)
    from pocketflow import Node
    from utils.call_llm import call_llm
    
    class AnalyzeRelationships(Node):
        def prep(self, shared):
            abstractions = shared["abstractions"]
            # Formats abstractions and files into a prompt string (omitted)
            context_prompt = "...formatted abstractions and relevant files..."
            return context_prompt
    
        def exec(self, context_prompt):
            print("Analyzing relationships using LLM...")
            # Calls AI (omitted)
            # Returns parsed AI response, e.g., {"summary": "...", "details": [...]}}
            return {"summary": "...", "details": [{"from": 0, "to": 1, "label": "uses"}]} # Dummy result
    
        def post(self, shared, prep_res, exec_res):
            shared["relationships"] = exec_res # Store summary/relationships
    
  • OrderChapters(Node):

    • Job: Determine the best order to present the identified abstractions as tutorial chapters.
    • prep: Reads abstractions and relationships from shared, formats them into a prompt for the AI.
    • exec: Calls the AI (call_llm) with a prompt asking for an ordered list of abstraction indices. Returns the parsed list of indices. This node has retries.
    • post: Stores the ordered list of indices in shared["chapter_order"].
    # function_app/nodes.py (Simplified OrderChapters)
    from pocketflow import Node
    from utils.call_llm import call_llm
    
    class OrderChapters(Node):
        def prep(self, shared):
            abstractions = shared["abstractions"]
            relationships = shared["relationships"]
            # Formats abstractions and relationships into a prompt string (omitted)
            context_prompt = "...formatted data..."
            return context_prompt
    
        def exec(self, context_prompt):
            print("Determining chapter order using LLM...")
            # Calls AI (omitted)
            # Returns parsed AI response, e.g., [2, 0, 1]
            return [0, 1] # Dummy result (indices of abstractions)
    
        def post(self, shared, prep_res, exec_res):
            shared["chapter_order"] = exec_res # Store ordered indices
    
  • WriteChapters(BatchNode):

    • Job: Write the content for each tutorial chapter based on the ordered abstractions and relevant code.
    • This is a special type of Node called a BatchNode. Instead of exec running once, exec runs multiple times, once for each item returned by prep. This is perfect for tasks that need to process a list of things independently, like writing multiple chapters.
    • prep: Reads chapter_order, abstractions, and files from shared. It prepares a list of data packets, one for each chapter to be written.
    • exec(item): This is called for each chapter data packet prepared by prep. It takes that chapter's data (abstraction details, relevant code) and formats a detailed prompt for the AI to write a single chapter's Markdown content. It calls the AI (call_llm) and returns the generated Markdown string. It also keeps track of previously written chapters to include their summaries in the prompt context.
    • post(shared, prep_res, exec_res_list): This is called after exec has finished for all items in the batch. It receives a list of all the results from the exec calls (the Markdown content for every chapter). It stores this list in shared["chapters"].
    # function_app/nodes.py (Simplified WriteChapters)
    from pocketflow import BatchNode # Note BatchNode
    from utils.call_llm import call_llm
    
    class WriteChapters(BatchNode): # This is a BatchNode
        def prep(self, shared):
            chapter_order = shared["chapter_order"]
            abstractions = shared["abstractions"]
            files_data = shared["files"]
            # Prepares a LIST of chapter-specific data items (omitted)
            items_to_process = [{"chapter_num": 1, "...": "..."}, {"chapter_num": 2, "...": "..."}, ...] # Dummy list
            print(f"Preparing to write {len(items_to_process)} chapters...")
            return items_to_process # Returns an iterable list
    
        def exec(self, item): # exec is called for EACH item
            chapter_num = item["chapter_num"]
            abstraction_name = item["abstraction_details"]["name"]
            print(f"Writing chapter {chapter_num} for: {abstraction_name} using LLM...")
            # Formats prompt for THIS chapter (omitted)
            # Calls AI (omitted)
            chapter_content = f"# Chapter {chapter_num}: {abstraction_name}\n..." # Dummy content
            return chapter_content # Returns content for ONE chapter
    
        def post(self, shared, prep_res, exec_res_list): # exec_res_list is ALL results
            shared["chapters"] = exec_res_list # Store list of ALL chapter contents
            print(f"Finished writing {len(exec_res_list)} chapters.")
    
  • CombineTutorial(Node):

    • Job: Take the individual chapter content, combine it with the project summary and relationship diagram, and save the final output.
    • prep: Reads project_name, relationships (for the summary/diagram), chapter_order, abstractions, and chapters content from shared. It formats the index.md file content (including the project summary and a Mermaid diagram of relationships) and creates a list of dictionaries containing each chapter's filename and content.
    • exec: Takes the prepared index content and chapter files list. It then uploads these files to Azure Blob Storage using a helper function (upload_to_blob_storage), structuring them under a path named after the project. It also creates a minimal local info.txt file pointing to the blob storage URLs for convenience during development/debugging. Returns details about the uploaded files (including URLs). If blob storage fails, it falls back to saving files locally.
    • post: Stores the path where files were saved (either local or a reference to blob storage) in shared["final_output_dir"] and information about the blob upload in shared["blob_storage_info"]. This makes the output path available for potential later steps or logging.
    # function_app/nodes.py (Simplified CombineTutorial)
    import os
    from pocketflow import Node
    # Need upload_to_blob_storage helper
    
    class CombineTutorial(Node):
        def prep(self, shared):
            project_name = shared["project_name"]
            relationships_data = shared["relationships"]
            chapters_content = shared["chapters"]
            # Formats index.md and list of chapter files (omitted)
            index_content = f"# Tutorial: {project_name}\n..."
            chapter_files = [{"filename": "01_chap.md", "content": "..."}] # Dummy list
            return {"output_path": f"output/{project_name}", "index_content": index_content, "chapter_files": chapter_files}
    
        def exec(self, prep_res):
            output_path = prep_res["output_path"]
            index_content = prep_res["index_content"]
            chapter_files = prep_res["chapter_files"]
    
            print(f"Combining tutorial and uploading...")
            # Calls upload_to_blob_storage or saves locally (omitted)
            # Returns details of saved/uploaded files
            return {"local_path": output_path, "blob_container": "tutorials", "blob_path": output_path} # Dummy result
    
        def post(self, shared, prep_res, exec_res):
            # exec_res might be a dict (blob info) or string (local path)
            if isinstance(exec_res, dict):
                 shared["final_output_dir"] = exec_res["local_path"]
                 shared["blob_storage_info"] = exec_res # Store blob info
            else:
                 shared["final_output_dir"] = exec_res # Store local path
            print(f"\nTutorial generation complete!")
    

Key Features Used#

  • Task Decomposition: Breaking the large job into smaller Nodes.
  • Workflow: Defining the sequence using Flow and >>.
  • Shared Store: Passing data between Nodes using a central dictionary.
  • Batch Processing: Using BatchNode (WriteChapters) to efficiently process multiple independent items (chapters).
  • Fault Tolerance: Using max_retries and wait on Nodes that call external services (like the LLM) to automatically handle transient errors.

Pocket Flow provides the structure and reliability needed to coordinate these complex operations, ensuring that the tutorial generation pipeline runs step-by-step, handles failures, and correctly passes data from code fetching all the way to saving the final output.

Conclusion#

The Workflow Engine, powered by Pocket Flow, is the brain that coordinates the entire tutorial generation process. By breaking the job into Nodes, defining the execution Flow, and managing data with a Shared Store, it ensures that each step is performed correctly and in the right order, turning raw code into a structured, beginner-friendly tutorial. The use of features like Batch Nodes and retries makes the process efficient and robust.

Now that we understand the overall orchestration, let's look at the first major step in the process: fetching the code itself.

Next Chapter: Code Fetching


Generated by AI Codebase Knowledge Builder. References: 1(https://github.com/hieuminh65/Tutorial-Codebase-Knowledge/blob/be7f595a38221b3dd7b1585dc226e47c815dec6e/.clinerules), 2(https://github.com/hieuminh65/Tutorial-Codebase-Knowledge/blob/be7f595a38221b3dd7b1585dc226e47c815dec6e/.cursorrules), 3(https://github.com/hieuminh65/Tutorial-Codebase-Knowledge/blob/be7f595a38221b3dd7b1585dc226e47c815dec6e/.windsurfrules), 4(https://github.com/hieuminh65/Tutorial-Codebase-Knowledge/blob/be7f595a38221b3dd7b1585dc226e47c815dec6e/flow.py), 5(https://github.com/hieuminh65/Tutorial-Codebase-Knowledge/blob/be7f595a38221b3dd7b1585dc226e47c815dec6e/function_app/flow.py), 6(https://github.com/hieuminh65/Tutorial-Codebase-Knowledge/blob/be7f595a38221b3dd7b1585dc226e47c815dec6e/function_app/nodes.py), 7(https://github.com/hieuminh65/Tutorial-Codebase-Knowledge/blob/be7f595a38221b3dd7b1585dc226e47c815dec6e/nodes.py)