In this chapter we consider the problem of deriving
placement information to schedule a program graph (as defined in
section
) consisting of
services and the sharing of raw and result data. Unlike nearly all of
the studies to be found in the literature (see
chapter
), the aim of this work is not simply to
produce an algorithm to provide the best task-to-processor mapping in
the situation where all the processors can execute
any of the tasks in the user's job. This work differs from that
discussed in section
in that not all
tasks may be executed by any processor. In addition to the case where
the task (or service to be used) may not be available on all
processors (or nodes) in the system, we explicitly consider the case
where the data needed for the services is not available on every node
(and also may not be moved to where the services currently reside).
This chapter describes the model of the placement mechanism developed
for use in our prototype metacomputing framework. This model allows a
static mapping to be made between the services that are required in a
processing request, and the machines that host them. Once the mapping
is made and the system starts execution, the mapping can be
dynamically modified to take into consideration additional system
state information. The approach is a hybrid between static and dynamic
scheduling, which is described more fully in
section
. The implementation of the
metacomputing framework is described in chapter
.
For the sake of self-containment and completeness, we define our usage of some basic terminology. The metacomputing daemon is the software that controls the local resource in the context of the metacomputing environment, granting and denying users access, executing and monitoring jobs that are run on behalf of the users and the system as a whole. We use the terms node and dumb node to refer to a machine in the heterogeneous collection of computers that makes up a metacomputing system. We draw an important distinction between the two terms, however, insomuch as node is used to represent a machine upon which a copy of a daemon is run, and dumb node for the machine on which the daemon cannot run, for a given reason (such as an inability to run a Java Virtual Machine).
We use the term job or task to represent an application in execution, as known by the operating system. A job, in turn, comprises of a set of threads that execute together to form an application. As in [62], the exception to the term job is when a set of processes, which are autonomous as far as the operating system is concerned, are actually part of a single application. In this case, the operating system regards each process as a separate job. An example of this case is the concurrent execution of code on different machines using message passing constructs for communications. Each process is logically separate as far as the machines' operating systems are concerned, but together they comprise a single job. When we describe programs, the terms ``job'', ``process'' and ``task'' are used interchangeably.
When modeling scheduling, it is beneficial to bear in mind a list of desirable attributes of the final model. In this section, the desired attributes and features of our scheduling model are discussed. These attributes of our scheduling model are: distributed scheduling; ability to schedule task graphs; ability to characterise processing nodes; restricted placement of services; heuristic optimal schedules; non-preemptive scheduling; and clustering of services onto nodes.
Some of the features of this model are: it is designed to perform in the absence of complete up-to-date system state information; and it is designed to schedule high-level programs which communicate by the sharing of complete results in producer - multiple-consumer relationships. The attributes and characteristics are explained more fully in the following subsections.
The distributed environment we consider is that of a loose confederation of distributed computers, separated by significant (in terms of network connectivity) distances. Furthermore, we consider the situation in which a user submits a processing request from a client application to its local DISCWorld node. In most cases, we expect the local DISCWorld node to be listening on a different port of the same physical machine, although it is not infeasible for the local DISCWorld node to be a different machine. Thus, the descriptive term local only refers to the proximity of the node to the machine running the client application, not necessarily the same physical machine.
The use of a centralised scheduler requires that all processing
requests are sent to a master node for scheduling. In the case
where the master node has a complete view of the global system state
(see section
), scheduling decisions can
be made to optimise the execution time of the request or the effective
utilizations of distributed resources. When using a centralised
scheduler, each node can be instructed to send regular state updates
to the scheduler. Because the system under consideration exhibits
significant interconnection network distances, wide geographical
separation of resources (see Appendix
for a discussion
of this), and because the system is comprised of a loose confederation
of machines, we are unable to mandate the use of a centralised
scheduler.
Distributed scheduling, then, is another approach to scheduling that
may be taken. In this approach, each node can make individual
decisions on where the processing requests will be executed. It
prevents the need for constant communication with a master node, and
reduces the chance of a central point of failure for the
system. Consequently with nodes making their own decisions of where to
place services, the entropy of the distributed system may be
increased. Of course, in the case where complete system state
information is available, nodes are fully aware of the actual load on
remote servers, and continual load balancing, with a high degree of
accuracy, can be performed. In the environment under consideration, we
are unable to guarantee that each node in the distributed system will
have the same view of the global system state. Each node relies on its
own view of the global system state - each server has its own
view of which nodes are available, what services are hosted by each
node, and what characteristics the other nodes can offer the local
server. This incomplete view of the system is called partial
system state, and is discussed in
section
.
The effects of partial system state are twofold: first, there may be
machines participating in the distributed system which are unknown by
other machines; and secondly, the most up-to-date state information
about a remote machine that is known, may not be available. The
first case is shown in figure
, where
Server 1 knows about the existence of Server 5; Server 2 knows about
Servers 1 and 5; Server 3 knows about Servers 4 and 5; Server 4 knows
about Server 3; and, Server 5 knows about Servers 1 and 3. It can be
seen that Server 2 does not know about Servers 3 and 4. The second
case occurs because machines in the system are not static; the load on
machines, and the data they possess are continually changing. Without
complete system state information, the node that creates execution
schedules must use the most up-to-date information it possesses. If
this information is not completely accurate, then any schedule will
not be optimal, however good the system state information is.

