Jobs API (.jobs)#

Estimated reading time: 13’

What you’ll learn#

This guide’s objective is to help you understand how to use the jobs API to execute your code remotely in a non-blocking way and even parallelize your execution.

Introduction#

PySyft allows data scientists to conduct remote code execution on the data owners’ server. Each time code is executed, a job is created to handle the execution process. This job enables users to monitor its progress and retrieve the results upon completion.

What is a “job”?#

A “job” represents a code request submitted for asynchronous execution. In other words, is is processed in a non-blocking manner, allowing the user to continue their work in the notebook while the server processes the request, without having to wait for it to complete.

This is particularly useful for running requests in parallel, to handle large amounts of data. However, since jobs are crucial for remote code execution, they are leveraged by PySyft for other purposes aswell.

Understanding Jobs#

When a job is submitted to a server, it follows these stages:

  1. Job Submission: The client submits a code request to the server. This does not immediately start the job, but specifies which code should be run. Whenever a user attempts to run the code, either on mock or private data, they can choose to run it in a non-blocking manner, which submits a new job for execution.

  2. Job Queuing: Upon submission, the job is placed in a queue. This ensures that jobs are managed in an orderly manner and allows the server to handle multiple jobs efficiently. To make sure jobs are run in a timely manner, the data owner can choose to scale the number of workers available.

  3. Job Execution: The server picks up jobs from the queue in order. The job is then executed asynchronously, allowing the server to manage its resources efficiently and simultaneously handle other incoming jobs.

  4. Job Monitoring: During execution, the server keeps track of the job’s progress. This might involve updating the job’s status and providing intermediate results, if applicable.

  5. Job Completion: Once the job is completed, the server updates its status to indicate its completion. The job’s results are then made available to the user’s client.

Parallelization using jobs#

You can create more complex analyses by scheduling jobs to run the same code multiple times in parallel. This is achieved through nested jobs and code requests, which can emulate a map-reduce pipeline or other methods you prefer. An example on how to do this is available here, with more guides coming soon.

Example of a job’s lifecycle#

Now, let’s experiment with each of the stages a job goes through to understand how to use them appropriately.

Experimenting with jobs in a local dev server

It’s great to experiment with jobs locally to learn how to use them. However, please note that a default local development server will not be able to execute jobs if you do not pass at least one job consumer (n_consumers=1) and create a job producer(create_producer=True). You can read more about this in the local deployment guide.

Let’s start by launching a demo setup to allow us to create a dummy example.

Hide code cell source
import syft as sy
import pandas as pd

node = sy.orchestra.launch(name="demo_datasite", port="auto", dev_mode=False, reset=True, n_consumers=1, create_producer=True)

admin_client = sy.login(
    url='localhost',
    port=node.port,
    email="[email protected]",
    password="changethis",
)

df = pd.DataFrame({'A': [1, 2, 3], 'B': [10, 20, 30]})
mock_df = pd.DataFrame({'A': [1, 2, 1], 'B': [20, 10, 20]})

main_contributor = sy.Contributor(
    name='John Doe',
    role='Uploader',
    email='[email protected]'
)

asset = sy.Asset(
    name='demo_asset',
    data=df,
    mock=mock_df,
    contributors=[main_contributor]
)

dataset = sy.Dataset(
    name='Demo Dataset',
    description='Demo Dataset',
    asset_list=[asset],
    contributors=[main_contributor]
)

admin_client.upload_dataset(dataset)
admin_client.settings.allow_guest_signup(enable=True)

admin_client.register(
    email='[email protected]',
    name='Data Scientist',
    password='123',
    password_verify='123'
)

admin_client.users[-1].allow_mock_execution()
Hide code cell output
Starting demo_datasite server on 0.0.0.0:53566
INFO:     Started server process [19618]
INFO:     Waiting for application startup.
INFO:     Application startup complete.
INFO:     Uvicorn running on http://0.0.0.0:53566 (Press CTRL+C to quit)
Waiting for server to start Done.
SyftInfo:
You have launched a development server at http://0.0.0.0:53566.It is intended only for local use.

