~ubuntu-branches/ubuntu/trusty/weka/trusty-proposed

« back to all changes in this revision

Viewing changes to weka/experiment/RemoteEngine.java

  • Committer: Bazaar Package Importer
  • Author(s): Soeren Sonnenburg
  • Date: 2008-02-24 09:18:45 UTC
  • Revision ID: james.westby@ubuntu.com-20080224091845-1l8zy6fm6xipbzsr
Tags: upstream-3.5.7+tut1
ImportĀ upstreamĀ versionĀ 3.5.7+tut1

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
/*
 
2
 *    This program is free software; you can redistribute it and/or modify
 
3
 *    it under the terms of the GNU General Public License as published by
 
4
 *    the Free Software Foundation; either version 2 of the License, or
 
5
 *    (at your option) any later version.
 
6
 *
 
7
 *    This program is distributed in the hope that it will be useful,
 
8
 *    but WITHOUT ANY WARRANTY; without even the implied warranty of
 
9
 *    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
10
 *    GNU General Public License for more details.
 
11
 *
 
12
 *    You should have received a copy of the GNU General Public License
 
13
 *    along with this program; if not, write to the Free Software
 
14
 *    Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
 
15
 */
 
16
 
 
17
/*
 
18
 *    RemoteEngine.java
 
19
 *    Copyright (C) 2000 University of Waikato, Hamilton, New Zealand
 
20
 *
 
21
 */
 
22
 
 
23
 
 
24
package weka.experiment;
 
25
 
 
26
import weka.core.Queue;
 
27
 
 
28
import java.net.InetAddress;
 
29
import java.net.URL;
 
30
import java.net.URLClassLoader;
 
31
import java.rmi.Naming;
 
32
import java.rmi.RMISecurityManager;
 
33
import java.rmi.RemoteException;
 
34
import java.rmi.server.UnicastRemoteObject;
 
35
import java.util.Enumeration;
 
36
import java.util.Hashtable;
 
37
 
 
38
/**
 
39
 * A general purpose server for executing Task objects sent via RMI.
 
40
 *
 
41
 * @author Mark Hall (mhall@cs.waikato.ac.nz)
 
42
 * @version $Revision: 1.9 $
 
43
 */
 
44
public class RemoteEngine
 
45
  extends UnicastRemoteObject
 