Figure 22: System state information
may propagate throughout the system at different rates. When
up-to-date global system state information is not available, partial
system state information must be used as a basis for
decision-making. For example, Server 1 might know only about services
on Server 5; and Server 2 knows about both Server 1 and Server 5.
Processing requests can be made by both user clients and servers. They
are composed of graphs of high-level tasks, or
services. Services are programs which can be run in the distributed
system, and may be implemented as either platform-independent
byte-code or platform-dependent applications. In addition, they may
either be implemented as parallel or serial program code. The actual
implementation of services is not important to their execution by the
DISCWorld daemon. Services are connected by data dependencies, which
represent the use of data outputs of the producing service as inputs
to the consuming service. Figure
shows
the relationship between a processing request, services, data
dependencies and producer-consumer relationships. We only consider
high-level data dependencies; fine-grained parallelism between
services is not supported, and data is not able to be used until the
producing service has terminated.

Figure 23: Relationship between DISCWorld processing
request, services and data dependencies. A processing request consists
of a number of services interconnected with data dependencies. The
service creating the data is termed the producer, while the
service that is to use the data is termed the consumer. Although
it is not shown in this diagram, multiple services may consume the
data produced by a producer.
Process networks [136, ] are a common method of representing computations, where the inter-task dependencies are explicitly represented by edges between processing nodes. Process networks can specify parallelism between tasks, and can either be cyclic, which allows for the possibility of infinite iterations or acyclic, which do not. The subset of process networks, featuring directed edges and acyclic references are called directed acyclic graphs [8] (DAGs). DAGs are possibly the most common mechanism by which computations are represented in systems that reason about jobs with inter-dependencies. They allow the flow of data and any precedence relations to be explicitly stated.
The name of a given service remains constant across the
DISCWorld. Computations consist of a number of services that are
chained together to create a potentially complicated sequence of
operations. The presence of a loop in a graph implies the modification
of data. DAGs are an ideal notation for describing the flow of
control within a DISCWorld processing request because we restrict data
to be write-once (or single-assignment). After data is created, any
change to the data results on a new derived data item being
created. For display purposes, however, we allow the use of loops
in-conjunction with variable aliases (this is described in
section
).
Unlike the assumption made in [205], we are unable to constrain the DAGs to have a single root node. It is quite possible that there will be many extant data items that are to be used as input to services, which will need to be taken into account when devising an appropriate schedule.
We define a DAG as a tuple G=(V, E) where
is the
set of task nodes (vertices, or services) and v=|V| is the number of
nodes, and E is the set of communication edges, implying a data
dependency between the source and target nodes. e=|E| is the number
of edges. Thus, if each of the services is placed onto a different
node, there may be a maximum of e data transfers in order to execute
the schedule.
In the case under consideration, the presence of two or more outputs coming from a node does not represent a conditional branch, as in [55], but indicates the sharing of the results of the node. In our system, services - represented by nodes - can produce more than one output, each of which is a separate data item and any of which are able to be shared amongst more than one node or may not be used at all. Furthermore, if a service produces more than one output, more than one may be used by any number of services, requiring each to be individually transferred to the remote sever. For example, a service Imagery:GMS_CloudCover, which produces a cloud cover bitmap when supplied with a visual and infra-red GMS5 [133] satellite image of the same date-stamp, also creates a numeric figure corresponding to the percentage of the images. While the second output may not be of interest to the user submitting the processing request, it is still produced, and will, in all likelihood, be cached by the producing server, as it is non-trivial to reproduce.
In order to make sensible decisions on where to place services in order to achieve near-optimal execution time, and resource usage, it is vital that the components of the system are adequately characterised. For the purposes of this study, the relevant components of the system are: Data; Services; Nodes; and Interconnection Networks.
The fundamental assumptions of this scheduling model are: the
existence of a platform-independent global naming strategy; and,
a single assignment restriction for data. We assume that a
mechanism for the platform-independent, canonical naming of data and
services exists, and is adhered to by all participating users, nodes
and services. In this context, the term unique refers to
different data, not copies of the same data. When data is ingested
into the system, or created by a service, it is named according to the
global naming strategy. By restricting data to be single assignment,
we prevent data from being changed. This restriction allows the
simplifying assumption that if two objects have the same global name,
they are the same. The execution of a service results in output data,
which is named in accordance with the global naming strategy. We
assume that the naming strategy is hierarchical, and that output data
is named according to the service and input data used to create
it. The global naming strategy used in the prototype DISCWorld
implementation is discussed in
section
.
The remainder of this section discusses the minimum amount of information that must be held on each of the components in order to properly schedule processing requests. We require that each of the services be characterised, and need to bear in mind the information we will be given about the services may not be static or completely up-to-date. In addition we need the ability to collect the information about the services, data and nodes, and to incorporate that into the scheduling decisions that are to be made by the system.
In our scheduling model, we recognise two different types of data:
raw data, which may be injected into the system by a physical
device or a user; and derived data, which is the output of a
service when executed with either raw or derived data. For the
purposes of scheduling and placement, both are considered
equivalent. We broadly characterise the data as being large, and
expensive to produce; we also consider the case where data belongs
to an organisation and it available for use at a price.
Data is stored on a node, which may be where the data was created, where it is about to be used, or a data repository. There may be multiple copies of the same data at different nodes within the system. Due to the assumed global naming strategy, each copy of the data will have the same name; hence it can be used interchangeably. We recognise two characteristics of data that are important for our scheduling model: size of data and mobility.
Although the size of a data item is fixed and constant independent of
where it is stored, the mobility of the same item may vary between
nodes. For example, a node's administrator may stipulate that the data
they own may be copied or moved at a cost premium. However, all those
nodes that take a copy of the data are not able to pass the copy on
but may still use it as inputs to services. These characteristics of
the DISCWorld are termed policies. Policies are discussed in
section
. When a data item is to be
used as an input to a service, the size of the data and its mobility
will determine whether it is more feasible (in terms of time or
economic units) to copy the data item to where the service presently
exists, or to copy the service to where the data exists.
Services are high-level programs, which may be implemented as parallel or serial code, represented by platform-independent byte code or platform-dependent object code. They may be small applications which perform a general task, such as the cropping of images [129] or more specialised programs such as producing cloud cover classifications from satellite imagery [106]. They may even be larger legacy applications which have been optimised to run on specialised equipment [105, ]. Our scheduling model treats services as black boxes, where their actual implementation is not important for their high-level characterisation.
As mentioned above, producer-consumer relationships between services in a processing request are at a high level. Fine-grained message passing is not supported; only complete outputs of a service are able to be shared. Due to the global naming strategy, equivalent services with the same names can be used interchangeably to perform a known function. The characteristics that we consider important for services are: size of byte code; service run time; service run time variance; and mobility.
In contrast to the characteristics of a data object, there are no guarantees that characteristics of a service will be the same. The run-time information pertains to the instance of the service at a given node only. When a service is considered for transfer to a new node, we assume, for simplicity, that the estimated run-time of the service at the new node will be approximately the same as on the node from which it came. When the service is actually run on a new node, any run-time information from the previous node is discarded.
In order to schedule processing requests in a sensible way, it must be possible to profile the services that comprise the request. Profiling is the measuring of the performance and resource requirements of a service. There are a number of ways in which it is possible to profile services.
One method is by the collection of historical run-time data of a
service. Whenever a service is run on the same machine, the execution
time is recorded and the resources that it consumes are measured. Over
time, the measurements may settle into a steady state, from which
predictions can be made. If the service's execution times do not
settle into a steady state, then the execution time may be best
expressed as either a mathematical distribution or a mean and variance
tuple (see section
). If the
machines that a service may be run on are homogeneous, then the data
collected from each of the machines individually may be collated to
provide more data over which to make predictions (because the data is
more statistically significant); care must be taken that data from
heterogeneous machines, on which resource requirements may vary, are
not combined.
Another method of predicting the run-time of a service is by the inspection of the source code. Such a source code complexity model is a relative measure, giving rise to an estimate of run time. Unfortunately, this is not a useful measure in the event that the service that is being executed is a wrapper for pre-compiled object code, as is usually the case with commercial programs. Because we treat services as 'black boxes', we do not use this method.
A popular method of predicting the run-time of services, used by most queueing systems is requesting that the user specify, as accurately as possible, an upper bound on the run-time of their service, or program. This places the onus of prediction onto the user. A range of incentives are used to encourage the user to accurately predict the run-time, such as allowing shorter jobs to be placed on faster processing queues, or automatically killing jobs that overrun their estimate by a certain amount.
Simulation is another method by which a user may estimate the run time and resource requirements of a program, and pass information onto the queueing system. It may be possible to run the program with a synthetic distribution of input data, generating average run times and resource usages of the synthetic jobs, before executing the real program on a machine that may be expensive, in terms of available CPU time or money.
One of the issues that must be addressed when attempting to profile a service is whether it is possible to parameterise the profile. For example if a service reads a constant amount of data from a given file, it is easy to predict future run time and resource requirements, or if it reads a variable amount of data, it may be possible to parameterise the run time with the amount of data that is to be read. If the data that must be read is dependent on some other condition, for example, it reads from a file until it has read a certain number of sparsely-placed tokens, then it may not be possible to easily parameterise.
One of the main problems with the collection of performance and resource utilizations data from programs is the impact that the collection may have on performance. As with deriving schedules, if it takes longer to collect the performance data and form the profile than it would have simply running the program on any available machine, or if it is only intended that a program be run once only, then it is obviously not worth the effort of profiling.
In the system under consideration, the same processing requests will be repeated infrequently, while the services that comprise the requests are sufficiently general that they can be used quite often, in possibly different contexts. Thus, profiling data, while useful for services, is not anticipated to be very much use for complete processing requests. This approach is in direct opposition to other approaches, such as found in most batch queueing systems and wide-area managers, which lack the ability to inspect the composition of user-submitted jobs.
In our scheduling model, nodes are machines that run a metacomputing daemon. The metacomputing daemon allows the machine to perform scheduling operations and exchange data and services with the wider metacomputing environment. While running the daemon allows services to be run, and data to be created on behalf of the distributed system, the owner does not yield control of the resource. They still retain absolute authority over their resource.
This model assumes that all nodes participating in the distributed environment are trusted. Thus, a processing request arriving from a remote node will be honoured, providing the node has been configured to do so. We identify four characteristics that facilitate scheduling across the distributed system: the current waiting time to start the execution of a new service; the capacity to accept new service requests; the ability to accept new service byte-code; and the access costs for using a node.
The characteristics representing a node's capacity to accept new service requests and to accept new service byte code are similar to the mobility of a service or data. If the node creating a schedule has a choice between placing a service on two or more remote nodes, then the node with the greatest capacity to accept new requests should be chosen. Similarly, if a copy of the service doesn't exist on any of the remote nodes, then the node with the greatest capacity for accepting new service byte codes should be chosen. A node's access cost is only incurred when it is the source of a data or service byte code transfer.
A node's current waiting time to start new services is an estimate based on the historical behaviour of the daemon (which actually runs the services). While the estimate may be not completely up-to-date at the present point in time, it serves to provide remote nodes with an idea of the response time of the remote node. Even though the above characteristics are primarily used to describe a node within the distributed system, a subset of the characteristics may be used to describe a dumb node in the system. We use the current waiting time to start a new service as the minimum characteristic of a node.
We assume that the collection of processors that comprise the distributed system is a point-to-point fully connected set, in that while there may be no direct connection between a two machines, there will be a route between them (perhaps via the Internet or using a specialised broadband network [140]). Traditionally, there are two characteristics that define an interconnection network: bandwidth and latency.
Advertised bandwidth and latency characteristics are stored between pairs of nodes. Because the actual availability of a link, and the achievable bandwidth between two points is very variable [132], we only use bulk characteristics.
Our model differs from those previously discussed in
section
, in that some of the services and
data may be unable to be placed on arbitrary processors. The location
of some services are fixed, most often due to their method of
implementation: they may use a large amount of scratch space; they may
be implemented in a fashion that uses a parallel node; or, they may
require specific hardware that only exists on a subset of the nodes in
the distributed system.
The scheduling model needs to take placement restrictions into account when creating schedules that involve heterogeneous machines in a distributed environment. The characteristics of the system components, listed in the previous subsection must be used to ensure that any placement of services and data is valid.
Our model seeks to minimise the communication volume, while at the same time minimising the parallel cost time across a bounded number of nodes. The parallel cost time is the additional cost (measured in increased processing time on a per-process basis) incurred when unrelated services that are able to be executed in parallel, are scheduled to run on the same node at the same time. Parallel cost time is not incurred when related services are co-located on a node, because the explicit temporal ordering imposed by producer-consumer relationships. While it is well known that scheduling tasks across an unbounded number of nodes in the general case is NP-complete, we restrict the number of nodes that are considered for placement, of either data or services.
We restrict the nodes that are considered for placement by only choosing those that either: already host a copy of the service that is to be used; already host the data that is to be used; or will receive a copy of the data or service in a previous processing step. Thus, we choose only those nodes which have at least partial satisfaction of the requirements for a service to be executed. The use of the system characteristics compliment the selection of viable nodes by further restricting the available set.
Of course, the use of decentralised scheduling and possible partial
system state information may mean that a node is not aware of
all remote nodes that host a service or have the required data. This
is addressed in chapter
, and in particular,
section
, where the mechanism for schedule
execution is discussed.
In minimising the communication volume and parallel cost time, we require that the execution schedule thus created is optimal for the nodes that have been considered for placement. As such, we do not require that the created schedule be optimal for the global system, but instead that it is optimal for a restricted sub-set of considered nodes.
One of the features of our scheduling model is that of the possibility of duplication of services and data. Such duplication may occur on two levels: the services and data that are to be used may be duplicated across nodes in the distributed system; and, multiple instances of the same combination of service(s) may be executed in order to reduce the overall processing request execution time.
At present, we only support the first type of duplication: that of
multiple sources for services and data. Because the creation of
execution schedules can be performed by any node within the system,
and because nodes are under no obligation to cooperate, the same data
may eventually be created on multiple nodes. As discussed
in section
, if a service requires data
that is available from multiple sources, a mechanism exists to choose
the optimal source.
While we fully intend on doing so, at present we do not consider the multiple execution of the same service(s) on different nodes in order to reduce the overall execution time, as in [205]. Such a modification would not be complicated, and its impact on the system would be of interest. Presently, the unit of cost to use the system is time. When users are required to ``pay'' for the resources they consume, the use of multiple resource that may provide a faster request resolution will introduce trade-offs between the time they wish to spend waiting for a request to be resolved, and the amount of resources they wish to be used on the request.
The method by which schedules are created to minimise both
communication volume and parallel cost time is through the clustering
of services, as in [86, , ]. Services that
share a large amount of data are placed onto the same node, or nodes
with high-speed interconnection networks. Clustering is discussed in
section
.
Clustering algorithms group services together onto a single node to reduce communications delays. When the schedules are executed, clustering approaches such as [86] serialized the execution of services which may be run in parallel. Thus, they assume that the processing hardware is only able to execute a single service at a time. While this approach is adequate for systems in which there exists a centralised scheduling mechanism or complete system state information, the system under consideration in this study has neither.
Their approach was thus one of a brute force attack, considering the placement of each service on every node. Through the using restricted placement of services, we constrain the complexity of the search that must be made to achieve an optimal solution to the constrained case that we consider.
In our scheduling model, we assume that tasks are scheduled in a non-preemptive manner. In this case, when a service begins executing with all its required inputs, it will continue executing until it terminates, without interruption. While it is quite probable that the daemon, under which the services will be executed, will execute tasks in a co-scheduled fashion [175], this level of detail is too fine for the high-level schedules considered here.
If preemptive scheduling is allowed, then other threads of control will be executed together with the currently-executing service. The nett effect of the multi-threading will be an increase in expected execution time. In order to keep the scheduling model as simple as possible, yet applicable to the real system, we only assume that a single service is executing at a time. In some ways, the characteristics that we maintain on a service reflect the case in which the service is executed together with others. Multiple services being executed simultaneously will cause an increase in both the mean run time, and variance in the run-time for the services running on that node.
One of the fundamental mechanisms that separates metacomputing environments from most batch queueing systems or simple client-server systems is the ability to specify a number of operations that are to be performed on some data, and have the operations individually scheduled. This can be done without the intervention of the user at every step of the processing, whether that intervention is the initialisation of the next processing step, or the submission of raw or derived data to a new program. We term the specification of operations to be performed a processing request. The mechanism used to express processing requests in the DISCWorld prototype is the Distributed Job Placement Language (DJPL) [107].
This section describes the DJPL, and how it is used to specify services on behalf of the client, whether the client is a human user or another DISCWorld daemon. The DJPL grammar is presented and an example is given of its use.
In the DISCWorld metacomputing environment, processing requests are
expressed in the form of services on data, arranged as a directed
acyclic graph. When a processing request is submitted by a user client
or remote daemon, the services that comprise the request have not been
assigned, or placed onto nodes. We term this an
un-annotated DJPL script. When the services have all been assigned to
nodes, we term the resulting DJPL script, annotated. An
annotated DJPL script may be annotated using the placement model
described in section
and executed by
DISCWorld daemons (as discussed in chapter
).
The DJPL is designed to encapsulate all the information that is necessary for a client's request to be executed by a daemon within the DISCWorld environment. The client can be either a human user or another DISCWorld daemon that is making an execution request. The information that is needed by the DISCWorld daemon in order to execute the request is:
To simplify the task of parsing the DJPL script, it is expressed in
XML [238]. Although transmitting a pre-parsed, object-version
of the processing request would be more efficient, this decision
allows us to use an architecture-independent representation for
requests, and makes the DJPL easy to read by humans. Free XML parsers
for Java are also readily available [124, ]. Figure
shows the XML
document template definition (DTD), which is used to specify the
manner in which the DJPL script is written.