Logged into <demo_datasite: High side Datasite> as <[email protected]>
SyftWarning:
You are using a default password. Please change the password using `[your_client].account.set_password([new_password])`.

Uploading: demo_asset: 100%|██████████| 1/1 [00:00<00:00,  4.21it/s]
SyftSuccess:
User details successfully updated.

First, a Data Scientist will connect to the domain and fetch a reference to the available data.

import syft as sy

ds_client = sy.login(url='localhost', port=node.port, email='[email protected]', password='123')

data_asset = ds_client.datasets[0].assets[0]
mock_asset = ds_client.datasets[0].assets[0].mock
Logged into <demo_datasite: High side Datasite> as <[email protected]>

Submitting and executing a job#

Once someone specifies the code they want to run, they are ready to initiate the execution of a job. The main mechanism to specify the code is via syft functions.

We define a dummy syft function below to specify the code.

@sy.syft_function_single_use(data=data_asset)
def example_function(data):
    print('Started execution..')
    result = data.sum()
    print('Finalized execution..')
    return result

ds_client.code.request_code_execution(example_function)
SyftSuccess:
Syft function 'example_function' successfully created. To add a code request, please create a project using `project = syft.Project(...)`, then use command `project.create_code_request`.

Request

Id: 4d4d9ca9e41a4e2b8a6a93f67627d6d8

Request time: 2024-10-01 18:56:55

Status: RequestStatus.PENDING

Requested on: Demo_datasite of type Datasite

Requested by: Data Scientist ([email protected])

Changes: Request to change example_function (Pool Id: default-pool) to permission RequestStatus.APPROVED. No nested requests.

Note that no job was created up to this point. Let’s initiate two jobs to run remotely on the server:

  • the first one runs on mock data

  • the second one runs on private data

Permissions: who can run jobs on mock data?

Executing a job on mock data is possible only if the appropriate permissions were granted. In particular, the admin must explictly allow a data scientist to execute experiments against mock datasets on the resources of their server. You can check, in the setup above, how to do that or ask the admin of your server for these permissions.

We use the code API to identify the code we want to run.

ds_client.code

UserCode List

Total: 0

ds_client.code.example_function(data=mock_asset, blocking=False)
demo_datasite/jobs/
JOB
example_function
#94bce081aad0429d8dd1c0e2dd8f441e
UserCode: example_function
Status: Created
Started At: 2024-10-01 18:56:55.88929 by Data Scientist [email protected]
Updated At: --
Subjobs: 0
syft.service.action.action_data_empty.ObjectNotReady
        Message
    

Similarily, we could launch a job to run on private data. However, we won’t be able to, unless our request was approved, so let’s have the admin do that first and then we’ll initiate the execution using the client.code API.

admin_client.requests[-1].approve()
Approving request on change example_function for datasite demo_datasite
SyftSuccess:
Request 4d4d9ca9e41a4e2b8a6a93f67627d6d8 changes applied

ds_client.code.example_function(data=data_asset, blocking=False)
demo_datasite/jobs/
JOB
example_function
#6cc340bc281c4d0db023800b311c9611
UserCode: example_function
Status: Created
Started At: 2024-10-01 18:56:56.59065 by Data Scientist [email protected]
Updated At: --
Subjobs: 0
syft.service.action.action_data_empty.ObjectNotReady
        Message
    

We can now see all the submitted jobs, and their status, by accessing:

ds_client.jobs

Job List

Total: 0

Permissions: who can see the jobs?

Intuitively, a data scientist can only monitor the jobs they launched. However, an admin can see all the jobs launched or running on their server.

Monitoring the progress of a job#

Status#

