~ubuntu-branches/ubuntu/precise/weka/precise

« back to all changes in this revision

Viewing changes to weka/experiment/RemoteExperiment.java

  • Committer: Bazaar Package Importer
  • Author(s): Soeren Sonnenburg
  • Date: 2008-02-24 09:18:45 UTC
  • Revision ID: james.westby@ubuntu.com-20080224091845-1l8zy6fm6xipbzsr
Tags: upstream-3.5.7+tut1
ImportĀ upstreamĀ versionĀ 3.5.7+tut1

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
/*
 
2
 *    This program is free software; you can redistribute it and/or modify
 
3
 *    it under the terms of the GNU General Public License as published by
 
4
 *    the Free Software Foundation; either version 2 of the License, or
 
5
 *    (at your option) any later version.
 
6
 *
 
7
 *    This program is distributed in the hope that it will be useful,
 
8
 *    but WITHOUT ANY WARRANTY; without even the implied warranty of
 
9
 *    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
10
 *    GNU General Public License for more details.
 
11
 *
 
12
 *    You should have received a copy of the GNU General Public License
 
13
 *    along with this program; if not, write to the Free Software
 
14
 *    Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
 
15
 */
 
16
 
 
17
/*
 
18
 *    RemoteExperiment.java
 
19
 *    Copyright (C) 2000 University of Waikato, Hamilton, New Zealand
 
20
 *
 
21
 */
 
22
 
 
23
 
 
24
package weka.experiment;
 
25
 
 
26
import weka.core.FastVector;
 
27
import weka.core.Option;
 
28
import weka.core.OptionHandler;
 
29
import weka.core.Queue;
 
30
import weka.core.SerializedObject;
 
31
import weka.core.Utils;
 
32
import weka.core.xml.KOML;
 
33
import weka.core.xml.XMLOptions;
 
34
import weka.experiment.xml.XMLExperiment;
 
35
 
 
36
import java.io.BufferedInputStream;
 
37
import java.io.BufferedOutputStream;
 
38
import java.io.File;
 
39
import java.io.FileInputStream;
 
40
import java.io.FileOutputStream;
 
41
import java.io.ObjectInputStream;
 
42
import java.io.ObjectOutputStream;
 
43
import java.rmi.Naming;
 
44
import java.util.Enumeration;
 
45
 
 
46
import javax.swing.DefaultListModel;
 