Figure 24: DJPL XML document template definition (DTD)
For the purposes of this thesis, the most important section of the DJPL is the instruction section. In this section, the services that have been used to decompose the processing request are itemised, and the data they consume and produce are named. If any aliases are defined in the alias section, they are directly substituted into the instruction stream at execution time. Loops, iterating over a pre-defined range of values, are allowed in the instruction stream. They provide a short-hand way of repeating the same instructions across a range of values. This is implemented by a temporary alias, which is substituted into the instruction stream. We impose the restriction that there must be no data dependencies between iterations of the loop.
Figure
shows a DJPL script that describes a
processing request. A loop is used to iterate over the day range
Integer:01 to Integer:30. An Imagery:Crop service is used
to crop the visual and infra-red channels of the GMS5 satellite images
corresponding to those days. For each pair of cropped output images,
the service Imagery:GMS_Classify is executed. On the output of
this service, the final service Imagery:Georectify is executed.
For further explanation of this processing request,
see [130].

Figure 25: Example DJPL script before
annotation. Services, and the parameters that are to be used are named
to allow a data flow graph to be constructed. Outputs of each service
are also named to allow re-use.
Expressing a query in using DJPL is similar to, but significantly different from the classad [192] structure as used in Condor and the resource specification language (RSL) [48] used in Globus. Whereas the classad structure and RSL define a set of constraints that the target must satisfy to execute a job, the DJPL is designed as a general structure, from which an execution schedule may be generated. The Helios operating system [185] featured a similar language for shell scripting, which allowed multiple programs to accept inputs from a single producer, and the Computing Center Software project used a resource definition language [22] to represent Transputer configurations for parallel programming.
Defining the language in XML allows it to be extensible. If a daemon receives an XML element that its parser does not understand, it is able to ignore it. We are currently investigating methods of specifying the criticality of XML elements, so that if a parser does not understand a critical element, it will terminate parsing.
We are also investigating methods of encoding exception-handling instructions into the DJPL. If an exception occurs, for example, if a service is unable to be found on any of the known servers, then the user may want to be immediately notified, or may just want the maintainer of their local DISCWorld daemon notified.
We are currently experimenting with annotating the DJPL code in such a way as to instruct it not to further distribute the script if it is unable to provide the service. Alternatively, we could annotate it in such a way as to record all the daemons that have sent it on, thus preventing the case where daemons simply pass the request among themselves, never making forward progress.
The DJPL comprises an important part of the DISCWorld scheduling and placement model. While designed and implemented specifically for use with DISCWorld, the language itself is general enough to be used in other metacomputing or middleware environments.
In the literature, a task is defined as an indivisible unit of computation, and the tasks are convex, which means that once a task starts its execution, it can run to completion without interruption [201]. We modify the definition of a task slightly so that while it is still the unit of computation, it may itself represent a number of tasks that have been grouped together under a single name.
As discussed in section
, there are two main
models for allocating tasks to processors: static and dynamic. Neither
approach is adequate in the case where limited system state, or at
least the possibility that out-of-date global system state, is
available to a scheduler.
When reference is made to two services, they may either be termed related or unrelated. The term related means that one of the services is a consumer of the output data produced by either the other service or a consumer of the service. If two services are related, there is an implicit ordering of execution between them; they are unable to be executed at the same time. The term unrelated means that there is no producer-consumer relationship between the two services, and hence, they may be executed at the same time.
The problem of disseminating global system state information may be approached by defining a strict hierarchy of nodes, or state brokers, from which the most recent global state information may be retrieved. Of course, the state broker for a cluster of nodes will have its own state broker that supplies it with global information on the remainder of the distributed system. There must exist a two-way information flow such that a nodes' broker can be updated with the nodes' information at the same time as the nodes can retrieve information on the remainder of the system.
We feel that this approach to disseminating system state information is inappropriate for our model. This is true for a number of reasons: there is too much system information that must be shared; nodes are truly dynamic in their availability; and, we are unable to define a strict hierarchical relationship that all resource owners are likely to agree upon. For these reasons, we have decided to adopt a system whereby advantage is taken of partial system information, and nodes are updated through the receipt of typical information and control messages.
In our model, the nodes' accept new jobs and accept new
service byte code characteristics are represented by the range
values [0,2]. The value of 0 is used to indicate that the
characteristic is not supported by the node; 2 is used to indicate
full support of the characteristic, with no cost penalty; and, 1 is
used to indicate that the characteristic is supported reluctantly,
with a cost penalty for use. The cross-product of the possible values
for a node's accept new jobs and accept new service
characteristics is shown in
table
. A description and
possible example of each combination is given. Where the willingness
of a node is represented by a value, this does not suggest the use of
probabilities in decision-making. These factors are used in the cost
functions to weight the decisions in order to properly use an
objective function to make an informed decision.