We can see a job’s status in the table above via client.jobs. It is good to know what statuses we should be expecting:

  • JobStatus.CREATED: the job was submitted into the server’s queue; there’s nothing you can do to speed things up, but wait patiently. If your job stays in the queue for an unreasonable time, please contact the admin.

  • JobStatus.PROCESSING: the job is being executed; during this time, you can inspect the logs of your job, but note that logs for jobs running on private data are not immediately available

  • JobStatus.ERRORED: the job failed its execution and a stack trace is expected to show up in the logs

  • JobStatus.COMPLETED: the job completed its execution and you can retrieve the result

  • JobStatus.TERMINATING: the job is being spun off and the state is being cleared; this occurs only when the user kills the job or when the worker running it is forcibly shut down

  • JobStatus.INTERRUPTED: the job was interrupted; this also occurs due to a user’s action

Further guidance is given in the last section below for when your job is hanging or got intrerrupted.

Logs#

Setting up logging for a job is done by simply adding print statements within your code, as the stdout and stderr are both redirected to the logs stash.

Let’s fetch a job from the job API:

job = ds_client.jobs[0]
job
demo_datasite/jobs/
JOB
example_function
#94bce081aad0429d8dd1c0e2dd8f441e
UserCode: example_function
Status: Processing
Started At: 2024-10-01 18:56:55.88929 by Data Scientist [email protected]
Updated At: 2024-10-01 1
Worker Pool: default-pool-1 on worker
#default-pool
Subjobs: 0
syft.service.action.action_data_empty.ObjectNotReady
        Message
    

We can see its status, when it was started, when it was most recently updated, what worker pool it was scheduled on, as well as the results and logs section.

If the result is missing, it is common to see syft.service.action.action_data_empty.ObjectNotReady. Otherwise, your result has already been populated. To see the logs, you can run:

job.wait()

Job Results#

Results of jobs in progress#

If you are impatient to see the result, you can block your client until the result is available by using the .wait() method.

This is can be done with:

ds_client.jobs[0].wait().get()
01/10/24 21:57:01 FUNCTION LOG (94bce081aad0429d8dd1c0e2dd8f441e): Started execution..
01/10/24 21:57:01 FUNCTION LOG (94bce081aad0429d8dd1c0e2dd8f441e): Finalized execution..
A     4
B    50
Name: 0, dtype: int64

Results for completed jobs#

If your job was completed, the result can be easily fetched with:

# Mock execution 
ds_client.jobs[0].result.get()
A     4
B    50
Name: 0, dtype: int64
# Private, yet approved job
ds_client.jobs[1].result.get()
class ObjectNotReady:
  id: str = 9582fde2c88a433f9f92f30bea0fd2c6

If the job is completed, this will return immediately!

Debugging a failed job#

Sometimes things don’t run as you would expect. Let’s look at three situations:

  1. My job is not getting picked up from the queue (CREATED)

  2. My job takes too long to run (PROCESSING)

  3. My job fails (ERRORED)

Situation 1: My job is not getting picked up (CREATED)#

If you notice that your job stays in a CREATED state for a very long time, it is possible the queue is quite big and the server cannot handle running too many jobs in parallel. As a data scientist you cannot do much to debug the problem, so please reach out to the admin.

Situation 2: My job takes too long to run (PROCESSING)#

It is possible that your code simply takes too long to run. Various common causes include:

  1. the private data is much larger than the mock version

  2. the private data differs significantly, besides size, making your code inefficient

  3. the hardware you need is not available on the server. For example, training a machine learning model or doing inference might prove impractical on some servers due to hardware limitations.

Let’s see how we can work with this.

@sy.syft_function_single_use(data=data_asset)
def my_func_long_execution(data):
    print('Started execution..')
    result = 0
    for i in range(10000000000):  # this will take a while
        result += i**2
    print('Finalized execution..')
    return result    

ds_client.code.request_code_execution(my_func_long_execution)
SyftSuccess:
Syft function 'my_func_long_execution' successfully created. To add a code request, please create a project using `project = syft.Project(...)`, then use command `project.create_code_request`.

