185
<tr class="inferred">
186
<td><pre><a name="line22">22</a> </pre></td>
192
<td><pre><a name="line23">23</a> def neptune_mapreduce_run_job(nodes, job_data, secret)</pre></td>
197
<tr class="uncovered">
198
<td><pre><a name="line24">24</a> return BAD_SECRET_MSG unless valid_secret?(secret)</pre></td>
203
<tr class="uncovered">
204
<td><pre><a name="line25">25</a> Djinn.log_debug("mapreduce - run")</pre></td>
209
<tr class="uncovered">
186
<td><pre><a name="line22">22</a> STREAMING = "#{APPSCALE_HOME}/AppDB/hadoop-0.20.2/contrib/streaming/hadoop-0.20.2-streaming.jar"</pre></td>
191
<tr class="inferred">
192
<td><pre><a name="line23">23</a> </pre></td>
197
<tr class="inferred">
198
<td><pre><a name="line24">24</a> </pre></td>
204
<td><pre><a name="line25">25</a> public </pre></td>
209
<tr class="inferred">
210
210
<td><pre><a name="line26">26</a> </pre></td>
215
<tr class="uncovered">
216
<td><pre><a name="line27">27</a> Thread.new {</pre></td>
221
<tr class="uncovered">
222
<td><pre><a name="line28">28</a> keyname = @creds['keyname']</pre></td>
227
<tr class="uncovered">
228
<td><pre><a name="line29">29</a> nodes = Djinn.convert_location_array_to_class(nodes, keyname)</pre></td>
233
<tr class="uncovered">
234
<td><pre><a name="line30">30</a> </pre></td>
239
<tr class="uncovered">
240
<td><pre><a name="line31">31</a> storage = job_data["@storage"]</pre></td>
245
<tr class="uncovered">
246
<td><pre><a name="line32">32</a> creds = Djinn.neptune_parse_creds(storage, job_data)</pre></td>
251
<tr class="uncovered">
252
<td><pre><a name="line33">33</a> </pre></td>
257
<tr class="uncovered">
258
<td><pre><a name="line34">34</a> mapreducejar = job_data["@mapreducejar"]</pre></td>
263
<tr class="uncovered">
264
<td><pre><a name="line35">35</a> main = job_data["@main"]</pre></td>
269
<tr class="uncovered">
270
<td><pre><a name="line36">36</a> </pre></td>
275
<tr class="uncovered">
276
<td><pre><a name="line37">37</a> map = job_data["@map"]</pre></td>
281
<tr class="uncovered">
282
<td><pre><a name="line38">38</a> reduce = job_data["@reduce"]</pre></td>
287
<tr class="uncovered">
288
<td><pre><a name="line39">39</a> </pre></td>
293
<tr class="uncovered">
294
<td><pre><a name="line40">40</a> input = job_data["@input"]</pre></td>
299
<tr class="uncovered">
300
<td><pre><a name="line41">41</a> output = job_data["@output"]</pre></td>
305
<tr class="uncovered">
306
<td><pre><a name="line42">42</a> </pre></td>
311
<tr class="uncovered">
312
<td><pre><a name="line43">43</a> Djinn.log_debug("MR: Copying mapper and reducer to all boxes")</pre></td>
317
<tr class="uncovered">
318
<td><pre><a name="line44">44</a> # TODO: get files from shadow first if in cloud</pre></td>
323
<tr class="uncovered">
324
<td><pre><a name="line45">45</a> </pre></td>
329
<tr class="uncovered">
330
<td><pre><a name="line46">46</a> if mapreducejar</pre></td>
335
<tr class="uncovered">
336
<td><pre><a name="line47">47</a> Djinn.log_debug("need to get mr jar located at #{mapreducejar}")</pre></td>
341
<tr class="uncovered">
342
<td><pre><a name="line48">48</a> mr_jar = Repo.get_output(mapreducejar, storage, creds)</pre></td>
347
<tr class="uncovered">
348
<td><pre><a name="line49">49</a> mr_file = mapreducejar.split('/')[-1]</pre></td>
353
<tr class="uncovered">
354
<td><pre><a name="line50">50</a> my_mrjar = "/tmp/#{mr_file}"</pre></td>
359
<tr class="uncovered">
360
<td><pre><a name="line51">51</a> HelperFunctions.write_file(my_mrjar, mr_jar)</pre></td>
365
<tr class="uncovered">
366
<td><pre><a name="line52">52</a> </pre></td>
371
<tr class="uncovered">
372
<td><pre><a name="line53">53</a> nodes.each { |node|</pre></td>
377
<tr class="uncovered">
378
<td><pre><a name="line54">54</a> HelperFunctions.scp_file(my_mrjar, my_mrjar, node.private_ip, node.ssh_key)</pre></td>
383
<tr class="uncovered">
384
<td><pre><a name="line55">55</a> }</pre></td>
215
<tr class="inferred">
216
<td><pre><a name="line27">27</a> </pre></td>
222
<td><pre><a name="line28">28</a> def neptune_mapreduce_run_job(nodes, job_data, secret)</pre></td>
227
<tr class="uncovered">
228
<td><pre><a name="line29">29</a> return BAD_SECRET_MSG unless valid_secret?(secret)</pre></td>
233
<tr class="uncovered">
234
<td><pre><a name="line30">30</a> Djinn.log_debug("mapreduce - run")</pre></td>
239
<tr class="uncovered">
240
<td><pre><a name="line31">31</a> </pre></td>
245
<tr class="uncovered">
246
<td><pre><a name="line32">32</a> Thread.new {</pre></td>
251
<tr class="uncovered">
252
<td><pre><a name="line33">33</a> keyname = @creds['keyname']</pre></td>
257
<tr class="uncovered">
258
<td><pre><a name="line34">34</a> nodes = Djinn.convert_location_array_to_class(nodes, keyname)</pre></td>
263
<tr class="uncovered">
264
<td><pre><a name="line35">35</a> </pre></td>
269
<tr class="uncovered">
270
<td><pre><a name="line36">36</a> storage = job_data["@storage"]</pre></td>
275
<tr class="uncovered">
276
<td><pre><a name="line37">37</a> datastore = DatastoreFactory.get_datastore(storage, job_data)</pre></td>
281
<tr class="uncovered">
282
<td><pre><a name="line38">38</a> </pre></td>
287
<tr class="uncovered">
288
<td><pre><a name="line39">39</a> mapreducejar = job_data["@mapreducejar"]</pre></td>
293
<tr class="uncovered">
294
<td><pre><a name="line40">40</a> main = job_data["@main"]</pre></td>
299
<tr class="uncovered">
300
<td><pre><a name="line41">41</a> </pre></td>
305
<tr class="uncovered">
306
<td><pre><a name="line42">42</a> map = job_data["@map"]</pre></td>
311
<tr class="uncovered">
312
<td><pre><a name="line43">43</a> reduce = job_data["@reduce"]</pre></td>
317
<tr class="uncovered">
318
<td><pre><a name="line44">44</a> </pre></td>
323
<tr class="uncovered">
324
<td><pre><a name="line45">45</a> input = job_data["@input"]</pre></td>
329
<tr class="uncovered">
330
<td><pre><a name="line46">46</a> output = job_data["@output"]</pre></td>
335
<tr class="uncovered">
336
<td><pre><a name="line47">47</a> </pre></td>
341
<tr class="uncovered">
342
<td><pre><a name="line48">48</a> Djinn.log_debug("MR: Copying mapper and reducer to all boxes")</pre></td>
347
<tr class="uncovered">
348
<td><pre><a name="line49">49</a> # TODO: get files from shadow first if in cloud</pre></td>
353
<tr class="uncovered">
354
<td><pre><a name="line50">50</a> </pre></td>
359
<tr class="uncovered">
360
<td><pre><a name="line51">51</a> if mapreducejar</pre></td>
365
<tr class="uncovered">
366
<td><pre><a name="line52">52</a> Djinn.log_debug("need to get mr jar located at #{mapreducejar}")</pre></td>
371
<tr class="uncovered">
372
<td><pre><a name="line53">53</a> mr_file = mapreducejar.split('/')[-1]</pre></td>
377
<tr class="uncovered">
378
<td><pre><a name="line54">54</a> my_mrjar = "/tmp/#{mr_file}"</pre></td>
383
<tr class="uncovered">
384
<td><pre><a name="line55">55</a> datastore.get_output_and_save_to_fs(mapreducejar, my_mrjar)</pre></td>
395
395
<tr class="uncovered">
396
<td><pre><a name="line57">57</a> db_master = get_db_master</pre></td>
401
<tr class="uncovered">
402
<td><pre><a name="line58">58</a> ip = db_master.private_ip</pre></td>
407
<tr class="uncovered">
408
<td><pre><a name="line59">59</a> ssh_key = db_master.ssh_key</pre></td>
413
<tr class="uncovered">
414
<td><pre><a name="line60">60</a> HelperFunctions.scp_file(my_mrjar, my_mrjar, ip, ssh_key)</pre></td>
419
<tr class="uncovered">
420
<td><pre><a name="line61">61</a> </pre></td>
425
<tr class="uncovered">
426
<td><pre><a name="line62">62</a> run_mr_command = "#{HADOOP} jar #{my_mrjar} #{main} #{input} #{output}"</pre></td>
431
<tr class="uncovered">
432
<td><pre><a name="line63">63</a> else</pre></td>
437
<tr class="uncovered">
438
<td><pre><a name="line64">64</a> Djinn.log_debug("need to get map code located at #{map}") </pre></td>
443
<tr class="uncovered">
444
<td><pre><a name="line65">65</a> map_code = Repo.get_output(map, storage, creds)</pre></td>
449
<tr class="uncovered">
450
<td><pre><a name="line66">66</a> </pre></td>
455
<tr class="uncovered">
456
<td><pre><a name="line67">67</a> Djinn.log_debug("need to get reduce code located at #{reduce}")</pre></td>
461
<tr class="uncovered">
462
<td><pre><a name="line68">68</a> red_code = Repo.get_output(reduce, storage, creds)</pre></td>
467
<tr class="uncovered">
468
<td><pre><a name="line69">69</a> </pre></td>
473
<tr class="uncovered">
474
<td><pre><a name="line70">70</a> map_file = map.split('/')[-1]</pre></td>
479
<tr class="uncovered">
480
<td><pre><a name="line71">71</a> red_file = reduce.split('/')[-1]</pre></td>
485
<tr class="uncovered">
486
<td><pre><a name="line72">72</a> </pre></td>
491
<tr class="uncovered">
492
<td><pre><a name="line73">73</a> my_map = "/tmp/#{map_file}"</pre></td>
497
<tr class="uncovered">
498
<td><pre><a name="line74">74</a> my_red = "/tmp/#{red_file}"</pre></td>
503
<tr class="uncovered">
504
<td><pre><a name="line75">75</a> </pre></td>
509
<tr class="uncovered">
510
<td><pre><a name="line76">76</a> HelperFunctions.write_file(my_map, map_code)</pre></td>
515
<tr class="uncovered">
516
<td><pre><a name="line77">77</a> HelperFunctions.write_file(my_red, red_code)</pre></td>
521
<tr class="uncovered">
522
<td><pre><a name="line78">78</a> </pre></td>
527
<tr class="uncovered">
528
<td><pre><a name="line79">79</a> # since the db master is the initiator of the mapreduce job, it needs</pre></td>
533
<tr class="uncovered">
534
<td><pre><a name="line80">80</a> # to have both the mapper and reducer files handy</pre></td>
539
<tr class="uncovered">
540
<td><pre><a name="line81">81</a> </pre></td>
545
<tr class="uncovered">
546
<td><pre><a name="line82">82</a> db_master = get_db_master</pre></td>
551
<tr class="uncovered">
552
<td><pre><a name="line83">83</a> ip = db_master.private_ip</pre></td>
557
<tr class="uncovered">
558
<td><pre><a name="line84">84</a> ssh_key = db_master.ssh_key</pre></td>
563
<tr class="uncovered">
564
<td><pre><a name="line85">85</a> HelperFunctions.scp_file(my_map, my_map, ip, ssh_key)</pre></td>
569
<tr class="uncovered">
570
<td><pre><a name="line86">86</a> HelperFunctions.scp_file(my_red, my_red, ip, ssh_key)</pre></td>
575
<tr class="uncovered">
576
<td><pre><a name="line87">87</a> </pre></td>
581
<tr class="uncovered">
582
<td><pre><a name="line88">88</a> nodes.each { |node|</pre></td>
587
<tr class="uncovered">
588
<td><pre><a name="line89">89</a> HelperFunctions.scp_file(my_map, my_map, node.private_ip, node.ssh_key)</pre></td>
593
<tr class="uncovered">
594
<td><pre><a name="line90">90</a> HelperFunctions.scp_file(my_red, my_red, node.private_ip, node.ssh_key)</pre></td>
599
<tr class="uncovered">
600
<td><pre><a name="line91">91</a> }</pre></td>
605
<tr class="uncovered">
606
<td><pre><a name="line92">92</a> </pre></td>
611
<tr class="uncovered">
612
<td><pre><a name="line93">93</a> map_cmd = "\"" + get_language(my_map) + " " + my_map + "\""</pre></td>
617
<tr class="uncovered">
618
<td><pre><a name="line94">94</a> reduce_cmd = "\"" + get_language(my_red) + " " + my_red + "\""</pre></td>
623
<tr class="uncovered">
624
<td><pre><a name="line95">95</a> </pre></td>
629
<tr class="uncovered">
630
<td><pre><a name="line96">96</a> run_mr_command = "#{HADOOP} jar #{STREAMING} -input #{input} " +</pre></td>
635
<tr class="uncovered">
636
<td><pre><a name="line97">97</a> "-output #{output} -mapper #{map_cmd} -reducer #{reduce_cmd}"</pre></td>
641
<tr class="uncovered">
642
<td><pre><a name="line98">98</a> end</pre></td>
647
<tr class="uncovered">
648
<td><pre><a name="line99">99</a> </pre></td>
653
<tr class="uncovered">
654
<td><pre><a name="line100">100</a> Djinn.log_debug("waiting for input file #{input} to exist in HDFS")</pre></td>
659
<tr class="uncovered">
660
<td><pre><a name="line101">101</a> wait_for_hdfs_file(input)</pre></td>
665
<tr class="uncovered">
666
<td><pre><a name="line102">102</a> </pre></td>
671
<tr class="uncovered">
672
<td><pre><a name="line103">103</a> # run mr job</pre></td>
677
<tr class="uncovered">
678
<td><pre><a name="line104">104</a> start = Time.now</pre></td>
683
<tr class="uncovered">
684
<td><pre><a name="line105">105</a> </pre></td>
689
<tr class="uncovered">
690
<td><pre><a name="line106">106</a> Djinn.log_debug("MR: Running job")</pre></td>
695
<tr class="uncovered">
696
<td><pre><a name="line107">107</a> Djinn.log_debug("MR: Command is #{run_mr_command}")</pre></td>
701
<tr class="uncovered">
702
<td><pre><a name="line108">108</a> Djinn.log_run(run_mr_command)</pre></td>
707
<tr class="uncovered">
708
<td><pre><a name="line109">109</a> </pre></td>
713
<tr class="uncovered">
714
<td><pre><a name="line110">110</a> wait_for_hdfs_file(output)</pre></td>
719
<tr class="uncovered">
720
<td><pre><a name="line111">111</a> Djinn.log_debug("MR: Done running job!")</pre></td>
725
<tr class="uncovered">
726
<td><pre><a name="line112">112</a> </pre></td>
731
<tr class="uncovered">
732
<td><pre><a name="line113">113</a> fin = Time.now</pre></td>
737
<tr class="uncovered">
738
<td><pre><a name="line114">114</a> Djinn.log_debug("TIMING: Total time is #{fin - start} seconds")</pre></td>
743
<tr class="uncovered">
744
<td><pre><a name="line115">115</a> </pre></td>
749
<tr class="uncovered">
750
<td><pre><a name="line116">116</a> # TODO: check if no part-* files exist - if so, there's an error</pre></td>
755
<tr class="uncovered">
756
<td><pre><a name="line117">117</a> # that we should funnel to the user somehow</pre></td>
761
<tr class="uncovered">
762
<td><pre><a name="line118">118</a> </pre></td>
767
<tr class="uncovered">
768
<td><pre><a name="line119">119</a> output_cmd = "#{HADOOP} fs -cat #{output}/part-*"</pre></td>
773
<tr class="uncovered">
774
<td><pre><a name="line120">120</a> Djinn.log_debug("MR: Retrieving job output with command #{output_cmd}")</pre></td>
779
<tr class="uncovered">
780
<td><pre><a name="line121">121</a> output_str = `#{output_cmd}`</pre></td>
785
<tr class="uncovered">
786
<td><pre><a name="line122">122</a> </pre></td>
791
<tr class="uncovered">
792
<td><pre><a name="line123">123</a> neptune_write_job_output_str(job_data, output_str)</pre></td>
797
<tr class="uncovered">
798
<td><pre><a name="line124">124</a> </pre></td>
803
<tr class="uncovered">
804
<td><pre><a name="line125">125</a> remove_lock_file(job_data)</pre></td>
809
<tr class="uncovered">
810
<td><pre><a name="line126">126</a> }</pre></td>
815
<tr class="uncovered">
816
<td><pre><a name="line127">127</a> </pre></td>
821
<tr class="uncovered">
822
<td><pre><a name="line128">128</a> return "OK"</pre></td>
827
<tr class="uncovered">
828
<td><pre><a name="line129">129</a> end</pre></td>
833
<tr class="inferred">
834
<td><pre><a name="line130">130</a> </pre></td>
840
<td><pre><a name="line131">131</a> private</pre></td>
845
<tr class="inferred">
846
<td><pre><a name="line132">132</a> </pre></td>
852
<td><pre><a name="line133">133</a> def neptune_mapreduce_get_output(job_data)</pre></td>
857
<tr class="uncovered">
858
<td><pre><a name="line134">134</a> output = job_data["@output"]</pre></td>
863
<tr class="uncovered">
864
<td><pre><a name="line135">135</a> output_location = "/tmp/#{output}"</pre></td>
869
<tr class="uncovered">
870
<td><pre><a name="line136">136</a> </pre></td>
875
<tr class="uncovered">
876
<td><pre><a name="line137">137</a> `rm -rf #{output_location}`</pre></td>
881
<tr class="uncovered">
882
<td><pre><a name="line138">138</a> run_on_db_master("rm -rf #{output_location}", NO_OUTPUT) </pre></td>
887
<tr class="uncovered">
888
<td><pre><a name="line139">139</a> run_on_db_master("#{HADOOP} fs -get #{output} #{output_location}", NO_OUTPUT)</pre></td>
893
<tr class="uncovered">
894
<td><pre><a name="line140">140</a> unless my_node.is_db_master?</pre></td>
899
<tr class="uncovered">
900
<td><pre><a name="line141">141</a> Djinn.log_debug("hey by the way output is [#{output}]")</pre></td>
905
<tr class="uncovered">
906
<td><pre><a name="line142">142</a> </pre></td>
911
<tr class="uncovered">
912
<td><pre><a name="line143">143</a> db_master = get_db_master</pre></td>
917
<tr class="uncovered">
918
<td><pre><a name="line144">144</a> ip = db_master.public_ip</pre></td>
923
<tr class="uncovered">
924
<td><pre><a name="line145">145</a> ssh_key = db_master.ssh_key</pre></td>
929
<tr class="uncovered">
930
<td><pre><a name="line146">146</a> </pre></td>
935
<tr class="uncovered">
936
<td><pre><a name="line147">147</a> Djinn.log_run("scp -i #{ssh_key} -o StrictHostkeyChecking=no -r #{ip}:#{output_location} #{output_location}")</pre></td>
941
<tr class="uncovered">
942
<td><pre><a name="line148">148</a> end</pre></td>
947
<tr class="uncovered">
948
<td><pre><a name="line149">149</a> </pre></td>
953
<tr class="uncovered">
954
<td><pre><a name="line150">150</a> return output_location</pre></td>
959
<tr class="uncovered">
960
<td><pre><a name="line151">151</a> end</pre></td>
965
<tr class="inferred">
966
<td><pre><a name="line152">152</a> </pre></td>
972
<td><pre><a name="line153">153</a> def start_mapreduce_master()</pre></td>
977
<tr class="uncovered">
978
<td><pre><a name="line154">154</a> Djinn.log_debug("start mapreduce master - starting up hadoop first")</pre></td>
983
<tr class="uncovered">
984
<td><pre><a name="line155">155</a> #start_db_master</pre></td>
989
<tr class="uncovered">
990
<td><pre><a name="line156">156</a> #start_hadoop_slave</pre></td>
995
<tr class="uncovered">
996
<td><pre><a name="line157">157</a> end</pre></td>
1001
<tr class="inferred">
1002
<td><pre><a name="line158">158</a> </pre></td>
1008
<td><pre><a name="line159">159</a> def start_mapreduce_slave()</pre></td>
1013
<tr class="uncovered">
1014
<td><pre><a name="line160">160</a> Djinn.log_debug("start mapreduce slave - starting up hadoop first")</pre></td>
1019
<tr class="uncovered">
1020
<td><pre><a name="line161">161</a> #start_db_slave</pre></td>
1025
<tr class="uncovered">
1026
<td><pre><a name="line162">162</a> #start_hadoop_slave</pre></td>
1031
<tr class="uncovered">
1032
<td><pre><a name="line163">163</a> end</pre></td>
1037
<tr class="inferred">
1038
<td><pre><a name="line164">164</a> </pre></td>
1044
<td><pre><a name="line165">165</a> def stop_mapreduce_master()</pre></td>
1049
<tr class="uncovered">
1050
<td><pre><a name="line166">166</a> Djinn.log_debug("stop mapreduce master - stopping hadoop")</pre></td>
1055
<tr class="uncovered">
1056
<td><pre><a name="line167">167</a> #stop_db_master</pre></td>
1061
<tr class="uncovered">
1062
<td><pre><a name="line168">168</a> #stop_hadoop_slave</pre></td>
1067
<tr class="uncovered">
1068
<td><pre><a name="line169">169</a> end</pre></td>
1073
<tr class="inferred">
1074
<td><pre><a name="line170">170</a> </pre></td>
1080
<td><pre><a name="line171">171</a> def stop_mapreduce_slave()</pre></td>
1085
<tr class="uncovered">
1086
<td><pre><a name="line172">172</a> Djinn.log_debug("stop mapreduce slave - stopping hadoop")</pre></td>
1091
<tr class="uncovered">
1092
<td><pre><a name="line173">173</a> #stop_db_slave</pre></td>
1097
<tr class="uncovered">
1098
<td><pre><a name="line174">174</a> #stop_hadoop_slave</pre></td>
1103
<tr class="uncovered">
1104
<td><pre><a name="line175">175</a> end</pre></td>
1109
<tr class="inferred">
1110
<td><pre><a name="line176">176</a> </pre></td>
1116
<td><pre><a name="line177">177</a> def wait_for_hdfs_file(location)</pre></td>
1121
<tr class="uncovered">
1122
<td><pre><a name="line178">178</a> command = "#{HADOOP} fs -ls #{location}"</pre></td>
1127
<tr class="uncovered">
1128
<td><pre><a name="line179">179</a> db_master = get_db_master</pre></td>
1133
<tr class="uncovered">
1134
<td><pre><a name="line180">180</a> ip = db_master.public_ip</pre></td>
1139
<tr class="uncovered">
1140
<td><pre><a name="line181">181</a> ssh_key = db_master.ssh_key</pre></td>
1145
<tr class="uncovered">
1146
<td><pre><a name="line182">182</a> loop {</pre></td>
1151
<tr class="uncovered">
1152
<td><pre><a name="line183">183</a> cmd = "ssh -o StrictHostkeyChecking=no -i #{ssh_key} #{ip} '#{command}'"</pre></td>
1157
<tr class="uncovered">
1158
<td><pre><a name="line184">184</a> Djinn.log_debug(cmd)</pre></td>
1163
<tr class="uncovered">
1164
<td><pre><a name="line185">185</a> result = `#{cmd}`</pre></td>
1169
<tr class="uncovered">
1170
<td><pre><a name="line186">186</a> Djinn.log_debug("oi: result was [#{result}]")</pre></td>
1175
<tr class="uncovered">
1176
<td><pre><a name="line187">187</a> break if result.match(/Found/) # this shows up when ls returns files</pre></td>
1181
<tr class="uncovered">
1182
<td><pre><a name="line188">188</a> sleep(5)</pre></td>
1187
<tr class="uncovered">
1188
<td><pre><a name="line189">189</a> }</pre></td>
1193
<tr class="uncovered">
1194
<td><pre><a name="line190">190</a> end</pre></td>
1199
<tr class="inferred">
1200
<td><pre><a name="line191">191</a> </pre></td>
1206
<td><pre><a name="line192">192</a> def get_language(filename)</pre></td>
1211
<tr class="uncovered">
1212
<td><pre><a name="line193">193</a> return "ruby"</pre></td>
1217
<tr class="uncovered">
1218
<td><pre><a name="line194">194</a> end</pre></td>
1223
<tr class="inferred">
1224
<td><pre><a name="line195">195</a> </pre></td>
1230
<td><pre><a name="line196">196</a> def run_on_db_master(command, output=WANT_OUTPUT)</pre></td>
1235
<tr class="uncovered">
1236
<td><pre><a name="line197">197</a> db_master = get_db_master</pre></td>
1241
<tr class="uncovered">
1242
<td><pre><a name="line198">198</a> ip = db_master.public_ip</pre></td>
1247
<tr class="uncovered">
1248
<td><pre><a name="line199">199</a> ssh_key = db_master.ssh_key </pre></td>
1253
<tr class="uncovered">
1254
<td><pre><a name="line200">200</a> HelperFunctions.run_remote_command(ip, command, ssh_key, NO_OUTPUT) </pre></td>
1259
<tr class="uncovered">
1260
<td><pre><a name="line201">201</a> end</pre></td>
1265
<tr class="inferred">
1266
<td><pre><a name="line202">202</a> </pre></td>
396
<td><pre><a name="line57">57</a> nodes.each { |node|</pre></td>
401
<tr class="uncovered">
402
<td><pre><a name="line58">58</a> HelperFunctions.scp_file(my_mrjar, my_mrjar, node.private_ip, node.ssh_key)</pre></td>
407
<tr class="uncovered">
408
<td><pre><a name="line59">59</a> }</pre></td>
413
<tr class="uncovered">
414
<td><pre><a name="line60">60</a> </pre></td>
419
<tr class="uncovered">
420
<td><pre><a name="line61">61</a> db_master = get_db_master</pre></td>
425
<tr class="uncovered">
426
<td><pre><a name="line62">62</a> ip = db_master.private_ip</pre></td>
431
<tr class="uncovered">
432
<td><pre><a name="line63">63</a> ssh_key = db_master.ssh_key</pre></td>
437
<tr class="uncovered">
438
<td><pre><a name="line64">64</a> HelperFunctions.scp_file(my_mrjar, my_mrjar, ip, ssh_key)</pre></td>
443
<tr class="uncovered">
444
<td><pre><a name="line65">65</a> </pre></td>
449
<tr class="uncovered">
450
<td><pre><a name="line66">66</a> run_mr_command = "#{HADOOP} jar #{my_mrjar} #{main} #{input} #{output}"</pre></td>
455
<tr class="uncovered">
456
<td><pre><a name="line67">67</a> else</pre></td>
461
<tr class="uncovered">
462
<td><pre><a name="line68">68</a> Djinn.log_debug("need to get map code located at #{map}, and reduce " +</pre></td>
467
<tr class="uncovered">
468
<td><pre><a name="line69">69</a> "code located at #{reduce}")</pre></td>
473
<tr class="uncovered">
474
<td><pre><a name="line70">70</a> </pre></td>
479
<tr class="uncovered">
480
<td><pre><a name="line71">71</a> map_file = map.split('/')[-1]</pre></td>
485
<tr class="uncovered">
486
<td><pre><a name="line72">72</a> red_file = reduce.split('/')[-1]</pre></td>
491
<tr class="uncovered">
492
<td><pre><a name="line73">73</a> </pre></td>
497
<tr class="uncovered">
498
<td><pre><a name="line74">74</a> my_map = "/tmp/#{map_file}"</pre></td>
503
<tr class="uncovered">
504
<td><pre><a name="line75">75</a> my_red = "/tmp/#{red_file}"</pre></td>
509
<tr class="uncovered">
510
<td><pre><a name="line76">76</a> </pre></td>
515
<tr class="uncovered">
516
<td><pre><a name="line77">77</a> datastore.get_output_and_save_to_fs(map, my_map)</pre></td>
521
<tr class="uncovered">
522
<td><pre><a name="line78">78</a> datastore.get_output_and_save_to_fs(reduce, my_red)</pre></td>
527
<tr class="uncovered">
528
<td><pre><a name="line79">79</a> </pre></td>
533
<tr class="uncovered">
534
<td><pre><a name="line80">80</a> # since the db master is the initiator of the mapreduce job, it needs</pre></td>
539
<tr class="uncovered">
540
<td><pre><a name="line81">81</a> # to have both the mapper and reducer files handy</pre></td>
545
<tr class="uncovered">
546
<td><pre><a name="line82">82</a> </pre></td>
551
<tr class="uncovered">
552
<td><pre><a name="line83">83</a> db_master = get_db_master</pre></td>
557
<tr class="uncovered">
558
<td><pre><a name="line84">84</a> ip = db_master.private_ip</pre></td>
563
<tr class="uncovered">
564
<td><pre><a name="line85">85</a> ssh_key = db_master.ssh_key</pre></td>
569
<tr class="uncovered">
570
<td><pre><a name="line86">86</a> HelperFunctions.scp_file(my_map, my_map, ip, ssh_key)</pre></td>
575
<tr class="uncovered">
576
<td><pre><a name="line87">87</a> HelperFunctions.scp_file(my_red, my_red, ip, ssh_key)</pre></td>
581
<tr class="uncovered">
582
<td><pre><a name="line88">88</a> </pre></td>
587
<tr class="uncovered">
588
<td><pre><a name="line89">89</a> nodes.each { |node|</pre></td>
593
<tr class="uncovered">
594
<td><pre><a name="line90">90</a> HelperFunctions.scp_file(my_map, my_map, node.private_ip, node.ssh_key)</pre></td>
599
<tr class="uncovered">
600
<td><pre><a name="line91">91</a> HelperFunctions.scp_file(my_red, my_red, node.private_ip, node.ssh_key)</pre></td>
605
<tr class="uncovered">
606
<td><pre><a name="line92">92</a> }</pre></td>
611
<tr class="uncovered">
612
<td><pre><a name="line93">93</a> </pre></td>
617
<tr class="uncovered">
618
<td><pre><a name="line94">94</a> map_cmd = "\"" + get_language(my_map) + " " + my_map + "\""</pre></td>
623
<tr class="uncovered">
624
<td><pre><a name="line95">95</a> reduce_cmd = "\"" + get_language(my_red) + " " + my_red + "\""</pre></td>
629
<tr class="uncovered">
630
<td><pre><a name="line96">96</a> </pre></td>
635
<tr class="uncovered">
636
<td><pre><a name="line97">97</a> run_mr_command = "#{HADOOP} jar #{STREAMING} -input #{input} " +</pre></td>
641
<tr class="uncovered">
642
<td><pre><a name="line98">98</a> "-output #{output} -mapper #{map_cmd} -reducer #{reduce_cmd}"</pre></td>
647
<tr class="uncovered">
648
<td><pre><a name="line99">99</a> end</pre></td>
653
<tr class="uncovered">
654
<td><pre><a name="line100">100</a> </pre></td>
659
<tr class="uncovered">
660
<td><pre><a name="line101">101</a> Djinn.log_debug("waiting for input file #{input} to exist in HDFS")</pre></td>
665
<tr class="uncovered">
666
<td><pre><a name="line102">102</a> wait_for_hdfs_file(input)</pre></td>
671
<tr class="uncovered">
672
<td><pre><a name="line103">103</a> </pre></td>
677
<tr class="uncovered">
678
<td><pre><a name="line104">104</a> # run mr job</pre></td>
683
<tr class="uncovered">
684
<td><pre><a name="line105">105</a> start = Time.now</pre></td>
689
<tr class="uncovered">
690
<td><pre><a name="line106">106</a> </pre></td>
695
<tr class="uncovered">
696
<td><pre><a name="line107">107</a> Djinn.log_debug("MR: Running job")</pre></td>
701
<tr class="uncovered">
702
<td><pre><a name="line108">108</a> Djinn.log_debug("MR: Command is #{run_mr_command}")</pre></td>
707
<tr class="uncovered">
708
<td><pre><a name="line109">109</a> Djinn.log_run(run_mr_command)</pre></td>
713
<tr class="uncovered">
714
<td><pre><a name="line110">110</a> </pre></td>
719
<tr class="uncovered">
720
<td><pre><a name="line111">111</a> wait_for_hdfs_file(output)</pre></td>
725
<tr class="uncovered">
726
<td><pre><a name="line112">112</a> Djinn.log_debug("MR: Done running job!")</pre></td>
731
<tr class="uncovered">
732
<td><pre><a name="line113">113</a> </pre></td>
737
<tr class="uncovered">
738
<td><pre><a name="line114">114</a> fin = Time.now</pre></td>
743
<tr class="uncovered">
744
<td><pre><a name="line115">115</a> Djinn.log_debug("TIMING: Total time is #{fin - start} seconds")</pre></td>
749
<tr class="uncovered">
750
<td><pre><a name="line116">116</a> </pre></td>
755
<tr class="uncovered">
756
<td><pre><a name="line117">117</a> # TODO: check if no part-* files exist - if so, there's an error</pre></td>
761
<tr class="uncovered">
762
<td><pre><a name="line118">118</a> # that we should funnel to the user somehow</pre></td>
767
<tr class="uncovered">
768
<td><pre><a name="line119">119</a> </pre></td>
773
<tr class="uncovered">
774
<td><pre><a name="line120">120</a> output_cmd = "#{HADOOP} fs -cat #{output}/part-*"</pre></td>
779
<tr class="uncovered">
780
<td><pre><a name="line121">121</a> Djinn.log_debug("MR: Retrieving job output with command #{output_cmd}")</pre></td>
785
<tr class="uncovered">
786
<td><pre><a name="line122">122</a> output_str = `#{output_cmd}`</pre></td>
791
<tr class="uncovered">
792
<td><pre><a name="line123">123</a> </pre></td>
797
<tr class="uncovered">
798
<td><pre><a name="line124">124</a> neptune_write_job_output_str(job_data, output_str)</pre></td>
803
<tr class="uncovered">
804
<td><pre><a name="line125">125</a> </pre></td>
809
<tr class="uncovered">
810
<td><pre><a name="line126">126</a> remove_lock_file(job_data)</pre></td>
815
<tr class="uncovered">
816
<td><pre><a name="line127">127</a> }</pre></td>
821
<tr class="uncovered">
822
<td><pre><a name="line128">128</a> </pre></td>
827
<tr class="uncovered">
828
<td><pre><a name="line129">129</a> return "OK"</pre></td>
833
<tr class="uncovered">
834
<td><pre><a name="line130">130</a> end</pre></td>
839
<tr class="inferred">
840
<td><pre><a name="line131">131</a> </pre></td>
846
<td><pre><a name="line132">132</a> private</pre></td>
851
<tr class="inferred">
852
<td><pre><a name="line133">133</a> </pre></td>
858
<td><pre><a name="line134">134</a> def neptune_mapreduce_get_output(job_data)</pre></td>
863
<tr class="uncovered">
864
<td><pre><a name="line135">135</a> output = job_data["@output"]</pre></td>
869
<tr class="uncovered">
870
<td><pre><a name="line136">136</a> output_location = "/tmp/#{output}"</pre></td>
875
<tr class="uncovered">
876
<td><pre><a name="line137">137</a> </pre></td>
881
<tr class="uncovered">
882
<td><pre><a name="line138">138</a> `rm -rf #{output_location}`</pre></td>
887
<tr class="uncovered">
888
<td><pre><a name="line139">139</a> run_on_db_master("rm -rf #{output_location}", NO_OUTPUT) </pre></td>
893
<tr class="uncovered">
894
<td><pre><a name="line140">140</a> run_on_db_master("#{HADOOP} fs -get #{output} #{output_location}", NO_OUTPUT)</pre></td>
899
<tr class="uncovered">
900
<td><pre><a name="line141">141</a> unless my_node.is_db_master?</pre></td>
905
<tr class="uncovered">
906
<td><pre><a name="line142">142</a> Djinn.log_debug("hey by the way output is [#{output}]")</pre></td>
911
<tr class="uncovered">
912
<td><pre><a name="line143">143</a> </pre></td>
917
<tr class="uncovered">
918
<td><pre><a name="line144">144</a> db_master = get_db_master</pre></td>
923
<tr class="uncovered">
924
<td><pre><a name="line145">145</a> ip = db_master.public_ip</pre></td>
929
<tr class="uncovered">
930
<td><pre><a name="line146">146</a> ssh_key = db_master.ssh_key</pre></td>
935
<tr class="uncovered">
936
<td><pre><a name="line147">147</a> </pre></td>
941
<tr class="uncovered">
942
<td><pre><a name="line148">148</a> Djinn.log_run("scp -i #{ssh_key} -o StrictHostkeyChecking=no -r #{ip}:#{output_location} #{output_location}")</pre></td>
947
<tr class="uncovered">
948
<td><pre><a name="line149">149</a> end</pre></td>
953
<tr class="uncovered">
954
<td><pre><a name="line150">150</a> </pre></td>
959
<tr class="uncovered">
960
<td><pre><a name="line151">151</a> return output_location</pre></td>
965
<tr class="uncovered">
966
<td><pre><a name="line152">152</a> end</pre></td>
971
<tr class="inferred">
972
<td><pre><a name="line153">153</a> </pre></td>
978
<td><pre><a name="line154">154</a> def start_mapreduce_master()</pre></td>
983
<tr class="uncovered">
984
<td><pre><a name="line155">155</a> Djinn.log_debug("start mapreduce master - starting up hadoop first")</pre></td>
989
<tr class="uncovered">
990
<td><pre><a name="line156">156</a> #start_db_master</pre></td>
995
<tr class="uncovered">
996
<td><pre><a name="line157">157</a> #start_hadoop_slave</pre></td>
1001
<tr class="uncovered">
1002
<td><pre><a name="line158">158</a> end</pre></td>
1007
<tr class="inferred">
1008
<td><pre><a name="line159">159</a> </pre></td>
1014
<td><pre><a name="line160">160</a> def start_mapreduce_slave()</pre></td>
1019
<tr class="uncovered">
1020
<td><pre><a name="line161">161</a> Djinn.log_debug("start mapreduce slave - starting up hadoop first")</pre></td>
1025
<tr class="uncovered">
1026
<td><pre><a name="line162">162</a> #start_db_slave</pre></td>
1031
<tr class="uncovered">
1032
<td><pre><a name="line163">163</a> #start_hadoop_slave</pre></td>
1037
<tr class="uncovered">
1038
<td><pre><a name="line164">164</a> end</pre></td>
1043
<tr class="inferred">
1044
<td><pre><a name="line165">165</a> </pre></td>
1050
<td><pre><a name="line166">166</a> def stop_mapreduce_master()</pre></td>
1055
<tr class="uncovered">
1056
<td><pre><a name="line167">167</a> Djinn.log_debug("stop mapreduce master - stopping hadoop")</pre></td>
1061
<tr class="uncovered">
1062
<td><pre><a name="line168">168</a> #stop_db_master</pre></td>
1067
<tr class="uncovered">
1068
<td><pre><a name="line169">169</a> #stop_hadoop_slave</pre></td>
1073
<tr class="uncovered">
1074
<td><pre><a name="line170">170</a> end</pre></td>
1079
<tr class="inferred">
1080
<td><pre><a name="line171">171</a> </pre></td>
1086
<td><pre><a name="line172">172</a> def stop_mapreduce_slave()</pre></td>
1091
<tr class="uncovered">
1092
<td><pre><a name="line173">173</a> Djinn.log_debug("stop mapreduce slave - stopping hadoop")</pre></td>
1097
<tr class="uncovered">
1098
<td><pre><a name="line174">174</a> #stop_db_slave</pre></td>
1103
<tr class="uncovered">
1104
<td><pre><a name="line175">175</a> #stop_hadoop_slave</pre></td>
1109
<tr class="uncovered">
1110
<td><pre><a name="line176">176</a> end</pre></td>
1115
<tr class="inferred">
1116
<td><pre><a name="line177">177</a> </pre></td>
1122
<td><pre><a name="line178">178</a> def wait_for_hdfs_file(location)</pre></td>
1127
<tr class="uncovered">
1128
<td><pre><a name="line179">179</a> command = "#{HADOOP} fs -ls #{location}"</pre></td>
1133
<tr class="uncovered">
1134
<td><pre><a name="line180">180</a> db_master = get_db_master</pre></td>
1139
<tr class="uncovered">
1140
<td><pre><a name="line181">181</a> ip = db_master.public_ip</pre></td>
1145
<tr class="uncovered">
1146
<td><pre><a name="line182">182</a> ssh_key = db_master.ssh_key</pre></td>
1151
<tr class="uncovered">
1152
<td><pre><a name="line183">183</a> loop {</pre></td>
1157
<tr class="uncovered">
1158
<td><pre><a name="line184">184</a> cmd = "ssh -o StrictHostkeyChecking=no -i #{ssh_key} #{ip} '#{command}'"</pre></td>
1163
<tr class="uncovered">
1164
<td><pre><a name="line185">185</a> Djinn.log_debug(cmd)</pre></td>
1169
<tr class="uncovered">
1170
<td><pre><a name="line186">186</a> result = `#{cmd}`</pre></td>
1175
<tr class="uncovered">
1176
<td><pre><a name="line187">187</a> Djinn.log_debug("oi: result was [#{result}]")</pre></td>
1181
<tr class="uncovered">
1182
<td><pre><a name="line188">188</a> break if result.match(/Found/) # this shows up when ls returns files</pre></td>
1187
<tr class="uncovered">
1188
<td><pre><a name="line189">189</a> sleep(5)</pre></td>
1193
<tr class="uncovered">
1194
<td><pre><a name="line190">190</a> }</pre></td>
1199
<tr class="uncovered">
1200
<td><pre><a name="line191">191</a> end</pre></td>
1205
<tr class="inferred">
1206
<td><pre><a name="line192">192</a> </pre></td>
1212
<td><pre><a name="line193">193</a> def get_language(filename)</pre></td>
1217
<tr class="uncovered">
1218
<td><pre><a name="line194">194</a> return "ruby"</pre></td>
1223
<tr class="uncovered">
1224
<td><pre><a name="line195">195</a> end</pre></td>
1229
<tr class="inferred">
1230
<td><pre><a name="line196">196</a> </pre></td>
1236
<td><pre><a name="line197">197</a> def run_on_db_master(command, output=WANT_OUTPUT)</pre></td>
1241
<tr class="uncovered">
1242
<td><pre><a name="line198">198</a> db_master = get_db_master</pre></td>
1247
<tr class="uncovered">
1248
<td><pre><a name="line199">199</a> ip = db_master.public_ip</pre></td>
1253
<tr class="uncovered">
1254
<td><pre><a name="line200">200</a> ssh_key = db_master.ssh_key </pre></td>
1259
<tr class="uncovered">
1260
<td><pre><a name="line201">201</a> HelperFunctions.run_remote_command(ip, command, ssh_key, NO_OUTPUT) </pre></td>
1265
<tr class="uncovered">
1266
<td><pre><a name="line202">202</a> end</pre></td>
1271
<tr class="inferred">
1272
<td><pre><a name="line203">203</a> </pre></td>
1272
<p>Generated on Mon Feb 20 14:30:58 -0800 2012 with <a href="http://github.com/relevance/rcov">rcov 0.9.8</a></p>
1278
<p>Generated on Sat Feb 25 19:19:09 -0800 2012 with <a href="http://github.com/relevance/rcov">rcov 0.9.8</a></p>