Titanium Threads

Andrew Begel
CS267 Final Project
Spring 1998

 

Abstract

    Parallel programming has been a difficult task since the first parallel computing architectures were introduced in the 60s. Many different programming metaphors, from shared memory to message-passing have been designed and implemented, but none have fully satisfied the programming community by their usability and efficiency in various problem domains. Titanium, an explicitly-parallel, SPMD, object-oriented language created at UC Berkeley, aims to give the programmer a simple metaphor in which to work, while providing an efficient runtime implementation. While SPMD programming is easy to understand, it lacks the expressiveness of multi-threading, which allows the programming to separate functionally distinct segments of control flow. This project introduces a multi-threading architecture that can coexist with SPMD programming, discusses the changes to Titanium and the consequences on ensuring correct and deadlock-free code. 

 

Introduction

    Cheap, efficient, parallel computing has been a dream of computer scientists since the early pioneers in the 1960s. While great strides have been made in parallel architecture and hardware design, there hasn't been much progress in software. Parallel languages and toolkits, such as MPI and PVM, either take a low-level, essentially assembly-like approach to programming shared memory and distributed memory machines, or, like NESL and Matlab, try to be very high-level and incorporate implicit parallel constructs.

    The Titanium language tries to take a middle ground between low-level explicit parallelism and high-level language abstractions. It is a cross between Split-C, a SPMD, explicitly-parallel, shared memory, low-level language, and Java, a popular, object-oriented, multi-threaded descendent of C++. The designers of Titanium hope that by taking an explicitly-parallel language and giving it higher-level language constructs, they can achieve high-performance, easy-to-program, parallel computing.

     Explicit parallelism tends to be expressed in two forms, SPMD or multi-threading. SPMD programming has one thread per processing unit, each thread starting at the same time and running the same code. Communication and coordination happens through shared memory abstractions, explicit barriers and synchronization. Multi-threaded programs , on the other hand, have many dynamically created threads, each of this may start at an arbitrary time and may be running different code. Communication between threads happens via shared memory abstractions and explicit synchronization on shared locks.

    Titanium takes the SPMD approach to parallelism. This makes it easy to rewrite vector-parallel programs merely by mapping portions of the data to the processing units available. If you have n units of data and p processing units, each processing unit will then perform n/p of the work. Many programs tend to have a computation-communication two-phase cycle. This allows each processor to do the maximum work possible by itself, and then communicate boundary data to neighboring processors.

    Titanium also takes advantage of object-orientation as a high-level program organization strategy. The designers hope this will encourage code modularity and code reuse between different projects. Since many of the current benchmark projects tend to be quite mathematical, for example, reuse of standard BLAS functions is imperative for building useful libraries for potential users.

    While Titanium is useful for converting sequential and vector code to parallel code, the lack of multi-threading can cause some abstraction problems. Let's say we have a model of the climate over the United States and we want to see how global warming will affect the crops in Nebraska. Various scientific disciplines can provide good models for wind flow, humidity, air chemistry, solar radiation and reflection. Each of these models can be developed independentally and then joined together to create a complete climate simulation.

    In Titanium, there is one dimension of parallelism. Either each piece of the climate model runs on a different processor, or we run each piece on every processor and split the grid across processors. The first option is not ideal since the pieces interact with each other at every grid point. If the pieces are split this way, every computational interaction between the them must involve communication across processors. The second option is better at preserving the locality of the model interactions, but it leads to muddled programs. Since we only have one thread of control, we have to rewrite our entire program so that this single thread of control can call all of the different models in the system. Each time we add a new component, we have to add its code to every control point in the program. If the original control flow developed for each model didn't match each other, the programmer would have to reengineer the combined system from scratch!

    The lack of multi-threading also prevents the programmer from hiding I/O latency. In a large, dense matrix decomposition or matrix-multiply application, a lot of time will be spent in disk I/O, reading and writing temporary matrices to disk. One would like to spawn independent threads to perform this I/O so the main program can perform useful computation at the same time. 

 