Table 6: Description
and examples of node accept new service byte code and
accept new service request characteristics cross-product
The state of the system is stored on a per-node basis. Each node in the distributed environment collects and uses information as to the state of the rest of the system. The collection of information as to the remainder of the system allows us to make restricted placement decisions of jobs, with more information and more nodes being able to be incorporated as the system becomes aware of other nodes. The collection of system state knowledge alleviates the need for each node to be aware of every other node, and also alleviates the need for designated control nodes or brokers, which other systems feature. This allows for a truly dynamic system to be constructed, where membership in the confederation of cooperating nodes is based on the current state of each node. In other systems, if a node that was designated as a control node suffers a failure, the remainder of the system will be unable to access the characteristics of the nodes that the control node manages. In our truly decentralised system, information is propagated by a protocol, which is outwith the scope of this work.
The variables used in the minimisation function are:
The result of each step in the placement process is a tuple (q,h)
which is unique for a given processing request component q:
. Thus, each step
produces a mapping of a processing request component to a node in the
system. The component is only assigned to a single node within the
system.
The objective of the placement process is the minimisation of the
overall execution time for each component in a given processing
request, q. The minimisation function is
, with
placement being defined as (c,h) = cost(q,H). This function
returns a tuple (c,h) such that c represents the minimum cost for
the component processing step and h is the node chosen to host the
component. The set of nodes over which assignment is deemed
viable is the union of the set of nodes that either host the service
featured in the component, hosts(h,s), or host the data needed by
the component, hosts(h,i). Thus,
.
The cost of placing a processing request component, q, onto a member
of the viable nodes, H is

