2
* This program is free software; you can redistribute it and/or modify
3
* it under the terms of the GNU General Public License as published by
4
* the Free Software Foundation; either version 2 of the License, or
5
* (at your option) any later version.
7
* This program is distributed in the hope that it will be useful,
8
* but WITHOUT ANY WARRANTY; without even the implied warranty of
9
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10
* GNU General Public License for more details.
12
* You should have received a copy of the GNU General Public License
13
* along with this program; if not, write to the Free Software
14
* Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
19
* Copyright (C) 2000 University of Waikato, Hamilton, New Zealand
24
package weka.experiment;
26
import weka.core.Queue;
28
import java.net.InetAddress;
30
import java.net.URLClassLoader;
31
import java.rmi.Naming;
32
import java.rmi.RMISecurityManager;
33
import java.rmi.RemoteException;
34
import java.rmi.server.UnicastRemoteObject;
35
import java.util.Enumeration;
36
import java.util.Hashtable;
39
* A general purpose server for executing Task objects sent via RMI.
41
* @author Mark Hall (mhall@cs.waikato.ac.nz)
42
* @version $Revision: 1.9 $
44
public class RemoteEngine
45
extends UnicastRemoteObject
48
/** for serialization */
49
private static final long serialVersionUID = -1021538162895448259L;
51
/** The name of the host that this engine is started on */
52
private String m_HostName = "local";
54
/** A queue of waiting tasks */
55
private Queue m_TaskQueue = new Queue();
57
/** A queue of corresponding ID's for tasks */
58
private Queue m_TaskIdQueue = new Queue();
60
/** A hashtable of experiment status */
61
private Hashtable m_TaskStatus = new Hashtable();
63
/** Is there a task running */
64
private boolean m_TaskRunning = false;
68
* @param hostName name of the host
69
* @exception RemoteException if something goes wrong
71
public RemoteEngine(String hostName) throws RemoteException {
73
m_HostName = hostName;
75
/* launch a clean-up thread. Will purge any failed or finished
76
tasks still in the TaskStatus hashtable after an hour */
79
cleanUpThread = new Thread() {
84
Thread.sleep(3600000);
85
} catch (InterruptedException ie) {}
87
if (m_TaskStatus.size() > 0) {
90
System.err.println("RemoteEngine : purge - no tasks to check.");
95
cleanUpThread.setPriority(Thread.MIN_PRIORITY);
96
cleanUpThread.start();
100
* Takes a task object and queues it for execution
101
* @param t the Task object to execute
102
* @return an identifier for the Task that can be used when querying
105
public synchronized Object executeTask(Task t) throws RemoteException {
106
String taskId = ""+System.currentTimeMillis()+":";
107
taskId += t.hashCode();
108
addTaskToQueue(t, taskId);
111
// return t.execute();
115
* Returns status information on a particular task
117
* @param taskId the ID of the task to check
118
* @return a <code>TaskStatusInfo</code> encapsulating task status info
119
* @exception Exception if an error occurs
121
public Object checkStatus(Object taskId) throws Exception {
123
TaskStatusInfo inf = (TaskStatusInfo)m_TaskStatus.get(taskId);
126
throw new Exception("RemoteEngine ("+m_HostName+") : Task not found.");
129
TaskStatusInfo result = new TaskStatusInfo();
130
result.setExecutionStatus(inf.getExecutionStatus());
131
result.setStatusMessage(inf.getStatusMessage());
132
result.setTaskResult(inf.getTaskResult());
134
if (inf.getExecutionStatus() == TaskStatusInfo.FINISHED ||
135
inf.getExecutionStatus() == TaskStatusInfo.FAILED) {
136
System.err.println("Finished/failed Task id : "
137
+ taskId + " checked by client. Removing.");
138
inf.setTaskResult(null);
140
m_TaskStatus.remove(taskId);
147
* Adds a new task to the queue.
149
* @param t a <code>Task</code> value to be added
150
* @param taskId the id of the task to be added
152
private synchronized void addTaskToQueue(Task t, String taskId) {
153
TaskStatusInfo newTask = t.getTaskStatus();
154
if (newTask == null) {
155
newTask = new TaskStatusInfo();
158
m_TaskIdQueue.push(taskId);
159
newTask.setStatusMessage("RemoteEngine ("
161
+") : task queued at postion: "
162
+m_TaskQueue.size());
163
// add task status to HashTable
164
m_TaskStatus.put(taskId, newTask);
165
System.err.println("Task id : " + taskId + "Queued.");
166
if (m_TaskRunning == false) {
172
* Checks to see if there are any waiting tasks, and if no task is
173
* currently running starts a waiting task.
175
private void startTask() {
177
if (m_TaskRunning == false && m_TaskQueue.size() > 0) {
178
Thread activeTaskThread;
179
activeTaskThread = new Thread() {
181
m_TaskRunning = true;
182
Task currentTask = (Task)m_TaskQueue.pop();
183
String taskId = (String)m_TaskIdQueue.pop();
184
TaskStatusInfo tsi = (TaskStatusInfo)m_TaskStatus.get(taskId);
185
tsi.setExecutionStatus(TaskStatusInfo.PROCESSING);
186
tsi.setStatusMessage("RemoteEngine ("
188
+") : task running...");
190
System.err.println("Launching task id : "
192
currentTask.execute();
193
TaskStatusInfo runStatus = currentTask.getTaskStatus();
194
tsi.setExecutionStatus(runStatus.getExecutionStatus());
195
tsi.setStatusMessage("RemoteExperiment ("
197
+runStatus.getStatusMessage());
198
tsi.setTaskResult(runStatus.getTaskResult());
199
} catch (Exception ex) {
200
tsi.setExecutionStatus(TaskStatusInfo.FAILED);
201
tsi.setStatusMessage("RemoteEngine ("
203
+") : task failed.");
204
System.err.println("Task id " + taskId + "Failed!");
206
if (m_TaskStatus.size() == 0) {
209
m_TaskRunning = false;
210
// start any waiting tasks
215
activeTaskThread.setPriority(Thread.MIN_PRIORITY);
216
activeTaskThread.start();
221
* Attempts to purge class types from the virtual machine. May take some
222
* time as it relies on garbage collection
224
private void purgeClasses() {
226
// see if we can purge classes
228
Thread.currentThread().getContextClassLoader();
230
URLClassLoader.newInstance(new URL[] {new URL("file:.")}, prevCl);
231
Thread.currentThread().setContextClassLoader(urlCl);
232
} catch (Exception ex) {
233
ex.printStackTrace();
238
* Checks the hash table for failed/finished tasks. Any that have been
239
* around for an hour or more are removed. Clients are expected to check
240
* on the status of their remote tasks. Checking on the status of a
241
* finished/failed task will remove it from the hash table, therefore
242
* any failed/finished tasks left lying around for more than an hour
243
* suggest that their client has died..
246
private void purge() {
247
Enumeration keys = m_TaskStatus.keys();
248
long currentTime = System.currentTimeMillis();
249
System.err.println("RemoteEngine purge. Current time : " + currentTime);
250
while (keys.hasMoreElements()) {
251
String tk = (String)keys.nextElement();
252
System.err.print("Examining task id : " + tk + "...");
253
String timeString = tk.substring(0, tk.indexOf(':'));
254
long ts = Long.valueOf(timeString).longValue();
255
if (currentTime - ts > 3600000) {
256
TaskStatusInfo tsi = null;
257
tsi = (TaskStatusInfo)m_TaskStatus.get(tsi);
259
&& (tsi.getExecutionStatus() == TaskStatusInfo.FINISHED ||
260
tsi.getExecutionStatus() == TaskStatusInfo.FAILED)) {
261
System.err.println("\nTask id : "
262
+ tk + " has gone stale. Removing.");
263
m_TaskStatus.remove(tk);
264
tsi.setTaskResult(null);
268
System.err.println("ok.");
271
if (m_TaskStatus.size() == 0) {
277
* Main method. Gets address of the local host, creates a remote engine
278
* object and binds it in the RMI registry. If there is no RMI registry,
279
* then it tries to create one with default port 1099.
283
public static void main(String[] args) {
284
if (System.getSecurityManager() == null) {
285
System.setSecurityManager(new RMISecurityManager());
287
InetAddress localhost = null;
289
localhost = InetAddress.getLocalHost();
290
System.err.println("Host name : "+localhost.getHostName());
291
} catch (Exception ex) {
292
ex.printStackTrace();
295
if (localhost != null) {
296
name = "//"+localhost.getHostName()+"/RemoteEngine";
298
name = "//localhost/RemoteEngine";
302
Compute engine = new RemoteEngine(name);
303
Naming.rebind(name, engine);
304
System.out.println("RemoteEngine bound in RMI registry");
305
} catch (Exception e) {
306
System.err.println("RemoteEngine exception: " +
308
// try to bootstrap a new registry
310
System.err.println("Attempting to start rmi registry...");
311
java.rmi.registry.LocateRegistry.createRegistry(1099);
312
Compute engine = new RemoteEngine(name);
313
Naming.rebind(name, engine);
314
System.out.println("RemoteEngine bound in RMI registry");
315
} catch (Exception ex) {
316
ex.printStackTrace();