Specification

    This project adds a multi-threading extension to Titanium to allow creation and control of multiple SPMD thread groups.  Each SPMD thread group (hereafter called ThreadGroup) consists of Ti.numProcs() threads. All threads in a ThreadGroup are started at the same time and synchronize with only each other when a barrier, broadcast, exchange, reduction or scan is performed. We adopt a modification of the java.lang.Thread API, called ti.lang.ThreadGroup, to support control of ThreadGroups. We also implement and extend the Java framework for method serialization and synchronization. Several mechanisms currently used in Titanium must be modified and extended to support ThreadGroups. These modifications are described below.

    When Titanium starts up, the initial flow of control is initiated within a ThreadGroup. This ThreadGroup is the master ThreadGroup. It may execute sequentially to the end, or it may spawn separate threads of control and optionally rejoin them at any point in its execution. The master ThreadGroup is started by the system by calling the public static single void main(String single []) method. A new ThreadGroup may be created and started the same way a Java thread is created and started, by calling new ThreadGroup(instanceofRunnableClass).start(). One caveat is that the creation of a ThreadGroup is a single operation; all processing units in the current ThreadGroup must create the new ThreadGroup together. There is an implicit barrier at the end of the start() method for ThreadGroup creation.

    public static single void main(String single [] argv) {
        System.out.println("Hello from the main ThreadGroup!");
        ThreadGroup tg = new ThreadGroup(new RunnableClass()).start();
        System.out.println("Now, two ThreadGroups are running!");
        tg.join();
        System.out.println("The new ThreadGroup has terminated. Goodbye.");
    }

    Each ThreadGroup, once spawned, runs independentally from all other ThreadGroups. All barriers, broadcasts, exchanges, reductions and scans take place only among the threads that make up a particular ThreadGroup. ThreadGroups are defined by their Runnable class and time of birth. If two ThreadGroups are spawned on the same Runnable class but at different times, for all intents and purposes, they are two distinct ThreadGroups.

 

ThreadGroup Communication

    Communication between ThreadGroups is an important topic which shall be discussed below. Communication occurs via modification of shared variables. These variables can be static fields, single static fields, local fields accessible via global pointers, or single local fields accessible via single global pointers. A contribution of this work is to implement and extend Java's synchronization framework to support intra-ThreadGroup and inter-ThreadGroup object synchronization. Synchronization is needed to prevent race conditions between threads in a ThreadGroup and between threads in competing ThreadGroups.

    Let's say we have a computation grid that is spread across the processing units of a ThreadGroup. Each grid point contains a data structure that holds our particles. On each computation step, a particle might feel a force due to its neighbors and move across the grid. When a particle moves, it must be removed from the data structure at its origin and placed in the new data structure at its destination. If a particle moves out of a processor's local grid, the processor must place it in another processor's grid point via global pointer. This means we can now have multiple processing units accessing grid data structures at the same time, thus we must synchronize access to them. We can do this by simply adding the synchronized keyword to the method signature of the accessors:

class Grid {

	public synchronized void addFish(Point<2> p, Fish f) {
		int ownerproc = split.pointToProc(p);
		fishlistgrid[ownerproc][p] = fishlistgrid[ownerproc][p].push(f);
	}

	public synchronized void removeFish(Point<2> p, Fish f) {
		int ownerproc = split.pointToProc(p);
		FishList l = fishlistgrid[ownerproc][p];
		fishlistgrid[ownerproc][p] = l.remove(f);
	}
	public single void initialize() {
		fishlistgrid.exchange(new FishList[mygrid]);

		foreach (p within mygrid) {
			fishlistgrid[Ti.thisProc()][p] = FishList.nil;
		}
		Ti.barrier();
	}
	public single Grid(double single cellsize, int single inminx, 
        		   int single inmaxx, int single inminy, int single inmaxy) {
		this.cellsize = cellsize;
		minx = inminx;
		miny = inminy;
		maxx = inmaxx - 1;
		maxy = inmaxy - 1;

		grid = [minx:maxx, miny:maxy];

		numprocs = Ti.numProcs();

		split = new Split(grid, numprocs, Split.DONTCARE);

		mygrid = split.getDomain(Ti.thisProc());

		myminx = mygrid.min()[1]; myminy = mygrid.min()[2];
		mymaxx = mygrid.max()[1]; mymaxy = mygrid.max()[2];

		fishlistgrid = new FishList[allProcs][2d];
		Ti.barrier();
	}