Request

Id: eb7d27421c4a4723b6df24ab6ccfeec7

Request time: 2024-10-01 18:57:02

Status: RequestStatus.PENDING

Requested on: Demo_datasite of type Datasite

Requested by: Data Scientist ([email protected])

Changes: Request to change my_func_long_execution (Pool Id: default-pool) to permission RequestStatus.APPROVED. No nested requests.

For simplicity, let’s execute them on the mock data:

ds_client.code.my_func_long_execution(data=mock_asset, blocking=False)
job = ds_client.jobs[-1]
job
demo_datasite/jobs/
JOB
my_func_long_execution
#07558529879445799f0d6d92f5ed8db4
UserCode: my_func_long_execution
Status: Created
Started At: 2024-10-01 18:57:03.14252 by Data Scientist [email protected]
Updated At: --
Subjobs: 0
syft.service.action.action_data_empty.ObjectNotReady
        Message
    

We can wait for a while, and we see that the job has not yet finalized.

import time as tt
tt.sleep(5) # wait for the job to be picked up from the queue

job.status
<JobStatus.CREATED: 'created'>

We could try to estimate how much it is reasonable for that code to run. It is common that the mock and private data won’t be similar in size and an inefficient implementation might take much longer when approved. However, if it goes beyond your waiting threshold you can kill it via job.kill().

Situation 3: my job fails (ERRORED)#

If your job fails, you might not know where to start. Here we provide the two main reasons as a starting point:

  1. Case 1 - the code does not work: when you prototype your code based on mock data, it is crucial to test it on the mock data least once before submitting it to catch issues early on. However, in some cases, the code that works on mock fails to complete on the private data because of schema or data differences. We encourage inspecting the traceback from the logs as a starting point. If unsuccessfull, it is best to reach out to the admin to identify where the differences might stem from.

  2. Case 2 - the code works, but PySyft failed to execute it: in some rare cases, PySyft’s internal components responsible for executing jobs within their appropriate container might fail. A first step is to restart the job. If the issue persists, you might have discovered a bug and we encourage you to either reach out on the #support channel or file an issue on GitHub.

@sy.syft_function_single_use(data=data_asset)
def my_func_bad_execution(data):
    import time as tt
    print('Started execution..')
    result = data.mysum() # typo, won't run
    print('Finalized execution..')
    return result

ds_client.code.request_code_execution(my_func_bad_execution)
SyftSuccess:
Syft function 'my_func_bad_execution' successfully created. To add a code request, please create a project using `project = syft.Project(...)`, then use command `project.create_code_request`.

Request

Id: 0de4c26e2281495288fe93d0f3ac02a7

Request time: 2024-10-01 18:57:09

Status: RequestStatus.PENDING

Requested on: Demo_datasite of type Datasite

Requested by: Data Scientist ([email protected])

Changes: Request to change my_func_bad_execution (Pool Id: default-pool) to permission RequestStatus.APPROVED. No nested requests.

ds_client.code.my_func_bad_execution(data=mock_asset, blocking=False)
job = ds_client.jobs[-1]
job.wait()
01/10/24 21:57:12 FUNCTION LOG (6cc340bc281c4d0db023800b311c9611): Started execution..
01/10/24 21:57:12 FUNCTION LOG (6cc340bc281c4d0db023800b311c9611): Finalized execution..
01/10/24 21:57:12 FUNCTION LOG (6cc340bc281c4d0db023800b311c9611): Started execution..
01/10/24 21:57:12 FUNCTION LOG (6cc340bc281c4d0db023800b311c9611): Finalized execution..
01/10/24 21:57:22 FUNCTION LOG (07558529879445799f0d6d92f5ed8db4): Started execution..

As you can see, only the admin can access the logs in this configuration. To debug the issue, you can contact the admin, who can check the traceback (if available):

admin_client.jobs[-1].logs()