~smaioli/azureus/ubuntu-experimental

« back to all changes in this revision

Viewing changes to com/aelitis/azureus/core/messenger/PlatformMessenger.java

MergedĀ VuzeĀ 4.2.0.2.

Show diffs side-by-side

added added

removed removed

Lines of Context:
27
27
import java.text.NumberFormat;
28
28
import java.util.*;
29
29
 
 
30
import org.gudy.azureus2.core3.config.COConfigurationManager;
30
31
import org.gudy.azureus2.core3.util.*;
31
32
import org.gudy.azureus2.core3.util.Timer;
32
33
import org.json.simple.JSONArray;
34
35
 
35
36
import com.aelitis.azureus.core.AzureusCore;
36
37
import com.aelitis.azureus.core.AzureusCoreFactory;
 
38
import com.aelitis.azureus.core.cnetwork.ContentNetwork;
 
39
import com.aelitis.azureus.core.cnetwork.ContentNetworkManagerFactory;
37
40
import com.aelitis.azureus.core.messenger.browser.BrowserMessage;
38
41
import com.aelitis.azureus.core.messenger.browser.BrowserMessageDispatcher;
39
42
import com.aelitis.azureus.core.messenger.browser.listeners.MessageCompletionListener;
40
43
import com.aelitis.azureus.core.messenger.config.PlatformRelayMessenger;
41
 
import com.aelitis.azureus.util.ConstantsV3;
42
 
import com.aelitis.azureus.util.JSONUtils;
43
 
import com.aelitis.azureus.util.MapUtils;
 
44
import com.aelitis.azureus.util.*;
44
45
 
45
46
import org.gudy.azureus2.plugins.PluginInterface;
46
47
import org.gudy.azureus2.plugins.utils.resourcedownloader.ResourceDownloader;
54
55
 */
