Workflow orchestration - what goes where?

[ported from Workflow orchestration - what goes where? · sparks-baird/self-driving-lab-demo · Discussion #233 · GitHub]

Let’s take the case of using a Bayesian optimizer with a workflow orchestration package. Assuming Ax as the BO package and Prefect as the workflow orchestration, with the following minimal working examples:

Ax

from ax.service.ax_client import AxClient
from ax.utils.measurement.synthetic_functions import branin


ax_client = AxClient()
ax_client.create_experiment(
    parameters=[
        {"name": "x1", "type": "range", "bounds": [-5.0, 10.0]},
        {"name": "x2", "type": "range", "bounds": [0.0, 10.0]},
    ],
    objective_name="branin",
    minimize=True,
)

for _ in range(15):
    parameters, trial_index = ax_client.get_next_trial()
    results = branin(parameters["x1"], parameters["x2"])
    ax_client.complete_trial(trial_index=trial_index, raw_data=results)


best_parameters, metrics = ax_client.get_best_parameters()

Prefect

import httpx
from prefect import flow, task


@task(retries=2)
def get_repo_info(repo_owner: str, repo_name: str):
    """Get info about a repo - will retry twice after failing"""
    url = f"https://api.github.com/repos/{repo_owner}/{repo_name}"
    api_response = httpx.get(url)
    api_response.raise_for_status()
    repo_info = api_response.json()
    return repo_info


@task
def get_contributors(repo_info: dict):
    """Get contributors for a repo"""
    contributors_url = repo_info["contributors_url"]
    response = httpx.get(contributors_url)
    response.raise_for_status()
    contributors = response.json()
    return contributors


@flow(log_prints=True)
def repo_info(repo_owner: str = "PrefectHQ", repo_name: str = "prefect"):
    """
    Given a GitHub repository, logs the number of stargazers
    and contributors for that repo.
    """
    repo_info = get_repo_info(repo_owner, repo_name)
    print(f"Stars 🌠 : {repo_info['stargazers_count']}")

    contributors = get_contributors(repo_info)
    print(f"Number of contributors 👷: {len(contributors)}")


if __name__ == "__main__":
    repo_info()

and knowing that Prefect has a UI and offers a free-tier Prefect managed server, where does Ax fit within Prefect, and vice-versa? At a higher level, this depends on how a user is going to interact with the system (oversight, maintenance, cooperation, etc.). It is also important to consider more complex cases, such as when asynchronous evaluations come into play, or the Prefect managed server doesn’t have the computational power required to build the BO models.

It gets more complex when we start thinking about how hardware/software communication and data management plays into this (e.g., MQTT and MongoDB). For example, the Ax script could be modified to pull all pertinent data from a database before suggesting the next experiment. The hardware/software communication is relatively straightforward; after an experiment has been suggested, the function that runs the experiment is the one that communicates with the hardware.

Ax has its own implementation of interacting with SQL databases, and I struggled with whether to use MongoDB or a SQL solution that would be directly compatible with Ax. Even now, I still think my leaning towards MongoDB is a reasonable decision because of the free-tier and the flexible schema.

Some questions:

  • How does someone ensure that all the desired data is available to the decision-making algorithm, even when there are start/stops?
  • What happens when the BO workflow involves batches and/or asynchrony? What still happens within Ax vs. within Prefect?
  • Would it make sense to use the Prefect task decorator on existing Ax functions (i.e., use decorators functionally via task(some_ax_function))? What happens when this is a class method? (ax_client.get_next_trial, ax_client.complete_trial, ax_client.get_best_parameters) (NOTE: the recommendation seems to be to use a new class with inheritance)
  • What typically happens directly leading up to the start of a campaign and immediately after the end of a campaign for a user? (seems to usually be “OK, the campaign is finished, let’s focus on writing up a paper and submitting to a high-impact journal”)
  • What happens if during the middle of a campaign, a user decides something should be changed? (the action of which also implies it’s not longer “autonomous”). How should this mid-campaign change history be stored? (probably by logging the full states of things back to a database). It may be prudent to periodically use Ax’s built-in SQL storage (separate from and in addition to MongoDB) via a Prefect trigger. A start/stop necessitates something at the beginning of the script that picks up wherever it was left off at in terms of existing data
  • How does a user specify what data to grab and how to store it? (e.g., allowing users to customize a filter statement as well as other db info like collection name and such)

Ax also has a Scheduler API, for which there is some overlap with Prefect (scheduling to external systems, resuming from a database backend). Typically I’ve avoided the Scheduler API because it’s quite a bit more complicated than the Service API.

Guiding principles

  • Modular is better than integrated, usually
    • Favor modular approaches that are individually exceptional over using an integrated approach that is inflexible
  • Forever free-tier services are a big plus
    • Forever free-tiers with generous limits based on open source implementations are preferred, even if the pro versions are more expensive than counterparts
  • Think “cloud experimentation”
    • When making design decisions, act as if this is going to become part of a delocalized global network of SDLs
  • Pure Python is better than mixed
    • By using tools that are based on Python, it becomes easier to understand, customize, and modify the tool (e.g., easy switching between code and UI)

(mildly and somewhat jokingly based on the language style in the Zen of Python)

Aside: consider how/whether Ray would fit into the stack (and also how Python queues with MQTT might help simplify/reduce some of the complexity of running AxClient, i.e., using the queue’s get() method and keeping the trial data handy to then pass to complete_trial)

Checks:

Persistent data/history

Does it allow for persistent storing of workflow data? (E.g., what is the data retention policy. Prefect Cloud is 7 days for flow run history, self-hosted Prefect is linked back to a database specified by the user operated)

With Prefect, I might be able to send history/logs to MongoDB or similar. See logging (Prefect docs), how to stream prefect logs to a file and flow-runs.flow_run_history. Here is a thread about this from Prefect 1 (old API).

Related:

Human-in-the-loop

Does it allow for manual input/interaction from users? See Flows - Prefect Docs for example and https://docs.prefect.io/guides/creating-human-in-the-loop-workflows/. See also this Jan 30, 2024 blog post: Unveiling Interactive Workflows. This is really key for almost fully autonomous workflows, I think

from prefect import flow, get_run_logger, pause_flow_run
from prefect.input import RunInput


class UserNameInput(RunInput):
    name: str


@flow
async def greet_user():
    logger = get_run_logger()

    user_input = await pause_flow_run(
        wait_for_input=UserNameInput
    )

    logger.info(f"Hello, {user_input.name}!")

Pause/resume and cancel/restart

Does it allow for smooth restarts, where it can cache results prior to the failed step, so it doesn’t have to be recalculated? (See Tasks - Prefect Docs)

from datetime import timedelta
from prefect import flow, task
from prefect.tasks import task_input_hash

@task(cache_key_fn=task_input_hash, cache_expiration=timedelta(days=1))
def hello_task(name_input):
    # Doing some work
    print("Saying hello")
    return "hello " + name_input

@flow
def hello_flow(name_input):
    hello_task(name_input)

Avoiding accidental spending

from prefect import task, get_run_logger
import time

@task(timeout_seconds=1)
def show_timeouts():
    logger = get_run_logger()
    logger.info("I will execute")
    time.sleep(5)
    logger.info("I will not execute")

Dependencies

Does it have the ability to specify conditions/dependencies (i.e., when/without needing to pass info back and forth between them)? Tasks - Prefect Docs

@task
def task_1():
    pass

@task
def task_2():
    pass

@flow
def my_flow():
    x = task_1()

    # task 2 will wait for task_1 to complete
    y = task_2(wait_for=[x])

Visualizing workflows

How to handle visualizing dynamic workflows? See visualizing flow structure and mapping (i.e., like a for loop, but where there’s no interaction between iterations, and can be handled by Prefect natively I think)

Asynchrony

Aside: Native support for await and async (see a general explanation in the context of wetlab experiments)

See also Best practices for using async in Prefect 2.0 - Archive - Prefect Community.

Free-tier hosting and long runs

Does it come with free-tier hosting? Does it allow for long-running executions (i.e., like a Bayesian optimization script that can run for weeks).

Integrations with cloud providers

AWS, Google Cloud, etc.

As an aside, it’s nice that Prefect allows you to submit work to certain cloud providers without having to have a separate worker for it. Not exactly sure how this plays with the expiration limits on flow runs. If it shows up as a flow run, then it’s still limited to the 7 or 14 days.

3 Likes

I agree with the use of flexible schema databases like MongoDB and other NoSQL solutions. Research can be seen as a complex dynamic optimization problem, where people tend to make decisions and discover new data in an autoregressive manner. It’s difficult to define a fixed schema at the start, and modifying schemas in SQL is both complex and expensive. Researchers typically want something easy and flexible to use so they don’t spend too much time managing data. If a tool proves too difficult, they often revert to more intuitive options like Excel. In NoSQL databases, adding a field is as simple as updating the current data, though some may worry about data consistency with historical records that lack the new field. This issue can be addressed by storing all historical logs and data files within a data lakehouse. By storing everything, we can retrieve all data, such as temperature logs from instruments. Even if the data isn’t stored directly in the database, it can easily be accessed from log files within the lakehouse.

Thus, not only is a database like MongoDB necessary, but a data lakehouse infrastructure, combining MongoDB with cloud storage (like AWS S3 or a file server), is also required to handle both data and files.

Prefect task now support decorating class method so it is perfectly fine to do this. Link: Write and run tasks - Prefect

1 Like

For central database, I still believe SQL to be a better option. People can push data from distributed database(nosql solution) to sql database (a custom converter need to be build). This approach ensure dynamicity for individual user/lab but also ensure integrity for central DB.

1 Like

We (UBC Hein lab) have the ivoryOS, a plug-n-play orchestration specially designed for SDL workflows.

Instead of decorating the function, ivoryOS would extract the class types and features from your instances automatically (you can have as many as different instances). The idea is to NOT make any modification on the current working code.

class MySDL:

    def task_1(self, arg_1:str, arg_2:float):
        return 1

    def task_2(self):
        pass

In a script with all instances, start the orchestration GUI.

import ivoryos
my_sdl = MySDL()
# another_sdl = MyAnotherSDL()
ivoryos.run()

Other than calling the function, we have a workflow design interface where you can script a workflow using the functions (e.g., task_1, task_2) and fill in either constant or configurable parameters. If choose to input configurable parameters and return numerical value(s), optimization interface will be available to generate a form for all parameters and objective for user to configure.
image

Then, it will generate a campaign ax_client.create_experiment(...) by wrapping the user input, and complete each trial in the back end. You can stop pending tasks anytime during the process, but won’t be able to modify during executions.

parameters, trial_index = ax_client.get_next_trial() 
output = function(**parameters)
ax_client.complete_trial(trial_index=trial_index, raw_data=output)

We have SQLite database for workflow management. However, BO/experimental outputs are saved locally as CSV files

I’m currently working on preparing the complete documentation and will post it here once it’s ready. In the meantime, feel free to reach out if you have any questions—I’m happy to help!