Workflow Context API

At the moment, this API is available in Java only.

A Workflow’s primary function is to orchestrate interactions with other entities.

Workflow Class and Method

A Workflow is implemented as a class in a Grainite application. This user-defined Workflow class must implement an instance method that accepts a WorkflowContext, an input Value argument, and returns a Value result. This method is the entry point to the Workflow - it implements Workflow logic.

// Sample code for defining a Workflow Class and Method

public class BookingFlow {
  ...
  public Value bookHotel(WorkflowContext wctx, Value bookingRequest) {
    String bookingReference
    // Implement code to do the booking
    return Value.of(bookingReference); 
  }
}

When a client starts a workflow, it provides the instance method as an argument, and an instance of the Workflow class is created. Workflow 'state' consists of the execution state of the Workflow method. This execution state is automatically durable, across system restarts. The Workflow logic is allowed to keep anything it needs in local and instance variables (serializability is not a requirement). The Workflow method is given a Workflow Context. This context provides access to Grainite Workflow Orchestration functions for executing Activities, Awaiting, and Sleeping.

Start an Activity

All external interactions must be executed as Activities. Types of Activities in Grainite are:

  • GrainOp - Query/Update/Invoke an action on a grain

  • TopicAppend - Append to a topic

  • Function - Interactions with external data sources

  • Workflow - Start/Query/Signal another Workflow

Invoke an action “getInfo” on a grain (GrainOp)

// create activity spec

GrainOp invokeOp = new GrainOp.Invoke("getInfo", null);
ActivitySpec spec = ctx.activitySpecForGrainOp( ctx.getAppName(), Constants.HOTEL_TABLE, Value.of(hotelId), invokeOp, null);

// start activity

Promise p = ctx.startActivity("get hotel info", spec);

Append an event to a topic (TopicAppend)

// Sample code to append to a topic

ActivitySpec spec = ctx.activitySpecForTopicAppend("someApp", "someTopic", Value.of(key), Value.of(event));
Promise p = ctx.startActivity("append to someTopic", spec);

Interact with an external service (Function)

// workflow method starts the activity

ActivitySpec spec = ctx.activitySpecForFunction(CallExternalService.class, Value.ofObject(serviceUrl), null);
Promise p = ctx.startActivity("call external service", spec);
...

// this class implements the activity.

public class CallExternalService implements FunctionActivity {
  @Override
  public Value perform(FunctionActivityContext ctx, Value input) {
    HttpClient client = HttpClient.newHttpClient();
    HttpRequest request = HttpRequest.newBuilder()
          .uri(URI.create(serviceUrl))
          .build();
    HttpResponse<String> response =
          client.send(request, BodyHandlers.ofString());

    if (response.statusCode() != 200) {
      throw new RuntimeException("http status: " + response.statusCode());
    }
    return Value.of(response.body();
  }
}

Start another Workflow from an Activity

// Sample to start another Workflow from an Activity

ActivitySpec spec = ctx.activitySpecForWorkflow("concierge", "BookingFlow", "bookHotel", Value.of(bookingRequest));
Promise p = ctx.startActivity("start booking workflow", spec);

Invoke a signal method on a Workflow from an Activity

// Sample to invoke a Signal method on a Workflow 

ctx.activitySpecForSignalWorkflow(workflowId, "concierge", "BookingFlow", "cancelBooking", Value.of(bookingReference));
Promise p = ctx.startActivity("cancel booking", spec);

Configuring Retries

Grain and Function Activities can be configured for automatic retries upon failure. A retry policy can specify a retry delay and maximum retry duration. If the activity continues to fail after retries, the Activity (Promise) is marked as failed.

// define an activity spec with a retry policy with 5 sec delay between
// retries, and a max retry duration of 60 secs  

RetryPolicy policy = RetryPolicy.newPolicy(Duration.ofSeconds(5), Duration.ofSeconds(60));
ActivitySpec spec = ctx.activitySpecForFunction(CallExternalService.class, Value.ofObject(serviceUrl), policy);

....

// this class implements the activity.
public class CallExternalService implements FunctionActivity {
  @Override
  public Value perform(FunctionActivityContext ctx, Value input) {
    ...
    HttpResponse<String> response =
          client.send(request, BodyHandlers.ofString());
    if (isRetryableHttpStatus(response.statusCode())) {
	throw new RetryableFailureException("http status: " +  response.statusCode());
    }
    else if (response.statusCode() != 200) {
      throw new RuntimeException("http status: " + response.statusCode());
    }
    return Value.of(response.body();
  }

}

Awaiting activity completion

Workflow can use the await method to wait for previously-started activity (or activities) to complete. The await method can simultaneously wait for

