~smaioli/azureus/ubuntu-experimental

« back to all changes in this revision

Viewing changes to com/aelitis/azureus/vivaldi/ver2/VivaldiV2PositionProvider.java

MergedĀ VuzeĀ 4.2.0.2.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
/*
 
2
 * Created on 24-Apr-2006 Created by Paul Gardner Copyright (C) 2006 Aelitis,
 
3
 * All Rights Reserved.
 
4
 * 
 
5
 * This program is free software; you can redistribute it and/or modify it under
 
6
 * the terms of the GNU General Public License as published by the Free Software
 
7
 * Foundation; either version 2 of the License, or (at your option) any later
 
8
 * version. This program is distributed in the hope that it will be useful, but
 
9
 * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 
10
 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
 
11
 * details. You should have received a copy of the GNU General Public License
 
12
 * along with this program; if not, write to the Free Software Foundation, Inc.,
 
13
 * 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
 
14
 * 
 
15
 * AELITIS, SAS au capital de 46,603.30 euros 8 Allee Lenotre, La Grille Royale,
 
16
 * 78600 Le Mesnil le Roi, France.
 
17
 * 
 
18
 */
 
19
 
 
20
package com.aelitis.azureus.vivaldi.ver2;
 
21
 
 
22
import java.io.DataInputStream;
 
23
import java.io.DataOutputStream;
 
24
import java.io.IOException;
 
25
import java.util.Iterator;
 
26
import java.util.LinkedList;
 
27
import java.util.SortedSet;
 
28
import java.util.TreeSet;
 
29
 
 
30
import org.gudy.azureus2.core3.util.Timer;
 
31
import org.gudy.azureus2.core3.util.TimerEvent;
 
32
import org.gudy.azureus2.core3.util.TimerEventPerformer;
 
33
 
 
34
import com.aelitis.azureus.core.dht.netcoords.DHTNetworkPosition;
 
35
import com.aelitis.azureus.core.dht.netcoords.DHTNetworkPositionManager;
 
36
import com.aelitis.azureus.core.dht.netcoords.DHTNetworkPositionProvider;
 
37
import com.aelitis.azureus.core.dht.netcoords.DHTNetworkPositionProviderInstance;
 
38
import com.aelitis.azureus.core.dht.router.DHTRouter;
 
39
import com.aelitis.azureus.core.dht.router.DHTRouterContact;
 
40
import com.aelitis.azureus.core.dht.router.DHTRouterFactory;
 
41
import com.aelitis.azureus.core.dht.router.DHTRouterFactoryObserver;
 
42
import com.aelitis.azureus.core.dht.router.DHTRouterObserver;
 
43
import com.aelitis.azureus.vivaldi.ver2.stats.SerializationController;
 
44
import com.aelitis.azureus.vivaldi.ver2.stats.V1Serializer;
 
45
 
 
46
import edu.harvard.syrah.nc.Coordinate;
 
47
import edu.harvard.syrah.nc.VivaldiClient;
 
48
 
 
49
public class VivaldiV2PositionProvider implements DHTNetworkPositionProvider,
 
