69
69
def update_input_data(context, entity_id, current_traversal,
70
70
is_update, atomic_key, input_data):
71
sync_point_object.SyncPoint.update_input_data(
71
rows_updated = sync_point_object.SyncPoint.update_input_data(
72
72
context, entity_id, current_traversal, is_update, atomic_key,
76
78
def deserialize_input_data(db_input_data):
77
79
db_input_data = db_input_data.get('input_data')
88
90
def sync(cnxt, entity_id, current_traversal, is_update, propagate,
89
91
predecessors, new_data):
90
sync_point = get(cnxt, entity_id, current_traversal,
92
input_data = dict(deserialize_input_data(sync_point.input_data))
93
input_data.update(new_data)
95
while not rows_updated:
96
# TODO(sirushtim): Add a conf option to add no. of retries
97
sync_point = get(cnxt, entity_id, current_traversal, is_update)
98
input_data = dict(deserialize_input_data(sync_point.input_data))
99
input_data.update(new_data)
100
rows_updated = update_input_data(
101
cnxt, entity_id, current_traversal, is_update,
102
sync_point.atomic_key, serialize_input_data(input_data))
94
104
waiting = predecessors - set(input_data)
96
# Note: update must be atomic
97
update_input_data(cnxt, entity_id, current_traversal,
98
is_update, sync_point.atomic_key,
99
serialize_input_data(input_data))
101
105
key = make_key(entity_id, current_traversal, is_update)
103
107
LOG.debug('[%s] Waiting %s: Got %s; still need %s',