~josejuan-sanchez/esajpip/debian

« back to all changes in this revision

Viewing changes to src/client_manager.cc

  • Committer: José Juan Sánchez Hernández
  • Date: 2013-04-02 18:14:26 UTC
  • Revision ID: josejuan.sanchez@gmail.com-20130402181426-07xn3djblburck53
Version for Debian

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
#include "trace.h"
 
2
#include "client_manager.h"
 
3
#include "jpip/jpip.h"
 
4
#include "jpip/request.h"
 
5
#include "jpip/databin_server.h"
 
6
#include "http/header.h"
 
7
#include "http/response.h"
 
8
#include "net/socket_stream.h"
 
9
#include "jpeg2000/index_manager.h"
 
10
 
 
11
 
 
12
using namespace std;
 
13
using namespace net;
 
14
using namespace http;
 
15
using namespace jpip;
 
16
using namespace jpeg2000;
 
17
 
 
18
 
 
19
void ClientManager::Run(ClientInfo *client_info)
 
20
{
 
21
  SocketStream sock_stream(client_info->sock());
 
22
  string channel = base::to_string(client_info->base_id());
 
23
 
 
24
  int chunk_len = 0;
 
25
  int buf_len = cfg.max_chunk_size();
 
26
 
 
27
  char *buf = new char[buf_len];
 
28
 
 
29
  if(buf == NULL) {
 
30
    ERROR("Insufficient memory to manage a new client session");
 
31
    return;
 
32
  }
 
33
 
 
34
  bool com_error;
 
35
  string req_line;
 
36
  jpip::Request req;
 
37
  bool pclose = false;
 
38
  bool is_opened = false;
 
39
  bool send_data = false;
 
40
  ImageIndex::Ptr im_index;
 
41
  DataBinServer data_server;
 
42
 
 
43
  string backup_file = cfg.caching_folder() +
 
44
      base::to_string(client_info->father_sock()) + ".backup";
 
45
 
 
46
  if(File::Exists(backup_file.c_str())) {
 
47
    InputStream().Open(backup_file.c_str()).Serialize(req.cache_model);
 
48
    is_opened = true;
 
49
  }
 
50
 
 
51
  while(!pclose)
 
52
  {
 
53
    if(cfg.log_requests())
 
54
      LOGC(_BLUE, "Waiting for a request ...");
 
55
 
 
56
    if(cfg.com_time_out() > 0) {
 
57
      if(sock_stream->WaitForInput(cfg.com_time_out() * 1000) == 0) {
 
58
        LOG("Communication time-out");
 
59
        sock_stream->Close();
 
60
        break;
 
61
      }
 
62
    }
 
63
 
 
64
    com_error = true;
 
65
    if(getline(sock_stream, req_line).good())
 
66
      com_error = !req.Parse(req_line);
 
67
 
 
68
    if(com_error) {
 
69
      if(sock_stream->IsValid()) LOG("Incorrect request received");
 
70
      else LOG("Connection closed by the client");
 
71
      sock_stream->Close();
 
72
      break;
 
73
 
 
74
    } else {
 
75
      if(cfg.log_requests())
 
76
        LOGC(_BLUE, "Request: " << req_line);
 
77
 
 
78
      http::Header header;
 
79
      int content_length = 0;
 
80
 
 
81
      while((sock_stream >> header).good()) {
 
82
        if(header == http::Header::ContentLength())
 
83
          content_length = atoi(header.value.c_str());
 
84
      }
 
85
 
 
86
      if(req.type == http::Request::POST) {
 
87
        stringstream body;
 
88
        sock_stream.clear();
 
89
 
 
90
        while(content_length--)
 
91
          body.put((char)sock_stream.get());
 
92
 
 
93
        req.ParseParameters(body);
 
94
      }
 
95
 
 
96
      sock_stream.clear();
 
97
    }
 
98
 
 
99
    pclose = true;
 
100
    send_data = false;
 
101
 
 
102
    if (req.mask.items.cclose)
 
103
    {
 
104
      if(!is_opened)
 
105
        LOG("Close request received but there is not any channel opened");
 
106
      else if(req.parameters["cclose"] != channel)
 
107
        LOG("Close request received related to another channel");
 
108
      else {
 
109
        pclose = false;
 
110
        is_opened = false;
 
111
        req.cache_model.Clear();
 
112
        unlink(backup_file.c_str());
 
113
        index_manager.CloseImage(im_index);
 
114
        LOG("The channel " << channel << " has been closed");
 
115
        sock_stream << http::Response(200) << http::Header::ContentLength("0") << http::Protocol::CRLF << flush;
 
116
      }
 
117
    }
 
118
    else if (req.mask.items.cnew)
 
119
    {
 
120
      if(is_opened)
 
121
        LOG("There already is a channel opened. Only one channel per client is supported");
 
122
      else {
 
123
        string file_name = (req.mask.items.target ? req.parameters["target"] : req.object);
 
124
 
 
125
        if (!index_manager.OpenImage(file_name, &im_index))
 
126
          ERROR("The image file '" << file_name << "' can not be read");
 
127
        else if(!data_server.Reset(im_index))
 
128
          ERROR("The image file '" << file_name << "' can not be opened");
 
129
        else if(!data_server.SetRequest(req))
 
130
          ERROR("The server can not process the request");
 
131
        else {
 
132
          LOG("The channel " << channel << " has been opened for the image '" << file_name << "'");
 
133
 
 
134
          sock_stream
 
135
            << http::Response(200)
 
136
            << http::Header("JPIP-cnew", "cid=" + channel + ",path=jpip,transport=http")
 
137
            << http::Header("JPIP-tid", index_manager.file_manager().GetCacheFileName(file_name))
 
138
            << http::Header::CacheControl("no-cache")
 
139
            << http::Header::TransferEncoding("chunked")
 
140
            << http::Header::ContentType("image/jpp-stream")
 
141
            << http::Protocol::CRLF
 
142
            << flush;
 
143
 
 
144
          OutputStream().Open(backup_file.c_str()).Serialize(req.cache_model);
 
145
          is_opened = true;
 
146
          send_data = true;
 
147
        }
 
148
      }
 
149
 
 
150
    } else if (req.mask.items.cid) {
 
151
      if(!is_opened) LOG("Request received but no channel is opened");
 
152
      else {
 
153
        if(req.parameters["cid"] != channel)
 
154
          LOG("Request related to another channel");
 
155
        else if(!data_server.SetRequest(req))
 
156
          ERROR("The server can not process the request");
 
157
        else {
 
158
          sock_stream
 
159
            << http::Response(200)
 
160
            << http::Header::CacheControl("no-cache")
 
161
            << http::Header::TransferEncoding("chunked")
 
162
            << http::Header::ContentType("image/jpp-stream")
 
163
            << http::Protocol::CRLF
 
164
            << flush;
 
165
 
 
166
          send_data = true;
 
167
        }
 
168
      }
 
169
 
 
170
    } else {
 
171
      LOG("Invalid request (channel parameter not found)");
 
172
    }
 
173
 
 
174
    pclose = pclose && !send_data;
 
175
 
 
176
    if(pclose)
 
177
      sock_stream << http::Response(500) << http::Protocol::CRLF << flush;
 
178
    else if(send_data) {
 
179
      for(bool last = false; !last;) {
 
180
        chunk_len = buf_len;
 
181
 
 
182
        if(!data_server.GenerateChunk(buf, &chunk_len, &last)) {
 
183
          ERROR("A new data chunk could not be generated");
 
184
          pclose = true;
 
185
          break;
 
186
        }
 
187
 
 
188
        if(chunk_len > 0) {
 
189
          sock_stream << hex << chunk_len << dec << http::Protocol::CRLF << flush;
 
190
 
 
191
          //LOG("Chunk of " << chunk_len << " bytes sent");
 
192
          sock_stream->Send(buf, chunk_len);
 
193
 
 
194
          sock_stream << http::Protocol::CRLF << flush;
 
195
        }
 
196
      }
 
197
 
 
198
      sock_stream
 
199
        << "0" << http::Protocol::CRLF
 
200
        << http::Protocol::CRLF
 
201
        << flush;
 
202
 
 
203
      if(data_server.end_woi())
 
204
        OutputStream().Open(backup_file.c_str()).Serialize(req.cache_model);
 
205
    }
 
206
  }
 
207
 
 
208
  if(is_opened) {
 
209
    unlink(backup_file.c_str());
 
210
    index_manager.CloseImage(im_index);
 
211
  }
 
212
 
 
213
  delete [] buf;
 
214
}
 