where
. We define the cost to transfer a service or data between nodes in the
distributed system,
, as
where the cost to
transfer an object is given below.

From the equation above, it can be seen that the cost for initiating data transfer on an object is related to the amount of time that the transfer will take, multiplied by the sum of the object's mobility and the willingness of the destination node to accept new service byte codes. There is no cost associated with the transfer of an object to the same node at which it currently resides. An error condition is raised if the destination node does not accept new service processing requests or if a service is to be transferred and the node does not accept new service byte codes.
Execution schedules are created with a request-centric view of the distributed system, where the scheduling node uses the most up-to-date information that it possesses to generate schedules. In the event that two services are placed onto a node, if the services are unrelated, the execution times of both services are increased by their run time variance; if they are related, they cannot be run together (by definition due to their temporal ordering). We call the increase in time the co-location penalty. It is shown in the equation below.

The cost of a given schedule is only loosely based on the time that it should take to execute. The final cost that the optimisation function reports is generated from estimations of execution time and transfer time for objects, increased by factors commensurate with the source- and destination-nodes' willingness to perform given operations. The sum of the current node's waiting time, the run time of the service, and the co-location factor gives an estimate of the time that the service might take to execute. The node's willingness to accept new service requests is used as a modifier to this estimate; if the node is reluctant to accept new service requests, this translates into a higher cost to execute the service on the chosen node. Therefore, the total cost to execute a service s on a node h is given by the equation below.

