Thursday, December 06, 2007

State Service Implementation

Zhenhua Guo

Previous posts gave initial design of the system. They mainly focus on high level abstraction. Now, I have nearly implemented the state server and some minor changes of the design have been made. It is time to elaborate the implementation of every component in the system.

Architecture

Figure 1

Client

Clients initiate the job submission. Currently, the Karajan workflow language is supported and it is the only way to represent a workflow. In fact, every user needs an account to submit jobs. However, currently the user management system at the agent server has not been decided. So I use the built-in user management system and authentication mechanism in Apache Tomcat.
Client invokes functions at the agent server by XML-RPC (Jsolait javascript library is used.). At client side, we provide functionality of converting between JSON and XML.
After authentication, what the client sends to the agent is user name and workflow for job submission.
Then the client periodically sends requests to state server to check the state of the workflow. The data is represented in JSON.

Agent
Agent uses Apache XML-RPC toolkit (http://ws.apache.org/xmlrpc/) to build server implementation.
After the user is authenticated, agent should receive job submission request which consists of user name and workflow. Then agent transforms original workflow into an internal format. Echo messages are added to report the progress of execution of the workflow.

For example, if original workflow is:

<project>
<include file="cogkit.xml"/>
<execute executable="/bin/rm" arguments="-f thedate" host="gf1.ucs.indiana.edu" provider="GT2" redirect="false"/>
<execute executable="/bin/date" stdout="thedate" host="gf1.ucs.indiana.edu" provider="GT2" redirect="false"/>
<transfer srchost="gf1.ucs.indiana.edu" srcfile="thedate" desthost="localhost" provider="gridftp"/>
</project>

then the internal workflow is:

<project>
<include file="cogkit.xml"/>
<echo message="/2|job:execute(/bin/rm) started|1"/>
<execute executable="/bin/rm" arguments="-f thedate" host="gf1.ucs.indiana.edu" provider="GT2" redirect="false"/>
<echo message="/2|job:execute(/bin/rm) completed|2"/>
<echo message="/3|job:execute(/bin/date) started|1"/>
<execute executable="/bin/date" stdout="thedate" host="gf1.ucs.indiana.edu" provider="GT2" redirect="false"/>
<echo message="/3|job:execute(/bin/date) completed|2"/>
<echo message="/4|job:transfer started|1"/>
<transfer srchost="gf1.ucs.indiana.edu" srcfile="thedate" desthost="localhost" provider="gridftp"/>
<echo message="/4|job:transfer completed|2"/>
</project>

    The statements written in blue are state reporting messages which are added by agent. The format of message is:
       /3/4/2|job:xxx|1
(“/3/4/2” is path, “job:xxx” is job state description and “1” is status code).
    The first part is path which contains all characters appear before character ‘|’. This part indicates the position of the element reported in the whole workflow. The path is /3/4/2 in the sample which means the element reported currently is the second child of the fourth child of the third child of the root element. The root element is <project>.
    The second part is job state description which contains all characters between the two ‘|’ characters. This reports the state of the element.
    The last part is job state code which appears after the last ‘|’ character. This actually is an integer which marks the state. Currently, 1 means the subtask is started and 2 means the subtask is completed.

A unique id is generated for every workflow of a user. I call it wfid(workflow id). In other words, every workflow is uniquely identified by combination of user name and workflow id.

After the transformation is done, agent sends original workflow to state server and sends transformed workflow to executor. Besides the workflows themselves, agent also sends corresponding wfid to state server and executor.

Note executor and state service are both wrapped as web services.

Executor

This part is wrapped as web service. Executor receives transformed workflow from agent and submits the workflow to underlying grid infrastructure by using Java CoGKit. Currently, I invoke command line tool provided by CoGKit to submit workflows to grid infrastructure. In the future, this may be changed. API of Java CoG may be a better choice.

When the workflow starts to be executed, executor sends message to state server to indicate the starting of execution of workflow.

During the execution, state messages are generated to report progress of execution. Every time a state message is generated, executor sends state message to state server.

When the workflow is completed, executor sends message to state server to indicate the completion of execution of workflow.

State server

This part is wrapped as web service.

During job submission, state server receives the workflow from agent. Then state server builds an element tree based on the workflow. State data is stored in every node of the workflow. Helper function is provided to serialize the element tree into JSON format or XML format (XML format serialization has not been implemented).

During job execution, state server receives state message from executor. Format of state message is defined above. State server updates state of corresponding node in the element tree. There are two kinds of state messages here. One is state messages related to state of whole workflow. The other is state messages related to state of a specific element in the workflow.


Issues:

  1. After a user sends a workflow to agent, what should be returned?

Option 1: unique id of the submitted workflow

The end user can check state of this submitted workflow by using the unique id. However, it is not easy to get the output of the workflow execution. To do this, executor or agent needs to store the result temporarily. Then when end user checks the result, agent can return it.

Option 2: output of the workflow execution

Because end user does not know id of the workflow which was submitted just now, the end user can only get state of all submitted workflows so far. This is current choice.

  1. Reliable messaging

Executor sends state messages to state server. Semantically, order of the state messages should be guaranteed which reflects the true order of execution of subtasks. In current system, this is not guaranteed. I am considering using WS Reliable Messaging to do this.

  1. Guarantee of the order of messages.

In description of state server, we know that state server should handle workflow from agent before it handle state messages from executor. In figure 1, it means step should occur before step . However, this can not be guaranteed now. Because of unpredictable network delay and process/thread scheduling, I have no way to satisfy that requirement without modifying code of state server.

If state server handles state messages from executor before it handles workflow from agent, obviously it will not find the corresponding workflow in its database.

One solution:

State server preserves the state messages from executor. How long should the state messages be preserved? Fixed time? Indefinitely time until state server receives workflow from agent?

No comments: