1
/////////////////////////////////////////////////////////////////////////////
2
// Name: samples/sockbase/client.cpp
3
// Purpose: Sockets sample for wxBase
4
// Author: Lukasz Michalski
7
// RCS-ID: $Id: baseclient.cpp 65680 2010-09-30 11:44:45Z VZ $
8
// Copyright: (c) 2005 Lukasz Michalski <lmichalski@sf.net>
9
// Licence: wxWindows licence
10
/////////////////////////////////////////////////////////////////////////////
12
// ============================================================================
14
// ============================================================================
16
// ----------------------------------------------------------------------------
18
// ----------------------------------------------------------------------------
21
#include "wx/socket.h"
24
#include "wx/cmdline.h"
26
#include "wx/datetime.h"
28
#include "wx/thread.h"
30
const wxEventType wxEVT_WORKER = wxNewEventType();
31
#define EVT_WORKER(func) DECLARE_EVENT_TABLE_ENTRY( wxEVT_WORKER, -1, -1, (wxObjectEventFunction) (wxEventFunction) (WorkerEventFunction) & func, (wxObject *) NULL ),
33
const int timeout_val = 1000;
35
class WorkerEvent : public wxEvent {
44
WorkerEvent(void* pSender, evt_type type)
47
SetEventType(wxEVT_WORKER);
53
void setFailed() { m_isFailed = true; }
54
bool isFailed() const { return m_isFailed; }
56
virtual wxEvent* Clone() const
58
return new WorkerEvent(*this);
62
wxString m_workerIdent;
66
typedef void (wxEvtHandler::*WorkerEventFunction)(WorkerEvent&);
71
WX_DECLARE_LIST(ThreadWorker, TList);
72
WX_DECLARE_LIST(EventWorker, EList);
74
class Client : public wxApp {
77
void RemoveEventWorker(EventWorker* p_worker);
98
virtual bool OnInit();
100
virtual int OnExit();
101
void OnInitCmdLine(wxCmdLineParser& pParser);
102
bool OnCmdLineParsed(wxCmdLineParser& pParser);
103
void OnWorkerEvent(WorkerEvent& pEvent);
104
void OnTimerEvent(wxTimerEvent& pEvent);
106
void StartWorker(workMode pMode, const wxString& pMessage);
107
void StartWorker(workMode pMode);
108
char* CreateBuffer(int *msgsize);
110
void dumpStatistics();
112
TList m_threadWorkers;
113
EList m_eventWorkers;
115
unsigned m_statConnecting;
116
unsigned m_statSending;
117
unsigned m_statReceiving;
118
unsigned m_statDisconnecting;
120
unsigned m_statFailed;
127
class ThreadWorker : public wxThread
130
ThreadWorker(const wxString& p_host, char* p_buf, int p_size);
131
virtual ExitCode Entry();
134
wxSocketClient* m_clientSocket;
139
wxString m_workerIdent;
142
class EventWorker : public wxEvtHandler
144
DECLARE_EVENT_TABLE()
146
EventWorker(const wxString& p_host, char* p_buf, int p_size);
148
virtual ~EventWorker();
151
wxSocketClient* m_clientSocket;
159
WorkerEvent::evt_type m_currentType;
161
wxIPV4address m_localaddr;
163
void OnSocketEvent(wxSocketEvent& pEvent);
164
void SendEvent(bool failed);
167
/******************* Implementation ******************/
168
IMPLEMENT_APP_CONSOLE(Client);
170
#include <wx/listimpl.cpp>
171
WX_DEFINE_LIST(TList);
172
WX_DEFINE_LIST(EList);
175
CreateIdent(const wxIPV4address& addr)
177
return wxString::Format(wxT("%s:%d"),addr.IPAddress().c_str(),addr.Service());
181
Client::OnInitCmdLine(wxCmdLineParser& pParser)
183
wxApp::OnInitCmdLine(pParser);
184
pParser.AddSwitch(wxT("e"),wxT("event"),_("Use event based worker (default)"),wxCMD_LINE_PARAM_OPTIONAL);
185
pParser.AddSwitch(wxT("t"),wxT("thread"),_("Use thread based worker"),wxCMD_LINE_PARAM_OPTIONAL);
186
pParser.AddSwitch(wxT("r"),wxT("random"),_("Send radnom data (default)"),wxCMD_LINE_PARAM_OPTIONAL);
187
pParser.AddOption(wxT("m"),wxT("message"),_("Send message from <str>"),wxCMD_LINE_VAL_STRING,wxCMD_LINE_PARAM_OPTIONAL);
188
pParser.AddOption(wxT("f"),wxT("file"),_("Send contents of <file>"),wxCMD_LINE_VAL_STRING,wxCMD_LINE_PARAM_OPTIONAL);
189
pParser.AddOption(wxT("H"),wxT("hostname"),_("IP or name of host to connect to"),wxCMD_LINE_VAL_STRING,wxCMD_LINE_PARAM_OPTIONAL);
190
pParser.AddOption(wxT("s"),wxT("stress"),_("stress test with <num> concurrent connections"),wxCMD_LINE_VAL_NUMBER,wxCMD_LINE_PARAM_OPTIONAL);
195
Client::OnCmdLineParsed(wxCmdLineParser& pParser)
199
m_stressWorkers = 50;
201
if (pParser.Found(_("verbose")))
203
wxLog::AddTraceMask(wxT("wxSocket"));
204
wxLog::AddTraceMask(wxT("epolldispatcher"));
205
wxLog::AddTraceMask(wxT("selectdispatcher"));
206
wxLog::AddTraceMask(wxT("thread"));
207
wxLog::AddTraceMask(wxT("events"));
210
if (pParser.Found(wxT("t")))
211
m_workMode = THREADS;
212
m_sendType = SEND_RANDOM;
214
if (pParser.Found(wxT("m"),&m_message))
215
m_sendType = SEND_MESSAGE;
216
else if (pParser.Found(wxT("f"),&fname))
219
if (!file.IsOpened()) {
220
wxLogError(wxT("Cannot open file %s"),fname.c_str());
223
if (!file.ReadAll(&m_message)) {
224
wxLogError(wxT("Cannot read conten of file %s"),fname.c_str());
227
m_sendType = SEND_MESSAGE;
230
if (pParser.Found(wxT("s"),&m_stressWorkers))
231
m_sendType = STRESS_TEST;
233
m_host = wxT("127.0.0.1");
234
pParser.Found(wxT("H"),&m_host);
235
return wxApp::OnCmdLineParsed(pParser);
241
if (!wxApp::OnInit())
243
srand(wxDateTime::Now().GetTicks());
244
mTimer.SetOwner(this);
245
m_statConnecting = 0;
248
m_statDisconnecting = 0;
264
for (i = 0; i < m_stressWorkers; i++) {
265
if (m_message.empty())
266
StartWorker(THREADS);
268
StartWorker(THREADS, m_message);
272
for (i = 0; i < m_stressWorkers; i++) {
273
if (m_message.empty())
276
StartWorker(EVENTS, m_message);
280
for (i = 0; i < m_stressWorkers; i++) {
281
if (m_message.empty())
282
StartWorker(i % 5 == 0 ? THREADS : EVENTS);
284
StartWorker(i % 5 == 0 ? THREADS : EVENTS, m_message);
290
StartWorker(m_workMode,m_message);
293
StartWorker(m_workMode);
296
mTimer.Start(timeout_val,true);
297
return wxApp::OnRun();
303
for(EList::compatibility_iterator it = m_eventWorkers.GetFirst(); it ; it->GetNext()) {
304
delete it->GetData();
309
// Create buffer to be sent by client. Buffer contains test indicator
310
// message size and place for data
311
// msgsize parameter contains size of data in bytes and
312
// if input value does not fit into 250 bytes then
313
// on exit is updated to new value that is multiply of 1024 bytes
315
Client::CreateBuffer(int* msgsize)
319
//if message should have more than 256 bytes then set it as
320
//test3 for compatibility with GUI server sample
321
if ((*msgsize) > 250)
323
//send at least one kb of data
324
int size = (*msgsize)/1024 + 1;
325
//returned buffer will contain test indicator, message size in kb and data
326
bufsize = size*1024+2;
327
buf = new char[bufsize];
328
buf[0] = (unsigned char)0xDE; //second byte contains size in kilobytes
329
buf[1] = (char)(size);
330
*msgsize = size*1024;
334
//returned buffer will contain test indicator, message size in kb and data
335
bufsize = (*msgsize)+2;
336
buf = new char[bufsize];
337
buf[0] = (unsigned char)0xBE; //second byte contains size in bytes
338
buf[1] = (char)(*msgsize);
344
Client::StartWorker(workMode pMode) {
345
int msgsize = 1 + (int) (250000.0 * (rand() / (RAND_MAX + 1.0)));
346
char* buf = CreateBuffer(&msgsize);
348
//fill data part of buffer with random bytes
349
for (int i = 2; i < (msgsize); i++) {
353
if (pMode == THREADS) {
354
ThreadWorker* c = new ThreadWorker(m_host,buf,msgsize+2);
355
if (c->Create() != wxTHREAD_NO_ERROR) {
356
wxLogError(wxT("Cannot create more threads"));
359
m_threadWorkers.Append(c);
362
EventWorker* e = new EventWorker(m_host,buf,msgsize+2);
364
m_eventWorkers.Append(e);
370
Client::StartWorker(workMode pMode, const wxString& pMessage) {
371
char* tmpbuf = wxStrdup(pMessage.mb_str());
372
int msgsize = strlen(tmpbuf);
373
char* buf = CreateBuffer(&msgsize);
374
memset(buf+2,0x0,msgsize);
375
memcpy(buf+2,tmpbuf,msgsize);
378
if (pMode == THREADS) {
379
ThreadWorker* c = new ThreadWorker(m_host,buf,msgsize+2);
380
if (c->Create() != wxTHREAD_NO_ERROR) {
381
wxLogError(wxT("Cannot create more threads"));
384
m_threadWorkers.Append(c);
387
EventWorker* e = new EventWorker(m_host,buf,msgsize+2);
389
m_eventWorkers.Append(e);
395
Client::OnWorkerEvent(WorkerEvent& pEvent) {
396
switch (pEvent.m_eventType) {
397
case WorkerEvent::CONNECTING:
398
if (pEvent.isFailed())
404
case WorkerEvent::SENDING:
405
if (pEvent.isFailed())
416
case WorkerEvent::RECEIVING:
417
if (pEvent.isFailed())
428
case WorkerEvent::DISCONNECTING:
429
if (pEvent.isFailed())
431
m_statDisconnecting--;
437
m_statDisconnecting++;
440
case WorkerEvent::DONE:
442
m_statDisconnecting--;
446
if (pEvent.isFailed() || pEvent.m_eventType == WorkerEvent::DONE)
448
for(TList::compatibility_iterator it = m_threadWorkers.GetFirst(); it ; it = it->GetNext()) {
449
if (it->GetData() == pEvent.m_sender) {
450
m_threadWorkers.DeleteNode(it);
454
for(EList::compatibility_iterator it2 = m_eventWorkers.GetFirst(); it2 ; it2 = it2->GetNext())
456
if (it2->GetData() == pEvent.m_sender) {
457
delete it2->GetData();
458
m_eventWorkers.DeleteNode(it2);
462
if ((m_threadWorkers.GetCount() == 0) && (m_eventWorkers.GetCount() == 0))
471
mTimer.Start(timeout_val,true);
477
Client::RemoveEventWorker(EventWorker* p_worker) {
478
for(EList::compatibility_iterator it = m_eventWorkers.GetFirst(); it ; it = it->GetNext()) {
479
if (it->GetData() == p_worker) {
480
//wxLogDebug(wxT("Deleting event worker"));
481
delete it->GetData();
482
m_eventWorkers.DeleteNode(it);
489
Client::dumpStatistics() {
491
wxString::Format(_("Connecting:\t%d\nSending\t\t%d\nReceiving\t%d\nDisconnecting:\t%d\nDone:\t\t%d\nFailed:\t\t%d\n"),
500
wxLogMessage(wxT("Current status:\n%s\n"),msg.c_str());
504
Client::OnTimerEvent(wxTimerEvent&) {
508
BEGIN_EVENT_TABLE(Client,wxEvtHandler)
509
EVT_WORKER(Client::OnWorkerEvent)
510
EVT_TIMER(wxID_ANY,Client::OnTimerEvent)
515
EventWorker::EventWorker(const wxString& p_host, char* p_buf, int p_size)
522
m_clientSocket = new wxSocketClient(wxSOCKET_NOWAIT);
523
m_clientSocket->SetEventHandler(*this);
524
m_insize = m_outsize - 2;
525
m_inbuf = new char[m_insize];
533
m_clientSocket->SetNotify(wxSOCKET_CONNECTION_FLAG|wxSOCKET_LOST_FLAG|wxSOCKET_OUTPUT_FLAG|wxSOCKET_INPUT_FLAG);
534
m_clientSocket->Notify(true);
535
m_currentType = WorkerEvent::CONNECTING;
537
//wxLogMessage(wxT("EventWorker: Connecting....."));
538
m_clientSocket->Connect(ca,false);
542
EventWorker::OnSocketEvent(wxSocketEvent& pEvent) {
543
switch(pEvent.GetSocketEvent()) {
545
//wxLogDebug(wxT("EventWorker: INPUT"));
547
if (m_readed == m_insize)
548
return; //event already posted
549
m_clientSocket->Read(m_inbuf + m_readed, m_insize - m_readed);
550
if (m_clientSocket->Error())
552
if (m_clientSocket->LastError() != wxSOCKET_WOULDBLOCK)
554
wxLogError(wxT("%s: read error"),CreateIdent(m_localaddr).c_str());
559
m_readed += m_clientSocket->LastCount();
560
//wxLogDebug(wxT("EventWorker: readed %d bytes, %d bytes to do"),m_clientSocket->LastCount(), m_insize - m_readed);
561
if (m_readed == m_insize)
563
if (!memcmp(m_inbuf,m_outbuf,m_insize)) {
564
wxLogError(wxT("%s: data mismatch"),CreateIdent(m_localaddr).c_str());
567
m_currentType = WorkerEvent::DISCONNECTING;
568
wxLogDebug(wxT("%s: DISCONNECTING"),CreateIdent(m_localaddr).c_str());
571
//wxLogDebug(wxT("EventWorker %p closing"),this);
572
m_clientSocket->Close();
574
m_currentType = WorkerEvent::DONE;
575
wxLogDebug(wxT("%s: DONE"),CreateIdent(m_localaddr).c_str());
578
} while (!m_clientSocket->Error());
580
case wxSOCKET_OUTPUT:
581
//wxLogDebug(wxT("EventWorker: OUTPUT"));
583
if (m_written == m_outsize)
587
m_currentType = WorkerEvent::SENDING;
588
wxLogDebug(wxT("%s: SENDING"),CreateIdent(m_localaddr).c_str());
590
m_clientSocket->Write(m_outbuf + m_written, m_outsize - m_written);
591
if (m_clientSocket->Error())
593
if (m_clientSocket->LastError() != wxSOCKET_WOULDBLOCK) {
594
wxLogError(wxT("%s: Write error"),CreateIdent(m_localaddr).c_str());
598
m_written += m_clientSocket->LastCount();
599
if (m_written != m_outsize)
601
//wxLogDebug(wxT("EventWorker: written %d bytes, %d bytes to do"),m_clientSocket->LastCount(),m_outsize - m_written);
605
//wxLogDebug(wxT("EventWorker %p SENDING->RECEIVING"),this);
606
m_currentType = WorkerEvent::RECEIVING;
607
wxLogDebug(wxT("%s: RECEIVING"),CreateIdent(m_localaddr).c_str());
610
} while(!m_clientSocket->Error());
612
case wxSOCKET_CONNECTION:
614
//wxLogMessage(wxT("EventWorker: got connection"));
615
wxLogMessage(wxT("%s: starting writing message (2 bytes for signature and %d bytes of data to write)"),CreateIdent(m_localaddr).c_str(),m_outsize-2);
616
if (!m_clientSocket->GetLocal(m_localaddr))
618
wxLogError(_("Cannot get peer data for socket %p"),m_clientSocket);
620
m_currentType = WorkerEvent::SENDING;
621
wxLogDebug(wxT("%s: CONNECTING"),CreateIdent(m_localaddr).c_str());
627
wxLogError(_("%s: connection lost"),CreateIdent(m_localaddr).c_str());
635
EventWorker::SendEvent(bool failed) {
638
WorkerEvent e(this,m_currentType);
639
if (failed) e.setFailed();
640
wxGetApp().AddPendingEvent(e);
641
m_doneSent = failed || m_currentType == WorkerEvent::DONE;
644
EventWorker::~EventWorker() {
645
m_clientSocket->Destroy();
650
BEGIN_EVENT_TABLE(EventWorker,wxEvtHandler)
651
EVT_SOCKET(wxID_ANY,EventWorker::OnSocketEvent)
655
ThreadWorker::ThreadWorker(const wxString& p_host, char* p_buf, int p_size)
656
: wxThread(wxTHREAD_DETACHED),
661
m_clientSocket = new wxSocketClient(wxSOCKET_BLOCK|wxSOCKET_WAITALL);
662
m_insize = m_outsize - 2;
663
m_inbuf = new char[m_insize];
666
wxThread::ExitCode ThreadWorker::Entry()
671
//wxLogDebug(wxT("ThreadWorker: Connecting....."));
672
m_clientSocket->SetTimeout(60);
674
WorkerEvent::evt_type etype = WorkerEvent::CONNECTING;
675
if (!m_clientSocket->Connect(ca)) {
676
wxLogError(wxT("Cannot connect to %s:%d"),ca.IPAddress().c_str(), ca.Service());
679
//wxLogMessage(wxT("ThreadWorker: Connected. Sending %d bytes of data"),m_outsize);
680
etype = WorkerEvent::SENDING;
681
WorkerEvent e(this,etype);
682
wxGetApp().AddPendingEvent(e);
683
int to_process = m_outsize;
685
m_clientSocket->Write(m_outbuf,m_outsize);
686
if (m_clientSocket->Error()) {
687
wxLogError(wxT("ThreadWorker: Write error"));
690
to_process -= m_clientSocket->LastCount();
691
//wxLogDebug(wxT("EventWorker: written %d bytes, %d bytes to do"),m_clientSocket->LastCount(),to_process);
692
} while(!m_clientSocket->Error() && to_process != 0);
695
etype = WorkerEvent::RECEIVING;
696
WorkerEvent e(this,etype);
697
wxGetApp().AddPendingEvent(e);
698
to_process = m_insize;
700
m_clientSocket->Read(m_inbuf,m_insize);
701
if (m_clientSocket->Error()) {
702
wxLogError(wxT("ThreadWorker: Read error"));
706
to_process -= m_clientSocket->LastCount();
707
//wxLogDebug(wxT("EventWorker: readed %d bytes, %d bytes to do"),m_clientSocket->LastCount(),to_process);
708
} while(!m_clientSocket->Error() && to_process != 0);
711
char* outdat = (char*)m_outbuf+2;
712
if (!failed && (memcmp(m_inbuf,outdat,m_insize) != 0))
714
wxLogError(wxT("Data mismatch"));
718
//wxLogDebug(wxT("ThreadWorker: Finished"));
720
etype = WorkerEvent::DISCONNECTING;
721
WorkerEvent e(this,etype);
722
wxGetApp().AddPendingEvent(e);
724
m_clientSocket->Close();
725
m_clientSocket->Destroy();
726
m_clientSocket = NULL;
730
etype = WorkerEvent::DONE;
731
WorkerEvent e(this,etype);
732
if (failed) e.setFailed();
733
wxGetApp().AddPendingEvent(e);