47
 
 
48
/**
 
49
 * Holds all the necessary configuration information for a distributed
 
50
 * experiment. This object is able to be serialized for storage on disk.<p>
 
51
 * 
 
52
 * This class is experimental at present. Has been tested using 
 
53
 * CSVResultListener (sending results to standard out) and 
 
54
 * DatabaseResultListener (InstantDB + RmiJdbc bridge). <p>
 
55
 *
 
56
 * Getting started:<p>
 
57
 *
 
58
 * Start InstantDB (with the RMI bridge) on some machine. If using java2
 
59
 * then specify -Djava.security.policy=db.policy to the
 
60
 * virtual machine. Where db.policy is as follows: <br>
 
61
 * <pre>
 
62
 * grant {
 
63
 *   permission java.security.AllPermission;
 
64
 * };
 
65
 * </pre><p>
 
66
 *
 
67
 * Start RemoteEngine servers on x machines as per the instructons in the
 
68
 * README_Experiment_Gui file. There must be a 
 
69
 * DatabaseUtils.props in either the HOME or current directory of each
 
70
 * machine, listing all necessary jdbc drivers.<p>
 
71
 *
 
72
 * The machine where a RemoteExperiment is started must also have a copy
 
73
 * of DatabaseUtils.props listing the URL to the machine where the 
 
74
 * database server is running (RmiJdbc + InstantDB). <p>
 
75
 *
 
76
 * Here is an example of starting a RemoteExperiment: <p>
 
77
 *
 
78
 * <pre>
 
79
 *
 
80
 * java -Djava.rmi.server.codebase=file:/path to weka classes/ \
 
81
 * weka.experiment.RemoteExperiment -L 1 -U 10 \
 
82
 * -T /home/ml/datasets/UCI/iris.arff \
 
83
 * -D "weka.experiment.DatabaseResultListener" \
 
84
 * -P "weka.experiment.RandomSplitResultProducer" \
 
85
 * -h rosebud.cs.waikato.ac.nz -h blackbird.cs.waikato.ac.nz -r -- \
 
86
 * -W weka.experiment.ClassifierSplitEvaluator -- \
 
87
 * -W weka.classifiers.bayes.NaiveBayes
 
88
 *
 
89
 * </pre> <p>
 
90
 * The "codebase" property tells rmi where to serve up weka classes from.
 
91
 * This can either be a file url (as long as a shared file system is being
 
92
 * used that is accessable by the remoteEngine servers), or http url (which
 
93
 * of course supposes that a web server is running and you have put your
 
94
 * weka classes somewhere that is web accessable). If using a file url the
 
95
 * trailing "/" is *most* important unless the weka classes are in a jar
 
96
 * file. <p>
 
97
 *
 
98
 <!-- options-start -->
 
99
 * Valid options are: <p/>
 
100
 * 
 
101
 * <pre> -L &lt;num&gt;
 
102
 *  The lower run number to start the experiment from.
 
103
 *  (default 1)</pre>
 
104
 * 
 
105
 * <pre> -U &lt;num&gt;
 
106
 *  The upper run number to end the experiment at (inclusive).
 
107
 *  (default 10)</pre>
 
108
 * 
 
109
 * <pre> -T &lt;arff file&gt;
 
110
 *  The dataset to run the experiment on.
 
111
 *  (required, may be specified multiple times)</pre>
 
112
 * 
 
113
 * <pre> -P &lt;class name&gt;
 
114
 *  The full class name of a ResultProducer (required).
 
115
 *  eg: weka.experiment.RandomSplitResultProducer</pre>
 
116
 * 
 
117
 * <pre> -D &lt;class name&gt;
 
118
 *  The full class name of a ResultListener (required).
 
119
 *  eg: weka.experiment.CSVResultListener</pre>
 
120
 * 
 
121
 * <pre> -N &lt;string&gt;
 
122
 *  A string containing any notes about the experiment.
 
123
 *  (default none)</pre>
 
124
 * 
 
125
 * <pre> 
 
126
 * Options specific to result producer weka.experiment.RandomSplitResultProducer:
 
127
 * </pre>
 
128
 * 
 
129
 * <pre> -P &lt;percent&gt;
 
130
 *  The percentage of instances to use for training.
 
131
 *  (default 66)</pre>
 
132
 * 
 
133
 * <pre> -D
 
134
 * Save raw split evaluator output.</pre>
 
135
 * 
 
136
 * <pre> -O &lt;file/directory name/path&gt;
 
137
 *  The filename where raw output will be stored.
 
138
 *  If a directory name is specified then then individual
 
139
 *  outputs will be gzipped, otherwise all output will be
 
140
 *  zipped to the named file. Use in conjuction with -D. (default splitEvalutorOut.zip)</pre>
 
141
 * 
 
142
 * <pre> -W &lt;class name&gt;
 
143
 *  The full class name of a SplitEvaluator.
 
144
 *  eg: weka.experiment.ClassifierSplitEvaluator</pre>
 
145
 * 
 
146
 * <pre> -R
 
147
 *  Set when data is not to be randomized and the data sets' size.
 
148
 *  Is not to be determined via probabilistic rounding.</pre>
 
149
 * 
 
150
 * <pre> 
 
151
 * Options specific to split evaluator weka.experiment.ClassifierSplitEvaluator:
 
152
 * </pre>
 
153
 * 
 
154
 * <pre> -W &lt;class name&gt;
 
155
 *  The full class name of the classifier.
 
156
 *  eg: weka.classifiers.bayes.NaiveBayes</pre>
 
157
 * 
 
158
 * <pre> -C &lt;index&gt;
 
159
 *  The index of the class for which IR statistics
 
160
 *  are to be output. (default 1)</pre>
 
161
 * 
 
162
 * <pre> -I &lt;index&gt;
 
163
 *  The index of an attribute to output in the
 
164
 *  results. This attribute should identify an
 
165
 *  instance in order to know which instances are
 
166
 *  in the test set of a cross validation. if 0
 
167
 *  no output (default 0).</pre>
 
168
 * 
 
169
 * <pre> -P
 
170
 *  Add target and prediction columns to the result
 
171
 *  for each fold.</pre>
 
172
 * 
 
173
 * <pre> 
 
174
 * Options specific to classifier weka.classifiers.rules.ZeroR:
 
175
 * </pre>
 
176
 * 
 
177
 * <pre> -D
 
178
 *  If set, classifier is run in debug mode and
 
179
 *  may output additional info to the console</pre>
 
180
 * 
 
181
 <!-- options-end -->
 
182
 *
 
183
 * @author Mark Hall (mhall@cs.waikato.ac.nz)
 
184
 * @version $Revision: 1.15 $
 
185
 */
 
186
public class RemoteExperiment 
 