The basis of this scheduling model is on clustering. Sarkar [201] also defined clustering as processor assignment in the case of an unbound number of processors on a clique architecture. According to Gerasoulis and Yang's [86] classification, a clustering is called nonlinear if independent tasks are assigned to the same node; linear otherwise. The clustering used in this model is nonlinear.
In order to keep the complexity of the clustering algorithms polynomially bounded, Gerasoulis and Yang only consider those clustering algorithms that do not involve backtracking. In other words, if a cluster is created at step i, then the nodes contained within it are not able to be separated in step j, j > i. We also impose this restriction.
The algorithm that we use is similar to the brand-and-bound algorithm [236] in that the first single complete path is found, and its cost is used as an upper bound on the cost. As the remainder of the combinations for placement are tried, if any of the partial paths have a cost greater or equal to the lowest complete-path cost so far, the search is terminated, as the complete-path cost must be greater than the minimum found so far. In the current prototype there is no practical limit on the number of nodes that are considered when creating complete-path costs. In a production system where there may be a large number of nodes that can be considered to host a service or data, it is anticipated that a heuristic would be used to further restrict the nodes considered for placement.
As seen in section
, the DJPL grammar
contains loop constructs and aliases. We are unable to mandate that
all users must interact with DISCWorld via a graphical interface and
automated request-creation tools. The loop construct and aliases are
provided to reduce the burden when manually creating processing
requests. These constructs exist solely for the benefit of the clients
- they are not used by the tools that assign services to nodes and
execute processing requests. Alias substitution is performed at the
outset of parsing.
The parsing of a DJPL script is recursive. Information present in a given level of nested recursion is available to all lower levels. In fact, if there are any aliases in the DJPL script, they are added as level 0 of the parsing. Instructions are added beginning in level 1.
After any aliases are added to the system, all loops are unraveled. Because we do not allow dependencies between loop iterations, the order of unraveling is not critical. There is no limit as to the number of nested loops. In fact, the current value of the loop control variable is added to the system as a temporary alias; therefore it is undefined outside the scope of the loop.
As discussed in section
, one of the
fundamental features of this system is a global naming scheme. Because
the names of all input and output parameters are explicitly stated in
the DJPL script, it is a simple operation to query a local database of
available data products and services. If all the outputs of a service
are available to the scheduler, the service invocation is replaced
with a reference to the data in the store. Because partial results of
processing requests are considered to be as important to the system as
the final results, if not all of the outputs of a service are
available, the service invocation is not replaced.
Each service that is recognised as being necessary to perform is placed according to the scheduling algorithm. A list of candidate nodes is generated, consisting of those nodes that already host the service, and those that host the data which is to be used as input to the service. The use of such a candidate list of restricted nodes is one method by which the complexity of the scheduling algorithm may be reduced.
For each node in the candidate list, the cost of transferring any necessary data and the service is calculated, using the most up-to-date information to the local node. Each of the data items that is needed for the service, as well as the service code itself is now considered. If the data item or service does not currently reside on the node that is being considered for transfer, then the minimum-cost transfer is determined to copy the data or service byte-code to the machine. The minimum is determined by enumerating over each of the candidate nodes that purports to host the data or service. If the service or data item is already on the node being considered, the incremental cost is zero.
After the cost of transferring any data and the service byte-code to
the chosen node is determined, the incremental cost of this
configuration is updated and if it is greater than a minimum cost for
the complete request, the search is cancelled, and the process is
started with the next node to be considered. Otherwise, the algorithm
recursively places instructions until there are no more instructions. The
pseudo-algorithm is shown in figure
. The nodes
that lead to the lowest overall execution cost for the processing
request are chosen.

