~nchohan/appscale/zk3.3.4

« back to all changes in this revision

Viewing changes to Neptune/mapreduce_helper.rb

  • Committer: Chris Bunch
  • Date: 2012-02-26 03:20:57 UTC
  • Revision ID: cgb@cs.ucsb.edu-20120226032057-ad0cy0zgx4we4exc
adding in repo over app engine support, and tests for most of datastore repo on appscale and s3

Show diffs side-by-side

added added

removed removed

Lines of Context:
2
2
# Programmer: Chris Bunch
3
3
 
4
4
 
 
5
$:.unshift File.join(File.dirname(__FILE__), "..", "AppController")
5
6
require 'djinn'
6
7
 
7
8
 
 
9
$:.unshift File.join(File.dirname(__FILE__), "..", "AppController", "lib")
 
10
require 'datastore_factory'
 
11
 
 
12
 
8
13
BAD_TABLE_MSG = "The currently running database isn't running Hadoop, so MapReduce jobs cannot be run."
9
14
 
10
15
 
29
34
    nodes = Djinn.convert_location_array_to_class(nodes, keyname)
30
35
 
31
36
    storage = job_data["@storage"]
32
 
    creds = Djinn.neptune_parse_creds(storage, job_data)
 
37
    datastore = DatastoreFactory.get_datastore(storage, job_data)
33
38
 
34
39
    mapreducejar = job_data["@mapreducejar"]
35
40
    main = job_data["@main"]
45
50
 
46
51
    if mapreducejar
47
52
      Djinn.log_debug("need to get mr jar located at #{mapreducejar}")
48
 
      mr_jar = Repo.get_output(mapreducejar, storage, creds)
49
53
      mr_file = mapreducejar.split('/')[-1]
50
54
      my_mrjar = "/tmp/#{mr_file}"
51
 
      HelperFunctions.write_file(my_mrjar, mr_jar)
 
55
      datastore.get_output_and_save_to_fs(mapreducejar, my_mrjar)
52
56
 
53
57
      nodes.each { |node|
54
58
        HelperFunctions.scp_file(my_mrjar, my_mrjar, node.private_ip, node.ssh_key)
61
65
 
62
66
      run_mr_command = "#{HADOOP} jar #{my_mrjar} #{main} #{input} #{output}"
63
67
    else
64
 
      Djinn.log_debug("need to get map code located at #{map}") 
65
 
      map_code = Repo.get_output(map, storage, creds)
66
 
 
67
 
      Djinn.log_debug("need to get reduce code located at #{reduce}")
68
 
      red_code = Repo.get_output(reduce, storage, creds)
 
68
      Djinn.log_debug("need to get map code located at #{map}, and reduce " +
 
69
        "code located at #{reduce}")
69
70
 
70
71
      map_file = map.split('/')[-1]
71
72
      red_file = reduce.split('/')[-1]
73
74
      my_map = "/tmp/#{map_file}"
74
75
      my_red = "/tmp/#{red_file}"
75
76
 
76
 
      HelperFunctions.write_file(my_map, map_code)
77
 
      HelperFunctions.write_file(my_red, red_code)
 
77
      datastore.get_output_and_save_to_fs(map, my_map)
 
78
      datastore.get_output_and_save_to_fs(reduce, my_red)
78
79
 
79
80
      # since the db master is the initiator of the mapreduce job, it needs
80
81
      # to have both the mapper and reducer files handy