55
56
public class PlatformMessenger
56
57
{
57
 
        private static final boolean DEBUG_URL = System.getProperty("platform.messenger.debug.url", "0").equals("1");
 
58
        private static final boolean DEBUG_URL = System.getProperty(
 
59
                        "platform.messenger.debug.url", "0").equals("1");
 
60
 
 
61
        private static final String URL_PLATFORM_MESSAGE = "?service=rpc";
 
62
 
 
63
        private static final String URL_POST_PLATFORM_DATA = "service=rpc";
58
64
 
59
65
        private static final int MAX_POST_LENGTH = 1024 * 512 * 3; // 1.5M
60
66
 
66
72
 
67
73
        public static String REPLY_RESULT = "response";
68
74
 
69
 
        static private Map mapQueueAuthorized = new LinkedHashMap();
70
 
 
71
 
        static private Map mapQueueNoAZID = new LinkedHashMap();
72
 
 
73
 
        static private Map mapQueue = new LinkedHashMap();
 
75
        /** Key: id of queue;  Value: Map of queued messages & listeners */
 
76
        static private Map<String, Map> mapQueues = new HashMap();
 
77
 
 
78
        private static final String QUEUE_AUTH = "Auth.";
 
79
 
 
80
        private static final String QUEUE_NOAZID = "noazid.";
 
81
 
 
82
        private static final String QUEUE_NORMAL = "msg.";
 
83
 
 
84
        private static final String QUEUE_RELAY = "relay.";
74
85
 
75
86
        static private AEMonitor queue_mon = new AEMonitor(
76
87
                        "v3.PlatformMessenger.queue");
77
88
 
78
89
        static private Timer timerProcess = new Timer("v3.PlatformMessenger.queue");
79
90
 
80
 
        static private TimerEvent timerEvent = null;
 
91
        //static private TimerEvent timerEvent = null;
 
92
 
 
93
        static private Map<String, TimerEvent> mapTimerEvents = new HashMap<String, TimerEvent>();
 
94
        
 
95
        static private AEMonitor mon_mapTimerEvents = new AEMonitor("mapTimerEvents");
81
96
 
82
97
        private static boolean initialized;
83
98
 
86
101
        private static PlatformAuthorizedSender authorizedSender;
87
102
 
88
103
        private static boolean authorizedDelayed;
89
 
 
 
104
        
90
105
        public static synchronized void init() {
91
106
                if (initialized) {
92
107
                        return;
103
118
                PlatformMessenger.authorizedSender = authorizedSender;
104
119
        }
105
120
 
 
121
        public static PlatformAuthorizedSender getAuthorizedTransferListener() {
 
122
                return PlatformMessenger.authorizedSender;
 
123
        }
 
124
 
106
125
        public static ClientMessageContext getClientMessageContext() {
107
126
                if (!initialized) {
108
127
                        init();
117
136
 
118
137
        public static void queueMessage(PlatformMessage message,
119
138
                        PlatformMessengerListener listener, boolean addToBottom) {
 
139
                
 
140
                if ( COConfigurationManager.getStringParameter( "ui", "az3" ).equals( "az2" )){
 
141
                        
 
142
                        Debug.out( "**** PlatformMessenger shouldn't be used with az2 UI ****" );
 
143
                }
 
144
                
120
145
                if (!initialized) {
121
146
                        init();
122
147
                }
123
148
 
124
 
                if (message != null) {
125
 
                        debug("q " + message.toShortString() + ": " + message + " for "
126
 
                                        + new Date(message.getFireBefore()) + "; in "
127
 
                                        + (message.getFireBefore() - SystemTime.getCurrentTime()) + "ms");
128
 
                        if (message.requiresAuthorization() && authorizedDelayed) {
129
 
                                debug("   authorized msg is delayed");
130
 
                        }
131
 
                } else {
 
149
                if (message == null) {
132
150
                        debug("fire timerevent");
133
151
                }
134
152
                queue_mon.enter();
135
153
                try {
136
154
                        long fireBefore;
 
155
                        String queueID = null;
137
156
                        if (message != null) {
138
 
                        if (message.requiresAuthorization()) {
139
 
                                mapQueueAuthorized.put(message, listener);
140
 
                        } else if (message.sendAZID()) {
141
 
                                mapQueue.put(message, listener);
142
 
                        } else {
143
 
                                mapQueueNoAZID.put(message, listener);
144
 
                        }
145
 
                        fireBefore = message.getFireBefore();
 
157
                                long networkID = message.getContentNetworkID();
 
158
                                if (networkID <= 0) {
 
159
                                        debug("Content Network invalid for " + message);
 
160
                                        return;
 
161
                                }
 
162
                                if (message.requiresAuthorization()) {
 
163
                                        queueID = QUEUE_AUTH;
 
164
                                } else if (!message.sendAZID()) {
 
165
                                        queueID = QUEUE_NOAZID;
 
166
                                } else {
 
167
                                        boolean isRelayServer = PlatformRelayMessenger.LISTENER_ID.equals(message.getListenerID());
 
168
                                        if (isRelayServer) {
 
169
                                                queueID = QUEUE_RELAY;
 
170
                                        } else {
 
171
                                                queueID = QUEUE_NORMAL;
 
172
                                        }
 
173
                                }
 
174
                                queueID += networkID;
 
175
                                
 
176
                                Map<PlatformMessage, PlatformMessengerListener> mapQueue = (Map) mapQueues.get(queueID);
 
177
                                if (mapQueue == null) {
 
178
                                        mapQueue = new LinkedHashMap<PlatformMessage, PlatformMessengerListener>();
 
179
                                        mapQueues.put(queueID, mapQueue);
 
180
                                }
 
181
                                mapQueue.put(message, listener);
 
182
 
 
183
                                debug("q " + queueID + "(" + mapQueue.size() + ") "
 
184
                                                + message.toShortString() + ": " + message + " @ "
 
185
                                                + new Date(message.getFireBefore()) + "; in "
 
186
                                                + (message.getFireBefore() - SystemTime.getCurrentTime()) + "ms");
 
187
                                if (message.requiresAuthorization() && authorizedDelayed) {
 
188
                                        debug("   authorized msg is delayed");
 
189
                                }
 
190
 
 
191
                                fireBefore = message.getFireBefore();
146
192
                        } else {
147
193
                                fireBefore = SystemTime.getCurrentTime();
148
194
                        }
149
195
 
150
 
                        if (timerEvent == null || timerEvent.hasRun()) {
151
 
                                timerEvent = timerProcess.addEvent(fireBefore,
152
 
                                                new TimerEventPerformer() {
153
 
                                                        public void perform(TimerEvent event) {
154
 
                                                                timerEvent = null;
155
 
                                                                while (mapQueue.size() > 0) {
156
 
                                                                        processQueue(mapQueue, false, true);
157
 
                                                                }
158
 
                                                                if (!authorizedDelayed) {
159
 
                                                                while (mapQueueAuthorized.size() > 0) {
160
 
                                                                        processQueue(mapQueueAuthorized, true, true);
161
 
                                                                }
162
 
                                                                }
163
 
                                                                while (mapQueueNoAZID.size() > 0) {
164
 
                                                                        processQueue(mapQueueNoAZID, false, false);
165
 
                                                                }
166
 
                                                        }
167
 
                                                });
168
 
                        } else {
169
 
                                // Move the time up if we have to
 
196
                        if (queueID != null) {
170
197
                                try {
171
 
                                if (fireBefore < timerEvent.getWhen()) {
172
 
                                        timerProcess.adjustAllBy(fireBefore - timerEvent.getWhen());
 
198
                                        mon_mapTimerEvents.enter();
 
199
 
 
200
                                TimerEvent timerEvent = mapTimerEvents.get(queueID);
 
201
                                
 
202
                        if (timerEvent == null || timerEvent.hasRun() || fireBefore < timerEvent.getWhen()) {
 
203
                                if (timerEvent != null) {
 
204
                                                mapTimerEvents.remove(timerEvent);
 
205
                                        timerEvent.cancel();
 
206
                                }
 
207
                                
 
208
                                final String fQueueID = queueID;
 
209
                                timerEvent = timerProcess.addEvent(fireBefore,
 
210
                                                new TimerEventPerformer() {
 
211
                                                        public void perform(TimerEvent event) {
 
212
                                                                try {
 
213
                                                                        mon_mapTimerEvents.enter();
 
214
                                                                        
 
215
                                                                        mapTimerEvents.remove(event);
 
216
                                                                } finally {
 
217
                                                                                mon_mapTimerEvents.exit();
 
218
                                                                }
 
219
 
 
220
                                                                        Map mapQueue = mapQueues.get(fQueueID);
 
221
                                                                        while (mapQueue != null && mapQueue.size() > 0) {
 
222
                                                                                processQueue(fQueueID, mapQueue);
 
223
                                                                        }
 
224
                                                                /*
 
225
                                                                Object[] keys = mapQueues.keySet().toArray();
 
226
                                                                for (int i = 0; i < keys.length; i++) {
 
227
                                                                        Map mapQueue = mapQueues.get(keys[i]);
 
228
                                                                        while (mapQueue != null && mapQueue.size() > 0) {
 
229
                                                                                processQueue(mapQueue);
 
230
                                                                        }
 
231
                                                                }
 
232
                                                                */
 
233
                                                        }
 
234
                                                });
 
235
                                mapTimerEvents.put(queueID, timerEvent);
 
236
                        }
 
237
                                if (timerEvent != null) {
 
238
                                debug(" next q process for  " + queueID + " in " + (timerEvent.getWhen() - SystemTime.getCurrentTime()));
173
239
                                }
174
 
                                } catch (Exception e) {
175
 
                                        
 
240
                                } finally {
 
241
                                        mon_mapTimerEvents.exit();                                      
176
242
                                }
177
243
                        }
178
244
                } finally {
186
252
        public static void debug(String string) {
187
253
                AEDiagnosticsLogger diag_logger = AEDiagnostics.getLogger("v3.PMsgr");
188
254
                diag_logger.log(string);
189
 
                if (ConstantsV3.DIAG_TO_STDOUT) {
 
255
                if (ConstantsVuze.DIAG_TO_STDOUT) {
190
256
                        System.out.println(Thread.currentThread().getName() + "|"
191
257
                                        + System.currentTimeMillis() + "] " + string);
192
258
                }
193
259
        }
194
260
 
195
261
        protected static void debug(String string, Throwable e) {
196
 
                debug(string + "\n\t" + Debug.getCompressedStackTrace(e, 1, 80));
 
262
                debug(string + "\n\t" + e.getClass().getName() + ": " + e.getMessage()
 
263
                                + ", " + Debug.getCompressedStackTrace(e, 1, 80));
197
264
        }
198
265
 
199
 
        
200
266
        /**
201
267
         * Sends the message almost immediately, skipping delayauthorization check 
202
268
         * @param message
204
270
         *
205
271
         * @since 3.0.5.3
206
272
         */
207
 
        public static void pushMessageNow(PlatformMessage message, PlatformMessengerListener listener) {
 
273
        public static void pushMessageNow(PlatformMessage message,
 
274
                        PlatformMessengerListener listener) {
208
275
                debug("push " + message.toShortString() + ": " + message);
209
276
 
210
277
                Map map = new HashMap(1);
211
278
                map.put(message, listener);
212
 
                processQueue(map, message.requiresAuthorization(), message.sendAZID());
 
279
                processQueue(null, map);
213
280
        }
214
281
 
215
282
        /**
216
283
         * @param requiresAuthorization 
217
284
         * 
218
285
         */
219
 
        protected static void processQueue(Map mapQueue,
220
 
                        final boolean requiresAuthorization, final boolean sendAZID) {
 
286
        protected static void processQueue(String queueID, Map mapQueue) {
221
287
                if (!initialized) {
222
288
                        init();
223
289
                }
224
290
 
 
291
                //debug("pq " + queueID + " via " + Debug.getCompressedStackTrace());
225
292
                final Map mapProcessing = new HashMap();
226
293
 
227
294
                boolean loginAndRetry = false;
 
295
                boolean requiresAuthorization = false;
 
296
                boolean sendAZID = true;
 
297
                long contentNetworkID = ContentNetwork.CONTENT_NETWORK_VUZE;
228
298
 
229
299
                // Create urlStem (or post data)
230
300
                // determine which server to use
236
306
                try {
237
307
                        // add one at a time, ensure relay server messages are seperate
238
308
                        boolean first = true;
239
 
                        boolean isRelayServerBatch = false;
240
309
                        for (Iterator iter = mapQueue.keySet().iterator(); iter.hasNext();) {
241
310
                                PlatformMessage message = (PlatformMessage) iter.next();
242
311
                                Object value = mapQueue.get(message);
243
312
 
244
 
                                boolean isRelayServer = PlatformRelayMessenger.LISTENER_ID.equals(message.getListenerID());
245
313
                                if (first) {
246
 
                                        isRelayServerBatch = isRelayServer;
 
314
                                        requiresAuthorization = message.requiresAuthorization();
 
315
                                        sendAZID = message.sendAZID();
 
316
                                        contentNetworkID = message.getContentNetworkID();
247
317
                                        first = false;
248
 
                                } else if (isRelayServerBatch != isRelayServer) {
249
 
                                        break;
250
318
                                }
251
319
 
252
320
                                // build urlStem
325
393
                }
326
394
 
327
395
                // Build base RPC url based on listener and server
 
396
 
 
397
                // one day all this URL hacking should be moved into the ContentNetwork...
 
398
 
 
399
                ContentNetwork cn = ContentNetworkManagerFactory.getSingleton().getContentNetwork(
 
400
                                contentNetworkID);
 
401
                if (cn == null) {
 
402
                        cn = ConstantsVuze.getDefaultContentNetwork();
 
403
                }
 
404
 
328
405
                String sURL_RPC;
329
406
                boolean isRelayServer = (PlatformRelayMessenger.MSG_ID + "-" + PlatformRelayMessenger.LISTENER_ID).equals(server);
330
407
                if (isRelayServer) {
331
 
                        sURL_RPC = ConstantsV3.URL_RELAY_RPC;
 
408
 
 
409
                        sURL_RPC = ContentNetworkUtils.getUrl(cn, ContentNetwork.SERVICE_RELAY_RPC);
 
410
 
332
411
                } else {
333
 
                        sURL_RPC = ConstantsV3.URL_PREFIX + ConstantsV3.URL_RPC + server;
334
 
                }
335
 
                
336
 
                String suffix = ConstantsV3.URL_SUFFIX;
337
 
                if (!sendAZID) {
338
 
                        suffix = suffix.replaceAll("azid=.*&", "");
 
412
                        sURL_RPC = ContentNetworkUtils.getUrl(cn, ContentNetwork.SERVICE_RPC)
 
413
                                        + server;
339
414
                }
340
415
 
341
416
                // Build full url and data to send
342
417
                String sURL;
343
418
                String sPostData = null;
344
 
                if (USE_HTTP_POST) {
 
419
                if (USE_HTTP_POST || requiresAuthorization) {
345
420
                        sURL = sURL_RPC;
346
 
                        sPostData = ConstantsV3.URL_POST_PLATFORM_DATA + "&" + urlStem.toString()
347
 
                                        + "&" + suffix;
 
421
                        if (requiresAuthorization) {
 
422
                                String sAuthUrl = ContentNetworkUtils.getUrl(cn,
 
423
                                                ContentNetwork.SERVICE_AUTH_RPC);
 
424
                                if (sAuthUrl != null) {
 
425
                                        sURL = sAuthUrl;
 
426
                                }
 
427
                        }
 
428
 
 
429
                        sPostData = URL_POST_PLATFORM_DATA + "&" + urlStem.toString();
 
430
                        sPostData = cn.appendURLSuffix(sPostData, true, sendAZID);
 
431
 
348
432
                        if (!requiresAuthorization) {
349
433
                                if (DEBUG_URL) {
350
434
                                        debug("POST for " + mapProcessing.size() + ": " + sURL + "?"
354
438
                                }
355
439
                        }
356
440
                } else {
357
 
                        sURL = sURL_RPC + ConstantsV3.URL_PLATFORM_MESSAGE + "&"
358
 
                                        + urlStem.toString() + "&" + suffix;
 
441
                        sURL = sURL_RPC + URL_PLATFORM_MESSAGE + "&" + urlStem.toString();
 
442
 
 
443
                        sURL = cn.appendURLSuffix(sURL, false, sendAZID);
 
444
 
359
445
                        if (DEBUG_URL) {
360
446
                                debug("GET: " + sURL);
361
447
                        } else {
362
 
                                debug("GET: " + sURL_RPC + ConstantsV3.URL_PLATFORM_MESSAGE);
 
448
                                debug("GET: " + sURL_RPC + URL_PLATFORM_MESSAGE);
363
449
                        }
364
450
                }
365
451
 
366
452
                final String fURL = sURL;
367
453
                final String fPostData = sPostData;
368
454
                final boolean fLoginAndRetry = loginAndRetry;
 
455
                final boolean fReqAuth = requiresAuthorization;
369
456
 
370
457
                // proccess queue on a new thread
371
458
                AEThread2 thread = new AEThread2("v3.PlatformMessenger", true) {
372
459
                        public void run() {
373
460
                                try {
374
 
                                        processQueueAsync(fURL, fPostData, mapProcessing,
375
 
                                                        requiresAuthorization, fLoginAndRetry);
 
461
                                        processQueueAsync(fURL, fPostData, mapProcessing, fReqAuth,
 
462
                                                        fLoginAndRetry);
376
463
                                } catch (Throwable e) {
377
464
                                        if (e instanceof ResourceDownloaderException) {
378
465
                                                debug("Error while sending message(s) to Platform: " + e.toString());
405
492
         * @throws Exception 
406
493
         */
407
494
        protected static void processQueueAsync(String sURL, String sData,
408
 
                        Map mapProcessing, boolean requiresAuthorization,
409
 
                        boolean loginAndRetry) throws Exception {
 
495
                        Map mapProcessing, boolean requiresAuthorization, boolean loginAndRetry)
 
496
                        throws Exception {
410
497
                URL url;
411
498
                url = new URL(sURL);
412
499
                AzureusCore core = AzureusCoreFactory.getSingleton();
418
505
                        authorizedSender.startDownload(url, sData, sem_waitDL, loginAndRetry);
419
506
                        sem_waitDL.reserve();
420
507
                        s = authorizedSender.getResults();
 
508
                        authorizedSender.clearResults();
421
509
                } else {
422
510
                        if (requiresAuthorization) {
423
511
                                debug("No Authorized Sender.. using non-auth request");
489
577
                                continue;
490
578
                        }
491
579
 
492
 
                        debug("Got a reply for " + message.toShortString() + "\n\t\t"
 
580
                        debug("Got a " + reply.length() + " byte reply for "
 
581
                                        + message.toShortString() + "\n\t\t"
493
582
                                        + reply.substring(0, Math.min(8192, reply.length())));
494
583
 
495
584
                        final PlatformMessage fMessage = message;
646
735
        private static class fakeContext
647
736
                extends ClientMessageContextImpl
648
737
        {
649
 
                private void
650
 
                log(
651
 
                        String str )
652
 
                {
653
 
                        if ( System.getProperty( "browser.route.all.external.stimuli.for.testing", "false" ).equalsIgnoreCase( "true" )){
654
 
                                
655
 
                                System.err.println( str );
 
738
                private long contentNetworkID = ConstantsVuze.DEFAULT_CONTENT_NETWORK_ID;
 
739
 
 
740
                private void log(String str) {
 
741
                        if (System.getProperty("browser.route.all.external.stimuli.for.testing",
 
742
                                        "false").equalsIgnoreCase("true")) {
 
743
 
 
744
                                System.err.println(str);
656
745
                        }
657
 
                        debug( str );
 
746
                        debug(str);
658
747
                }
659
748
 
660
749
                public fakeContext() {
666
755
                }
667
756
 
668
757
                public void displayBrowserMessage(String message) {
669
 
                        log("displayBrowserMessage - " + message );
 
758
                        log("displayBrowserMessage - " + message);
670
759
                }
671
760
 
672
761
                public boolean executeInBrowser(String javascript) {
673
 
                        log("executeInBrowser - " + javascript );
 
762
                        log("executeInBrowser - " + javascript);
674
763
                        return false;
675
764
                }
676
765
 
677
766
                public Object getBrowserData(String key) {
678
 
                        log("getBrowserData - " + key );
 
767
                        log("getBrowserData - " + key);
679
768
                        return null;
680
769
                }
681
770
 
682
771
                public boolean sendBrowserMessage(String key, String op) {
683
 
                        log("sendBrowserMessage - " + key + "/" + op );
 
772
                        log("sendBrowserMessage - " + key + "/" + op);
684
773
                        return false;
685
774
                }
686
775
 
687
776
                public boolean sendBrowserMessage(String key, String op, Map params) {
688
 
                        log("sendBrowserMessage - " + key + "/" + op + "/" + params );
 
777
                        log("sendBrowserMessage - " + key + "/" + op + "/" + params);
689
778
                        return false;
690
779
                }
691
780
 
692
781
                public void setBrowserData(String key, Object value) {
693
 
                        log("setBrowserData - " + key + "/" + value );
 
782
                        log("setBrowserData - " + key + "/" + value);
694
783
                }
695
784
 
696
785
                public boolean sendBrowserMessage(String key, String op, Collection params) {
697
 
                        log("sendBrowserMessage - " + key + "/" + op + "/" + params );
 
786
                        log("sendBrowserMessage - " + key + "/" + op + "/" + params);
698
787
                        return false;
699
788
                }
 
789
 
700
790
                public void setTorrentURLHandler(torrentURLHandler handler) {
701
 
                        log("setTorrentURLHandler - " + handler );
 
791
                        log("setTorrentURLHandler - " + handler);
 
792
                }
 
793
 
 
794
                public long getContentNetworkID() {
 
795
                        return contentNetworkID;
 
796
                }
 
797
 
 
798
                public void setContentNetworkID(long contentNetwork) {
 
799
                        this.contentNetworkID = contentNetwork;
702
800
                }
703
801
        }
704
802
 
710
808
        public static void setAuthorizedDelayed(boolean authorizedDelayed) {
711
809
                debug("setDelayAuthorized " + authorizedDelayed);
712
810
                PlatformMessenger.authorizedDelayed = authorizedDelayed;
713
 
                if (!authorizedDelayed && mapQueueAuthorized.size() > 0) {
714
 
                        queueMessage(null, null);
 
811
                if (!authorizedDelayed) {
 
812
                        boolean fireQueue = false;
 
813
                        queue_mon.enter();
 
814
                        try {
 
815
                                for (String key : mapQueues.keySet()) {
 
816
                                        if (key.startsWith(QUEUE_AUTH)) {
 
817
                                                Map map = mapQueues.get(key);
 
818
                                                if (map != null && map.size() > 0) {
 
819
                                                        fireQueue = true;
 
820
                                                        break;
 
821
                                                }
 
822
                                        }
 
823
                                }
 
824
                        } finally {
 
825
                                queue_mon.exit();
 
826
                        }
 
827
                        if (fireQueue) {
 
828
                                queueMessage(null, null);
 
829
                        }
715
830
                }
716
831
        }
717
832