215
 
 
216
void ClientManager::RunBasic(ClientInfo *client_info)
 
217
{
 
218
  jpip::Request req;
 
219
  int buff_len = 5000;
 
220
  char *buff = new char[buff_len];
 
221
  SocketStream sock_stream(client_info->sock());
 
222
 
 
223
  for (;;)
 
224
  {
 
225
    LOG("Waiting for a request ...");
 
226
 
 
227
    if(cfg.com_time_out() > 0) {
 
228
      if(sock_stream->WaitForInput(cfg.com_time_out() * 1000) == 0) {
 
229
        LOG("Communication time-out");
 
230
        sock_stream->Close();
 
231
        break;
 
232
      }
 
233
    }
 
234
 
 
235
    if (!(sock_stream >> req).good())
 
236
    {
 
237
      if(sock_stream->IsValid()) LOG("Incorrect request received");
 
238
      else LOG("Connection closed by the client");
 
239
      sock_stream->Close();
 
240
      break;
 
241
 
 
242
    }
 
243
    else
 
244
    {
 
245
      http::Header header;
 
246
      while ((sock_stream >> header).good());
 
247
      sock_stream.clear();
 
248
    }
 
249
 
 
250
    sock_stream << http::Response(200)
 
251
      << http::Header("JPIP-cnew", "cid=C0,path=jpip,transport=http")
 
252
      << http::Header("JPIP-tid", "T0")
 
253
      << http::Header::CacheControl("no-cache")
 
254
      << http::Header::ContentLength(base::to_string(buff_len))
 
255
      << http::Header::ContentType("image/jpp-stream")
 
256
      << http::Protocol::CRLF
 
257
      << flush;
 
258
 
 
259
    sock_stream->Send(buff, buff_len);
 
260
  }
 
261
}
 
262