~ubuntu-branches/ubuntu/vivid/eclipse-linuxtools/vivid-proposed

« back to all changes in this revision

Viewing changes to lttng/org.eclipse.linuxtools.lttng2.control.ui/src/org/eclipse/linuxtools/internal/lttng2/control/ui/relayd/LttngRelaydConsumer.java

  • Committer: Package Import Robot
  • Author(s): Jakub Adam, Jakub Adam, tony mancill
  • Date: 2014-10-11 11:44:05 UTC
  • mfrom: (1.2.4)
  • Revision ID: package-import@ubuntu.com-20141011114405-yazjvxfzzhmi5sgj
Tags: 3.1.0-1
[ Jakub Adam ]
* New upstream release (Closes: #761524).
* Refreshed d/patches.
* Don't build removed feature org.eclipse.linuxtools.tools.launch
  - merged into org.eclipse.linuxtools.profiling.
* Use javac target 1.7.
* Build new feature org.eclipse.linuxtools.dataviewers.feature
  - required by Valgrind integration.
* Build-depend on eclipse-remote-services-api and eclipse-cdt-autotools.
* Bump Standards-Version to 3.9.6.
* Override incompatible-java-bytecode-format - linuxtools needs Java 7.
* Remove unused codeless-jar override.

[ tony mancill ]
* Tweak short package description to make lintian happy.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
/**********************************************************************
 
2
 * Copyright (c) 2014 Ericsson
 
3
 *
 
4
 * All rights reserved. This program and the accompanying materials are
 
5
 * made available under the terms of the Eclipse Public License v1.0 which
 
6
 * accompanies this distribution, and is available at
 
7
 * http://www.eclipse.org/legal/epl-v10.html
 
8
 *
 
9
 * Contributors:
 
10
 *   Matthew Khouzam - Initial implementation
 
11
 **********************************************************************/
 
12
 
 
13
package org.eclipse.linuxtools.internal.lttng2.control.ui.relayd;
 
14
 
 
15
import java.io.IOException;
 
16
import java.io.UnsupportedEncodingException;
 
17
import java.net.Socket;
 
18
import java.util.List;
 
19
 
 
20
import org.eclipse.core.runtime.CoreException;
 
21
import org.eclipse.core.runtime.IProgressMonitor;
 
22
import org.eclipse.core.runtime.IStatus;
 
23
import org.eclipse.core.runtime.Status;
 
24
import org.eclipse.core.runtime.jobs.Job;
 
25
import org.eclipse.linuxtools.ctf.core.trace.CTFTrace;
 
26
import org.eclipse.linuxtools.internal.lttng2.control.core.relayd.ILttngRelaydConnector;
 
27
import org.eclipse.linuxtools.internal.lttng2.control.core.relayd.LttngRelaydConnectorFactory;
 
28
import org.eclipse.linuxtools.internal.lttng2.control.core.relayd.lttngviewerCommands.AttachReturnCode;
 
29
import org.eclipse.linuxtools.internal.lttng2.control.core.relayd.lttngviewerCommands.AttachSessionResponse;
 
30
import org.eclipse.linuxtools.internal.lttng2.control.core.relayd.lttngviewerCommands.CreateSessionResponse;
 
31
import org.eclipse.linuxtools.internal.lttng2.control.core.relayd.lttngviewerCommands.CreateSessionReturnCode;
 
32
import org.eclipse.linuxtools.internal.lttng2.control.core.relayd.lttngviewerCommands.IndexResponse;
 
33
import org.eclipse.linuxtools.internal.lttng2.control.core.relayd.lttngviewerCommands.NextIndexReturnCode;
 
34
import org.eclipse.linuxtools.internal.lttng2.control.core.relayd.lttngviewerCommands.SessionResponse;
 
35
import org.eclipse.linuxtools.internal.lttng2.control.core.relayd.lttngviewerCommands.StreamResponse;
 
36
import org.eclipse.linuxtools.internal.lttng2.control.ui.Activator;
 
37
import org.eclipse.linuxtools.tmf.core.signal.TmfTraceRangeUpdatedSignal;
 
38
import org.eclipse.linuxtools.tmf.core.timestamp.TmfTimeRange;
 
39
import org.eclipse.linuxtools.tmf.ctf.core.CtfTmfTimestamp;
 
40
import org.eclipse.linuxtools.tmf.ctf.core.CtfTmfTrace;
 
41
 
 
42
/**
 
43
 * Consumer of the relay d.
 
44
 *
 
45
 * @author Matthew Khouzam
 
46
 * @since 3.1
 
47
 */
 
48
public final class LttngRelaydConsumer {
 
49
 
 
50
    private static final int SIGNAL_THROTTLE_NANOSEC = 10_000_000;
 
51
    private static final String ENCODING_UTF_8 = "UTF-8"; //$NON-NLS-1$
 
52
 
 
53
    private Job fConsumerJob;
 
54
    private CtfTmfTrace fCtfTmfTrace;
 
55
    private CTFTrace fCtfTrace;
 
56
    private long fTimestampEnd;
 
57
    private AttachSessionResponse fSession;
 
58
    private Socket fConnection;
 
59
    private ILttngRelaydConnector fRelayd;
 
60
    private String fTracePath;
 
61
    private long fLastSignal = 0;
 
62
    private final LttngRelaydConnectionInfo fConnectionInfo;
 
63
 
 
64
    /**
 
65
     * Start a lttng consumer.
 
66
     *
 
67
     * @param address
 
68
     *            the ip address in string format
 
69
     * @param port
 
70
     *            the port, an integer
 
71
     * @param sessionName
 
72
     *            the session name
 
73
     * @param project
 
74
     *            the default project
 
75
     */
 
76
    LttngRelaydConsumer(final LttngRelaydConnectionInfo connectionInfo) {
 
77
        fConnectionInfo = connectionInfo;
 
78
        fTimestampEnd = 0;
 
79
    }
 
80
 
 
81
    /**
 
82
     * Connects to the relayd at the given address and port then attaches to the
 
83
     * given session name.
 
84
     *
 
85
     * @throws CoreException
 
86
     *             If something goes wrong during the connection
 
87
     *             <ul>
 
88
     *             <li>
 
89
     *             Connection could not be established (Socket could not be
 
90
     *             opened, etc)</li>
 
91
     *             <li>
 
92
     *             Connection timeout</li>
 
93
     *             <li>
 
94
     *             The session was not found</li>
 
95
     *             <li>
 
96
     *             Could not create viewer session</li>
 
97
     *             <li>
 
98
     *             Invalid trace (no metadata, no streams)</li>
 
99
     *             </ul>
 
100
     */
 
101
    public void connect() throws CoreException {
 
102
        if (fConnection != null) {
 
103
            return;
 
104
        }
 
105
 
 
106
        try {
 
107
            fConnection = new Socket(fConnectionInfo.getHost(), fConnectionInfo.getPort());
 
108
            fRelayd = LttngRelaydConnectorFactory.getNewConnector(fConnection);
 
109
            List<SessionResponse> sessions = fRelayd.getSessions();
 
110
            SessionResponse selectedSession = null;
 
111
            for (SessionResponse session : sessions) {
 
112
                String asessionName = nullTerminatedByteArrayToString(session.getSessionName().getBytes());
 
113
 
 
114
                if (asessionName.equals(fConnectionInfo.getSessionName())) {
 
115
                    selectedSession = session;
 
116
                    break;
 
117
                }
 
118
            }
 
119
 
 
120
            if (selectedSession == null) {
 
121
                throw new CoreException(new Status(IStatus.ERROR, Activator.PLUGIN_ID, Messages.LttngRelaydConsumer_SessionNotFound));
 
122
            }
 
123
 
 
124
            CreateSessionResponse createSession = fRelayd.createSession();
 
125
            if (createSession.getStatus() != CreateSessionReturnCode.LTTNG_VIEWER_CREATE_SESSION_OK) {
 
126
                throw new CoreException(new Status(IStatus.ERROR, Activator.PLUGIN_ID, Messages.LttngRelaydConsumer_CreateViewerSessionError + createSession.getStatus().toString()));
 
127
            }
 
128
 
 
129
            AttachSessionResponse attachedSession = fRelayd.attachToSession(selectedSession);
 
130
            if (attachedSession.getStatus() != AttachReturnCode.VIEWER_ATTACH_OK) {
 
131
                throw new CoreException(new Status(IStatus.ERROR, Activator.PLUGIN_ID, Messages.LttngRelaydConsumer_AttachSessionError + attachedSession.getStatus().toString()));
 
132
            }
 
133
 
 
134
            String metadata = fRelayd.getMetadata(attachedSession);
 
135
            if (metadata == null) {
 
136
                throw new CoreException(new Status(IStatus.ERROR, Activator.PLUGIN_ID, Messages.LttngRelaydConsumer_NoMetadata));
 
137
            }
 
138
 
 
139
            List<StreamResponse> attachedStreams = attachedSession.getStreamList();
 
140
            if (attachedStreams.isEmpty()) {
 
141
                throw new CoreException(new Status(IStatus.ERROR, Activator.PLUGIN_ID, Messages.LttngRelaydConsumer_NoStreams));
 
142
            }
 
143
 
 
144
            fTracePath = nullTerminatedByteArrayToString(attachedStreams.get(0).getPathName().getBytes());
 
145
 
 
146
            fSession = attachedSession;
 
147
        } catch (IOException e) {
 
148
            throw new CoreException(new Status(IStatus.ERROR, Activator.PLUGIN_ID, Messages.LttngRelaydConsumer_ErrorConnecting + (e.getMessage() != null ? e.getMessage() : ""))); //$NON-NLS-1$
 
149
        }
 
150
    }
 
151
 
 
152
    /**
 
153
     * Run the consumer operation for a give trace.
 
154
     *
 
155
     * @param trace
 
156
     *            the trace
 
157
     */
 
158
    public void run(final CtfTmfTrace trace) {
 
159
        if (fSession == null) {
 
160
            return;
 
161
        }
 
162
 
 
163
        fCtfTmfTrace = trace;
 
164
        fCtfTrace = trace.getCTFTrace();
 
165
        fConsumerJob = new Job("RelayD consumer") { //$NON-NLS-1$
 
166
 
 
167
            @Override
 
168
            protected IStatus run(final IProgressMonitor monitor) {
 
169
                try {
 
170
                    while (!monitor.isCanceled()) {
 
171
                        List<StreamResponse> attachedStreams = fSession.getStreamList();
 
172
                        for (StreamResponse stream : attachedStreams) {
 
173
                            if (stream.getMetadataFlag() != 1) {
 
174
                                IndexResponse indexReply = fRelayd.getNextIndex(stream);
 
175
                                if (indexReply.getStatus() == NextIndexReturnCode.VIEWER_INDEX_OK) {
 
176
                                    long nanoTimeStamp = fCtfTrace.timestampCyclesToNanos(indexReply.getTimestampEnd());
 
177
                                    if (nanoTimeStamp > fTimestampEnd) {
 
178
                                        CtfTmfTimestamp endTime = new CtfTmfTimestamp(nanoTimeStamp);
 
179
                                        TmfTimeRange range = new TmfTimeRange(fCtfTmfTrace.getStartTime(), endTime);
 
180
 
 
181
                                        long currentTime = System.nanoTime();
 
182
                                        if (currentTime - fLastSignal > SIGNAL_THROTTLE_NANOSEC) {
 
183
                                            TmfTraceRangeUpdatedSignal signal = new TmfTraceRangeUpdatedSignal(LttngRelaydConsumer.this, fCtfTmfTrace, range);
 
184
                                            fCtfTmfTrace.broadcastAsync(signal);
 
185
                                            fLastSignal = currentTime;
 
186
                                        }
 
187
                                        fTimestampEnd = nanoTimeStamp;
 
188
                                    }
 
189
                                } else if (indexReply.getStatus() == NextIndexReturnCode.VIEWER_INDEX_HUP) {
 
190
                                    // The trace is now complete because the trace session was destroyed
 
191
                                    fCtfTmfTrace.setComplete(true);
 
192
                                    TmfTraceRangeUpdatedSignal signal = new TmfTraceRangeUpdatedSignal(LttngRelaydConsumer.this, fCtfTmfTrace, new TmfTimeRange(fCtfTmfTrace.getStartTime(), new CtfTmfTimestamp(fTimestampEnd)));
 
193
                                    fCtfTmfTrace.broadcastAsync(signal);
 
194
                                    return Status.OK_STATUS;
 
195
                                }
 
196
                            }
 
197
                        }
 
198
                    }
 
199
                } catch (IOException e) {
 
200
                    Activator.getDefault().logError("Error during live trace reading", e); //$NON-NLS-1$
 
201
                    return new Status(IStatus.ERROR, Activator.PLUGIN_ID, Messages.LttngRelaydConsumer_ErrorLiveReading + (e.getMessage() != null ? e.getMessage() : "")); //$NON-NLS-1$
 
202
                }
 
203
 
 
204
                return Status.OK_STATUS;
 
205
            }
 
206
        };
 
207
        fConsumerJob.setSystem(true);
 
208
        fConsumerJob.schedule();
 
209
    }
 
210
 
 
211
    /**
 
212
     * Dispose the consumer and it's resources (sockets, etc).
 
213
     */
 
214
    public void dispose() {
 
215
        try {
 
216
            if (fConsumerJob != null) {
 
217
                fConsumerJob.cancel();
 
218
                fConsumerJob.join();
 
219
            }
 
220
            if (fConnection != null) {
 
221
                fConnection.close();
 
222
            }
 
223
            if (fRelayd != null) {
 
224
                fRelayd.close();
 
225
            }
 
226
        } catch (IOException e) {
 
227
            // Ignore
 
228
        } catch (InterruptedException e) {
 
229
            // Ignore
 
230
        }
 
231
    }
 
232
 
 
233
    /**
 
234
     * Once the consumer is connected to the relayd session, it knows the trace
 
235
     * path. This can be useful to know exactly where the trace is so that it
 
236
     * can be imported into the workspace and it can be opened.
 
237
     *
 
238
     * @return the trace path
 
239
     */
 
240
    public String getTracePath() {
 
241
        return fTracePath;
 
242
    }
 
243
 
 
244
    private static String nullTerminatedByteArrayToString(final byte[] byteArray) throws UnsupportedEncodingException {
 
245
        // Find length of null terminated string
 
246
        int length = 0;
 
247
        while (length < byteArray.length && byteArray[length] != 0) {
 
248
            length++;
 
249
        }
 
250
 
 
251
        String asessionName = new String(byteArray, 0, length, ENCODING_UTF_8);
 
252
        return asessionName;
 
253
    }
 
254
 
 
255
}