	double single cellsize;
	int single minx, miny;
	int single maxx, maxy;

	int myminx, myminy;
	int mymaxx, mymaxy;

	int numprocs;

	RectDomain<2> grid;
	Split split;
	RectDomain<2> mygrid;
	FishList [1d] single [2d] fishlistgrid;

	RectDomain<1> single allProcs = [0:Ti.numProcs() - 1];

}

    Calling Ti.barrier() or any other single primitive or method from within a synchronized method or block is prohibited. Consider the following case: If one processing unit inside a ThreadGroup were to successfully grab a lock on a shared object and call Ti.barrier(), it would wait until all of the other threads in the ThreadGroup also called Ti.barrier(). But, since this first thread holds the lock, no other thread may enter the synchronized region to call the barrier, thus deadlocking every thread in the ThreadGroup. Therefore, invoking a barrier when inside any globally synchronized context will be illegal. Semantically, synchronization is non-single and therefore, no single methods or primitives make take place while the lock is held.

 

Single Analysis

    After synchronization, the final topic to be discussed is the interaction of single with multiple ThreadGroups. The single modifier is used to identify coherent values of replicated data and is used to ensure that all processors synchronize correctly. The definition of coherent is taken from the Titanium reference manual:

    The formal definition of coherent is in terms of pairs of storage locations: two storage locations a and b, in regions Ra and Rb respectively, and containing values of static type t are consistent if t is not single, or if:

    ThreadGroups act as individual SPMD programs. We can extend the single analysis to start at multiple roots beginning at each Runnable class' run() method. However, while the single analysis may prove that each ThreadGroup individually maintains the singleness of a value, multiple ThreadGroups modifying the value at the same time may cause it to have different values on each processor, which would violate its single property.

    We might like to use the synchronized construct to ensure this mutual exclusion, but there is a danger which is illustrated by the following case: Let's say we have two ThreadGroups, A and B. Each processor runs one thread from both A and B. If there is a synchronized block on a local or replicated object, such as a FIFO queue for transferring data between the ThreadGroups, there is no guarantee as to which ThreadGroup will grab it first. On one processor, ThreadGroup A may take the lock, and on the other processor, ThreadGroup B can take the lock. Inside these synchronized blocks, the different ThreadGroups may decide to modify a single value. This variable may be mutated to a different value on each processor which would violate the single invariant.

public class BrokenQueue {

	int[] queue;
	int queuesize = 100;
	int readpointer = 0;
	int writepointer = 0;
	single int numelements = 0;

	public Queue {
		queue = new int[queuesize];
	}

	public boolean empty() {
		synchronized (this) {
			return (numelements == 0);
		}
	}

	public int readElement() {
		synchronized (this) {
			if (numelements > 0) {
			    numelements--; // May get incorrect value
			    int value = queue[readpointer++];
			    if (readpointer > 99) readpointer = 0;
			    return value;
			}
		}
	}

	public void writeElement(int value) {
		synchronized (this) {
			if (numelements < 100) {
			    numelements++; // May get incorrect value
			    queue[writepointer++] = value;
			    if (writepointer > 99) writepointer = 0;
			}
		}
	}

}

    Thus, we add singlesynchronized. Singlesynchronized is a keyword that behaves quite similarly to synchronized. In the same manner as a thread can use synchronized to exclusively hold a lock that no other thread can take, singlesynchronized allows a ThreadGroup to exclusively hold a lock that no other ThreadGroup can take. The first thread of any group to hold a singlesynchronized lock establishes the lock for the entire ThreadGroup. Singlesynchronized is a single operation -- we can statically prove that only one ThreadGroup may be inside. Within this block, we can now perform single operations such as barriers, reductions. broadcasts, or modification of a single variable. At the end of the block, just before giving up the lock, all threads in the ThreadGroup must stop at a barrier, to ensure that all threads in a ThreadGroup have completed their single operations.