50
                DHTRouterFactoryObserver, DHTRouterObserver {
 
51
        public static final int NUM_DIMS = 5;
 
52
        public static final int TRANSIENT_TIME = 45 * 1000;             // in milliseconds
 
53
  public static final long MIN_NC_UPDATE_INTERVAL = 10*1000;
 
54
  
 
55
        private static final boolean LOGGING_ENABLED = false;
 
56
 
 
57
        private static boolean initialised = false;
 
58
        private static volatile DHTNetworkPositionProviderInstance provider = null;
 
59
 
 
60
        private final VivaldiClient<IDWrapper> vc;
 
61
        private final InitialPosition ip;
 
62
        
 
63
        protected final SortedSet<IDWrapper> transient_ids;
 
64
        protected final LinkedList<TransientTuple> transient_list;
 
65
        
 
66
        protected final SerializationController serializer;
 
67
        protected final SortedSet<IDWrapper> router_entries;
 
68
  protected DHTRouter router = null;
 
69
  protected long last_nc_update = 0;
 
70
  
 
71
  private boolean       started_up = false;
 
72
  
 
73
        public static synchronized void initialise() {
 
74
                if (!initialised) {
 
75
 
 
76
                        initialised = true;
 
77
 
 
78
                        provider = DHTNetworkPositionManager
 
79
                                        .registerProvider(new VivaldiV2PositionProvider());
 
80
 
 
81
                        doLog("Vivaldi V2 position provider created");
 
82
                }
 
83
        }
 
84
 
 
85
        protected VivaldiV2PositionProvider() {
 
86
                DHTRouterFactory.addObserver(this);
 
87
 
 
88
                vc = new VivaldiClient<IDWrapper>(NUM_DIMS);
 
89
                ip = new InitialPosition();
 
90
                
 
91
                transient_ids = new TreeSet<IDWrapper>();
 
92
                transient_list = new LinkedList<TransientTuple>();
 
93
                
 
94
                serializer = new SerializationController();
 
95
                router_entries = new TreeSet<IDWrapper>();
 
96
    
 
97
 
 
98
    Timer timer = new Timer("VivaldiV2PositionProvider:ping");
 
99
    
 
100
    timer.addPeriodicEvent(
 
101
      MIN_NC_UPDATE_INTERVAL,
 
102
      new TimerEventPerformer()
 
103
      {
 
104
        public void
 
105
        perform(
 
106
          TimerEvent  event )
 
107
        {
 
108
          ping();
 
109
        
 
110
        }
 
111
      });
 
112
 
 
113
        }
 
114
 
 
115
  protected void resetPingClock (long curr_time) {
 
116
    last_nc_update = curr_time;
 
117
  }
 
118
  
 
119
  protected void ping () {
 
120
    long curr_time = System.currentTimeMillis();
 
121
    if (curr_time > last_nc_update + MIN_NC_UPDATE_INTERVAL) {
 
122
        //System.out.println ("starting v2position ping");
 
123
      if (router != null) {
 
124
        IDWrapper id = vc.getNeighborToPing(curr_time);
 
125
        if (id != null) {
 
126
          byte[] raw_id = id.getRawId();
 
127
          router.requestPing(raw_id);
 
128
          //System.out.println ("pinging "+id);
 
129
        } else {
 
130
            //System.out.println ("vc neighbor to ping is null");
 
131
        }
 
132
      } else {
 
133
          //System.out.println ("router is null");  
 
134
      }
 
135
    } else {
 
136
        //System.out.println ("supressing v2position ping");
 
137
    }
 
138
  }
 
139
    
 
140
        public byte getPositionType() {
 
141
                return (DHTNetworkPosition.POSITION_TYPE_VIVALDI_V2 );
 
142
        }
 
143
 
 
144
        public DHTNetworkPosition create(byte[] ID, boolean is_local) {
 
145
                if (is_local) {
 
146
                        doLog("Returning position for local peer");
 
147
                        
 
148
                        return new LocalPosition(this);
 
149
                }
 
150
 
 
151
                purgeTransient(System.currentTimeMillis());
 
152
 
 
153
                // return the shared initial position of a remote peer
 
154
                return ip;
 
155
        }
 
156
 
 
157
        public DHTNetworkPosition deserialisePosition(DataInputStream is)
 
158
                        throws IOException {
 
159
                return new RemotePosition(is);
 
160
        }
 
161
 
 
162
        public void
 
163
        serialiseStats(
 
164
                DataOutputStream        os )
 
165
        
 
166
                throws IOException
 
167
        {
 
168
                if (!serializer.contains(V1Serializer.VER_01)) {
 
169
                        serializer.addSerializer(V1Serializer.getInstance());
 
170
                }
 
171
                serializer.toSerialized(V1Serializer.VER_01, os, vc);
 
172
        }
 
173
        
 
174
        public void routerCreated(DHTRouter _router) {
 
175
                doLog("Vivaldi notified of created router");
 
176
                
 
177
                _router.addObserver(this);
 
178
    router = _router;
 
179
        }
 
180
 
 
181
        public synchronized void added(DHTRouterContact contact) {
 
182
                purgeTransient(System.currentTimeMillis());
 
183
    
 
184
                // add to set of router entries
 
185
                IDWrapper id = new IDWrapper(contact.getID());
 
186
                if (!router_entries.contains(id)) {
 
187
                        router_entries.add(id);
 
188
                        if ( LOGGING_ENABLED ){
 
189
                                doLog("added router entry " + id + " " + getStats());
 
190
                        }
 
191
                }
 
192
                
 
193
                if (transient_ids.remove(id)) {
 
194
                        // was a transient entry, get its latest position and sample
 
195
                        TransientTuple tt = null;
 
196
                        for (Iterator<TransientTuple> i = transient_list.iterator(); i.hasNext(); ) {
 
197
                                tt = i.next();
 
198
                                if (tt.id.equals(id)) {
 
199
                                        i.remove();
 
200
                                        break;
 
201
                                }
 
202
                        }
 
203
                        
 
204
                        // add state to the vivaldi client
 
205
                        long curr_time = System.currentTimeMillis();
 
206
                        // add the time this guy has been sitting as a TransientTuple to its age
 
207
                        long tt_age = curr_time - tt.create_time + tt.last_pos.getAge();
 
208
                        if (vc.processSample(id, tt.last_pos.getCoords(), tt.last_pos.getError(), tt.last_rtt, tt_age,
 
209
                                        curr_time, true)) {
 
210
                          resetPingClock(curr_time);
 
211
      }
 
212
 
 
213
                        if ( LOGGING_ENABLED ){
 
214
                                doLog(id + "added to router, promoting transient to host " + getStats());
 
215
                        }
 
216
                }
 
217
                else if (vc.getHosts().contains(id)) {
 
218
                        if ( LOGGING_ENABLED ){
 
219
                                doLog(id + " added to router, but was already found as a host");
 
220
                        }
 
221
                }
 
222
                else {
 
223
                        if ( LOGGING_ENABLED ){
 
224
                                doLog(id + " added to router, but was not found as a host or transient");
 
225
                        }
 
226
                }
 
227
        }
 
228
 
 
229
        public synchronized void removed(DHTRouterContact contact) {
 
230
                purgeTransient(System.currentTimeMillis());
 
231
 
 
232
                // remove from router entries
 
233
                IDWrapper id = new IDWrapper(contact.getID());
 
234
                if (router_entries.remove(id)) {
 
235
                        if ( LOGGING_ENABLED ){
 
236
                                doLog("removed router entry " + id + " " + getStats());
 
237
                        }
 
238
                }
 
239
                
 
240
                if (vc.removeHost(id)) {
 
241
                        if ( LOGGING_ENABLED ){
 
242
                                doLog(id + " removed from router, removed as host " + getStats());
 
243
                        }
 
244
                }
 
245
                else if (transient_ids.remove(id)) {
 
246
                        // contact identifier is present in list, so find and remove it
 
247
                        for (Iterator<TransientTuple> i = transient_list.iterator(); i.hasNext(); ) {
 
248
                                TransientTuple tt = i.next();
 
249
                                if (tt.id.equals(id)) {
 
250
                                        i.remove();
 
251
                                        break;
 
252
                                }
 
253
                        }
 
254
        
 
255
                        if ( LOGGING_ENABLED ){
 
256
                                doLog(id + " removed from router, removed as transient " + getStats());
 
257
                        }
 
258
                }
 
259
                else {
 
260
                        if ( LOGGING_ENABLED ){
 
261
                                doLog(id + " removed from router, but was not found as a host or transient");
 
262
                        }
 
263
                }
 
264
        }
 
265
 
 
266
        public void locationChanged(DHTRouterContact contact) {
 
267
                purgeTransient(System.currentTimeMillis());
 
268
                
 
269
                // only remove from VivaldiClient when removed from routing table
 
270
        }
 
271
 
 
272
        public void nowAlive(DHTRouterContact contact) {
 
273
                purgeTransient(System.currentTimeMillis());
 
274
                
 
275
                // only add to VivaldiClient when coordinates are updated
 
276
        }
 
277
 
 
278
        public void nowFailing(DHTRouterContact contact) {
 
279
                purgeTransient(System.currentTimeMillis());
 
280
                
 
281
                // only remove from VivaldiClient when removed from routing table
 
282
        }
 
283
 
 
284
        public void destroyed(DHTRouter router) {
 
285
                doLog("Vivaldi notified of destroyed router");
 
286
                
 
287
                router.removeObserver(this);
 
288
                
 
289
                vc.reset();
 
290
                router_entries.clear();
 
291
 
 
292
                transient_ids.clear();
 
293
                transient_list.clear();
 
294
        }
 
295
        
 
296
        public DHTNetworkPosition
 
297
        getLocalPosition()
 
298
        {
 
299
                if ( started_up ){
 
300
                        
 
301
                        return( new StableLocalPosition( this ));
 
302
                }
 
303
                
 
304
                        // we only have a stable local position if we've been started up - i.e things are running
 
305
                
 
306
                return( null );
 
307
        }
 
308
        
 
309
        /*
 
310
         * Pass-thru methods.
 
311
         */
 
312
        
 
313
        protected Coordinate getCoords() {
 
314
                return vc.getSystemCoords();
 
315
        }
 
316
 
 
317
        protected Coordinate getStableCoords() {
 
318
                return vc.getApplicationCoords();
 
319
        }
 
320
        
 
321
  protected float getError() {
 
322
    return (float) vc.getSystemError();
 
323
  }
 
324
 
 
325
  protected long getAge() {
 
326
    long age = vc.getAge(System.currentTimeMillis());
 
327
    //System.out.println("age "+age);
 
328
    return age;
 
329
  }
 
330
  
 
331
        protected InitialPosition getInitialPosition() {
 
332
                return ip;
 
333
        }
 
334
        
 
335
        protected VivaldiClient<IDWrapper> getVivaldiClient() {
 
336
                return vc;
 
337
        }
 
338
        
 
339
        /*
 
340
         * Methods for maintaining the potential adds to the VivaldiClient.
 
341
         */
 
342
        
 
343
        protected synchronized void update(LocalPosition local_pos, IDWrapper id, SyrahPosition sp, float sample_rtt) {
 
344
                if (sp == local_pos) {
 
345
                        doLog("update invoked on LocalPosition with itself, ID = " + id);
 
346
                        return;
 
347
                }
 
348
                else if (sp == ip) {
 
349
                        doLog("update invoked with the InitialPosition singleton, ID = " + id);
 
350
                        return;
 
351
                }
 
352
                
 
353
                long curr_time = System.currentTimeMillis();
 
354
                if (vc.getHosts().contains(id)) {
 
355
                        // already maintain state for this peer, so process sample now
 
356
                        if (vc.processSample(id, sp.getCoords(), sp.getError(), sample_rtt, sp.getAge(), curr_time, false)) {
 
357
        resetPingClock(curr_time);
 
358
      }
 
359
                        
 
360
                        doLog("update called on host, ID = " + id);
 
361
                        return;
 
362
                }
 
363
                else if (router_entries.contains(id)) {
 
364
                        if (transient_ids.remove(id)) {
 
365
                                // contact identifier is present in list, so find and remove it
 
366
                                for (Iterator<TransientTuple> i = transient_list.iterator(); i.hasNext(); ) {
 
367
                                        TransientTuple tt = i.next();
 
368
                                        if (tt.id.equals(id)) {
 
369
                                                i.remove();
 
370
                                                break;
 
371
                                        }
 
372
                                }
 
373
                        }
 
374
                        
 
375
                        // peer is already in the router, so add and process sample now
 
376
                        if (vc.processSample(id, sp.getCoords(), sp.getError(), sample_rtt, sp.getAge(), curr_time, true)) {
 
377
                          resetPingClock (curr_time);
 
378
      }
 
379
                        
 
380
                        doLog("update called on host already in router, ID = " + id);
 
381
                        return;
 
382
                }
 
383
                
 
384
                final long new_remove_time = curr_time + TRANSIENT_TIME;
 
385
                if (!transient_ids.contains(id)) {
 
386
                        // identifier is not in list yet, so add it and return
 
387
                        transient_ids.add(id);
 
388
                        transient_list.addLast(new TransientTuple(new_remove_time, id, sp, sample_rtt, curr_time));
 
389
                        doLog("added transient " + id + " " + getStats());
 
390
                        return;
 
391
                }
 
392
                
 
393
                doLog("updating transient " + id + " " + getStats());
 
394
                
 
395
                // find the entry belonging to the identifier in the list
 
396
                TransientTuple tt = null;
 
397
                for (Iterator<TransientTuple> i = transient_list.iterator(); i.hasNext(); ) {
 
398
                        tt = i.next();
 
399
                        if (tt.id.equals(id)) {
 
400
                                // once found, remove it
 
401
                                i.remove();
 
402
                                break;
 
403
                        }
 
404
                }
 
405
                
 
406
                // update the entry and add it to the end
 
407
                tt.remove_time = new_remove_time;
 
408
                tt.last_pos = sp;
 
409
                tt.last_rtt = sample_rtt;
 
410
                transient_list.addLast(tt);
 
411
        }
 
412
 
 
413
        protected synchronized void purgeTransient(long curr_time) {
 
414
                for (Iterator<TransientTuple> i = transient_list.iterator(); i.hasNext(); ) {
 
415
                        TransientTuple tt = i.next();
 
416
                        if (tt.remove_time > curr_time) {
 
417
                                // entry expires in the future, so end now
 
418
                                return;
 
419
                        }
 
420
                        
 
421
                        i.remove();
 
422
                        transient_ids.remove(tt.id);
 
423
                        doLog("removed transient " + tt.id + " " + getStats());
 
424
                }
 
425
        }
 
426
        
 
427
        protected String getStats() {
 
428
                return "[h:" + vc.getHosts().size() + ", t:" + transient_list.size() + " re:" + router_entries.size() + "]";
 
429
        }
 
430
        
 
431
        protected static void doLog(String str) {
 
432
    //System.out.println (str);
 
433
                if (LOGGING_ENABLED && (provider != null)) {
 
434
                        provider.log(str);
 
435
                }
 
436
        }
 
437
        
 
438
        public void
 
439
        startUp(
 
440
                DataInputStream         is ) //throws IOException
 
441
        {
 
442
                started_up      = true;
 
443
                
 
444
                try{
 
445
                        vc.startUp (is);
 
446
                        
 
447
                }catch( IOException e ){
 
448
                        
 
449
                        doLog("startUp failed:" + e.toString());
 
450
                }
 
451
        }
 
452
        
 
453
        public void
 
454
        shutDown(
 
455
                DataOutputStream        os )// throws IOException
 
456
        {       
 
457
                try{
 
458
                        vc.shutDown (os);
 
459
                        
 
460
                }catch( IOException e ){
 
461
                        
 
462
                        doLog("shutDown failed:" + e.toString());
 
463
                }
 
464
        }
 
465
}