187
  extends Experiment {
 
188
  
 
189
  /** for serialization */
 
190
  static final long serialVersionUID = -7357668825635314937L;
 
191
 
 
192
  /** The list of objects listening for remote experiment events */
 
193
  private FastVector m_listeners = new FastVector();
 
194
 
 
195
  /** Holds the names of machines with remoteEngine servers running */
 
196
  protected DefaultListModel m_remoteHosts = new DefaultListModel();
 
197
  
 
198
  /** The queue of available hosts */
 
199
  private Queue m_remoteHostsQueue = new Queue();
 
200
 
 
201
  /** The status of each of the remote hosts */
 
202
  private int [] m_remoteHostsStatus;
 
203
 
 
204
  /** The number of times tasks have failed on each remote host */
 
205
  private int [] m_remoteHostFailureCounts;
 
206
 
 
207
  /** status of the remote host: available */
 
208
  protected static final int AVAILABLE=0;
 
209
  /** status of the remote host: in use */
 
210
  protected static final int IN_USE=1;
 
211
  /** status of the remote host: connection failed */
 
212
  protected static final int CONNECTION_FAILED=2;
 
213
  /** status of the remote host: some other failure */
 
214
  protected static final int SOME_OTHER_FAILURE=3;
 
215
 
 
216
//    protected static final int TO_BE_RUN=0;
 
217
//    protected static final int PROCESSING=1;
 
218
//    protected static final int FAILED=2;
 
219
//    protected static final int FINISHED=3;
 
220
 
 
221
  /** allow at most 3 failures on a host before it is removed from the list
 
222
      of usable hosts */
 
223
  protected static final int MAX_FAILURES=3;
 
224
 
 
225
  /** Set to true if MAX_FAILURES exceeded on all hosts or connections fail 
 
226
      on all hosts or user aborts experiment (via gui) */
 
227
  private boolean m_experimentAborted = false;
 
228
 
 
229
  /** The number of hosts removed due to exceeding max failures */
 
230
  private int m_removedHosts;
 
231
 
 
232
  /** The count of failed sub-experiments */
 
233
  private int m_failedCount;
 
234
 
 
235
  /** The count of successfully completed sub-experiments */
 
236
  private int m_finishedCount;
 
237
 
 
238
  /** The base experiment to split up into sub experiments for remote
 
239
      execution */
 
240
  private Experiment m_baseExperiment = null;
 
241
 
 
242
  /** The sub experiments */
 
243
  protected Experiment [] m_subExperiments;
 
244
 
 
245
  /** The queue of sub experiments waiting to be processed */
 
246
  private Queue m_subExpQueue = new Queue();
 
247
 
 
248
  /** The status of each of the sub-experiments */
 
249
  protected int [] m_subExpComplete;
 
250
 
 
251
  /**
 
252
   * If true, then sub experiments are created on the basis of data sets
 
253
   * rather than run number.
 
254
   */
 
255
  protected boolean m_splitByDataSet = true;
 
256
 
 
257
 
 
258
  /**
 
259
   * Returns true if sub experiments are to be created on the basis of
 
260
   * data set..
 
261
   *
 
262
   * @return a <code>boolean</code> value indicating whether sub
 
263
   * experiments are to be created on the basis of data set (true) or
 
264
   * run number (false).
 
265
   */
 
266
  public boolean getSplitByDataSet() {
 
267
    return m_splitByDataSet;
 
268
  }
 
269
 
 
270
  /**
 
271
   * Set whether sub experiments are to be created on the basis of
 
272
   * data set.
 
273
   *
 
274
   * @param sd true if sub experiments are to be created on the basis
 
275
   * of data set. Otherwise sub experiments are created on the basis of
 
276
   * run number.
 
277
   */
 
278
  public void setSplitByDataSet(boolean sd) {
 
279
    m_splitByDataSet = sd;
 
280
  }
 
281
  
 
282
  /**
 
283
   * Construct a new RemoteExperiment using an empty Experiment as base 
 
284
   * Experiment
 
285
   * @throws Exception if the base experiment is null
 
286
   */
 
287
  public RemoteExperiment() throws Exception {
 
288
     this(new Experiment());
 
289
  }
 
290
  
 
291
  /**
 
292
   * Construct a new RemoteExperiment using a base Experiment
 
293
   * @param base the base experiment to use
 
294
   * @throws Exception if the base experiment is null
 
295
   */
 
296
  public RemoteExperiment(Experiment base) throws Exception {
 
297
    setBaseExperiment(base);
 
298
  }
 
299
 
 
300
  /**
 
301
   * Add an object to the list of those interested in recieving update
 
302
   * information from the RemoteExperiment
 
303
   * @param r a listener
 
304
   */
 
305
  public void addRemoteExperimentListener(RemoteExperimentListener r) {
 
306
    m_listeners.addElement(r);
 
307
  }
 
308
 
 
309
  /**
 
310
   * Get the base experiment used by this remote experiment
 
311
   * @return the base experiment
 
312
   */
 
313
  public Experiment getBaseExperiment() {
 
314
    return m_baseExperiment;
 
315
  }
 
316
 
 
317
  /**
 
318
   * Set the base experiment. A sub experiment will be created for each
 
319
   * run in the base experiment.
 
320
   * @param base the base experiment to use.
 
321
   * @throws Exception if supplied base experiment is null
 
322
   */
 
323
  public void setBaseExperiment(Experiment base) throws Exception {
 
324
    if (base == null) {
 
325
      throw new Exception("Base experiment is null!");
 
326
    }
 
327
    m_baseExperiment = base;
 
328
    setRunLower(m_baseExperiment.getRunLower());
 
329
    setRunUpper(m_baseExperiment.getRunUpper());
 
330
    setResultListener(m_baseExperiment.getResultListener());
 
331
    setResultProducer(m_baseExperiment.getResultProducer());
 
332
    setDatasets(m_baseExperiment.getDatasets());
 
333
    setUsePropertyIterator(m_baseExperiment.getUsePropertyIterator());
 
334
    setPropertyPath(m_baseExperiment.getPropertyPath());
 
335
    setPropertyArray(m_baseExperiment.getPropertyArray());
 
336
    setNotes(m_baseExperiment.getNotes());
 
337
    m_ClassFirst = m_baseExperiment.m_ClassFirst;
 
338
    m_AdvanceDataSetFirst = m_baseExperiment.m_AdvanceDataSetFirst;
 
339
  }
 
340
  
 
341
  /**
 
342
   * Set the user notes.
 
343
   *
 
344
   * @param newNotes New user notes.
 
345
   */
 
346
  public void setNotes(String newNotes) {
 
347
    
 
348
    super.setNotes(newNotes);
 
349
    m_baseExperiment.setNotes(newNotes);
 
350
  }
 
351
 
 
352
  /**
 
353
   * Set the lower run number for the experiment.
 
354
   *
 
355
   * @param newRunLower the lower run number for the experiment.
 
356
   */
 
357
  public void setRunLower(int newRunLower) {
 
358
    
 
359
    super.setRunLower(newRunLower);
 
360
    m_baseExperiment.setRunLower(newRunLower);
 
361
  }
 
362
 
 
363
  /**
 
364
   * Set the upper run number for the experiment.
 
365
   *
 
366
   * @param newRunUpper the upper run number for the experiment.
 
367
   */
 
368
  public void setRunUpper(int newRunUpper) {
 
369
    
 
370
    super.setRunUpper(newRunUpper);
 
371
    m_baseExperiment.setRunUpper(newRunUpper);
 
372
  }
 
373
 
 
374
  /**
 
375
   * Sets the result listener where results will be sent.
 
376
   *
 
377
   * @param newResultListener the result listener where results will be sent.
 
378
   */
 
379
  public void setResultListener(ResultListener newResultListener) {
 
380
    
 
381
    super.setResultListener(newResultListener);
 
382
    m_baseExperiment.setResultListener(newResultListener);
 
383
  }
 
384
 
 
385
  /**
 
386
   * Set the result producer used for the current experiment.
 
387
   *
 
388
   * @param newResultProducer result producer to use for the current 
 
389
   * experiment.
 
390
   */
 
391
  public void setResultProducer(ResultProducer newResultProducer) {
 
392
    
 
393
    super.setResultProducer(newResultProducer);
 
394
    m_baseExperiment.setResultProducer(newResultProducer);
 
395
  }
 
396
 
 
397
  /**
 
398
   * Set the datasets to use in the experiment
 
399
   * @param ds the list of datasets to use
 
400
   */
 
401
  public void setDatasets(DefaultListModel ds) {
 
402
    super.setDatasets(ds);
 
403
    m_baseExperiment.setDatasets(ds);
 
404
  }
 
405
 
 
406
  /**
 
407
   * Sets whether the custom property iterator should be used.
 
408
   *
 
409
   * @param newUsePropertyIterator true if so
 
410
   */
 
411
  public void setUsePropertyIterator(boolean newUsePropertyIterator) {
 
412
    
 
413
    super.setUsePropertyIterator(newUsePropertyIterator);
 
414
    m_baseExperiment.setUsePropertyIterator(newUsePropertyIterator);
 
415
  }
 
416
 
 
417
  /**
 
418
   * Sets the path of properties taken to get to the custom property
 
419
   * to iterate over.
 
420
   *
 
421
   * @param newPropertyPath an array of PropertyNodes
 
422
   */
 
423
  public void setPropertyPath(PropertyNode [] newPropertyPath) {
 
424
    
 
425
    super.setPropertyPath(newPropertyPath);
 
426
    m_baseExperiment.setPropertyPath(newPropertyPath);
 
427
  }
 
428
 
 
429
  /**
 
430
   * Sets the array of values to set the custom property to.
 
431
   *
 
432
   * @param newPropArray a value of type Object which should be an
 
433
   * array of the appropriate values.
 
434
   */
 
435
  public void setPropertyArray(Object newPropArray) {
 
436
    super.setPropertyArray(newPropArray);
 
437
    m_baseExperiment.setPropertyArray(newPropArray);
 
438
  }
 
439
 
 
440
    
 
441
  /**
 
442
   * Prepares a remote experiment for running, creates sub experiments
 
443
   *
 
444
   * @throws Exception if an error occurs
 
445
   */
 
446
  public void initialize() throws Exception {
 
447
    if (m_baseExperiment == null) {
 
448
      throw new Exception("No base experiment specified!");
 
449
    }
 
450
 
 
451
    m_experimentAborted = false;
 
452
    m_finishedCount = 0;
 
453
    m_failedCount = 0;
 
454
    m_RunNumber = getRunLower();
 
455
    m_DatasetNumber = 0;
 
456
    m_PropertyNumber = 0;
 
457
    m_CurrentProperty = -1;
 
458
    m_CurrentInstances = null;
 
459
    m_Finished = false;
 
460
 
 
461
    if (m_remoteHosts.size() == 0) {
 
462
      throw new Exception("No hosts specified!");
 
463
    }
 
464
    // initialize all remote hosts to available
 
465
    m_remoteHostsStatus = new int [m_remoteHosts.size()];    
 
466
    m_remoteHostFailureCounts = new int [m_remoteHosts.size()];
 
467
 
 
468
    m_remoteHostsQueue = new Queue();
 
469
    // prime the hosts queue
 
470
    for (int i=0;i<m_remoteHosts.size();i++) {
 
471
      m_remoteHostsQueue.push(new Integer(i));
 
472
    }
 
473
 
 
474
    // set up sub experiments
 
475
    m_subExpQueue = new Queue();
 
476
    int numExps;
 
477
    if (getSplitByDataSet()) {
 
478
      numExps = m_baseExperiment.getDatasets().size();
 
479
    } else {
 
480
      numExps = getRunUpper() - getRunLower() + 1;
 
481
    }
 
482
    m_subExperiments = new Experiment[numExps];
 
483
    m_subExpComplete = new int[numExps];
 
484
    // create copy of base experiment
 
485
    SerializedObject so = new SerializedObject(m_baseExperiment);
 
486
 
 
487
    if (getSplitByDataSet()) {
 
488
      for (int i = 0; i < m_baseExperiment.getDatasets().size(); i++) {
 
489
        m_subExperiments[i] = (Experiment)so.getObject();
 
490
        // one for each data set
 
491
        DefaultListModel temp = new DefaultListModel();
 
492
        temp.addElement(m_baseExperiment.getDatasets().elementAt(i));
 
493
        m_subExperiments[i].setDatasets(temp);
 
494
        m_subExpQueue.push(new Integer(i));
 
495
      }
 
496
    } else {
 
497
      for (int i = getRunLower(); i <= getRunUpper(); i++) {
 
498
        m_subExperiments[i-getRunLower()] = (Experiment)so.getObject();
 
499
        // one run for each sub experiment
 
500
        m_subExperiments[i-getRunLower()].setRunLower(i);
 
501
        m_subExperiments[i-getRunLower()].setRunUpper(i);
 
502
        
 
503
        m_subExpQueue.push(new Integer(i-getRunLower()));
 
504
      }    
 
505
    }
 
506
  }
 
507
 
 
508
  /**
 
509
   * Inform all listeners of progress
 
510
   * @param status true if this is a status type of message
 
511
   * @param log true if this is a log type of message
 
512
   * @param finished true if the remote experiment has finished
 
513
   * @param message the message.
 
514
   */
 
515
  private synchronized void notifyListeners(boolean status, 
 
516
                                            boolean log, 
 
517
                                            boolean finished,
 
518
                                            String message) {
 
519
    if (m_listeners.size() > 0) {
 
520
      for (int i=0;i<m_listeners.size();i++) {
 
521
        RemoteExperimentListener r = 
 
522
          (RemoteExperimentListener)(m_listeners.elementAt(i));
 
523
        r.remoteExperimentStatus(new RemoteExperimentEvent(status,
 
524
                                                           log,
 
525
                                                           finished,
 
526
                                                           message));
 
527
      }
 
528
    } else {
 
529
      System.err.println(message);
 
530
    }
 
531
  }
 
532
 
 
533
  /**
 
534
   * Set the abort flag
 
535
   */
 
536
  public void abortExperiment() {
 
537
    m_experimentAborted = true;
 
538
  }
 
539
 
 
540
  /**
 
541
   * Increment the number of successfully completed sub experiments
 
542
   */
 
543
  protected synchronized void incrementFinished() {
 
544
    m_finishedCount++;
 
545
  }
 
546
 
 
547
  /**
 
548
   * Increment the overall number of failures and the number of failures for
 
549
   * a particular host
 
550
   * @param hostNum the index of the host to increment failure count
 
551
   */
 
552
  protected synchronized void incrementFailed(int hostNum) {
 
553
    m_failedCount++;
 
554
    m_remoteHostFailureCounts[hostNum]++;
 
555
  }
 
556
 
 
557
  /**
 
558
   * Push an experiment back on the queue of waiting experiments
 
559
   * @param expNum the index of the experiment to push onto the queue
 
560
   */
 
561
  protected synchronized void waitingExperiment(int expNum) {
 
562
    m_subExpQueue.push(new Integer(expNum));
 
563
  }
 
564
 
 
565
  /**
 
566
   * Check to see if we have failed to connect to all hosts
 
567
   * 
 
568
   * @return true if failed to connect to all hosts
 
569
   */
 
570
  private boolean checkForAllFailedHosts() {
 
571
    boolean allbad = true;
 
572
    for (int i = 0; i < m_remoteHostsStatus.length; i++) {
 
573
      if (m_remoteHostsStatus[i] != CONNECTION_FAILED) {
 
574
        allbad = false;
 
575
        break;
 
576
      }
 
577
    }
 
578
    if (allbad) {
 
579
      abortExperiment();
 
580
      notifyListeners(false,true,true,"Experiment aborted! All connections "
 
581
                      +"to remote hosts failed.");
 
582
    }
 
583
    return allbad;
 
584
  }
 
585
 
 
586
  /**
 
587
   * Returns some post experiment information.
 
588
   * @return a String containing some post experiment info
 
589
   */
 
590
  private String postExperimentInfo() {
 
591
    StringBuffer text = new StringBuffer();
 
592
    text.append(m_finishedCount+(m_splitByDataSet 
 
593
                                 ? " data sets" 
 
594
                                 : " runs") + " completed successfully. "
 
595
                +m_failedCount+" failures during running.\n");
 
596
    System.err.print(text.toString());
 
597
    return text.toString();
 
598
  }
 
599
 
 
600
  /**
 
601
   * Pushes a host back onto the queue of available hosts and attempts to
 
602
   * launch a waiting experiment (if any).
 
603
   * @param hostNum the index of the host to push back onto the queue of
 
604
   * available hosts
 
605
   */
 
606
  protected synchronized void availableHost(int hostNum) {
 
607
    if (hostNum >= 0) { 
 
608
      if (m_remoteHostFailureCounts[hostNum] < MAX_FAILURES) {
 
609
        m_remoteHostsQueue.push(new Integer(hostNum));
 
610
      } else {
 
611
        notifyListeners(false,true,false,"Max failures exceeded for host "
 
612
                        +((String)m_remoteHosts.elementAt(hostNum))
 
613
                        +". Removed from host list.");
 
614
        m_removedHosts++;
 
615
      }
 
616
    }
 
617
 
 
618
    // check for all sub exp complete or all hosts failed or failed count
 
619
    // exceeded
 
620
    if (m_failedCount == (MAX_FAILURES * m_remoteHosts.size())) {
 
621
      abortExperiment();
 
622
      notifyListeners(false,true,true,"Experiment aborted! Max failures "
 
623
                      +"exceeded on all remote hosts.");
 
624
      return;
 
625
    }
 
626
 
 
627
    if ((getSplitByDataSet() && 
 
628
         (m_baseExperiment.getDatasets().size() == m_finishedCount)) ||
 
629
        (!getSplitByDataSet() && 
 
630
         ((getRunUpper() - getRunLower() + 1) == m_finishedCount))) {
 
631
      notifyListeners(false,true,false,"Experiment completed successfully.");
 
632
      notifyListeners(false,true,true,postExperimentInfo());
 
633
      return;
 
634
    }
 
635
    
 
636
    if (checkForAllFailedHosts()) {
 
637
      return;
 
638
    }
 
639
 
 
640
    if (m_experimentAborted && 
 
641
        (m_remoteHostsQueue.size() + m_removedHosts) == m_remoteHosts.size()) {
 
642
      notifyListeners(false,true,true,"Experiment aborted. All remote tasks "
 
643
                      +"finished.");
 
644
    }
 
645
        
 
646
    if (!m_subExpQueue.empty() && !m_experimentAborted) {
 
647
      if (!m_remoteHostsQueue.empty()) {
 
648
        int availHost, waitingExp;
 
649
        try {
 
650
          availHost = ((Integer)m_remoteHostsQueue.pop()).intValue();
 
651
          waitingExp = ((Integer)m_subExpQueue.pop()).intValue();
 
652
          launchNext(waitingExp, availHost);
 
653
        } catch (Exception ex) {
 
654
          ex.printStackTrace();
 
655
        }
 
656
      }
 
657
    }    
 
658
  }
 
659
 
 
660
  /**
 
661
   * Launch a sub experiment on a remote host
 
662
   * @param wexp the index of the sub experiment to launch
 
663
   * @param ah the index of the available host to launch on
 
664
   */
 
665
  public void launchNext(final int wexp, final int ah) {
 
666
    
 
667
    Thread subExpThread;
 
668
    subExpThread = new Thread() {
 
669
        public void run() {           
 
670
          m_remoteHostsStatus[ah] = IN_USE;
 
671
          m_subExpComplete[wexp] = TaskStatusInfo.PROCESSING;
 
672
          RemoteExperimentSubTask expSubTsk = new RemoteExperimentSubTask();
 
673
          expSubTsk.setExperiment(m_subExperiments[wexp]);
 
674
          String subTaskType = (getSplitByDataSet())
 
675
            ? "dataset :" + ((File)m_subExperiments[wexp].getDatasets().
 
676
                             elementAt(0)).getName()
 
677
            : "run :" + m_subExperiments[wexp].getRunLower();
 
678
          try {
 
679
            String name = "//"
 
680
              +((String)m_remoteHosts.elementAt(ah))
 
681
              +"/RemoteEngine";
 
682
            Compute comp = (Compute) Naming.lookup(name);
 
683
            // assess the status of the sub-exp
 
684
            notifyListeners(false,true,false,"Starting "
 
685
                            +subTaskType
 
686
                            +" on host "
 
687
                            +((String)m_remoteHosts.elementAt(ah)));
 
688
            Object subTaskId = comp.executeTask(expSubTsk);
 
689
            boolean finished = false;
 
690
            TaskStatusInfo is = null;
 
691
            while (!finished) {
 
692
              try {
 
693
                Thread.sleep(2000);
 
694
                
 
695
                TaskStatusInfo cs = (TaskStatusInfo)comp.
 
696
                  checkStatus(subTaskId);
 
697
                if (cs.getExecutionStatus() == TaskStatusInfo.FINISHED) {
 
698
                  // push host back onto queue and try launching any waiting 
 
699
                  // sub-experiments
 
700
                  notifyListeners(false, true, false,  cs.getStatusMessage());
 
701
                  m_remoteHostsStatus[ah] = AVAILABLE;
 
702
                  incrementFinished();
 
703
                  availableHost(ah);
 
704
                  finished = true;
 
705
                } else if (cs.getExecutionStatus() == TaskStatusInfo.FAILED) {
 
706
                  // a non connection related error---possibly host doesn't have
 
707
                  // access to data sets or security policy is not set up
 
708
                  // correctly or classifier(s) failed for some reason
 
709
                  notifyListeners(false, true, false,  cs.getStatusMessage());
 
710
                  m_remoteHostsStatus[ah] = SOME_OTHER_FAILURE;
 
711
                  m_subExpComplete[wexp] = TaskStatusInfo.FAILED;
 
712
                  notifyListeners(false,true,false,subTaskType
 
713
                                  +" "+cs.getStatusMessage()
 
714
                                  +". Scheduling for execution on another host.");
 
715
                  incrementFailed(ah);
 
716
                  // push experiment back onto queue
 
717
                  waitingExperiment(wexp);      
 
718
                  // push host back onto queue and try launching any waiting 
 
719
                  // sub-experiments. Host is pushed back on the queue as the
 
720
                  // failure may be temporary---eg. with InstantDB using the
 
721
                  // RMI bridge, two or more threads may try to create the
 
722
                  // experiment index or results table simultaneously; all but
 
723
                  // one will throw an exception. These hosts are still usable
 
724
                  // however.
 
725
                  availableHost(ah);
 
726
                  finished = true;
 
727
                } else {
 
728
                  if (is == null) {
 
729
                    is = cs;
 
730
                    notifyListeners(false, true, false, cs.getStatusMessage());
 
731
                  } else {
 
732
                    if (cs.getStatusMessage().
 
733
                        compareTo(is.getStatusMessage()) != 0) {
 
734
                     
 
735
                      notifyListeners(false, true, false,  
 
736
                                      cs.getStatusMessage());
 
737
                    }
 
738
                    is = cs;
 
739
                  }  
 
740
                }
 
741
              } catch (InterruptedException ie) {
 
742
              }
 
743
            }         
 
744
 
 
745
          } catch (Exception ce) {
 
746
            m_remoteHostsStatus[ah] = CONNECTION_FAILED;
 
747
            m_subExpComplete[wexp] = TaskStatusInfo.TO_BE_RUN;
 
748
            System.err.println(ce);
 
749
            ce.printStackTrace();
 
750
            notifyListeners(false,true,false,"Connection to "
 
751
                            +((String)m_remoteHosts.elementAt(ah))
 
752
                            +" failed. Scheduling "
 
753
                            +subTaskType
 
754
                            +" for execution on another host.");
 
755
            checkForAllFailedHosts();
 
756
            waitingExperiment(wexp);
 
757
          } finally {
 
758
            if (isInterrupted()) {
 
759
              System.err.println("Sub exp Interupted!");
 
760
            }
 
761
          }
 
762
        }          
 
763
      };
 
764
    subExpThread.setPriority(Thread.MIN_PRIORITY);
 
765
    subExpThread.start();
 
766
  }
 
767
 
 
768
  /**
 
769
   * Overides the one in Experiment
 
770
   * @throws Exception never throws an exception
 
771
   */
 
772
  public void nextIteration() throws Exception {
 
773
 
 
774
  }
 
775
 
 
776
  /** 
 
777
   * overides the one in Experiment
 
778
   */
 
779
  public void advanceCounters() {
 
780
 
 
781
  }
 
782
 
 
783
  /** 
 
784
   * overides the one in Experiment
 
785
   */
 
786
  public void postProcess() {
 
787
   
 
788
  }
 
789
 
 
790
  /**
 
791
   * Add a host name to the list of remote hosts
 
792
   * @param hostname the host name to add to the list
 
793
   */
 
794
  public void addRemoteHost(String hostname) {
 
795
    m_remoteHosts.addElement(hostname);
 
796
  }
 
797
 
 
798
  /**
 
799
   * Get the list of remote host names
 
800
   * @return the list of remote host names
 
801
   */
 
802
  public DefaultListModel getRemoteHosts() {
 
803
    return m_remoteHosts;
 
804
  }
 
805
 
 
806
  /**
 
807
   * Set the list of remote host names
 
808
   * @param list the list of remote host names
 
809
   */
 
810
  public void setRemoteHosts(DefaultListModel list) {
 
811
    m_remoteHosts = list;
 
812
  }
 
813
 
 
814
  /**
 
815
   * Overides toString in Experiment
 
816
   * @return a description of this remote experiment
 
817
   */
 
818
  public String toString() {
 
819
    String result = m_baseExperiment.toString();
 
820
 
 
821
    result += "\nRemote Hosts:\n";
 
822
    for (int i=0;i<m_remoteHosts.size();i++) {
 
823
      result += ((String)m_remoteHosts.elementAt(i)) +'\n';
 
824
    }
 
825
    return result;
 
826
  }
 
827
 
 
828
  /**
 
829
   * Overides runExperiment in Experiment
 
830
   */
 
831
  public void runExperiment() {
 
832
    int totalHosts = m_remoteHostsQueue.size();
 
833
    // Try to launch sub experiments on all available hosts
 
834
    for (int i = 0; i < totalHosts; i++) {
 
835
      availableHost(-1);
 
836
    }
 
837
  }
 
838
 
 
839
  /**
 
840
   * Configures/Runs the Experiment from the command line.
 
841
   *
 
842
   * @param args command line arguments to the Experiment.
 
843
   */
 
844
  public static void main(String[] args) {
 
845
 
 
846
    try {
 
847
      RemoteExperiment exp = null;
 
848
 
 
849
      // get options from XML?
 
850
      String xmlOption = Utils.getOption("xml", args);
 
851
      if (!xmlOption.equals(""))
 
852
         args = new XMLOptions(xmlOption).toArray();
 
853
      
 
854
      Experiment base = null;
 
855
      String expFile = Utils.getOption('l', args);
 
856
      String saveFile = Utils.getOption('s', args);
 
857
      boolean runExp = Utils.getFlag('r', args);
 
858
      FastVector remoteHosts = new FastVector();
 
859
      String runHost = " ";
 
860
      while (runHost.length() != 0) {
 
861
        runHost = Utils.getOption('h', args);
 
862
        if (runHost.length() != 0) {
 
863
          remoteHosts.addElement(runHost);
 
864
        }
 
865
      }
 
866
      if (expFile.length() == 0) {
 
867
        base = new Experiment();
 
868
        try {
 
869
          base.setOptions(args);
 
870
          Utils.checkForRemainingOptions(args);
 
871
        } catch (Exception ex) {
 
872
          ex.printStackTrace();
 
873
          String result = "Usage:\n\n"
 
874
            + "-l <exp file>\n"
 
875
            + "\tLoad experiment from file (default use cli options)\n"
 
876
            + "-s <exp file>\n"
 
877
            + "\tSave experiment to file after setting other options\n"
 
878
            + "\t(default don't save)\n"
 
879
            + "-h <remote host name>\n"
 
880
            + "\tHost to run experiment on (may be specified more than once\n"
 
881
            + "\tfor multiple remote hosts)\n"
 
882
            + "-r \n"
 
883
            + "\tRun experiment on (default don't run)\n"
 
884
       + "-xml <filename | xml-string>\n"
 
885
       + "\tget options from XML-Data instead from parameters\n"
 
886
       + "\n";
 
887
          Enumeration enm = ((OptionHandler)base).listOptions();
 
888
          while (enm.hasMoreElements()) {
 
889
            Option option = (Option) enm.nextElement();
 
890
            result += option.synopsis() + "\n";
 
891
            result += option.description() + "\n";
 
892
          }
 
893
          throw new Exception(result + "\n" + ex.getMessage());
 
894
        }
 
895
      } else {
 
896
         Object tmp;
 
897
         
 
898
         // KOML?
 
899
         if ( (KOML.isPresent()) && (expFile.toLowerCase().endsWith(KOML.FILE_EXTENSION)) ) {
 
900
            tmp = KOML.read(expFile);
 
901
         }
 
902
         else
 
903
         // XML?
 
904
         if (expFile.toLowerCase().endsWith(".xml")) {
 
905
            XMLExperiment xml = new XMLExperiment(); 
 
906
            tmp = xml.read(expFile);
 
907
         }
 
908
         // binary
 
909
         else {
 
910
            FileInputStream fi = new FileInputStream(expFile);
 
911
            ObjectInputStream oi = new ObjectInputStream(
 
912
                                   new BufferedInputStream(fi));
 
913
            tmp = oi.readObject();
 
914
            oi.close();
 
915
         }
 
916
        if (tmp instanceof RemoteExperiment) {
 
917
          exp = (RemoteExperiment)tmp;
 
918
        } else {
 
919
          base = (Experiment)tmp;
 
920
        }
 
921
      }
 
922
      if (base != null) {
 
923
        exp = new RemoteExperiment(base);
 
924
      }
 
925
      for (int i=0;i<remoteHosts.size();i++) {
 
926
        exp.addRemoteHost((String)remoteHosts.elementAt(i));
 
927
      }
 
928
      System.err.println("Experiment:\n" + exp.toString());
 
929
 
 
930
      if (saveFile.length() != 0) {
 
931
         // KOML?
 
932
         if ( (KOML.isPresent()) && (saveFile.toLowerCase().endsWith(KOML.FILE_EXTENSION)) ) {
 
933
            KOML.write(saveFile, exp);
 
934
         }
 
935
         else
 
936
         // XML?
 
937
         if (saveFile.toLowerCase().endsWith(".xml")) {
 
938
            XMLExperiment xml = new XMLExperiment(); 
 
939
            xml.write(saveFile, exp);
 
940
         }
 
941
         // binary
 
942
         else {
 
943
            FileOutputStream fo = new FileOutputStream(saveFile);
 
944
            ObjectOutputStream oo = new ObjectOutputStream(
 
945
                                    new BufferedOutputStream(fo));
 
946
            oo.writeObject(exp);
 
947
            oo.close();
 
948
         }
 
949
      }
 
950
      
 
951
      if (runExp) {
 
952
        System.err.println("Initializing...");
 
953
        exp.initialize();
 
954
        System.err.println("Iterating...");
 
955
        exp.runExperiment();
 
956
        System.err.println("Postprocessing...");
 
957
        exp.postProcess();
 
958
      }      
 
959
    } catch (Exception ex) {
 
960
      ex.printStackTrace();
 
961
      System.err.println(ex.getMessage());
 
962
    }
 
963
  }
 
964
}