Figure 26: Pseudo-algorithm
for determining service and data placement in our scheduling model.
In our model, if the scheduler does not know of any nodes that host a particular service, then it can create the most efficient schedule it can for all services up to the unknown service, and then start the computation executing. The belief is that when the partial schedule arrives at a remote node with the capability to re-create a schedule, or it arrives at a node where the next service is not assigned to a processor, the new schedule is created, if at all possible. When data or services are referenced in a processing request but are unknown to all nodes participating in the request execution. the behaviour of the system is not defined. We expect to introduce exception-handling instructions into the DJPL grammar in the near future.
Although we are exploring other algorithms, the current method for determining the placement of services and data is by searching the whole state space of those nodes that may be considered to host the service. The algorithm was designed to be recursive to allow additional prospective system state information to be passed to each subsequent instruction that is to be tested. Included in the system state information are details about previous placement decisions that have been made, and the locations of any services or data that have been transferred as part of the placement process. This prevents the placement algorithm from having to pay extra to use a data item that was transferred to the local node in a previous iteration. Using a recursive algorithm also allows the information regarding the remote DISCWorld nodes to be prospectively updated in terms of the services that they host and the expected waiting time that any new service will endure if it uses the node.
While loops were added for the benefit of human readers of the DJPL scripts, they are explicitly expanded due to the fact that it may be more economical, in the presence of a large number of iterations, to transfer some of the data to be used away from the remainder of the processing, in order to achieve a lower waiting time for a service on a node.
As the system under consideration is service-oriented, and does not accept arbitrary user code for execution, is possible, in some ways, to abstract away from the node on which a service is executing. This means that it is not necessary for each server to be aware of the relative speeds of all other nodes that participate in the infrastructure, or of their relative memory sizes as in some other systems [95, , ].
We do need to record some physical characteristics to help us make decisions on where to execute services, though, such as the current load on a remote node, perhaps in terms of how much of its queue length is full, or perhaps by providing an estimate on the time that an incoming request is likely to start executing. This allows us to break away from the problem of how to properly define a node, especially in the light of having so many different architectures in the processor pool.
Later research may involve the storage of the input graphs and the comparison of derived placement schedules with new graphs, as is detailed in [212]. Predictions are derived from run-times of previous parallel applications, and are used to refine the predictions of current workloads.
It may be possible to use a critical path method (CPM) together with a form of task duplication, such as is used in [205]. Of course, we would have to relax the constraints that they place on their algorithm, namely that we cannot assume an unbounded number of processors, cannot assume that the program is running in isolation, and the DAG must be able to have more than one root node. In addition, part of the future work that may be performed on the proposed scheduling and placement algorithm, the inclusion of service run-times proportional to the input data, such as detailed in Lee, Yang and Wang [144] will be considered.
This chapter has introduced two important ideas for the adaptive generation and execution of schedules in a metacomputing context. The first is a theoretical model of scheduling in our prototype metacomputing infrastructure. The second is a prototype language with which computations may be structured and specified in a programming-language independent fashion.
We have presented the mechanism by which processing requests within the DISCWorld prototype are represented. The Distributed Job Placement Language (DJPL) is intended to be automatically generated by a graphical user interface client or a DISCWorld daemon. While users will not be prevented from viewing the generated DJPL script, there should be no need for them to do so.
We believe that the proposed DJPL is sufficiently general that it could be implemented and used in other metacomputing and middleware systems for exactly the same purposes. This language, together with a standardised method for the inter-operation of data and services, would allow a number of different environments to properly cooperate.
Designed for use in systems that feature only partial system state
information, the model relies on mechanisms by which information about
services and data may be passed about the system. An implementation of
this mechanism, and of the model, is presented in
chapter
, and performance analysis of the
implementation is provided in chapter
.