~cgb-cs/appscale/appscale-main

« back to all changes in this revision

Viewing changes to AppController/coverage/-home-cgb-clients-main-main-cgb-research-Neptune-mapreduce_helper_rb.html

  • Committer: Chris Bunch
  • Date: 2012-02-26 03:20:57 UTC
  • Revision ID: cgb@cs.ucsb.edu-20120226032057-ad0cy0zgx4we4exc
adding in repo over app engine support, and tests for most of datastore repo on appscale and s3

Show diffs side-by-side

added added

removed removed

Lines of Context:
28
28
        <tbody>
29
29
          <tr>
30
30
            <td class="left_align"><a href="-home-cgb-clients-main-main-cgb-research-Neptune-mapreduce_helper_rb.html">/home/cgb/clients/main/main-cgb-research/Neptune/mapreduce_helper.rb</a></td>
31
 
            <td class='right_align'><tt>202</tt></td>
 
31
            <td class='right_align'><tt>203</tt></td>
32
32
            <td class='right_align'><tt>129</tt></td>
33
 
            <td class="left_align"><div class="percent_graph_legend"><tt class=''>20.79%</tt></div>
 
33
            <td class="left_align"><div class="percent_graph_legend"><tt class=''>23.15%</tt></div>
34
34
          <div class="percent_graph">
35
 
            <div class="covered" style="width:21px"></div>
36
 
            <div class="uncovered" style="width:79px"></div>
 
35
            <div class="covered" style="width:23px"></div>
 
36
            <div class="uncovered" style="width:77px"></div>
37
37
          </div></td>
38
 
            <td class="left_align"><div class="percent_graph_legend"><tt class=''>12.40%</tt></div>
 
38
            <td class="left_align"><div class="percent_graph_legend"><tt class=''>14.73%</tt></div>
39
39
          <div class="percent_graph">
40
 
            <div class="covered" style="width:12px"></div>
41
 
            <div class="uncovered" style="width:88px"></div>
 
40
            <div class="covered" style="width:15px"></div>
 
41
            <div class="uncovered" style="width:85px"></div>
42
42
          </div></td>
43
43
          </tr>
44
44
        </tbody>
81
81
          
82
82
          
83
83
          <tr class="marked">
84
 
            <td><pre><a name="line5">5</a> require 'djinn'</pre></td>
 
84
            <td><pre><a name="line5">5</a> $:.unshift File.join(File.dirname(__FILE__), &quot;..&quot;, &quot;AppController&quot;)</pre></td>
85
85
          </tr>
86
86
        
87
87
          
88
88
          
89
 
          <tr class="inferred">
90
 
            <td><pre><a name="line6">6</a> </pre></td>
 
89
          <tr class="marked">
 
90
            <td><pre><a name="line6">6</a> require 'djinn'</pre></td>
91
91
          </tr>
92
92
        
93
93
          
98
98
        
99
99
          
100
100
          
101
 
          <tr class="marked">
102
 
            <td><pre><a name="line8">8</a> BAD_TABLE_MSG = &quot;The currently running database isn't running Hadoop, so MapReduce jobs cannot be run.&quot;</pre></td>
103
 
          </tr>
104
 
        
105
 
          
106
 
          
107
 
          <tr class="inferred">
108
 
            <td><pre><a name="line9">9</a> </pre></td>
109
 
          </tr>
110
 
        
111
 
          
112
 
          
113
 
          <tr class="inferred">
114
 
            <td><pre><a name="line10">10</a> </pre></td>
115
 
          </tr>
116
 
        
117
 
          
118
 
          
119
 
          <tr class="marked">
120
 
            <td><pre><a name="line11">11</a> DBS_W_HADOOP = [&quot;hbase&quot;, &quot;hypertable&quot;]</pre></td>
 
101
          <tr class="inferred">
 
102
            <td><pre><a name="line8">8</a> </pre></td>
 
103
          </tr>
 
104
        
 
105
          
 
106
          
 
107
          <tr class="marked">
 
108
            <td><pre><a name="line9">9</a> $:.unshift File.join(File.dirname(__FILE__), &quot;..&quot;, &quot;AppController&quot;, &quot;lib&quot;)</pre></td>
 
109
          </tr>
 
110
        
 
111
          
 
112
          
 
113
          <tr class="marked">
 
114
            <td><pre><a name="line10">10</a> require 'datastore_factory'</pre></td>
 
115
          </tr>
 
116
        
 
117
          
 
118
          
 
119
          <tr class="inferred">
 
120
            <td><pre><a name="line11">11</a> </pre></td>
121
121
          </tr>
122
122
        
123
123
          
128
128
        
129
129
          
130
130
          
131
 
          <tr class="inferred">
132
 
            <td><pre><a name="line13">13</a> </pre></td>
133
 
          </tr>
134
 
        
135
 
          
136
 
          
137
131
          <tr class="marked">
138
 
            <td><pre><a name="line14">14</a> HADOOP = &quot;#{APPSCALE_HOME}/AppDB/hadoop-0.20.2/bin/hadoop&quot;</pre></td>
 
132
            <td><pre><a name="line13">13</a> BAD_TABLE_MSG = &quot;The currently running database isn't running Hadoop, so MapReduce jobs cannot be run.&quot;</pre></td>
 
133
          </tr>
 
134
        
 
135
          
 
136
          
 
137
          <tr class="inferred">
 
138
            <td><pre><a name="line14">14</a> </pre></td>
139
139
          </tr>
140
140
        
141
141
          
146
146
        
147
147
          
148
148
          
149
 
          <tr class="inferred">
150
 
            <td><pre><a name="line16">16</a> </pre></td>
151
 
          </tr>
152
 
        
153
 
          
154
 
          
155
149
          <tr class="marked">
156
 
            <td><pre><a name="line17">17</a> STREAMING = &quot;#{APPSCALE_HOME}/AppDB/hadoop-0.20.2/contrib/streaming/hadoop-0.20.2-streaming.jar&quot;</pre></td>
 
150
            <td><pre><a name="line16">16</a> DBS_W_HADOOP = [&quot;hbase&quot;, &quot;hypertable&quot;]</pre></td>
 
151
          </tr>
 
152
        
 
153
          
 
154
          
 
155
          <tr class="inferred">
 
156
            <td><pre><a name="line17">17</a> </pre></td>
157
157
          </tr>
158
158
        
159
159
          
164
164
        
165
165
          
166
166
          
167
 
          <tr class="inferred">
168
 
            <td><pre><a name="line19">19</a> </pre></td>
169
 
          </tr>
170
 
        
171
 
          
172
 
          
173
167
          <tr class="marked">
174
 
            <td><pre><a name="line20">20</a> public </pre></td>
 
168
            <td><pre><a name="line19">19</a> HADOOP = &quot;#{APPSCALE_HOME}/AppDB/hadoop-0.20.2/bin/hadoop&quot;</pre></td>
 
169
          </tr>
 
170
        
 
171
          
 
172
          
 
173
          <tr class="inferred">
 
174
            <td><pre><a name="line20">20</a> </pre></td>
175
175
          </tr>
176
176
        
177
177
          
182
182
        
183
183
          
184
184
          
185
 
          <tr class="inferred">
186
 
            <td><pre><a name="line22">22</a> </pre></td>
187
 
          </tr>
188
 
        
189
 
          
190
 
          
191
 
          <tr class="marked">
192
 
            <td><pre><a name="line23">23</a> def neptune_mapreduce_run_job(nodes, job_data, secret)</pre></td>
193
 
          </tr>
194
 
        
195
 
          
196
 
          
197
 
          <tr class="uncovered">
198
 
            <td><pre><a name="line24">24</a>   return BAD_SECRET_MSG unless valid_secret?(secret)</pre></td>
199
 
          </tr>
200
 
        
201
 
          
202
 
          
203
 
          <tr class="uncovered">
204
 
            <td><pre><a name="line25">25</a>   Djinn.log_debug(&quot;mapreduce - run&quot;)</pre></td>
205
 
          </tr>
206
 
        
207
 
          
208
 
          
209
 
          <tr class="uncovered">
 
185
          <tr class="marked">
 
186
            <td><pre><a name="line22">22</a> STREAMING = &quot;#{APPSCALE_HOME}/AppDB/hadoop-0.20.2/contrib/streaming/hadoop-0.20.2-streaming.jar&quot;</pre></td>
 
187
          </tr>
 
188
        
 
189
          
 
190
          
 
191
          <tr class="inferred">
 
192
            <td><pre><a name="line23">23</a> </pre></td>
 
193
          </tr>
 
194
        
 
195
          
 
196
          
 
197
          <tr class="inferred">
 
198
            <td><pre><a name="line24">24</a> </pre></td>
 
199
          </tr>
 
200
        
 
201
          
 
202
          
 
203
          <tr class="marked">
 
204
            <td><pre><a name="line25">25</a> public </pre></td>
 
205
          </tr>
 
206
        
 
207
          
 
208
          
 
209
          <tr class="inferred">
210
210
            <td><pre><a name="line26">26</a> </pre></td>
211
211
          </tr>
212
212
        
213
213
          
214
214
          
215
 
          <tr class="uncovered">