46
  implements Compute {
 
47
 
 
48
  /** for serialization */
 
49
  private static final long serialVersionUID = -1021538162895448259L;
 
50
 
 
51
  /** The name of the host that this engine is started on */
 
52
  private String m_HostName = "local";
 
53
 
 
54
  /** A queue of waiting tasks */
 
55
  private Queue m_TaskQueue = new Queue();
 
56
 
 
57
  /** A queue of corresponding ID's for tasks */
 
58
  private Queue m_TaskIdQueue = new Queue();
 
59
 
 
60
  /** A hashtable of experiment status */
 
61
  private Hashtable m_TaskStatus = new Hashtable();
 
62
 
 
63
  /** Is there a task running */
 
64
  private boolean m_TaskRunning = false;
 
65
 
 
66
  /**
 
67
   * Constructor
 
68
   * @param hostName name of the host
 
69
   * @exception RemoteException if something goes wrong
 
70
   */
 
71
  public RemoteEngine(String hostName) throws RemoteException {
 
72
    super();
 
73
    m_HostName = hostName;
 
74
 
 
75
    /* launch a clean-up thread. Will purge any failed or finished 
 
76
       tasks still in the TaskStatus hashtable after an hour */
 
77
       
 
78
    Thread cleanUpThread;
 
79
    cleanUpThread = new Thread() {
 
80
        public void run() {
 
81
          while (true) {
 
82
            try {
 
83
              // sleep for an hour
 
84
              Thread.sleep(3600000);
 
85
            } catch (InterruptedException ie) {}
 
86
 
 
87
            if (m_TaskStatus.size() > 0) {
 
88
              purge();
 
89
            } else {
 
90
              System.err.println("RemoteEngine : purge - no tasks to check.");
 
91
            }
 
92
          }
 
93
        }
 
94
      };
 
95
    cleanUpThread.setPriority(Thread.MIN_PRIORITY);
 
96
    cleanUpThread.start();
 
97
  }
 
98
  
 
99
  /**
 
100
   * Takes a task object and queues it for execution
 
101
   * @param t the Task object to execute
 
102
   * @return an identifier for the Task that can be used when querying
 
103
   * Task status
 
104
   */
 
105
  public synchronized Object executeTask(Task t) throws RemoteException {
 
106
    String taskId = ""+System.currentTimeMillis()+":";
 
107
    taskId += t.hashCode();
 
108
    addTaskToQueue(t, taskId);
 
109
 
 
110
    return taskId;
 
111
    //    return t.execute();
 
112
  }
 
113
 
 
114
  /**
 
115
   * Returns status information on a particular task
 
116
   *
 
117
   * @param taskId the ID of the task to check
 
118
   * @return a <code>TaskStatusInfo</code> encapsulating task status info
 
119
   * @exception Exception if an error occurs
 
120
   */
 
121
  public Object checkStatus(Object taskId) throws Exception {
 
122
    
 
123
    TaskStatusInfo inf = (TaskStatusInfo)m_TaskStatus.get(taskId);
 
124
 
 
125
    if (inf == null) {
 
126
      throw new Exception("RemoteEngine ("+m_HostName+") : Task not found.");
 
127
    }
 
128
    
 
129
    TaskStatusInfo result = new TaskStatusInfo();
 
130
    result.setExecutionStatus(inf.getExecutionStatus());
 
131
    result.setStatusMessage(inf.getStatusMessage());
 
132
    result.setTaskResult(inf.getTaskResult());
 
133
 
 
134
    if (inf.getExecutionStatus() == TaskStatusInfo.FINISHED ||
 
135
        inf.getExecutionStatus() == TaskStatusInfo.FAILED) {
 
136
      System.err.println("Finished/failed Task id : " 
 
137
                         + taskId + " checked by client. Removing.");
 
138
      inf.setTaskResult(null);
 
139
      inf = null;
 
140
      m_TaskStatus.remove(taskId);
 
141
    }
 
142
    inf = null;
 
143
    return result;
 
144
  }
 
145
 
 
146
  /**
 
147
   * Adds a new task to the queue.
 
148
   *
 
149
   * @param t a <code>Task</code> value to be added
 
150
   * @param taskId the id of the task to be added
 
151
   */
 
152
  private synchronized void addTaskToQueue(Task t, String taskId) {
 
153
    TaskStatusInfo newTask = t.getTaskStatus();
 
154
    if (newTask == null) {
 
155
      newTask = new TaskStatusInfo();
 
156
    }
 
157
    m_TaskQueue.push(t);
 
158
    m_TaskIdQueue.push(taskId);
 
159
    newTask.setStatusMessage("RemoteEngine ("
 
160
                             +m_HostName
 
161
                             +") : task queued at postion: "
 
162
                             +m_TaskQueue.size());
 
163
    // add task status to HashTable
 
164
    m_TaskStatus.put(taskId, newTask);
 
165
    System.err.println("Task id : " + taskId + "Queued.");
 
166
    if (m_TaskRunning == false) {
 
167
      startTask();
 
168
    }
 
169
  }
 
170
 
 
171
  /**
 
172
   * Checks to see if there are any waiting tasks, and if no task is
 
173
   * currently running starts a waiting task.
 
174
   */
 
175
  private void startTask() {
 
176
 
 
177
    if (m_TaskRunning == false && m_TaskQueue.size() > 0) {
 
178
      Thread activeTaskThread;
 
179
      activeTaskThread = new Thread() {
 
180
          public void run() {
 
181
            m_TaskRunning = true;
 
182
            Task currentTask = (Task)m_TaskQueue.pop();
 
183
            String taskId = (String)m_TaskIdQueue.pop();
 
184
            TaskStatusInfo tsi = (TaskStatusInfo)m_TaskStatus.get(taskId);
 
185
            tsi.setExecutionStatus(TaskStatusInfo.PROCESSING);
 
186
            tsi.setStatusMessage("RemoteEngine ("
 
187
                                 +m_HostName
 
188
                                 +") : task running...");
 
189
            try {
 
190
              System.err.println("Launching task id : "
 
191
                                 + taskId + "...");
 
192
              currentTask.execute();
 
193
              TaskStatusInfo runStatus = currentTask.getTaskStatus();
 
194
              tsi.setExecutionStatus(runStatus.getExecutionStatus());
 
195
              tsi.setStatusMessage("RemoteExperiment ("
 
196
                                   +m_HostName+") "
 
197
                                   +runStatus.getStatusMessage());
 
198
              tsi.setTaskResult(runStatus.getTaskResult());
 
199
            } catch (Exception ex) {
 
200
              tsi.setExecutionStatus(TaskStatusInfo.FAILED);
 
201
              tsi.setStatusMessage("RemoteEngine ("
 
202
                                   +m_HostName
 
203
                                   +") : task failed.");
 
204
              System.err.println("Task id " + taskId + "Failed!");
 
205
            } finally {
 
206
              if (m_TaskStatus.size() == 0) {
 
207
                purgeClasses();
 
208
              }
 
209
              m_TaskRunning = false;
 
210
              // start any waiting tasks
 
211
              startTask();
 
212
            }
 
213
          }
 
214
        };
 
215
      activeTaskThread.setPriority(Thread.MIN_PRIORITY);
 
216
      activeTaskThread.start();
 
217
    }
 
218
  }
 
219
 
 
220
  /**
 
221
   * Attempts to purge class types from the virtual machine. May take some
 
222
   * time as it relies on garbage collection
 
223
   */
 
224
  private void purgeClasses() {
 
225
    try {
 
226
      // see if we can purge classes
 
227
      ClassLoader prevCl = 
 
228
        Thread.currentThread().getContextClassLoader();
 
229
      ClassLoader urlCl = 
 
230
        URLClassLoader.newInstance(new URL[] {new URL("file:.")}, prevCl);
 
231
      Thread.currentThread().setContextClassLoader(urlCl);
 
232
    } catch (Exception ex) {
 
233
      ex.printStackTrace();
 
234
    }
 
235
  }
 
236
  
 
237
  /**
 
238
   * Checks the hash table for failed/finished tasks. Any that have been
 
239
   * around for an hour or more are removed. Clients are expected to check
 
240
   * on the status of their remote tasks. Checking on the status of a
 
241
   * finished/failed task will remove it from the hash table, therefore
 
242
   * any failed/finished tasks left lying around for more than an hour
 
243
   * suggest that their client has died..
 
244
   *
 
245
   */
 
246
  private void purge() {
 
247
    Enumeration keys = m_TaskStatus.keys();
 
248
    long currentTime = System.currentTimeMillis();
 
249
    System.err.println("RemoteEngine purge. Current time : " + currentTime);
 
250
    while (keys.hasMoreElements()) {
 
251
      String tk = (String)keys.nextElement();
 
252
      System.err.print("Examining task id : " + tk + "...");
 
253
      String timeString = tk.substring(0, tk.indexOf(':'));
 
254
      long ts = Long.valueOf(timeString).longValue();
 
255
      if (currentTime - ts > 3600000) {
 
256
        TaskStatusInfo tsi = null;
 
257
        tsi = (TaskStatusInfo)m_TaskStatus.get(tsi);
 
258
        if ((tsi != null) 
 
259
            && (tsi.getExecutionStatus() == TaskStatusInfo.FINISHED ||
 
260
            tsi.getExecutionStatus() == TaskStatusInfo.FAILED)) {
 
261
          System.err.println("\nTask id : " 
 
262
                             + tk + " has gone stale. Removing.");
 
263
          m_TaskStatus.remove(tk);
 
264
          tsi.setTaskResult(null);
 
265
          tsi = null;
 
266
        }
 
267
      } else {
 
268
        System.err.println("ok.");
 
269
      }
 
270
    }
 
271
    if (m_TaskStatus.size() == 0) {
 
272
      purgeClasses();
 
273
    }
 
274
  }
 
275
 
 
276
  /**
 
277
   * Main method. Gets address of the local host, creates a remote engine
 
278
   * object and binds it in the RMI registry. If there is no RMI registry,
 
279
   * then it tries to create one with default port 1099.
 
280
   *
 
281
   * @param args 
 
282
   */
 
283
  public static void main(String[] args) {
 
284
    if (System.getSecurityManager() == null) {
 
285
      System.setSecurityManager(new RMISecurityManager());
 
286
    }
 
287
    InetAddress localhost = null;
 
288
    try {
 
289
      localhost = InetAddress.getLocalHost();
 
290
      System.err.println("Host name : "+localhost.getHostName());
 
291
    } catch (Exception ex) {
 
292
      ex.printStackTrace();
 
293
    }
 
294
    String name;
 
295
    if (localhost != null) {
 
296
      name = "//"+localhost.getHostName()+"/RemoteEngine";
 
297
    } else {
 
298
      name = "//localhost/RemoteEngine";
 
299
    }
 
300
    
 
301
    try {
 
302
      Compute engine = new RemoteEngine(name);
 
303
      Naming.rebind(name, engine);
 
304
      System.out.println("RemoteEngine bound in RMI registry");
 
305
    } catch (Exception e) {
 
306
      System.err.println("RemoteEngine exception: " + 
 
307
                         e.getMessage());
 
308
      // try to bootstrap a new registry
 
309
      try {
 
310
        System.err.println("Attempting to start rmi registry...");
 
311
        java.rmi.registry.LocateRegistry.createRegistry(1099);
 
312
        Compute engine = new RemoteEngine(name);
 
313
        Naming.rebind(name, engine);
 
314
        System.out.println("RemoteEngine bound in RMI registry");
 
315
      } catch (Exception ex) {
 
316
        ex.printStackTrace();
 
317
      }
 
318
    }
 
319
  }
 
320
}