30
30
#include "protocols/protocolfactorymanager.h"
31
31
#include "application/clientapplicationmanager.h"
32
32
#include "netio/netio.h"
33
#include "jobstimerappprotocolhandler.h"
34
#include "jobstimerprotocol.h"
35
#include "protocols/protocolmanager.h"
33
36
using namespace app_proxypublish;
35
38
ProxyPublishApplication::ProxyPublishApplication(Variant &configuration)
75
80
_pRTSPHandler = NULL;
77
82
#endif /* HAS_PROTOCOL_RTP */
84
BaseProtocol *pProtocol = ProtocolManager::GetProtocol(_jobsTimerProtocolId);
85
if (pProtocol != NULL) {
86
pProtocol->EnqueueForDelete();
89
UnRegisterAppProtocolHandler(PT_TIMER);
90
if (_pJobsHandler != NULL) {
80
96
bool ProxyPublishApplication::Initialize() {
128
144
FATAL("Invalid uri: %s", STR(target["targetUri"]));
131
if (uri.scheme.find("rtmp") != 0) {
147
if (uri.scheme().find("rtmp") != 0) {
132
148
FATAL("Supported target scheme is rtmp for now....");
137
target["targetUri"] = uri.ToVariant();
139
152
_targetServers = _configuration["targetServers"];
140
153
_abortOnConnectError = (bool)_configuration["abortOnConnectError"];
160
173
RegisterAppProtocolHandler(PT_RTSP, _pRTSPHandler);
161
174
#endif /* HAS_PROTOCOL_RTP */
176
_pJobsHandler = new JobsTimerAppProtocolHandler(_configuration);
177
RegisterAppProtocolHandler(PT_TIMER, _pJobsHandler);
179
//2. Initialize the jobs timer
180
BaseTimerProtocol *pProtocol = new JobsTimerProtocol();
181
_jobsTimerProtocolId = pProtocol->GetId();
182
pProtocol->SetApplication(this);
183
pProtocol->EnqueueForTimeEvent(1);
163
185
return PullExternalStreams();
188
void ProxyPublishApplication::UnRegisterProtocol(BaseProtocol *pProtocol) {
189
//1. Get the parameters assigned to this connection
190
Variant ¶meters = pProtocol->GetCustomParameters();
192
//FINEST("parameters:\n%s", STR(parameters.ToString()));
193
//2. depending on the pull/push config, we retry only if keepAlive is true and the
194
//source stream is still there
195
if ((parameters.HasKeyChain(V_BOOL, true, 3, "customParameters", "localStreamConfig", "keepAlive"))
196
&& ((bool)parameters["customParameters"]["localStreamConfig"]["keepAlive"])
197
&& (parameters.HasKeyChain(_V_NUMERIC, true, 3, "customParameters", "localStreamConfig", "localUniqueStreamId"))) {
198
//3. This is a push. First, fix the uri
199
string uri = parameters["customParameters"]["localStreamConfig"]["targetUri"]["fullUriWithAuth"];
200
parameters["customParameters"]["localStreamConfig"]["targetUri"] = uri;
201
EnqueuePush(parameters["customParameters"]["localStreamConfig"]);
202
} else if ((parameters.HasKeyChain(V_BOOL, true, 3, "customParameters", "externalStreamConfig", "keepAlive"))
203
&& ((bool)parameters["customParameters"]["externalStreamConfig"]["keepAlive"])) {
204
//4. This is a pull with keep alive. Just o it again. First, fix the uri
205
//which is currently "resolved"
206
string uri = parameters["customParameters"]["externalStreamConfig"]["uri"]["fullUriWithAuth"];
207
parameters["customParameters"]["externalStreamConfig"]["uri"] = uri;
208
EnqueuePull(parameters["customParameters"]["externalStreamConfig"]);
211
//5. Finally, call the base class
212
BaseClientApplication::UnRegisterProtocol(pProtocol);
166
215
void ProxyPublishApplication::SignalStreamRegistered(BaseStream *pStream) {
167
216
//1. Call the base class
168
217
BaseClientApplication::SignalStreamRegistered(pStream);
233
void ProxyPublishApplication::EnqueuePush(Variant ¶meters) {
234
//1. get the timer protocol
235
JobsTimerProtocol *pProtocol = (JobsTimerProtocol *) ProtocolManager::GetProtocol(
236
_jobsTimerProtocolId);
237
if (pProtocol == NULL) {
238
FATAL("Jobs protocol died. Aborting ...");
242
//2. Enqueue the operation
243
pProtocol->EnqueuePush(parameters);
246
void ProxyPublishApplication::EnqueuePull(Variant ¶meters) {
247
//1. get the timer protocol
248
JobsTimerProtocol *pProtocol = (JobsTimerProtocol *) ProtocolManager::GetProtocol(
249
_jobsTimerProtocolId);
250
if (pProtocol == NULL) {
251
FATAL("Jobs protocol died. Aborting ...");
255
//2. Enqueue the operation
256
pProtocol->EnqueuePull(parameters);
184
259
bool ProxyPublishApplication::InitiateForwardingStream(BaseInStream *pStream) {
186
261
FOR_MAP(_targetServers, string, Variant, i) {
229
304
STR(tagToString(pStream->GetType())),
230
305
STR(pStream->GetName()),
232
STR((string) target["targetUri"]["fullUri"]),
307
STR((string) target["targetUri"]),
233
308
STR(parameters["targetStreamName"]));
235
//3. Since we only accept RTMP targets, we will just fetch the RTMP handler
236
//and push the stream
237
return _pRTMPHandler->PushLocalStream(parameters);
310
//4. Enqueue the push
311
EnqueuePush(parameters);
238
313
#endif /* HAS_PROTOCOL_RTMP */