~gcrosswhite/charon/projects-ParallelIO

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
//@+leo-ver=4-thin
//@+node:gcross.20081014125825.3:@thin controller.C
//@@language C++

#include "controller.h"

#include "controller.def.h"

//@+others
//@+node:gcross.20081103140450.10:Operation Class Definition
Operation::~Operation() { if(slave!=NULL) slave->retire_operation(this); }
//@-node:gcross.20081103140450.10:Operation Class Definition
//@+node:gcross.20081015194657.5:Master Class Definition
void Master::update_broadcast_ticket() {
    last_broadcast_ticket_id = next_ticket_id;
    next_ticket_id++;
    ticket_id_assignees.clear();
}

void Master::broadcast_operation(Operation& op) {
    op.ticket_id = next_ticket_id;
    chares.operate_broadcasted(last_broadcast_ticket_id,ticket_id_assignees,&op);
    update_broadcast_ticket();    
}

void Master::pointcast_operation(int index, Operation& op) {
    op.ticket_id = next_ticket_id;
    TicketID last_ticket_id = chare_previous_ticket_ids[index] < last_broadcast_ticket_id
        ? last_broadcast_ticket_id
        : chare_previous_ticket_ids[index];
    chares[index].operate_pointcasted(last_ticket_id,&op);
    chare_previous_ticket_ids[index] = next_ticket_id;
    next_ticket_id++;
    ticket_id_assignees.push_back(index);
}

//@-node:gcross.20081015194657.5:Master Class Definition
//@+node:gcross.20081013161056.10:Slave Class Declaration
void Slave::perform_operation(Operation* op) {
    op->slave = this;
    if(op->perform()) {
        retire_operation(op,false);
        delete op;
    }
}

void Slave::retire_operation(Operation* op, bool perform_ready) {
    CkAssert(op->ticket_id > last_seen_ticket_id);
    last_seen_ticket_id = op->ticket_id;
    while((ticket_jump_queue.size()) > 0 and (ticket_jump_queue.top().new_ticket_id == op->ticket_id))
        ticket_jump_queue.pop();
    op->slave = NULL;
    if(perform_ready)
        self.perform_ready_operations(); // Queues this call so that it doesn't add to the stack
}

void Slave::perform_ready_operations() { 
    while((ticket_jump_queue.size() > 0) and (operation_queue.size() > 0) and (ticket_jump_queue.top().old_ticket_id == last_seen_ticket_id) and (ticket_jump_queue.top().new_ticket_id == operation_queue.top()->ticket_id) ) {
        ticket_jump_queue.pop();
        Operation* op = operation_queue.top();
        operation_queue.pop();
        perform_operation(op);
    }
}

void Slave::operate_broadcasted(const TicketID last_broadcast_id, const std::vector<int>& ticket_id_assignees, Operation* op) {
    int previous_ticket_id = last_broadcast_id;
    for(int i = 0; i < ticket_id_assignees.size(); i++) {
        if(ticket_id_assignees[i]==thisIndex) {
            int next_ticket_id = last_broadcast_id+i+1;
            if(previous_ticket_id >= last_seen_ticket_id) {
                ticket_jump_queue.push(TicketJump(previous_ticket_id,next_ticket_id));
            }
            previous_ticket_id = next_ticket_id;
        }
    }
    int next_ticket_id = op->ticket_id;
    if(previous_ticket_id == last_seen_ticket_id) {
        perform_operation(op);
    } else {
        ticket_jump_queue.push(TicketJump(previous_ticket_id,next_ticket_id));
        operation_queue.push(op);
    }
    perform_ready_operations();
}

void Slave::operate_pointcasted(const TicketID previous_ticket_id, Operation* op) {
    if(last_seen_ticket_id == previous_ticket_id) {
        perform_operation(op);
    } else {
        ticket_jump_queue.push(TicketJump(previous_ticket_id,op->ticket_id));
        operation_queue.push(op); 
    }
    perform_ready_operations();
}
//@nonl
//@-node:gcross.20081013161056.10:Slave Class Declaration
//@-others
//@-node:gcross.20081014125825.3:@thin controller.C
//@-leo