public class FixedQueue {

	int[] queue;
	int queuesize = 100;
	int readpointer = 0;
	int writepointer = 0;
	single int numelements = 0;

	public Queue {
		queue = new int[queuesize];
	}

	public boolean empty() {
		singlesynchronized (this) {
			return (numelements == 0);
		}
	}

	public int readElement() {
		singlesynchronized (this) {
			if (numelements > 0) {
			    numelements--; // Value is now consistent
			    int value = queue[readpointer++];
			    if (readpointer > 99) readpointer = 0;
			    return value;
			}
		}
	}

	public void writeElement(int value) {
		singlesynchronized (this) {
			if (numelements < 100) {
			    numelements++; // Value is now consistent
			    queue[writepointer++] = value;
			    if (writepointer > 99) writepointer = 0;
			}
		}
	}
} 

    With the current specification, without much compiler analysis, we must require that all accesses to single variables be protected by a singlesynchronized lock. This will ensure the only one ThreadGroup may read or modify a single value at a time. Thus to each individual ThreadGroup, the singleness of a variable will be preserved. With more extensive compiler analysis, it may be provable that for each single value that no two ThreadGroups may read or modify it during their ThreadGroup lifetimes, thus preserving the value's single quality.

 

Implementation

    The current implementation implements synchronization with Solaris mutexes and condition variables. This scheme works on Solaris uniprocessor and SMP machines. In order to extend this work to the Berkeley NOW, which has a distributed memory architecture, new distributed locks and condition variables must be implemented. Singlesynchronization has not yet been implemented. More code must written to support the use of Posix Threads in order to make the uniprocessor and SMP versions portable to other operating systems. In addition, the current Titanium compiler analysis has not been extended to support single's interaction with synchronized, nor has it been extended to support singlesynchronized.

    Distributed synchronization primitives have been implemented many times in history. The singlesynchronized primitive is somewhat new, however, and its implementation is interesting. Singlesynchronized requires that one ThreadGroup per Titanium program be able to grab a lock. On different processors, however, threads from different ThreadGroups may try to take the lock at the same time. A distributed consensus algorithm is therefore necessary to arbitrate ownership of the distributed lock. The Paxos consensus algorithm is on the same order of complexity as a normal barrier. However, since under simple compiler analysis we require that all single variable access be mediated by a singlesynchronized lock, using single values becomes much more costly than before. Single had been intended to ensure that all processors which called a barrier would call the same barrier at the same time, and was implemented solely as a compiler analysis. Since multiple ThreadGroups force single to have effects in the runtime, we go from a zero-cost modifier to a high-cost modifier. This necessitates a more aggresive compiler analysis to help prove ThreadGroup mutual exclusion on single variables in order to relieve the need for singlesynchronized and costly global synchronization.

 

Conclusion

    This paper introduces a viable scheme for adding thread support to the Titanium parallel programming language. The ease of programming gained by adding the multi-threading abstraction will allow users to express more complicated, hierarchically-parallel simulations and more easily hide I/O latency. The interaction of SPMD TheadGroups and preexisting Titanium constructs is easily understood, but may have synchronization and performance implications under straightforward implementations. The potential for deadlocking threads within and between ThreadGroups is greater than before. The compiler can statically prevent a few obvious deadlock cases, but others will surely pop up until a more complete proof of all potential deadlocks is undertaken. Likewise, the use of single, which had been introduced as a compiler analysis to prove coherent barrier access, causes performance problems when used in a multi-threaded context. The hope here is that the benefits of multi-threaded programming outweigh the implementation difficulties, and will help make Titanium an easier-to-program language.



Date Modified: 11/24/03

Andrew Begel     abegel@cs.berkeley.edu