  • Promises (previously-started activities to complete with success or failure)

  • Condition expression (to become true)

  • Maximum duration

Wait for multiple Promises to complete

//Sample code to wait for multiple Promises to complete

ArrayList<Promise> promises;
// start activities, insert in list above.
...
boolean allPromisesAreComplete = ctx.awaitAll("awaiting all promises", promises, ()->someConditionCheck(), Duration.ofSeconds(60));

Wait for any of the multiple Promises to complete

// Sample code to wait for multiple Promises to complete

ArrayList<Promise> promises;
// start activities, insert in list above.
...
Promise completedPromise = ctx.awaitAny("awaiting any promise", promises, null, null);

Check the status of a Promise

ArrayList<Promise> promises;
// start activities, insert promise in list above.
...
ctx.awaitAll("awaiting all promises", promises, ()->someConditionCheck(), Duration.ofSeconds(60));
while (promises.hasNext()) {
  Promise promise = promises.next();
  if (promise.isSuccess()) {
    Value result = promise.getResult();
  } else if (promise.isFailure()) {
    String failure = promise.getFailure();
  } else {
    // activity is not complete
  }
}

Waiting for a condition

// wait for booking cancellation within cancellableDuration.
ctx.await("allow user time to cancel", () -> isBookingCancelled(), cancellableDuration);
if (isBookingCancelled()) {
  // do cancel-booking flow
} else {
  // do finalize-booking flow
}

Wait for the Maximum duration

//  Sleep for a duration using await without promises or watch condition

ctx.await("sleeping for a bit", null, null, Duration.ofHours(1));

Logging

Each Workflow instance maintains a log of its progress. Activity starts, awaits, and activity completions are automatically logged by the system. However, Workflow logic can also append to this log, to capture more detail.

// Sample code for logging

ctx.log("booking cancelled: " + bookingReference);
...
ctx.logStackTrace("http call exception", exception);

Query Methods

Query methods allow clients to get information about a Workflow state and are implemented by the developer. Queries are instance methods, therefore have access to instance vars. Any Workflow state which can be queried should be kept in instance vars. Queries are strictly read-only methods and are not allowed to use Workflow Context methods.

// Sample Query Methods

public Value getBookingStatus(Value arg) {
  // return current BookingStatus
  return Value.ofObject(bookingStatus);
}

Signal Methods

Signal methods allow clients to change the execution of a Workflow by modifying Workflow execution state. The Workflow developer decides what signals they want to implement (if any). Signals are instance methods, therefore have access to instance vars. Any Workflow state which needs to be updatable should be kept in instance vars. Signals can use all Workflow Context methods except awaits.

// Sample Signal Method

public class BookingFlow {
  boolean bookingCancelled = false;

  // workflow method
  public Value bookHotel(WorkflowContext wctx, Value bookingRequest) {
    // ...
    ctx.await("allow user time to cancel", () -> isBookingCancelled(), cancellableDuration);
    if (isBookingCancelled()) {
      // do cancel-booking flow
    } else {
      // do finalize-booking flow
    }
  }

  // signal method
  public Value cancelBooking(Value arg) {
    // Cancel the booking by setting an instance var.
    // The workflow method is in an await with a watch condition on this var.
    bookingCancelled = true;
    return null;
  }
} // class

Things to avoid

Workflow execution state is made durable not by saving the pojo state, but by saving a log of events that drive the Workflow. Under the covers, the Workflow method is replayed multiple times against the event-log, to resurrect the Workflow execution state.

The replay approach requires deterministic execution of the workflow method. That is, all the function calls that return data to the Workflow logic, must always produce the same data, given the same inputs.

The above requirement (and others) place the following constraints on workflow code:

  • Do not call non-deterministic functions such as random number/UUID generation, or current time. Instead, use deterministic replacements on WorkflowContext: currentTimeMillis() , newRandom() , randomUUID()

  • Do not use Thread.sleep as this blocks thread execution. Instead, use WorkflowContext.await()

  • Do not create threads, since this will result in non-deterministic code path execution.

  • Do not throw or catch java.lang.Error, since the workflow framework internally uses subclasses of Error to implement 'yield' semantics (code is free to throw/catch Exception)

  • Do not use 'finally' clause since this might intercept Errors.

  • Do not use non-final static variables.

Last updated