216
 
            <td><pre><a name="line27">27</a>   Thread.new {</pre></td>
217
 
          </tr>
218
 
        
219
 
          
220
 
          
221
 
          <tr class="uncovered">
222
 
            <td><pre><a name="line28">28</a>     keyname = @creds['keyname']</pre></td>
223
 
          </tr>
224
 
        
225
 
          
226
 
          
227
 
          <tr class="uncovered">
228
 
            <td><pre><a name="line29">29</a>     nodes = Djinn.convert_location_array_to_class(nodes, keyname)</pre></td>
229
 
          </tr>
230
 
        
231
 
          
232
 
          
233
 
          <tr class="uncovered">
234
 
            <td><pre><a name="line30">30</a> </pre></td>
235
 
          </tr>
236
 
        
237
 
          
238
 
          
239
 
          <tr class="uncovered">
240
 
            <td><pre><a name="line31">31</a>     storage = job_data[&quot;@storage&quot;]</pre></td>
241
 
          </tr>
242
 
        
243
 
          
244
 
          
245
 
          <tr class="uncovered">
246
 
            <td><pre><a name="line32">32</a>     creds = Djinn.neptune_parse_creds(storage, job_data)</pre></td>
247
 
          </tr>
248
 
        
249
 
          
250
 
          
251
 
          <tr class="uncovered">
252
 
            <td><pre><a name="line33">33</a> </pre></td>
253
 
          </tr>
254
 
        
255
 
          
256
 
          
257
 
          <tr class="uncovered">
258
 
            <td><pre><a name="line34">34</a>     mapreducejar = job_data[&quot;@mapreducejar&quot;]</pre></td>
259
 
          </tr>
260
 
        
261
 
          
262
 
          
263
 
          <tr class="uncovered">
264
 
            <td><pre><a name="line35">35</a>     main = job_data[&quot;@main&quot;]</pre></td>
265
 
          </tr>
266
 
        
267
 
          
268
 
          
269
 
          <tr class="uncovered">
270
 
            <td><pre><a name="line36">36</a> </pre></td>
271
 
          </tr>
272
 
        
273
 
          
274
 
          
275
 
          <tr class="uncovered">
276
 
            <td><pre><a name="line37">37</a>     map = job_data[&quot;@map&quot;]</pre></td>
277
 
          </tr>
278
 
        
279
 
          
280
 
          
281
 
          <tr class="uncovered">
282
 
            <td><pre><a name="line38">38</a>     reduce = job_data[&quot;@reduce&quot;]</pre></td>
283
 
          </tr>
284
 
        
285
 
          
286
 
          
287
 
          <tr class="uncovered">
288
 
            <td><pre><a name="line39">39</a> </pre></td>
289
 
          </tr>
290
 
        
291
 
          
292
 
          
293
 
          <tr class="uncovered">
294
 
            <td><pre><a name="line40">40</a>     input = job_data[&quot;@input&quot;]</pre></td>
295
 
          </tr>
296
 
        
297
 
          
298
 
          
299
 
          <tr class="uncovered">
300
 
            <td><pre><a name="line41">41</a>     output = job_data[&quot;@output&quot;]</pre></td>
301
 
          </tr>
302
 
        
303
 
          
304
 
          
305
 
          <tr class="uncovered">
306
 
            <td><pre><a name="line42">42</a> </pre></td>
307
 
          </tr>
308
 
        
309
 
          
310
 
          
311
 
          <tr class="uncovered">
312
 
            <td><pre><a name="line43">43</a>     Djinn.log_debug(&quot;MR: Copying mapper and reducer to all boxes&quot;)</pre></td>
313
 
          </tr>
314
 
        
315
 
          
316
 
          
317
 
          <tr class="uncovered">
318
 
            <td><pre><a name="line44">44</a>     # TODO: get files from shadow first if in cloud</pre></td>
319
 
          </tr>
320
 
        
321
 
          
322
 
          
323
 
          <tr class="uncovered">
324
 
            <td><pre><a name="line45">45</a> </pre></td>
325
 
          </tr>
326
 
        
327
 
          
328
 
          
329
 
          <tr class="uncovered">
330
 
            <td><pre><a name="line46">46</a>     if mapreducejar</pre></td>
331
 
          </tr>
332
 
        
333
 
          
334
 
          
335
 
          <tr class="uncovered">
336
 
            <td><pre><a name="line47">47</a>       Djinn.log_debug(&quot;need to get mr jar located at #{mapreducejar}&quot;)</pre></td>
337
 
          </tr>
338
 
        
339
 
          
340
 
          
341
 
          <tr class="uncovered">
342
 
            <td><pre><a name="line48">48</a>       mr_jar = Repo.get_output(mapreducejar, storage, creds)</pre></td>
343
 
          </tr>
344
 
        
345
 
          
346
 
          
347
 
          <tr class="uncovered">
348
 
            <td><pre><a name="line49">49</a>       mr_file = mapreducejar.split('/')[-1]</pre></td>
349
 
          </tr>
350
 
        
351
 
          
352
 
          
353
 
          <tr class="uncovered">
354
 
            <td><pre><a name="line50">50</a>       my_mrjar = &quot;/tmp/#{mr_file}&quot;</pre></td>
355
 
          </tr>
356
 
        
357
 
          
358
 
          
359
 
          <tr class="uncovered">
360
 
            <td><pre><a name="line51">51</a>       HelperFunctions.write_file(my_mrjar, mr_jar)</pre></td>
361
 
          </tr>
362
 
        
363
 
          
364
 
          
365
 
          <tr class="uncovered">
366
 
            <td><pre><a name="line52">52</a> </pre></td>
367
 
          </tr>
368
 
        
369
 
          
370
 
          
371
 
          <tr class="uncovered">
372
 
            <td><pre><a name="line53">53</a>       nodes.each { |node|</pre></td>
373
 
          </tr>
374
 
        
375
 
          
376
 
          
377
 
          <tr class="uncovered">
378
 
            <td><pre><a name="line54">54</a>         HelperFunctions.scp_file(my_mrjar, my_mrjar, node.private_ip, node.ssh_key)</pre></td>
379
 
          </tr>
380
 
        
381
 
          
382
 
          
383
 
          <tr class="uncovered">
384
 
            <td><pre><a name="line55">55</a>       }</pre></td>
 
215
          <tr class="inferred">
 
216
            <td><pre><a name="line27">27</a> </pre></td>
 
217
          </tr>
 
218
        
 
219
          
 
220
          
 
221
          <tr class="marked">
 
222
            <td><pre><a name="line28">28</a> def neptune_mapreduce_run_job(nodes, job_data, secret)</pre></td>
 
223
          </tr>
 
224
        
 
225
          
 
226
          
 
227
          <tr class="uncovered">
 
228
            <td><pre><a name="line29">29</a>   return BAD_SECRET_MSG unless valid_secret?(secret)</pre></td>
 
229
          </tr>
 
230
        
 
231
          
 
232
          
 
233
          <tr class="uncovered">
 
234
            <td><pre><a name="line30">30</a>   Djinn.log_debug(&quot;mapreduce - run&quot;)</pre></td>
 
235
          </tr>
 
236
        
 
237
          
 
238
          
 
239
          <tr class="uncovered">
 
240
            <td><pre><a name="line31">31</a> </pre></td>
 
241
          </tr>
 
242
        
 
243
          
 
244
          
 
245
          <tr class="uncovered">
 
246
            <td><pre><a name="line32">32</a>   Thread.new {</pre></td>
 
247
          </tr>
 
248
        
 
249
          
 
250
          
 
251
          <tr class="uncovered">
 
252
            <td><pre><a name="line33">33</a>     keyname = @creds['keyname']</pre></td>
 
253
          </tr>
 
254
        
 
255
          
 
256
          
 
257
          <tr class="uncovered">
 
258
            <td><pre><a name="line34">34</a>     nodes = Djinn.convert_location_array_to_class(nodes, keyname)</pre></td>
 
259
          </tr>
 
260
        
 
261
          
 
262
          
 
263
          <tr class="uncovered">
 
264
            <td><pre><a name="line35">35</a> </pre></td>
 
265
          </tr>
 
266
        
 
267
          
 
268
          
 
269
          <tr class="uncovered">
 
270
            <td><pre><a name="line36">36</a>     storage = job_data[&quot;@storage&quot;]</pre></td>
 
271
          </tr>
 
272
        
 
273
          
 
274
          
 
275
          <tr class="uncovered">
 
276
            <td><pre><a name="line37">37</a>     datastore = DatastoreFactory.get_datastore(storage, job_data)</pre></td>
 
277
          </tr>
 
278
        
 
279
          
 
280
          
 
281
          <tr class="uncovered">
 
282
            <td><pre><a name="line38">38</a> </pre></td>
 
283
          </tr>
 
284
        
 
285
          
 
286
          
 
287
          <tr class="uncovered">
 
288
            <td><pre><a name="line39">39</a>     mapreducejar = job_data[&quot;@mapreducejar&quot;]</pre></td>
 
289
          </tr>
 
290
        
 
291
          
 
292
          
 
293
          <tr class="uncovered">
 
294
            <td><pre><a name="line40">40</a>     main = job_data[&quot;@main&quot;]</pre></td>
 
295
          </tr>
 
296
        
 
297
          
 
298
          
 
299
          <tr class="uncovered">
 
300
            <td><pre><a name="line41">41</a> </pre></td>
 
301
          </tr>
 
302
        
 
303
          
 
304
          
 
305
          <tr class="uncovered">
 
306
            <td><pre><a name="line42">42</a>     map = job_data[&quot;@map&quot;]</pre></td>
 
307
          </tr>
 
308
        
 
309
          
 
310
          
 
311
          <tr class="uncovered">
 
312
            <td><pre><a name="line43">43</a>     reduce = job_data[&quot;@reduce&quot;]</pre></td>
 
313
          </tr>
 
314
        
 
315
          
 
316
          
 
317
          <tr class="uncovered">
 
318
            <td><pre><a name="line44">44</a> </pre></td>
 
319
          </tr>
 
320
        
 
321
          
 
322
          
 
323
          <tr class="uncovered">
 
324
            <td><pre><a name="line45">45</a>     input = job_data[&quot;@input&quot;]</pre></td>
 
325
          </tr>
 
326
        
 
327
          
 
328
          
 
329
          <tr class="uncovered">
 
330
            <td><pre><a name="line46">46</a>     output = job_data[&quot;@output&quot;]</pre></td>
 
331
          </tr>
 
332
        
 
333
          
 
334
          
 
335
          <tr class="uncovered">
 
336
            <td><pre><a name="line47">47</a> </pre></td>
 
337
          </tr>
 
338
        
 
339
          
 
340
          
 
341
          <tr class="uncovered">
 
342
            <td><pre><a name="line48">48</a>     Djinn.log_debug(&quot;MR: Copying mapper and reducer to all boxes&quot;)</pre></td>
 
343
          </tr>
 
344
        
 
345
          
 
346
          
 
347
          <tr class="uncovered">
 
348
            <td><pre><a name="line49">49</a>     # TODO: get files from shadow first if in cloud</pre></td>
 
349
          </tr>
 
350
        
 
351
          
 
352
          
 
353
          <tr class="uncovered">
 
354
            <td><pre><a name="line50">50</a> </pre></td>
 
355
          </tr>
 
356
        
 
357
          
 
358
          
 
359
          <tr class="uncovered">
 
360
            <td><pre><a name="line51">51</a>     if mapreducejar</pre></td>
 
361
          </tr>
 
362
        
 
363
          
 
364
          
 
365
          <tr class="uncovered">
 
366
            <td><pre><a name="line52">52</a>       Djinn.log_debug(&quot;need to get mr jar located at #{mapreducejar}&quot;)</pre></td>
 
367
          </tr>
 
368
        
 
369
          
 
370
          
 
371
          <tr class="uncovered">
 
372
            <td><pre><a name="line53">53</a>       mr_file = mapreducejar.split('/')[-1]</pre></td>
 
373
          </tr>
 
374
        
 
375
          
 
376
          
 
377
          <tr class="uncovered">
 
378
            <td><pre><a name="line54">54</a>       my_mrjar = &quot;/tmp/#{mr_file}&quot;</pre></td>
 
379
          </tr>
 
380
        
 
381
          
 
382
          
 
383
          <tr class="uncovered">
 
384
            <td><pre><a name="line55">55</a>       datastore.get_output_and_save_to_fs(mapreducejar, my_mrjar)</pre></td>
385
385
          </tr>
386
386
        
387
387
          
393
393
          
394
394
          
395
395
          <tr class="uncovered">
396
 
            <td><pre><a name="line57">57</a>       db_master = get_db_master</pre></td>
397
 
          </tr>
398
 
        
399
 
          
400
 
          
401
 
          <tr class="uncovered">
402
 
            <td><pre><a name="line58">58</a>       ip = db_master.private_ip</pre></td>
403
 
          </tr>
404
 
        
405
 
          
406
 
          
407
 
          <tr class="uncovered">
408
 
            <td><pre><a name="line59">59</a>       ssh_key = db_master.ssh_key</pre></td>
409
 
          </tr>
410
 
        
411
 
          
412
 
          
413
 
          <tr class="uncovered">
414
 
            <td><pre><a name="line60">60</a>       HelperFunctions.scp_file(my_mrjar, my_mrjar, ip, ssh_key)</pre></td>
415
 
          </tr>
416
 
        
417
 
          
418
 
          
419
 
          <tr class="uncovered">
420
 
            <td><pre><a name="line61">61</a> </pre></td>
421
 
          </tr>
422
 
        
423
 
          
424
 
          
425
 
          <tr class="uncovered">
426
 
            <td><pre><a name="line62">62</a>       run_mr_command = &quot;#{HADOOP} jar #{my_mrjar} #{main} #{input} #{output}&quot;</pre></td>
427
 
          </tr>
428
 
        
429
 
          
430
 
          
431
 
          <tr class="uncovered">
432
 
            <td><pre><a name="line63">63</a>     else</pre></td>
433
 
          </tr>
434
 
        
435
 
          
436
 
          
437
 
          <tr class="uncovered">
438
 
            <td><pre><a name="line64">64</a>       Djinn.log_debug(&quot;need to get map code located at #{map}&quot;) </pre></td>
439
 
          </tr>
440
 
        
441
 
          
442
 
          
443
 
          <tr class="uncovered">
444
 
            <td><pre><a name="line65">65</a>       map_code = Repo.get_output(map, storage, creds)</pre></td>
445
 
          </tr>
446
 
        
447
 
          
448
 
          
449
 
          <tr class="uncovered">
450
 
            <td><pre><a name="line66">66</a> </pre></td>
451
 
          </tr>
452
 
        
453
 
          
454
 
          
455
 
          <tr class="uncovered">
456
 
            <td><pre><a name="line67">67</a>       Djinn.log_debug(&quot;need to get reduce code located at #{reduce}&quot;)</pre></td>
457
 
          </tr>
458
 
        
459
 
          
460
 
          
461
 
          <tr class="uncovered">
462
 
            <td><pre><a name="line68">68</a>       red_code = Repo.get_output(reduce, storage, creds)</pre></td>
463
 
          </tr>
464
 
        
465
 
          
466
 
          
467
 
          <tr class="uncovered">
468
 
            <td><pre><a name="line69">69</a> </pre></td>
469
 
          </tr>
470
 
        
471
 
          
472
 
          
473
 
          <tr class="uncovered">
474
 
            <td><pre><a name="line70">70</a>       map_file = map.split('/')[-1]</pre></td>
475
 
          </tr>
476
 
        
477
 
          
478
 
          
479
 
          <tr class="uncovered">
480
 
            <td><pre><a name="line71">71</a>       red_file = reduce.split('/')[-1]</pre></td>
481
 
          </tr>
482
 
        
483
 
          
484
 
          
485
 
          <tr class="uncovered">
486
 
            <td><pre><a name="line72">72</a> </pre></td>
487
 
          </tr>
488
 
        
489
 
          
490
 
          
491
 
          <tr class="uncovered">
492
 
            <td><pre><a name="line73">73</a>       my_map = &quot;/tmp/#{map_file}&quot;</pre></td>
493
 
          </tr>
494
 
        
495
 
          
496
 
          
497
 
          <tr class="uncovered">
498
 
            <td><pre><a name="line74">74</a>       my_red = &quot;/tmp/#{red_file}&quot;</pre></td>
499
 
          </tr>
500
 
        
501
 
          
502
 
          
503
 
          <tr class="uncovered">
504
 
            <td><pre><a name="line75">75</a> </pre></td>
505
 
          </tr>
506
 
        
507
 
          
508
 
          
509
 
          <tr class="uncovered">
510
 
            <td><pre><a name="line76">76</a>       HelperFunctions.write_file(my_map, map_code)</pre></td>
511
 
          </tr>
512
 
        
513
 
          
514
 
          
515
 
          <tr class="uncovered">
516
 
            <td><pre><a name="line77">77</a>       HelperFunctions.write_file(my_red, red_code)</pre></td>
517
 
          </tr>
518
 
        
519
 
          
520
 
          
521
 
          <tr class="uncovered">
522
 
            <td><pre><a name="line78">78</a> </pre></td>
523
 
          </tr>
524
 
        
525
 
          
526
 
          
527
 
          <tr class="uncovered">
528
 
            <td><pre><a name="line79">79</a>       # since the db master is the initiator of the mapreduce job, it needs</pre></td>
529
 
          </tr>
530
 
        
531
 
          
532
 
          
533
 
          <tr class="uncovered">
534
 
            <td><pre><a name="line80">80</a>       # to have both the mapper and reducer files handy</pre></td>
535
 
          </tr>
536
 
        
537
 
          
538
 
          
539
 
          <tr class="uncovered">
540
 
            <td><pre><a name="line81">81</a> </pre></td>
541
 
          </tr>
542
 
        
543
 
          
544
 
          
545
 
          <tr class="uncovered">
546
 
            <td><pre><a name="line82">82</a>       db_master = get_db_master</pre></td>
547
 
          </tr>
548
 
        
549
 
          
550
 
          
551
 
          <tr class="uncovered">
552
 
            <td><pre><a name="line83">83</a>       ip = db_master.private_ip</pre></td>
553
 
          </tr>
554
 
        
555
 
          
556
 
          
557
 
          <tr class="uncovered">
558
 
            <td><pre><a name="line84">84</a>       ssh_key = db_master.ssh_key</pre></td>
559
 
          </tr>
560
 
        
561
 
          
562
 
          
563
 
          <tr class="uncovered">
564
 
            <td><pre><a name="line85">85</a>       HelperFunctions.scp_file(my_map, my_map, ip, ssh_key)</pre></td>
565
 
          </tr>
566
 
        
567
 
          
568
 
          
569
 
          <tr class="uncovered">
570
 
            <td><pre><a name="line86">86</a>       HelperFunctions.scp_file(my_red, my_red, ip, ssh_key)</pre></td>
571
 
          </tr>
572
 
        
573
 
          
574
 
          
575
 
          <tr class="uncovered">
576
 
            <td><pre><a name="line87">87</a> </pre></td>
577
 
          </tr>
578
 
        
579
 
          
580
 
          
581
 
          <tr class="uncovered">
582
 
            <td><pre><a name="line88">88</a>       nodes.each { |node|</pre></td>
583
 
          </tr>
584
 
        
585
 
          
586
 
          
587
 
          <tr class="uncovered">
588
 
            <td><pre><a name="line89">89</a>         HelperFunctions.scp_file(my_map, my_map, node.private_ip, node.ssh_key)</pre></td>
589
 
          </tr>
590
 
        
591
 
          
592
 
          
593
 
          <tr class="uncovered">
594
 
            <td><pre><a name="line90">90</a>         HelperFunctions.scp_file(my_red, my_red, node.private_ip, node.ssh_key)</pre></td>
595
 
          </tr>
596
 
        
597
 
          
598
 
          
599
 
          <tr class="uncovered">
600
 
            <td><pre><a name="line91">91</a>       }</pre></td>
601
 
          </tr>
602
 
        
603
 
          
604
 
          
605
 
          <tr class="uncovered">
606
 
            <td><pre><a name="line92">92</a> </pre></td>
607
 
          </tr>
608
 
        
609
 
          
610
 
          
611
 
          <tr class="uncovered">
612
 
            <td><pre><a name="line93">93</a>       map_cmd = &quot;\&quot;&quot; + get_language(my_map) + &quot; &quot; + my_map + &quot;\&quot;&quot;</pre></td>
613
 
          </tr>
614
 
        
615
 
          
616
 
          
617
 
          <tr class="uncovered">
618
 
            <td><pre><a name="line94">94</a>       reduce_cmd = &quot;\&quot;&quot; + get_language(my_red) + &quot; &quot; + my_red + &quot;\&quot;&quot;</pre></td>
619
 
          </tr>
620
 
        
621
 
          
622
 
          
623
 
          <tr class="uncovered">
624
 
            <td><pre><a name="line95">95</a> </pre></td>
625
 
          </tr>
626
 
        
627
 
          
628
 
          
629
 
          <tr class="uncovered">
630
 
            <td><pre><a name="line96">96</a>       run_mr_command = &quot;#{HADOOP} jar #{STREAMING} -input #{input} &quot; +</pre></td>
631
 
          </tr>
632
 
        
633
 
          
634
 
          
635
 
          <tr class="uncovered">
636
 
            <td><pre><a name="line97">97</a>         &quot;-output #{output} -mapper #{map_cmd} -reducer #{reduce_cmd}&quot;</pre></td>
637
 
          </tr>
638
 
        
639
 
          
640
 
          
641
 
          <tr class="uncovered">
642
 
            <td><pre><a name="line98">98</a>     end</pre></td>
643
 
          </tr>
644
 
        
645
 
          
646
 
          
647
 
          <tr class="uncovered">
648
 
            <td><pre><a name="line99">99</a> </pre></td>
649
 
          </tr>
650
 
        
651
 
          
652
 
          
653
 
          <tr class="uncovered">
654
 
            <td><pre><a name="line100">100</a>     Djinn.log_debug(&quot;waiting for input file #{input} to exist in HDFS&quot;)</pre></td>
655
 
          </tr>
656
 
        
657
 
          
658
 
          
659
 
          <tr class="uncovered">
660
 
            <td><pre><a name="line101">101</a>     wait_for_hdfs_file(input)</pre></td>
661
 
          </tr>
662
 
        
663
 
          
664
 
          
665
 
          <tr class="uncovered">
666
 
            <td><pre><a name="line102">102</a> </pre></td>
667
 
          </tr>
668
 
        
669
 
          
670
 
          
671
 
          <tr class="uncovered">
672
 
            <td><pre><a name="line103">103</a>     # run mr job</pre></td>
673
 
          </tr>
674
 
        
675
 
          
676
 
          
677
 
          <tr class="uncovered">
678
 
            <td><pre><a name="line104">104</a>     start = Time.now</pre></td>
679
 
          </tr>
680
 
        
681
 
          
682
 
          
683
 
          <tr class="uncovered">
684
 
            <td><pre><a name="line105">105</a> </pre></td>
685
 
          </tr>
686
 
        
687
 
          
688
 
          
689
 
          <tr class="uncovered">
690
 
            <td><pre><a name="line106">106</a>     Djinn.log_debug(&quot;MR: Running job&quot;)</pre></td>
691
 
          </tr>
692
 
        
693
 
          
694
 
          
695
 
          <tr class="uncovered">
696
 
            <td><pre><a name="line107">107</a>     Djinn.log_debug(&quot;MR: Command is #{run_mr_command}&quot;)</pre></td>
697
 
          </tr>
698
 
        
699
 
          
700
 
          
701
 
          <tr class="uncovered">
702
 
            <td><pre><a name="line108">108</a>     Djinn.log_run(run_mr_command)</pre></td>
703
 
          </tr>
704
 
        
705
 
          
706
 
          
707
 
          <tr class="uncovered">
708
 
            <td><pre><a name="line109">109</a> </pre></td>
709
 
          </tr>
710
 
        
711
 
          
712
 
          
713
 
          <tr class="uncovered">
714
 
            <td><pre><a name="line110">110</a>     wait_for_hdfs_file(output)</pre></td>
715
 
          </tr>
716
 
        
717
 
          
718
 
          
719
 
          <tr class="uncovered">
720
 
            <td><pre><a name="line111">111</a>     Djinn.log_debug(&quot;MR: Done running job!&quot;)</pre></td>
721
 
          </tr>
722
 
        
723
 
          
724
 
          
725
 
          <tr class="uncovered">
726
 
            <td><pre><a name="line112">112</a> </pre></td>
727
 
          </tr>
728
 
        
729
 
          
730
 
          
731
 
          <tr class="uncovered">
732
 
            <td><pre><a name="line113">113</a>     fin = Time.now</pre></td>
733
 
          </tr>
734
 
        
735
 
          
736
 
          
737
 
          <tr class="uncovered">
738
 
            <td><pre><a name="line114">114</a>     Djinn.log_debug(&quot;TIMING: Total time is #{fin - start} seconds&quot;)</pre></td>
739
 
          </tr>
740
 
        
741
 
          
742
 
          
743
 
          <tr class="uncovered">
744
 
            <td><pre><a name="line115">115</a> </pre></td>
745
 
          </tr>
746
 
        
747
 
          
748
 
          
749
 
          <tr class="uncovered">
750
 
            <td><pre><a name="line116">116</a>     # TODO: check if no part-* files exist - if so, there's an error</pre></td>
751
 
          </tr>
752
 
        
753
 
          
754
 
          
755
 
          <tr class="uncovered">
756
 
            <td><pre><a name="line117">117</a>     # that we should funnel to the user somehow</pre></td>
757
 
          </tr>
758
 
        
759
 
          
760
 
          
761
 
          <tr class="uncovered">
762
 
            <td><pre><a name="line118">118</a> </pre></td>
763
 
          </tr>
764
 
        
765
 
          
766
 
          
767
 
          <tr class="uncovered">
768
 
            <td><pre><a name="line119">119</a>     output_cmd = &quot;#{HADOOP} fs -cat #{output}/part-*&quot;</pre></td>
769
 
          </tr>
770
 
        
771
 
          
772
 
          
773
 
          <tr class="uncovered">
774
 
            <td><pre><a name="line120">120</a>     Djinn.log_debug(&quot;MR: Retrieving job output with command #{output_cmd}&quot;)</pre></td>
775
 
          </tr>
776
 
        
777
 
          
778
 
          
779
 
          <tr class="uncovered">
780
 
            <td><pre><a name="line121">121</a>     output_str = `#{output_cmd}`</pre></td>
781
 
          </tr>
782
 
        
783
 
          
784
 
          
785
 
          <tr class="uncovered">
786
 
            <td><pre><a name="line122">122</a> </pre></td>
787
 
          </tr>
788
 
        
789
 
          
790
 
          
791
 
          <tr class="uncovered">
792
 
            <td><pre><a name="line123">123</a>     neptune_write_job_output_str(job_data, output_str)</pre></td>
793
 
          </tr>
794
 
        
795
 
          
796
 
          
797
 
          <tr class="uncovered">
798
 
            <td><pre><a name="line124">124</a> </pre></td>
799
 
          </tr>
800
 
        
801
 
          
802
 
          
803
 
          <tr class="uncovered">
804
 
            <td><pre><a name="line125">125</a>     remove_lock_file(job_data)</pre></td>
805
 
          </tr>
806
 
        
807
 
          
808
 
          
809
 
          <tr class="uncovered">
810
 
            <td><pre><a name="line126">126</a>   }</pre></td>
811
 
          </tr>
812
 
        
813
 
          
814
 
          
815
 
          <tr class="uncovered">
816
 
            <td><pre><a name="line127">127</a> </pre></td>
817
 
          </tr>
818
 
        
819
 
          
820
 
          
821
 
          <tr class="uncovered">
822
 
            <td><pre><a name="line128">128</a>   return &quot;OK&quot;</pre></td>
823
 
          </tr>
824
 
        
825
 
          
826
 
          
827
 
          <tr class="uncovered">
828
 
            <td><pre><a name="line129">129</a> end</pre></td>
829
 
          </tr>
830
 
        
831
 
          
832
 
          
833
 
          <tr class="inferred">
834
 
            <td><pre><a name="line130">130</a> </pre></td>
835
 
          </tr>
836
 
        
837
 
          
838
 
          
839
 
          <tr class="marked">
840
 
            <td><pre><a name="line131">131</a> private</pre></td>
841
 
          </tr>
842
 
        
843
 
          
844
 
          
845
 
          <tr class="inferred">
846
 
            <td><pre><a name="line132">132</a> </pre></td>
847
 
          </tr>
848
 
        
849
 
          
850
 
          
851
 
          <tr class="marked">
852
 
            <td><pre><a name="line133">133</a> def neptune_mapreduce_get_output(job_data)</pre></td>
853
 
          </tr>
854
 
        
855
 
          
856
 
          
857
 
          <tr class="uncovered">
858
 
            <td><pre><a name="line134">134</a>   output = job_data[&quot;@output&quot;]</pre></td>
859
 
          </tr>
860
 
        
861
 
          
862
 
          
863
 
          <tr class="uncovered">
864
 
            <td><pre><a name="line135">135</a>   output_location = &quot;/tmp/#{output}&quot;</pre></td>
865
 
          </tr>
866
 
        
867
 
          
868
 
          
869
 
          <tr class="uncovered">
870
 
            <td><pre><a name="line136">136</a> </pre></td>
871
 
          </tr>
872
 
        
873
 
          
874
 
          
875
 
          <tr class="uncovered">
876
 
            <td><pre><a name="line137">137</a>   `rm -rf #{output_location}`</pre></td>
877
 
          </tr>
878
 
        
879
 
          
880
 
          
881
 
          <tr class="uncovered">
882
 
            <td><pre><a name="line138">138</a>   run_on_db_master(&quot;rm -rf #{output_location}&quot;, NO_OUTPUT) </pre></td>
883
 
          </tr>
884
 
        
885
 
          
886
 
          
887
 
          <tr class="uncovered">
888
 
            <td><pre><a name="line139">139</a>   run_on_db_master(&quot;#{HADOOP} fs -get #{output} #{output_location}&quot;, NO_OUTPUT)</pre></td>
889
 
          </tr>
890
 
        
891
 
          
892
 
          
893
 
          <tr class="uncovered">
894
 
            <td><pre><a name="line140">140</a>   unless my_node.is_db_master?</pre></td>
895
 
          </tr>
896
 
        
897
 
          
898
 
          
899
 
          <tr class="uncovered">
900
 
            <td><pre><a name="line141">141</a>     Djinn.log_debug(&quot;hey by the way output is [#{output}]&quot;)</pre></td>
901
 
          </tr>
902
 
        
903
 
          
904
 
          
905
 
          <tr class="uncovered">
906
 
            <td><pre><a name="line142">142</a> </pre></td>
907
 
          </tr>
908
 
        
909
 
          
910
 
          
911
 
          <tr class="uncovered">
912
 
            <td><pre><a name="line143">143</a>     db_master = get_db_master</pre></td>
913
 
          </tr>
914
 
        
915
 
          
916
 
          
917
 
          <tr class="uncovered">
918
 
            <td><pre><a name="line144">144</a>     ip = db_master.public_ip</pre></td>
919
 
          </tr>
920
 
        
921
 
          
922
 
          
923
 
          <tr class="uncovered">
924
 
            <td><pre><a name="line145">145</a>     ssh_key = db_master.ssh_key</pre></td>
925
 
          </tr>
926
 
        
927
 
          
928
 
          
929
 
          <tr class="uncovered">
930
 
            <td><pre><a name="line146">146</a> </pre></td>
931
 
          </tr>
932
 
        
933
 
          
934
 
          
935
 
          <tr class="uncovered">
936
 
            <td><pre><a name="line147">147</a>     Djinn.log_run(&quot;scp -i #{ssh_key} -o StrictHostkeyChecking=no -r #{ip}:#{output_location} #{output_location}&quot;)</pre></td>
937
 
          </tr>
938
 
        
939
 
          
940
 
          
941
 
          <tr class="uncovered">
942
 
            <td><pre><a name="line148">148</a>   end</pre></td>
943
 
          </tr>
944
 
        
945
 
          
946
 
          
947
 
          <tr class="uncovered">
948
 
            <td><pre><a name="line149">149</a> </pre></td>
949
 
          </tr>
950
 
        
951
 
          
952
 
          
953
 
          <tr class="uncovered">
954
 
            <td><pre><a name="line150">150</a>   return output_location</pre></td>
955
 
          </tr>
956
 
        
957
 
          
958
 
          
959
 
          <tr class="uncovered">
960
 
            <td><pre><a name="line151">151</a> end</pre></td>
961
 
          </tr>
962
 
        
963
 
          
964
 
          
965
 
          <tr class="inferred">
966
 
            <td><pre><a name="line152">152</a> </pre></td>
967
 
          </tr>
968
 
        
969
 
          
970
 
          
971
 
          <tr class="marked">
972
 
            <td><pre><a name="line153">153</a> def start_mapreduce_master()</pre></td>
973
 
          </tr>
974
 
        
975
 
          
976
 
          
977
 
          <tr class="uncovered">
978
 
            <td><pre><a name="line154">154</a>   Djinn.log_debug(&quot;start mapreduce master - starting up hadoop first&quot;)</pre></td>
979
 
          </tr>
980
 
        
981
 
          
982
 
          
983
 
          <tr class="uncovered">
984
 
            <td><pre><a name="line155">155</a>   #start_db_master</pre></td>
985
 
          </tr>
986
 
        
987
 
          
988
 
          
989
 
          <tr class="uncovered">
990
 
            <td><pre><a name="line156">156</a>   #start_hadoop_slave</pre></td>
991
 
          </tr>
992
 
        
993
 
          
994
 
          
995
 
          <tr class="uncovered">
996
 
            <td><pre><a name="line157">157</a> end</pre></td>
997
 
          </tr>
998
 
        
999
 
          
1000
 
          
1001
 
          <tr class="inferred">
1002
 
            <td><pre><a name="line158">158</a> </pre></td>
1003
 
          </tr>
1004
 
        
1005
 
          
1006
 
          
1007
 
          <tr class="marked">
1008
 
            <td><pre><a name="line159">159</a> def start_mapreduce_slave()</pre></td>
1009
 
          </tr>
1010
 
        
1011
 
          
1012
 
          
1013
 
          <tr class="uncovered">
1014
 
            <td><pre><a name="line160">160</a>   Djinn.log_debug(&quot;start mapreduce slave - starting up hadoop first&quot;)</pre></td>
1015
 
          </tr>
1016
 
        
1017
 
          
1018
 
          
1019
 
          <tr class="uncovered">
1020
 
            <td><pre><a name="line161">161</a>   #start_db_slave</pre></td>
1021
 
          </tr>
1022
 
        
1023
 
          
1024
 
          
1025
 
          <tr class="uncovered">
1026
 
            <td><pre><a name="line162">162</a>   #start_hadoop_slave</pre></td>
1027
 
          </tr>
1028
 
        
1029
 
          
1030
 
          
1031
 
          <tr class="uncovered">
1032
 
            <td><pre><a name="line163">163</a> end</pre></td>
1033
 
          </tr>
1034
 
        
1035
 
          
1036
 
          
1037
 
          <tr class="inferred">
1038
 
            <td><pre><a name="line164">164</a> </pre></td>
1039
 
          </tr>
1040
 
        
1041
 
          
1042
 
          
1043
 
          <tr class="marked">
1044
 
            <td><pre><a name="line165">165</a> def stop_mapreduce_master()</pre></td>
1045
 
          </tr>
1046
 
        
1047
 
          
1048
 
          
1049
 
          <tr class="uncovered">
1050
 
            <td><pre><a name="line166">166</a>   Djinn.log_debug(&quot;stop mapreduce master - stopping hadoop&quot;)</pre></td>
1051
 
          </tr>
1052
 
        
1053
 
          
1054
 
          
1055
 
          <tr class="uncovered">
1056
 
            <td><pre><a name="line167">167</a>   #stop_db_master</pre></td>
1057
 
          </tr>
1058
 
        
1059
 
          
1060
 
          
1061
 
          <tr class="uncovered">
1062
 
            <td><pre><a name="line168">168</a>   #stop_hadoop_slave</pre></td>
1063
 
          </tr>
1064
 
        
1065
 
          
1066
 
          
1067
 
          <tr class="uncovered">
1068
 
            <td><pre><a name="line169">169</a> end</pre></td>
1069
 
          </tr>
1070
 
        
1071
 
          
1072
 
          
1073
 
          <tr class="inferred">
1074
 
            <td><pre><a name="line170">170</a> </pre></td>
1075
 
          </tr>
1076
 
        
1077
 
          
1078
 
          
1079
 
          <tr class="marked">
1080
 
            <td><pre><a name="line171">171</a> def stop_mapreduce_slave()</pre></td>
1081
 
          </tr>
1082
 
        
1083
 
          
1084
 
          
1085
 
          <tr class="uncovered">
1086
 
            <td><pre><a name="line172">172</a>   Djinn.log_debug(&quot;stop mapreduce slave - stopping hadoop&quot;)</pre></td>
1087
 
          </tr>
1088
 
        
1089
 
          
1090
 
          
1091
 
          <tr class="uncovered">
1092
 
            <td><pre><a name="line173">173</a>   #stop_db_slave</pre></td>
1093
 
          </tr>
1094
 
        
1095
 
          
1096
 
          
1097
 
          <tr class="uncovered">
1098
 
            <td><pre><a name="line174">174</a>   #stop_hadoop_slave</pre></td>
1099
 
          </tr>
1100
 
        
1101
 
          
1102
 
          
1103
 
          <tr class="uncovered">
1104
 
            <td><pre><a name="line175">175</a> end</pre></td>
1105
 
          </tr>
1106
 
        
1107
 
          
1108
 
          
1109
 
          <tr class="inferred">
1110
 
            <td><pre><a name="line176">176</a> </pre></td>
1111
 
          </tr>
1112
 
        
1113
 
          
1114
 
          
1115
 
          <tr class="marked">
1116
 
            <td><pre><a name="line177">177</a> def wait_for_hdfs_file(location)</pre></td>
1117
 
          </tr>
1118
 
        
1119
 
          
1120
 
          
1121
 
          <tr class="uncovered">
1122
 
            <td><pre><a name="line178">178</a>   command = &quot;#{HADOOP} fs -ls #{location}&quot;</pre></td>
1123
 
          </tr>
1124
 
        
1125
 
          
1126
 
          
1127
 
          <tr class="uncovered">
1128
 
            <td><pre><a name="line179">179</a>   db_master = get_db_master</pre></td>
1129
 
          </tr>
1130
 
        
1131
 
          
1132
 
          
1133
 
          <tr class="uncovered">
1134
 
            <td><pre><a name="line180">180</a>   ip = db_master.public_ip</pre></td>
1135
 
          </tr>
1136
 
        
1137
 
          
1138
 
          
1139
 
          <tr class="uncovered">
1140
 
            <td><pre><a name="line181">181</a>   ssh_key = db_master.ssh_key</pre></td>
1141
 
          </tr>
1142
 
        
1143
 
          
1144
 
          
1145
 
          <tr class="uncovered">
1146
 
            <td><pre><a name="line182">182</a>   loop {</pre></td>
1147
 
          </tr>
1148
 
        
1149
 
          
1150
 
          
1151
 
          <tr class="uncovered">
1152
 
            <td><pre><a name="line183">183</a>     cmd = &quot;ssh -o StrictHostkeyChecking=no -i #{ssh_key} #{ip} '#{command}'&quot;</pre></td>
1153
 
          </tr>
1154
 
        
1155
 
          
1156
 
          
1157
 
          <tr class="uncovered">
1158
 
            <td><pre><a name="line184">184</a>     Djinn.log_debug(cmd)</pre></td>
1159
 
          </tr>
1160
 
        
1161
 
          
1162
 
          
1163
 
          <tr class="uncovered">
1164
 
            <td><pre><a name="line185">185</a>     result = `#{cmd}`</pre></td>
1165
 
          </tr>
1166
 
        
1167
 
          
1168
 
          
1169
 
          <tr class="uncovered">
1170
 
            <td><pre><a name="line186">186</a>     Djinn.log_debug(&quot;oi: result was [#{result}]&quot;)</pre></td>
1171
 
          </tr>
1172
 
        
1173
 
          
1174
 
          
1175
 
          <tr class="uncovered">
1176
 
            <td><pre><a name="line187">187</a>     break if result.match(/Found/) # this shows up when ls returns files</pre></td>
1177
 
          </tr>
1178
 
        
1179
 
          
1180
 
          
1181
 
          <tr class="uncovered">
1182
 
            <td><pre><a name="line188">188</a>     sleep(5)</pre></td>
1183
 
          </tr>
1184
 
        
1185
 
          
1186
 
          
1187
 
          <tr class="uncovered">
1188
 
            <td><pre><a name="line189">189</a>   }</pre></td>
1189
 
          </tr>
1190
 
        
1191
 
          
1192
 
          
1193
 
          <tr class="uncovered">
1194
 
            <td><pre><a name="line190">190</a> end</pre></td>
1195
 
          </tr>
1196
 
        
1197
 
          
1198
 
          
1199
 
          <tr class="inferred">
1200
 
            <td><pre><a name="line191">191</a> </pre></td>
1201
 
          </tr>
1202
 
        
1203
 
          
1204
 
          
1205
 
          <tr class="marked">
1206
 
            <td><pre><a name="line192">192</a> def get_language(filename)</pre></td>
1207
 
          </tr>
1208
 
        
1209
 
          
1210
 
          
1211
 
          <tr class="uncovered">
1212
 
            <td><pre><a name="line193">193</a>   return &quot;ruby&quot;</pre></td>
1213
 
          </tr>
1214
 
        
1215
 
          
1216
 
          
1217
 
          <tr class="uncovered">
1218
 
            <td><pre><a name="line194">194</a> end</pre></td>
1219
 
          </tr>
1220
 
        
1221
 
          
1222
 
          
1223
 
          <tr class="inferred">
1224
 
            <td><pre><a name="line195">195</a> </pre></td>
1225
 
          </tr>
1226
 
        
1227
 
          
1228
 
          
1229
 
          <tr class="marked">
1230
 
            <td><pre><a name="line196">196</a> def run_on_db_master(command, output=WANT_OUTPUT)</pre></td>
1231
 
          </tr>
1232
 
        
1233
 
          
1234
 
          
1235
 
          <tr class="uncovered">
1236
 
            <td><pre><a name="line197">197</a>   db_master = get_db_master</pre></td>
1237
 
          </tr>
1238
 
        
1239
 
          
1240
 
          
1241
 
          <tr class="uncovered">
1242
 
            <td><pre><a name="line198">198</a>   ip = db_master.public_ip</pre></td>
1243
 
          </tr>
1244
 
        
1245
 
          
1246
 
          
1247
 
          <tr class="uncovered">
1248
 
            <td><pre><a name="line199">199</a>   ssh_key = db_master.ssh_key  </pre></td>
1249
 
          </tr>
1250
 
        
1251
 
          
1252
 
          
1253
 
          <tr class="uncovered">
1254
 
            <td><pre><a name="line200">200</a>   HelperFunctions.run_remote_command(ip, command, ssh_key, NO_OUTPUT) </pre></td>
1255
 
          </tr>
1256
 
        
1257
 
          
1258
 
          
1259
 
          <tr class="uncovered">
1260
 
            <td><pre><a name="line201">201</a> end</pre></td>
1261
 
          </tr>
1262
 
        
1263
 
          
1264
 
          
1265
 
          <tr class="inferred">
1266
 
            <td><pre><a name="line202">202</a> </pre></td>
 
396
            <td><pre><a name="line57">57</a>       nodes.each { |node|</pre></td>
 
397
          </tr>
 
398
        
 
399
          
 
400
          
 
401
          <tr class="uncovered">
 
402
            <td><pre><a name="line58">58</a>         HelperFunctions.scp_file(my_mrjar, my_mrjar, node.private_ip, node.ssh_key)</pre></td>
 
403
          </tr>
 
404
        
 
405
          
 
406
          
 
407
          <tr class="uncovered">
 
408
            <td><pre><a name="line59">59</a>       }</pre></td>
 
409
          </tr>
 
410
        
 
411
          
 
412
          
 
413
          <tr class="uncovered">
 
414
            <td><pre><a name="line60">60</a> </pre></td>
 
415
          </tr>
 
416
        
 
417
          
 
418
          
 
419
          <tr class="uncovered">
 
420
            <td><pre><a name="line61">61</a>       db_master = get_db_master</pre></td>
 
421
          </tr>
 
422
        
 
423
          
 
424
          
 
425
          <tr class="uncovered">
 
426
            <td><pre><a name="line62">62</a>       ip = db_master.private_ip</pre></td>
 
427
          </tr>
 
428
        
 
429
          
 
430
          
 
431
          <tr class="uncovered">
 
432
            <td><pre><a name="line63">63</a>       ssh_key = db_master.ssh_key</pre></td>
 
433
          </tr>
 
434
        
 
435
          
 
436
          
 
437
          <tr class="uncovered">
 
438
            <td><pre><a name="line64">64</a>       HelperFunctions.scp_file(my_mrjar, my_mrjar, ip, ssh_key)</pre></td>
 
439
          </tr>
 
440
        
 
441
          
 
442
          
 
443
          <tr class="uncovered">
 
444
            <td><pre><a name="line65">65</a> </pre></td>
 
445
          </tr>
 
446
        
 
447
          
 
448
          
 
449
          <tr class="uncovered">
 
450
            <td><pre><a name="line66">66</a>       run_mr_command = &quot;#{HADOOP} jar #{my_mrjar} #{main} #{input} #{output}&quot;</pre></td>
 
451
          </tr>
 
452
        
 
453
          
 
454
          
 
455
          <tr class="uncovered">
 
456
            <td><pre><a name="line67">67</a>     else</pre></td>
 
457
          </tr>
 
458
        
 
459
          
 
460
          
 
461
          <tr class="uncovered">
 
462
            <td><pre><a name="line68">68</a>       Djinn.log_debug(&quot;need to get map code located at #{map}, and reduce &quot; +</pre></td>
 
463
          </tr>
 
464
        
 
465
          
 
466
          
 
467
          <tr class="uncovered">
 
468
            <td><pre><a name="line69">69</a>         &quot;code located at #{reduce}&quot;)</pre></td>
 
469
          </tr>
 
470
        
 
471
          
 
472
          
 
473
          <tr class="uncovered">
 
474
            <td><pre><a name="line70">70</a> </pre></td>
 
475
          </tr>
 
476
        
 
477
          
 
478
          
 
479
          <tr class="uncovered">
 
480
            <td><pre><a name="line71">71</a>       map_file = map.split('/')[-1]</pre></td>
 
481
          </tr>
 
482
        
 
483
          
 
484
          
 
485
          <tr class="uncovered">
 
486
            <td><pre><a name="line72">72</a>       red_file = reduce.split('/')[-1]</pre></td>
 
487
          </tr>
 
488
        
 
489
          
 
490
          
 
491
          <tr class="uncovered">
 
492
            <td><pre><a name="line73">73</a> </pre></td>
 
493
          </tr>
 
494
        
 
495
          
 
496
          
 
497
          <tr class="uncovered">
 
498
            <td><pre><a name="line74">74</a>       my_map = &quot;/tmp/#{map_file}&quot;</pre></td>
 
499
          </tr>
 
500
        
 
501
          
 
502
          
 
503
          <tr class="uncovered">
 
504
            <td><pre><a name="line75">75</a>       my_red = &quot;/tmp/#{red_file}&quot;</pre></td>
 
505
          </tr>
 
506
        
 
507
          
 
508
          
 
509
          <tr class="uncovered">
 
510
            <td><pre><a name="line76">76</a> </pre></td>
 
511
          </tr>
 
512
        
 
513
          
 
514
          
 
515
          <tr class="uncovered">
 
516
            <td><pre><a name="line77">77</a>       datastore.get_output_and_save_to_fs(map, my_map)</pre></td>
 
517
          </tr>
 
518
        
 
519
          
 
520
          
 
521
          <tr class="uncovered">
 
522
            <td><pre><a name="line78">78</a>       datastore.get_output_and_save_to_fs(reduce, my_red)</pre></td>
 
523
          </tr>
 
524
        
 
525
          
 
526
          
 
527
          <tr class="uncovered">
 
528
            <td><pre><a name="line79">79</a> </pre></td>
 
529
          </tr>
 
530
        
 
531
          
 
532
          
 
533
          <tr class="uncovered">
 
534
            <td><pre><a name="line80">80</a>       # since the db master is the initiator of the mapreduce job, it needs</pre></td>
 
535
          </tr>
 
536
        
 
537
          
 
538
          
 
539
          <tr class="uncovered">
 
540
            <td><pre><a name="line81">81</a>       # to have both the mapper and reducer files handy</pre></td>
 
541
          </tr>
 
542
        
 
543
          
 
544
          
 
545
          <tr class="uncovered">
 
546
            <td><pre><a name="line82">82</a> </pre></td>
 
547
          </tr>
 
548
        
 
549
          
 
550
          
 
551
          <tr class="uncovered">
 
552
            <td><pre><a name="line83">83</a>       db_master = get_db_master</pre></td>
 
553
          </tr>
 
554
        
 
555
          
 
556
          
 
557
          <tr class="uncovered">
 
558
            <td><pre><a name="line84">84</a>       ip = db_master.private_ip</pre></td>
 
559
          </tr>
 
560
        
 
561
          
 
562
          
 
563
          <tr class="uncovered">
 
564
            <td><pre><a name="line85">85</a>       ssh_key = db_master.ssh_key</pre></td>
 
565
          </tr>
 
566
        
 
567
          
 
568
          
 
569
          <tr class="uncovered">
 
570
            <td><pre><a name="line86">86</a>       HelperFunctions.scp_file(my_map, my_map, ip, ssh_key)</pre></td>
 
571
          </tr>
 
572
        
 
573
          
 
574
          
 
575
          <tr class="uncovered">
 
576
            <td><pre><a name="line87">87</a>       HelperFunctions.scp_file(my_red, my_red, ip, ssh_key)</pre></td>
 
577
          </tr>
 
578
        
 
579
          
 
580
          
 
581
          <tr class="uncovered">
 
582
            <td><pre><a name="line88">88</a> </pre></td>
 
583
          </tr>
 
584
        
 
585
          
 
586
          
 
587
          <tr class="uncovered">
 
588
            <td><pre><a name="line89">89</a>       nodes.each { |node|</pre></td>
 
589
          </tr>
 
590
        
 
591
          
 
592
          
 
593
          <tr class="uncovered">
 
594
            <td><pre><a name="line90">90</a>         HelperFunctions.scp_file(my_map, my_map, node.private_ip, node.ssh_key)</pre></td>
 
595
          </tr>
 
596
        
 
597
          
 
598
          
 
599
          <tr class="uncovered">
 
600
            <td><pre><a name="line91">91</a>         HelperFunctions.scp_file(my_red, my_red, node.private_ip, node.ssh_key)</pre></td>
 
601
          </tr>
 
602
        
 
603
          
 
604
          
 
605
          <tr class="uncovered">
 
606
            <td><pre><a name="line92">92</a>       }</pre></td>
 
607
          </tr>
 
608
        
 
609
          
 
610
          
 
611
          <tr class="uncovered">
 
612
            <td><pre><a name="line93">93</a> </pre></td>
 
613
          </tr>
 
614
        
 
615
          
 
616
          
 
617
          <tr class="uncovered">
 
618
            <td><pre><a name="line94">94</a>       map_cmd = &quot;\&quot;&quot; + get_language(my_map) + &quot; &quot; + my_map + &quot;\&quot;&quot;</pre></td>
 
619
          </tr>
 
620
        
 
621
          
 
622
          
 
623
          <tr class="uncovered">
 
624
            <td><pre><a name="line95">95</a>       reduce_cmd = &quot;\&quot;&quot; + get_language(my_red) + &quot; &quot; + my_red + &quot;\&quot;&quot;</pre></td>
 
625
          </tr>
 
626
        
 
627
          
 
628
          
 
629
          <tr class="uncovered">
 
630
            <td><pre><a name="line96">96</a> </pre></td>
 
631
          </tr>
 
632
        
 
633
          
 
634
          
 
635
          <tr class="uncovered">
 
636
            <td><pre><a name="line97">97</a>       run_mr_command = &quot;#{HADOOP} jar #{STREAMING} -input #{input} &quot; +</pre></td>
 
637
          </tr>
 
638
        
 
639
          
 
640
          
 
641
          <tr class="uncovered">
 
642
            <td><pre><a name="line98">98</a>         &quot;-output #{output} -mapper #{map_cmd} -reducer #{reduce_cmd}&quot;</pre></td>
 
643
          </tr>
 
644
        
 
645
          
 
646
          
 
647
          <tr class="uncovered">
 
648
            <td><pre><a name="line99">99</a>     end</pre></td>
 
649
          </tr>
 
650
        
 
651
          
 
652
          
 
653
          <tr class="uncovered">
 
654
            <td><pre><a name="line100">100</a> </pre></td>
 
655
          </tr>
 
656
        
 
657
          
 
658
          
 
659
          <tr class="uncovered">
 
660
            <td><pre><a name="line101">101</a>     Djinn.log_debug(&quot;waiting for input file #{input} to exist in HDFS&quot;)</pre></td>
 
661
          </tr>
 
662
        
 
663
          
 
664
          
 
665
          <tr class="uncovered">
 
666
            <td><pre><a name="line102">102</a>     wait_for_hdfs_file(input)</pre></td>
 
667
          </tr>
 
668
        
 
669
          
 
670
          
 
671
          <tr class="uncovered">
 
672
            <td><pre><a name="line103">103</a> </pre></td>
 
673
          </tr>
 
674
        
 
675
          
 
676
          
 
677
          <tr class="uncovered">
 
678
            <td><pre><a name="line104">104</a>     # run mr job</pre></td>
 
679
          </tr>
 
680
        
 
681
          
 
682
          
 
683
          <tr class="uncovered">
 
684
            <td><pre><a name="line105">105</a>     start = Time.now</pre></td>
 
685
          </tr>
 
686
        
 
687
          
 
688
          
 
689
          <tr class="uncovered">
 
690
            <td><pre><a name="line106">106</a> </pre></td>
 
691
          </tr>
 
692
        
 
693
          
 
694
          
 
695
          <tr class="uncovered">
 
696
            <td><pre><a name="line107">107</a>     Djinn.log_debug(&quot;MR: Running job&quot;)</pre></td>
 
697
          </tr>
 
698
        
 
699
          
 
700
          
 
701
          <tr class="uncovered">
 
702
            <td><pre><a name="line108">108</a>     Djinn.log_debug(&quot;MR: Command is #{run_mr_command}&quot;)</pre></td>
 
703
          </tr>
 
704
        
 
705
          
 
706
          
 
707
          <tr class="uncovered">
 
708
            <td><pre><a name="line109">109</a>     Djinn.log_run(run_mr_command)</pre></td>
 
709
          </tr>
 
710
        
 
711
          
 
712
          
 
713
          <tr class="uncovered">
 
714
            <td><pre><a name="line110">110</a> </pre></td>
 
715
          </tr>
 
716
        
 
717
          
 
718
          
 
719
          <tr class="uncovered">
 
720
            <td><pre><a name="line111">111</a>     wait_for_hdfs_file(output)</pre></td>
 
721
          </tr>
 
722
        
 
723
          
 
724
          
 
725
          <tr class="uncovered">
 
726
            <td><pre><a name="line112">112</a>     Djinn.log_debug(&quot;MR: Done running job!&quot;)</pre></td>
 
727
          </tr>
 
728
        
 
729
          
 
730
          
 
731
          <tr class="uncovered">
 
732
            <td><pre><a name="line113">113</a> </pre></td>
 
733
          </tr>
 
734
        
 
735
          
 
736
          
 
737
          <tr class="uncovered">
 
738
            <td><pre><a name="line114">114</a>     fin = Time.now</pre></td>
 
739
          </tr>
 
740
        
 
741
          
 
742
          
 
743
          <tr class="uncovered">
 
744
            <td><pre><a name="line115">115</a>     Djinn.log_debug(&quot;TIMING: Total time is #{fin - start} seconds&quot;)</pre></td>
 
745
          </tr>
 
746
        
 
747
          
 
748
          
 
749
          <tr class="uncovered">
 
750
            <td><pre><a name="line116">116</a> </pre></td>
 
751
          </tr>
 
752
        
 
753
          
 
754
          
 
755
          <tr class="uncovered">
 
756
            <td><pre><a name="line117">117</a>     # TODO: check if no part-* files exist - if so, there's an error</pre></td>
 
757
          </tr>
 
758
        
 
759
          
 
760
          
 
761
          <tr class="uncovered">
 
762
            <td><pre><a name="line118">118</a>     # that we should funnel to the user somehow</pre></td>
 
763
          </tr>
 
764
        
 
765
          
 
766
          
 
767
          <tr class="uncovered">
 
768
            <td><pre><a name="line119">119</a> </pre></td>
 
769
          </tr>
 
770
        
 
771
          
 
772
          
 
773
          <tr class="uncovered">
 
774
            <td><pre><a name="line120">120</a>     output_cmd = &quot;#{HADOOP} fs -cat #{output}/part-*&quot;</pre></td>
 
775
          </tr>
 
776
        
 
777
          
 
778
          
 
779
          <tr class="uncovered">
 
780
            <td><pre><a name="line121">121</a>     Djinn.log_debug(&quot;MR: Retrieving job output with command #{output_cmd}&quot;)</pre></td>
 
781
          </tr>
 
782
        
 
783
          
 
784
          
 
785
          <tr class="uncovered">
 
786
            <td><pre><a name="line122">122</a>     output_str = `#{output_cmd}`</pre></td>
 
787
          </tr>
 
788
        
 
789
          
 
790
          
 
791
          <tr class="uncovered">
 
792
            <td><pre><a name="line123">123</a> </pre></td>
 
793
          </tr>
 
794
        
 
795
          
 
796
          
 
797
          <tr class="uncovered">
 
798
            <td><pre><a name="line124">124</a>     neptune_write_job_output_str(job_data, output_str)</pre></td>
 
799
          </tr>
 
800
        
 
801
          
 
802
          
 
803
          <tr class="uncovered">
 
804
            <td><pre><a name="line125">125</a> </pre></td>
 
805
          </tr>
 
806
        
 
807
          
 
808
          
 
809
          <tr class="uncovered">
 
810
            <td><pre><a name="line126">126</a>     remove_lock_file(job_data)</pre></td>
 
811
          </tr>
 
812
        
 
813
          
 
814
          
 
815
          <tr class="uncovered">
 
816
            <td><pre><a name="line127">127</a>   }</pre></td>
 
817
          </tr>
 
818
        
 
819
          
 
820
          
 
821
          <tr class="uncovered">
 
822
            <td><pre><a name="line128">128</a> </pre></td>
 
823
          </tr>
 
824
        
 
825
          
 
826
          
 
827
          <tr class="uncovered">
 
828
            <td><pre><a name="line129">129</a>   return &quot;OK&quot;</pre></td>
 
829
          </tr>
 
830
        
 
831
          
 
832
          
 
833
          <tr class="uncovered">
 
834
            <td><pre><a name="line130">130</a> end</pre></td>
 
835
          </tr>
 
836
        
 
837
          
 
838
          
 
839
          <tr class="inferred">
 
840
            <td><pre><a name="line131">131</a> </pre></td>
 
841
          </tr>
 
842
        
 
843
          
 
844
          
 
845
          <tr class="marked">
 
846
            <td><pre><a name="line132">132</a> private</pre></td>
 
847
          </tr>
 
848
        
 
849
          
 
850
          
 
851
          <tr class="inferred">
 
852
            <td><pre><a name="line133">133</a> </pre></td>
 
853
          </tr>
 
854
        
 
855
          
 
856
          
 
857
          <tr class="marked">
 
858
            <td><pre><a name="line134">134</a> def neptune_mapreduce_get_output(job_data)</pre></td>
 
859
          </tr>
 
860
        
 
861
          
 
862
          
 
863
          <tr class="uncovered">
 
864
            <td><pre><a name="line135">135</a>   output = job_data[&quot;@output&quot;]</pre></td>
 
865
          </tr>
 
866
        
 
867
          
 
868
          
 
869
          <tr class="uncovered">
 
870
            <td><pre><a name="line136">136</a>   output_location = &quot;/tmp/#{output}&quot;</pre></td>
 
871
          </tr>
 
872
        
 
873
          
 
874
          
 
875
          <tr class="uncovered">
 
876
            <td><pre><a name="line137">137</a> </pre></td>
 
877
          </tr>
 
878
        
 
879
          
 
880
          
 
881
          <tr class="uncovered">
 
882
            <td><pre><a name="line138">138</a>   `rm -rf #{output_location}`</pre></td>
 
883
          </tr>
 
884
        
 
885
          
 
886
          
 
887
          <tr class="uncovered">
 
888
            <td><pre><a name="line139">139</a>   run_on_db_master(&quot;rm -rf #{output_location}&quot;, NO_OUTPUT) </pre></td>
 
889
          </tr>
 
890
        
 
891
          
 
892
          
 
893
          <tr class="uncovered">
 
894
            <td><pre><a name="line140">140</a>   run_on_db_master(&quot;#{HADOOP} fs -get #{output} #{output_location}&quot;, NO_OUTPUT)</pre></td>
 
895
          </tr>
 
896
        
 
897
          
 
898
          
 
899
          <tr class="uncovered">
 
900
            <td><pre><a name="line141">141</a>   unless my_node.is_db_master?</pre></td>
 
901
          </tr>
 
902
        
 
903
          
 
904
          
 
905
          <tr class="uncovered">
 
906
            <td><pre><a name="line142">142</a>     Djinn.log_debug(&quot;hey by the way output is [#{output}]&quot;)</pre></td>
 
907
          </tr>
 
908
        
 
909
          
 
910
          
 
911
          <tr class="uncovered">
 
912
            <td><pre><a name="line143">143</a> </pre></td>
 
913
          </tr>
 
914
        
 
915
          
 
916
          
 
917
          <tr class="uncovered">
 
918
            <td><pre><a name="line144">144</a>     db_master = get_db_master</pre></td>
 
919
          </tr>
 
920
        
 
921
          
 
922
          
 
923
          <tr class="uncovered">
 
924
            <td><pre><a name="line145">145</a>     ip = db_master.public_ip</pre></td>
 
925
          </tr>
 
926
        
 
927
          
 
928
          
 
929
          <tr class="uncovered">
 
930
            <td><pre><a name="line146">146</a>     ssh_key = db_master.ssh_key</pre></td>
 
931
          </tr>
 
932
        
 
933
          
 
934
          
 
935
          <tr class="uncovered">
 
936
            <td><pre><a name="line147">147</a> </pre></td>
 
937
          </tr>
 
938
        
 
939
          
 
940
          
 
941
          <tr class="uncovered">
 
942
            <td><pre><a name="line148">148</a>     Djinn.log_run(&quot;scp -i #{ssh_key} -o StrictHostkeyChecking=no -r #{ip}:#{output_location} #{output_location}&quot;)</pre></td>
 
943
          </tr>
 
944
        
 
945
          
 
946
          
 
947
          <tr class="uncovered">
 
948
            <td><pre><a name="line149">149</a>   end</pre></td>
 
949
          </tr>
 
950
        
 
951
          
 
952
          
 
953
          <tr class="uncovered">
 
954
            <td><pre><a name="line150">150</a> </pre></td>
 
955
          </tr>
 
956
        
 
957
          
 
958
          
 
959
          <tr class="uncovered">
 
960
            <td><pre><a name="line151">151</a>   return output_location</pre></td>
 
961
          </tr>
 
962
        
 
963
          
 
964
          
 
965
          <tr class="uncovered">
 
966
            <td><pre><a name="line152">152</a> end</pre></td>
 
967
          </tr>
 
968
        
 
969
          
 
970
          
 
971
          <tr class="inferred">
 
972
            <td><pre><a name="line153">153</a> </pre></td>
 
973
          </tr>
 
974
        
 
975
          
 
976
          
 
977
          <tr class="marked">
 
978
            <td><pre><a name="line154">154</a> def start_mapreduce_master()</pre></td>
 
979
          </tr>
 
980
        
 
981
          
 
982
          
 
983
          <tr class="uncovered">
 
984
            <td><pre><a name="line155">155</a>   Djinn.log_debug(&quot;start mapreduce master - starting up hadoop first&quot;)</pre></td>
 
985
          </tr>
 
986
        
 
987
          
 
988
          
 
989
          <tr class="uncovered">
 
990
            <td><pre><a name="line156">156</a>   #start_db_master</pre></td>
 
991
          </tr>
 
992
        
 
993
          
 
994
          
 
995
          <tr class="uncovered">
 
996
            <td><pre><a name="line157">157</a>   #start_hadoop_slave</pre></td>
 
997
          </tr>
 
998
        
 
999
          
 
1000
          
 
1001
          <tr class="uncovered">
 
1002
            <td><pre><a name="line158">158</a> end</pre></td>
 
1003
          </tr>
 
1004
        
 
1005
          
 
1006
          
 
1007
          <tr class="inferred">
 
1008
            <td><pre><a name="line159">159</a> </pre></td>
 
1009
          </tr>
 
1010
        
 
1011
          
 
1012
          
 
1013
          <tr class="marked">
 
1014
            <td><pre><a name="line160">160</a> def start_mapreduce_slave()</pre></td>
 
1015
          </tr>
 
1016
        
 
1017
          
 
1018
          
 
1019
          <tr class="uncovered">
 
1020
            <td><pre><a name="line161">161</a>   Djinn.log_debug(&quot;start mapreduce slave - starting up hadoop first&quot;)</pre></td>
 
1021
          </tr>
 
1022
        
 
1023
          
 
1024
          
 
1025
          <tr class="uncovered">
 
1026
            <td><pre><a name="line162">162</a>   #start_db_slave</pre></td>
 
1027
          </tr>
 
1028
        
 
1029
          
 
1030
          
 
1031
          <tr class="uncovered">
 
1032
            <td><pre><a name="line163">163</a>   #start_hadoop_slave</pre></td>
 
1033
          </tr>
 
1034
        
 
1035
          
 
1036
          
 
1037
          <tr class="uncovered">
 
1038
            <td><pre><a name="line164">164</a> end</pre></td>
 
1039
          </tr>
 
1040
        
 
1041
          
 
1042
          
 
1043
          <tr class="inferred">
 
1044
            <td><pre><a name="line165">165</a> </pre></td>
 
1045
          </tr>
 
1046
        
 
1047
          
 
1048
          
 
1049
          <tr class="marked">
 
1050
            <td><pre><a name="line166">166</a> def stop_mapreduce_master()</pre></td>
 
1051
          </tr>
 
1052
        
 
1053
          
 
1054
          
 
1055
          <tr class="uncovered">
 
1056
            <td><pre><a name="line167">167</a>   Djinn.log_debug(&quot;stop mapreduce master - stopping hadoop&quot;)</pre></td>
 
1057
          </tr>
 
1058
        
 
1059
          
 
1060
          
 
1061
          <tr class="uncovered">
 
1062
            <td><pre><a name="line168">168</a>   #stop_db_master</pre></td>
 
1063
          </tr>
 
1064
        
 
1065
          
 
1066
          
 
1067
          <tr class="uncovered">
 
1068
            <td><pre><a name="line169">169</a>   #stop_hadoop_slave</pre></td>
 
1069
          </tr>
 
1070
        
 
1071
          
 
1072
          
 
1073
          <tr class="uncovered">
 
1074
            <td><pre><a name="line170">170</a> end</pre></td>
 
1075
          </tr>
 
1076
        
 
1077
          
 
1078
          
 
1079
          <tr class="inferred">
 
1080
            <td><pre><a name="line171">171</a> </pre></td>
 
1081
          </tr>
 
1082
        
 
1083
          
 
1084
          
 
1085
          <tr class="marked">
 
1086
            <td><pre><a name="line172">172</a> def stop_mapreduce_slave()</pre></td>
 
1087
          </tr>
 
1088
        
 
1089
          
 
1090
          
 
1091
          <tr class="uncovered">
 
1092
            <td><pre><a name="line173">173</a>   Djinn.log_debug(&quot;stop mapreduce slave - stopping hadoop&quot;)</pre></td>
 
1093
          </tr>
 
1094
        
 
1095
          
 
1096
          
 
1097
          <tr class="uncovered">
 
1098
            <td><pre><a name="line174">174</a>   #stop_db_slave</pre></td>
 
1099
          </tr>
 
1100
        
 
1101
          
 
1102
          
 
1103
          <tr class="uncovered">
 
1104
            <td><pre><a name="line175">175</a>   #stop_hadoop_slave</pre></td>
 
1105
          </tr>
 
1106
        
 
1107
          
 
1108
          
 
1109
          <tr class="uncovered">
 
1110
            <td><pre><a name="line176">176</a> end</pre></td>
 
1111
          </tr>
 
1112
        
 
1113
          
 
1114
          
 
1115
          <tr class="inferred">
 
1116
            <td><pre><a name="line177">177</a> </pre></td>
 
1117
          </tr>
 
1118
        
 
1119
          
 
1120
          
 
1121
          <tr class="marked">
 
1122
            <td><pre><a name="line178">178</a> def wait_for_hdfs_file(location)</pre></td>
 
1123
          </tr>
 
1124
        
 
1125
          
 
1126
          
 
1127
          <tr class="uncovered">
 
1128
            <td><pre><a name="line179">179</a>   command = &quot;#{HADOOP} fs -ls #{location}&quot;</pre></td>
 
1129
          </tr>
 
1130
        
 
1131
          
 
1132
          
 
1133
          <tr class="uncovered">
 
1134
            <td><pre><a name="line180">180</a>   db_master = get_db_master</pre></td>
 
1135
          </tr>
 
1136
        
 
1137
          
 
1138
          
 
1139
          <tr class="uncovered">
 
1140
            <td><pre><a name="line181">181</a>   ip = db_master.public_ip</pre></td>
 
1141
          </tr>
 
1142
        
 
1143
          
 
1144
          
 
1145
          <tr class="uncovered">
 
1146
            <td><pre><a name="line182">182</a>   ssh_key = db_master.ssh_key</pre></td>
 
1147
          </tr>
 
1148
        
 
1149
          
 
1150
          
 
1151
          <tr class="uncovered">
 
1152
            <td><pre><a name="line183">183</a>   loop {</pre></td>
 
1153
          </tr>
 
1154
        
 
1155
          
 
1156
          
 
1157
          <tr class="uncovered">
 
1158
            <td><pre><a name="line184">184</a>     cmd = &quot;ssh -o StrictHostkeyChecking=no -i #{ssh_key} #{ip} '#{command}'&quot;</pre></td>
 
1159
          </tr>
 
1160
        
 
1161
          
 
1162
          
 
1163
          <tr class="uncovered">
 
1164
            <td><pre><a name="line185">185</a>     Djinn.log_debug(cmd)</pre></td>
 
1165
          </tr>
 
1166
        
 
1167
          
 
1168
          
 
1169
          <tr class="uncovered">
 
1170
            <td><pre><a name="line186">186</a>     result = `#{cmd}`</pre></td>
 
1171
          </tr>
 
1172
        
 
1173
          
 
1174
          
 
1175
          <tr class="uncovered">
 
1176
            <td><pre><a name="line187">187</a>     Djinn.log_debug(&quot;oi: result was [#{result}]&quot;)</pre></td>
 
1177
          </tr>
 
1178
        
 
1179
          
 
1180
          
 
1181
          <tr class="uncovered">
 
1182
            <td><pre><a name="line188">188</a>     break if result.match(/Found/) # this shows up when ls returns files</pre></td>
 
1183
          </tr>
 
1184
        
 
1185
          
 
1186
          
 
1187
          <tr class="uncovered">
 
1188
            <td><pre><a name="line189">189</a>     sleep(5)</pre></td>
 
1189
          </tr>
 
1190
        
 
1191
          
 
1192
          
 
1193
          <tr class="uncovered">
 
1194
            <td><pre><a name="line190">190</a>   }</pre></td>
 
1195
          </tr>
 
1196
        
 
1197
          
 
1198
          
 
1199
          <tr class="uncovered">
 
1200
            <td><pre><a name="line191">191</a> end</pre></td>
 
1201
          </tr>
 
1202
        
 
1203
          
 
1204
          
 
1205
          <tr class="inferred">
 
1206
            <td><pre><a name="line192">192</a> </pre></td>
 
1207
          </tr>
 
1208
        
 
1209
          
 
1210
          
 
1211
          <tr class="marked">
 
1212
            <td><pre><a name="line193">193</a> def get_language(filename)</pre></td>
 
1213
          </tr>
 
1214
        
 
1215
          
 
1216
          
 
1217
          <tr class="uncovered">
 
1218
            <td><pre><a name="line194">194</a>   return &quot;ruby&quot;</pre></td>
 
1219
          </tr>
 
1220
        
 
1221
          
 
1222
          
 
1223
          <tr class="uncovered">
 
1224
            <td><pre><a name="line195">195</a> end</pre></td>
 
1225
          </tr>
 
1226
        
 
1227
          
 
1228
          
 
1229
          <tr class="inferred">
 
1230
            <td><pre><a name="line196">196</a> </pre></td>
 
1231
          </tr>
 
1232
        
 
1233
          
 
1234
          
 
1235
          <tr class="marked">
 
1236
            <td><pre><a name="line197">197</a> def run_on_db_master(command, output=WANT_OUTPUT)</pre></td>
 
1237
          </tr>
 
1238
        
 
1239
          
 
1240
          
 
1241
          <tr class="uncovered">
 
1242
            <td><pre><a name="line198">198</a>   db_master = get_db_master</pre></td>
 
1243
          </tr>
 
1244
        
 
1245
          
 
1246
          
 
1247
          <tr class="uncovered">
 
1248
            <td><pre><a name="line199">199</a>   ip = db_master.public_ip</pre></td>
 
1249
          </tr>
 
1250
        
 
1251
          
 
1252
          
 
1253
          <tr class="uncovered">
 
1254
            <td><pre><a name="line200">200</a>   ssh_key = db_master.ssh_key  </pre></td>
 
1255
          </tr>
 
1256
        
 
1257
          
 
1258
          
 
1259
          <tr class="uncovered">
 
1260
            <td><pre><a name="line201">201</a>   HelperFunctions.run_remote_command(ip, command, ssh_key, NO_OUTPUT) </pre></td>
 
1261
          </tr>
 
1262
        
 
1263
          
 
1264
          
 
1265
          <tr class="uncovered">
 
1266
            <td><pre><a name="line202">202</a> end</pre></td>
 
1267
          </tr>
 
1268
        
 
1269
          
 
1270
          
 
1271
          <tr class="inferred">
 
1272
            <td><pre><a name="line203">203</a> </pre></td>
1267
1273
          </tr>
1268
1274
        
1269
1275
      </tbody>
1270
1276
    </table>
1271
1277
 
1272
 
    <p>Generated on Mon Feb 20 14:30:58 -0800 2012 with <a href="http://github.com/relevance/rcov">rcov 0.9.8</a></p>
 
1278
    <p>Generated on Sat Feb 25 19:19:09 -0800 2012 with <a href="http://github.com/relevance/rcov">rcov 0.9.8</a></p>
1273
1279
 
1274
1280
  </body>
1275
1281
</html>