1
/**********************************************************************
2
* Copyright (c) 2014 Ericsson
4
* All rights reserved. This program and the accompanying materials are
5
* made available under the terms of the Eclipse Public License v1.0 which
6
* accompanies this distribution, and is available at
7
* http://www.eclipse.org/legal/epl-v10.html
10
* Matthew Khouzam - Initial implementation
11
**********************************************************************/
13
package org.eclipse.linuxtools.internal.lttng2.control.ui.relayd;
15
import java.io.IOException;
16
import java.io.UnsupportedEncodingException;
17
import java.net.Socket;
18
import java.util.List;
20
import org.eclipse.core.runtime.CoreException;
21
import org.eclipse.core.runtime.IProgressMonitor;
22
import org.eclipse.core.runtime.IStatus;
23
import org.eclipse.core.runtime.Status;
24
import org.eclipse.core.runtime.jobs.Job;
25
import org.eclipse.linuxtools.ctf.core.trace.CTFTrace;
26
import org.eclipse.linuxtools.internal.lttng2.control.core.relayd.ILttngRelaydConnector;
27
import org.eclipse.linuxtools.internal.lttng2.control.core.relayd.LttngRelaydConnectorFactory;
28
import org.eclipse.linuxtools.internal.lttng2.control.core.relayd.lttngviewerCommands.AttachReturnCode;
29
import org.eclipse.linuxtools.internal.lttng2.control.core.relayd.lttngviewerCommands.AttachSessionResponse;
30
import org.eclipse.linuxtools.internal.lttng2.control.core.relayd.lttngviewerCommands.CreateSessionResponse;
31
import org.eclipse.linuxtools.internal.lttng2.control.core.relayd.lttngviewerCommands.CreateSessionReturnCode;
32
import org.eclipse.linuxtools.internal.lttng2.control.core.relayd.lttngviewerCommands.IndexResponse;
33
import org.eclipse.linuxtools.internal.lttng2.control.core.relayd.lttngviewerCommands.NextIndexReturnCode;
34
import org.eclipse.linuxtools.internal.lttng2.control.core.relayd.lttngviewerCommands.SessionResponse;
35
import org.eclipse.linuxtools.internal.lttng2.control.core.relayd.lttngviewerCommands.StreamResponse;
36
import org.eclipse.linuxtools.internal.lttng2.control.ui.Activator;
37
import org.eclipse.linuxtools.tmf.core.signal.TmfTraceRangeUpdatedSignal;
38
import org.eclipse.linuxtools.tmf.core.timestamp.TmfTimeRange;
39
import org.eclipse.linuxtools.tmf.ctf.core.CtfTmfTimestamp;
40
import org.eclipse.linuxtools.tmf.ctf.core.CtfTmfTrace;
43
* Consumer of the relay d.
45
* @author Matthew Khouzam
48
public final class LttngRelaydConsumer {
50
private static final int SIGNAL_THROTTLE_NANOSEC = 10_000_000;
51
private static final String ENCODING_UTF_8 = "UTF-8"; //$NON-NLS-1$
53
private Job fConsumerJob;
54
private CtfTmfTrace fCtfTmfTrace;
55
private CTFTrace fCtfTrace;
56
private long fTimestampEnd;
57
private AttachSessionResponse fSession;
58
private Socket fConnection;
59
private ILttngRelaydConnector fRelayd;
60
private String fTracePath;
61
private long fLastSignal = 0;
62
private final LttngRelaydConnectionInfo fConnectionInfo;
65
* Start a lttng consumer.
68
* the ip address in string format
70
* the port, an integer
76
LttngRelaydConsumer(final LttngRelaydConnectionInfo connectionInfo) {
77
fConnectionInfo = connectionInfo;
82
* Connects to the relayd at the given address and port then attaches to the
85
* @throws CoreException
86
* If something goes wrong during the connection
89
* Connection could not be established (Socket could not be
92
* Connection timeout</li>
94
* The session was not found</li>
96
* Could not create viewer session</li>
98
* Invalid trace (no metadata, no streams)</li>
101
public void connect() throws CoreException {
102
if (fConnection != null) {
107
fConnection = new Socket(fConnectionInfo.getHost(), fConnectionInfo.getPort());
108
fRelayd = LttngRelaydConnectorFactory.getNewConnector(fConnection);
109
List<SessionResponse> sessions = fRelayd.getSessions();
110
SessionResponse selectedSession = null;
111
for (SessionResponse session : sessions) {
112
String asessionName = nullTerminatedByteArrayToString(session.getSessionName().getBytes());
114
if (asessionName.equals(fConnectionInfo.getSessionName())) {
115
selectedSession = session;
120
if (selectedSession == null) {
121
throw new CoreException(new Status(IStatus.ERROR, Activator.PLUGIN_ID, Messages.LttngRelaydConsumer_SessionNotFound));
124
CreateSessionResponse createSession = fRelayd.createSession();
125
if (createSession.getStatus() != CreateSessionReturnCode.LTTNG_VIEWER_CREATE_SESSION_OK) {
126
throw new CoreException(new Status(IStatus.ERROR, Activator.PLUGIN_ID, Messages.LttngRelaydConsumer_CreateViewerSessionError + createSession.getStatus().toString()));
129
AttachSessionResponse attachedSession = fRelayd.attachToSession(selectedSession);
130
if (attachedSession.getStatus() != AttachReturnCode.VIEWER_ATTACH_OK) {
131
throw new CoreException(new Status(IStatus.ERROR, Activator.PLUGIN_ID, Messages.LttngRelaydConsumer_AttachSessionError + attachedSession.getStatus().toString()));
134
String metadata = fRelayd.getMetadata(attachedSession);
135
if (metadata == null) {
136
throw new CoreException(new Status(IStatus.ERROR, Activator.PLUGIN_ID, Messages.LttngRelaydConsumer_NoMetadata));
139
List<StreamResponse> attachedStreams = attachedSession.getStreamList();
140
if (attachedStreams.isEmpty()) {
141
throw new CoreException(new Status(IStatus.ERROR, Activator.PLUGIN_ID, Messages.LttngRelaydConsumer_NoStreams));
144
fTracePath = nullTerminatedByteArrayToString(attachedStreams.get(0).getPathName().getBytes());
146
fSession = attachedSession;
147
} catch (IOException e) {
148
throw new CoreException(new Status(IStatus.ERROR, Activator.PLUGIN_ID, Messages.LttngRelaydConsumer_ErrorConnecting + (e.getMessage() != null ? e.getMessage() : ""))); //$NON-NLS-1$
153
* Run the consumer operation for a give trace.
158
public void run(final CtfTmfTrace trace) {
159
if (fSession == null) {
163
fCtfTmfTrace = trace;
164
fCtfTrace = trace.getCTFTrace();
165
fConsumerJob = new Job("RelayD consumer") { //$NON-NLS-1$
168
protected IStatus run(final IProgressMonitor monitor) {
170
while (!monitor.isCanceled()) {
171
List<StreamResponse> attachedStreams = fSession.getStreamList();
172
for (StreamResponse stream : attachedStreams) {
173
if (stream.getMetadataFlag() != 1) {
174
IndexResponse indexReply = fRelayd.getNextIndex(stream);
175
if (indexReply.getStatus() == NextIndexReturnCode.VIEWER_INDEX_OK) {
176
long nanoTimeStamp = fCtfTrace.timestampCyclesToNanos(indexReply.getTimestampEnd());
177
if (nanoTimeStamp > fTimestampEnd) {
178
CtfTmfTimestamp endTime = new CtfTmfTimestamp(nanoTimeStamp);
179
TmfTimeRange range = new TmfTimeRange(fCtfTmfTrace.getStartTime(), endTime);
181
long currentTime = System.nanoTime();
182
if (currentTime - fLastSignal > SIGNAL_THROTTLE_NANOSEC) {
183
TmfTraceRangeUpdatedSignal signal = new TmfTraceRangeUpdatedSignal(LttngRelaydConsumer.this, fCtfTmfTrace, range);
184
fCtfTmfTrace.broadcastAsync(signal);
185
fLastSignal = currentTime;
187
fTimestampEnd = nanoTimeStamp;
189
} else if (indexReply.getStatus() == NextIndexReturnCode.VIEWER_INDEX_HUP) {
190
// The trace is now complete because the trace session was destroyed
191
fCtfTmfTrace.setComplete(true);
192
TmfTraceRangeUpdatedSignal signal = new TmfTraceRangeUpdatedSignal(LttngRelaydConsumer.this, fCtfTmfTrace, new TmfTimeRange(fCtfTmfTrace.getStartTime(), new CtfTmfTimestamp(fTimestampEnd)));
193
fCtfTmfTrace.broadcastAsync(signal);
194
return Status.OK_STATUS;
199
} catch (IOException e) {
200
Activator.getDefault().logError("Error during live trace reading", e); //$NON-NLS-1$
201
return new Status(IStatus.ERROR, Activator.PLUGIN_ID, Messages.LttngRelaydConsumer_ErrorLiveReading + (e.getMessage() != null ? e.getMessage() : "")); //$NON-NLS-1$
204
return Status.OK_STATUS;
207
fConsumerJob.setSystem(true);
208
fConsumerJob.schedule();
212
* Dispose the consumer and it's resources (sockets, etc).
214
public void dispose() {
216
if (fConsumerJob != null) {
217
fConsumerJob.cancel();
220
if (fConnection != null) {
223
if (fRelayd != null) {
226
} catch (IOException e) {
228
} catch (InterruptedException e) {
234
* Once the consumer is connected to the relayd session, it knows the trace
235
* path. This can be useful to know exactly where the trace is so that it
236
* can be imported into the workspace and it can be opened.
238
* @return the trace path
240
public String getTracePath() {
244
private static String nullTerminatedByteArrayToString(final byte[] byteArray) throws UnsupportedEncodingException {
245
// Find length of null terminated string
247
while (length < byteArray.length && byteArray[length] != 0) {
251
String asessionName = new String(byteArray, 0, length, ENCODING_UTF_8);