1
# Copyright (c) 2012 Amazon.com, Inc. or its affiliates. All Rights Reserved
3
# Permission is hereby granted, free of charge, to any person obtaining a
4
# copy of this software and associated documentation files (the
5
# "Software"), to deal in the Software without restriction, including
6
# without limitation the rights to use, copy, modify, merge, publish, dis-
7
# tribute, sublicense, and/or sell copies of the Software, and to permit
8
# persons to whom the Software is furnished to do so, subject to the fol-
11
# The above copyright notice and this permission notice shall be included
12
# in all copies or substantial portions of the Software.
14
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
15
# OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABIL-
16
# ITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT
17
# SHALL THE AUTHOR BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
18
# WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
19
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
24
from boto.compat import json
25
from boto.connection import AWSQueryConnection
26
from boto.regioninfo import RegionInfo
27
from boto.exception import JSONResponseError
28
from boto.datapipeline import exceptions
31
class DataPipelineConnection(AWSQueryConnection):
33
This is the AWS Data Pipeline API Reference. This guide provides
34
descriptions and samples of the AWS Data Pipeline API.
36
APIVersion = "2012-10-29"
37
DefaultRegionName = "us-east-1"
38
DefaultRegionEndpoint = "datapipeline.us-east-1.amazonaws.com"
39
ServiceName = "DataPipeline"
40
ResponseError = JSONResponseError
43
"PipelineDeletedException": exceptions.PipelineDeletedException,
44
"InvalidRequestException": exceptions.InvalidRequestException,
45
"TaskNotFoundException": exceptions.TaskNotFoundException,
46
"PipelineNotFoundException": exceptions.PipelineNotFoundException,
47
"InternalServiceError": exceptions.InternalServiceError,
51
def __init__(self, **kwargs):
52
region = kwargs.get('region')
54
region = RegionInfo(self, self.DefaultRegionName,
55
self.DefaultRegionEndpoint)
56
kwargs['host'] = region.endpoint
57
AWSQueryConnection.__init__(self, **kwargs)
61
def _required_auth_capability(self):
64
def activate_pipeline(self, pipeline_id):
66
Validates a pipeline and initiates processing. If the pipeline
67
does not pass validation, activation fails.
69
:type pipeline_id: string
70
:param pipeline_id: The identifier of the pipeline to activate.
73
params = {'pipelineId': pipeline_id, }
74
return self.make_request(action='ActivatePipeline',
75
body=json.dumps(params))
77
def create_pipeline(self, name, unique_id, description=None):
79
Creates a new empty pipeline. When this action succeeds, you can
80
then use the PutPipelineDefinition action to populate the
84
:param name: The name of the new pipeline. You can use the same name
85
for multiple pipelines associated with your AWS account, because
86
AWS Data Pipeline assigns each new pipeline a unique pipeline
89
:type unique_id: string
90
:param unique_id: A unique identifier that you specify. This identifier
91
is not the same as the pipeline identifier assigned by AWS Data
92
Pipeline. You are responsible for defining the format and ensuring
93
the uniqueness of this identifier. You use this parameter to ensure
94
idempotency during repeated calls to CreatePipeline. For example,
95
if the first call to CreatePipeline does not return a clear
96
success, you can pass in the same unique identifier and pipeline
97
name combination on a subsequent call to CreatePipeline.
98
CreatePipeline ensures that if a pipeline already exists with the
99
same name and unique identifier, a new pipeline will not be
100
created. Instead, you'll receive the pipeline identifier from the
101
previous attempt. The uniqueness of the name and unique identifier
102
combination is scoped to the AWS account or IAM user credentials.
104
:type description: string
105
:param description: The description of the new pipeline.
108
params = {'name': name, 'uniqueId': unique_id, }
109
if description is not None:
110
params['description'] = description
111
return self.make_request(action='CreatePipeline',
112
body=json.dumps(params))
114
def delete_pipeline(self, pipeline_id):
116
Permanently deletes a pipeline, its pipeline definition and its
117
run history. You cannot query or restore a deleted pipeline. AWS
118
Data Pipeline will attempt to cancel instances associated with
119
the pipeline that are currently being processed by task runners.
120
Deleting a pipeline cannot be undone.
122
:type pipeline_id: string
123
:param pipeline_id: The identifier of the pipeline to be deleted.
126
params = {'pipelineId': pipeline_id, }
127
return self.make_request(action='DeletePipeline',
128
body=json.dumps(params))
130
def describe_objects(self, object_ids, pipeline_id, marker=None,
131
evaluate_expressions=None):
133
Returns the object definitions for a set of objects associated
134
with the pipeline. Object definitions are composed of a set of
135
fields that define the properties of the object.
137
:type object_ids: list
138
:param object_ids: Identifiers of the pipeline objects that contain the
139
definitions to be described. You can pass as many as 25 identifiers
140
in a single call to DescribeObjects
143
:param marker: The starting point for the results to be returned. The
144
first time you call DescribeObjects, this value should be empty. As
145
long as the action returns HasMoreResults as True, you can call
146
DescribeObjects again and pass the marker value from the response
147
to retrieve the next set of results.
149
:type pipeline_id: string
150
:param pipeline_id: Identifier of the pipeline that contains the object
153
:type evaluate_expressions: boolean
154
:param evaluate_expressions:
158
'objectIds': object_ids,
159
'pipelineId': pipeline_id,
161
if marker is not None:
162
params['marker'] = marker
163
if evaluate_expressions is not None:
164
params['evaluateExpressions'] = evaluate_expressions
165
return self.make_request(action='DescribeObjects',
166
body=json.dumps(params))
168
def describe_pipelines(self, pipeline_ids):
170
Retrieve metadata about one or more pipelines. The information
171
retrieved includes the name of the pipeline, the pipeline
172
identifier, its current state, and the user account that owns
173
the pipeline. Using account credentials, you can retrieve
174
metadata about pipelines that you or your IAM users have
175
created. If you are using an IAM user account, you can retrieve
176
metadata about only those pipelines you have read permission
179
:type pipeline_ids: list
180
:param pipeline_ids: Identifiers of the pipelines to describe. You can
181
pass as many as 25 identifiers in a single call to
182
DescribePipelines. You can obtain pipeline identifiers by calling
186
params = {'pipelineIds': pipeline_ids, }
187
return self.make_request(action='DescribePipelines',
188
body=json.dumps(params))
190
def evaluate_expression(self, pipeline_id, expression, object_id):
192
Evaluates a string in the context of a specified object. A task
193
runner can use this action to evaluate SQL queries stored in
196
:type pipeline_id: string
197
:param pipeline_id: The identifier of the pipeline.
199
:type expression: string
200
:param expression: The expression to evaluate.
202
:type object_id: string
203
:param object_id: The identifier of the object.
207
'pipelineId': pipeline_id,
208
'expression': expression,
209
'objectId': object_id,
211
return self.make_request(action='EvaluateExpression',
212
body=json.dumps(params))
214
def get_pipeline_definition(self, pipeline_id, version=None):
216
Returns the definition of the specified pipeline. You can call
217
GetPipelineDefinition to retrieve the pipeline definition you
218
provided using PutPipelineDefinition.
220
:type pipeline_id: string
221
:param pipeline_id: The identifier of the pipeline.
223
:type version: string
224
:param version: The version of the pipeline definition to retrieve.
227
params = {'pipelineId': pipeline_id, }
228
if version is not None:
229
params['version'] = version
230
return self.make_request(action='GetPipelineDefinition',
231
body=json.dumps(params))
233
def list_pipelines(self, marker=None):
235
Returns a list of pipeline identifiers for all active pipelines.
236
Identifiers are returned only for pipelines you have permission
240
:param marker: The starting point for the results to be returned. The
241
first time you call ListPipelines, this value should be empty. As
242
long as the action returns HasMoreResults as True, you can call
243
ListPipelines again and pass the marker value from the response to
244
retrieve the next set of results.
248
if marker is not None:
249
params['marker'] = marker
250
return self.make_request(action='ListPipelines',
251
body=json.dumps(params))
253
def poll_for_task(self, worker_group, hostname=None,
254
instance_identity=None):
256
Task runners call this action to receive a task to perform from
257
AWS Data Pipeline. The task runner specifies which tasks it can
258
perform by setting a value for the workerGroup parameter of the
259
PollForTask call. The task returned by PollForTask may come from
260
any of the pipelines that match the workerGroup value passed in
261
by the task runner and that was launched using the IAM user
262
credentials specified by the task runner.
264
:type worker_group: string
265
:param worker_group: Indicates the type of task the task runner is
266
configured to accept and process. The worker group is set as a
267
field on objects in the pipeline when they are created. You can
268
only specify a single value for workerGroup in the call to
269
PollForTask. There are no wildcard values permitted in workerGroup,
270
the string must be an exact, case-sensitive, match.
272
:type hostname: string
273
:param hostname: The public DNS name of the calling task runner.
275
:type instance_identity: structure
276
:param instance_identity: Identity information for the Amazon EC2
277
instance that is hosting the task runner. You can get this value by
278
calling the URI, http://169.254.169.254/latest/meta-data/instance-
279
id, from the EC2 instance. For more information, go to Instance
280
Metadata in the Amazon Elastic Compute Cloud User Guide. Passing in
281
this value proves that your task runner is running on an EC2
282
instance, and ensures the proper AWS Data Pipeline service charges
283
are applied to your pipeline.
286
params = {'workerGroup': worker_group, }
287
if hostname is not None:
288
params['hostname'] = hostname
289
if instance_identity is not None:
290
params['instanceIdentity'] = instance_identity
291
return self.make_request(action='PollForTask',
292
body=json.dumps(params))
294
def put_pipeline_definition(self, pipeline_objects, pipeline_id):
296
Adds tasks, schedules, and preconditions that control the
297
behavior of the pipeline. You can use PutPipelineDefinition to
298
populate a new pipeline or to update an existing pipeline that
299
has not yet been activated.
301
:type pipeline_objects: list
302
:param pipeline_objects: The objects that define the pipeline. These
303
will overwrite the existing pipeline definition.
305
:type pipeline_id: string
306
:param pipeline_id: The identifier of the pipeline to be configured.
310
'pipelineObjects': pipeline_objects,
311
'pipelineId': pipeline_id,
313
return self.make_request(action='PutPipelineDefinition',
314
body=json.dumps(params))
316
def query_objects(self, pipeline_id, sphere, marker=None, query=None,
319
Queries a pipeline for the names of objects that match a
320
specified set of conditions.
323
:param marker: The starting point for the results to be returned. The
324
first time you call QueryObjects, this value should be empty. As
325
long as the action returns HasMoreResults as True, you can call
326
QueryObjects again and pass the marker value from the response to
327
retrieve the next set of results.
329
:type query: structure
330
:param query: Query that defines the objects to be returned. The Query
331
object can contain a maximum of ten selectors. The conditions in
332
the query are limited to top-level String fields in the object.
333
These filters can be applied to components, instances, and
336
:type pipeline_id: string
337
:param pipeline_id: Identifier of the pipeline to be queried for object
341
:param limit: Specifies the maximum number of object names that
342
QueryObjects will return in a single call. The default value is
346
:param sphere: Specifies whether the query applies to components or
347
instances. Allowable values: COMPONENT, INSTANCE, ATTEMPT.
350
params = {'pipelineId': pipeline_id, 'sphere': sphere, }
351
if marker is not None:
352
params['marker'] = marker
353
if query is not None:
354
params['query'] = query
355
if limit is not None:
356
params['limit'] = limit
357
return self.make_request(action='QueryObjects',
358
body=json.dumps(params))
360
def report_task_progress(self, task_id):
362
Updates the AWS Data Pipeline service on the progress of the
363
calling task runner. When the task runner is assigned a task, it
364
should call ReportTaskProgress to acknowledge that it has the
365
task within 2 minutes. If the web service does not recieve this
366
acknowledgement within the 2 minute window, it will assign the
367
task in a subsequent PollForTask call. After this initial
368
acknowledgement, the task runner only needs to report progress
369
every 15 minutes to maintain its ownership of the task. You can
370
change this reporting time from 15 minutes by specifying a
371
reportProgressTimeout field in your pipeline. If a task runner
372
does not report its status after 5 minutes, AWS Data Pipeline
373
will assume that the task runner is unable to process the task
374
and will reassign the task in a subsequent response to
375
PollForTask. task runners should call ReportTaskProgress every
378
:type task_id: string
379
:param task_id: Identifier of the task assigned to the task runner.
380
This value is provided in the TaskObject that the service returns
381
with the response for the PollForTask action.
384
params = {'taskId': task_id, }
385
return self.make_request(action='ReportTaskProgress',
386
body=json.dumps(params))
388
def report_task_runner_heartbeat(self, taskrunner_id, worker_group=None,
391
Task runners call ReportTaskRunnerHeartbeat to indicate that
392
they are operational. In the case of AWS Data Pipeline Task
393
Runner launched on a resource managed by AWS Data Pipeline, the
394
web service can use this call to detect when the task runner
395
application has failed and restart a new instance.
397
:type worker_group: string
398
:param worker_group: Indicates the type of task the task runner is
399
configured to accept and process. The worker group is set as a
400
field on objects in the pipeline when they are created. You can
401
only specify a single value for workerGroup in the call to
402
ReportTaskRunnerHeartbeat. There are no wildcard values permitted
403
in workerGroup, the string must be an exact, case-sensitive, match.
405
:type hostname: string
406
:param hostname: The public DNS name of the calling task runner.
408
:type taskrunner_id: string
409
:param taskrunner_id: The identifier of the task runner. This value
410
should be unique across your AWS account. In the case of AWS Data
411
Pipeline Task Runner launched on a resource managed by AWS Data
412
Pipeline, the web service provides a unique identifier when it
413
launches the application. If you have written a custom task runner,
414
you should assign a unique identifier for the task runner.
417
params = {'taskrunnerId': taskrunner_id, }
418
if worker_group is not None:
419
params['workerGroup'] = worker_group
420
if hostname is not None:
421
params['hostname'] = hostname
422
return self.make_request(action='ReportTaskRunnerHeartbeat',
423
body=json.dumps(params))
425
def set_status(self, object_ids, status, pipeline_id):
427
Requests that the status of an array of physical or logical
428
pipeline objects be updated in the pipeline. This update may not
429
occur immediately, but is eventually consistent. The status that
430
can be set depends on the type of object.
432
:type object_ids: list
433
:param object_ids: Identifies an array of objects. The corresponding
434
objects can be either physical or components, but not a mix of both
438
:param status: Specifies the status to be set on all the objects in
439
objectIds. For components, this can be either PAUSE or RESUME. For
440
instances, this can be either CANCEL, RERUN, or MARK\_FINISHED.
442
:type pipeline_id: string
443
:param pipeline_id: Identifies the pipeline that contains the objects.
447
'objectIds': object_ids,
449
'pipelineId': pipeline_id,
451
return self.make_request(action='SetStatus',
452
body=json.dumps(params))
454
def set_task_status(self, task_id, task_status, error_code=None,
455
error_message=None, error_stack_trace=None):
457
Notifies AWS Data Pipeline that a task is completed and provides
458
information about the final status. The task runner calls this
459
action regardless of whether the task was sucessful. The task
460
runner does not need to call SetTaskStatus for tasks that are
461
canceled by the web service during a call to ReportTaskProgress.
463
:type error_code: integer
464
:param error_code: If an error occurred during the task, specifies a
465
numerical value that represents the error. This value is set on the
466
physical attempt object. It is used to display error information to
467
the user. The web service does not parse this value.
469
:type error_message: string
470
:param error_message: If an error occurred during the task, specifies a
471
text description of the error. This value is set on the physical
472
attempt object. It is used to display error information to the
473
user. The web service does not parse this value.
475
:type error_stack_trace: string
476
:param error_stack_trace: If an error occurred during the task,
477
specifies the stack trace associated with the error. This value is
478
set on the physical attempt object. It is used to display error
479
information to the user. The web service does not parse this value.
481
:type task_id: string
482
:param task_id: Identifies the task assigned to the task runner. This
483
value is set in the TaskObject that is returned by the PollForTask
486
:type task_status: string
487
:param task_status: If FINISHED, the task successfully completed. If
488
FAILED the task ended unsuccessfully. The FALSE value is used by
492
params = {'taskId': task_id, 'taskStatus': task_status, }
493
if error_code is not None:
494
params['errorCode'] = error_code
495
if error_message is not None:
496
params['errorMessage'] = error_message
497
if error_stack_trace is not None:
498
params['errorStackTrace'] = error_stack_trace
499
return self.make_request(action='SetTaskStatus',
500
body=json.dumps(params))
502
def validate_pipeline_definition(self, pipeline_objects, pipeline_id):
504
Tests the pipeline definition with a set of validation checks to
505
ensure that it is well formed and can run without error.
507
:type pipeline_objects: list
508
:param pipeline_objects: A list of objects that define the pipeline
509
changes to validate against the pipeline.
511
:type pipeline_id: string
512
:param pipeline_id: Identifies the pipeline whose definition is to be
517
'pipelineObjects': pipeline_objects,
518
'pipelineId': pipeline_id,
520
return self.make_request(action='ValidatePipelineDefinition',
521
body=json.dumps(params))
523
def make_request(self, action, body):
525
'X-Amz-Target': '%s.%s' % (self.ServiceName, action),
526
'Host': self.region.endpoint,
527
'Content-Type': 'application/x-amz-json-1.1',
528
'Content-Length': str(len(body)),
530
http_request = self.build_base_http_request(
531
method='POST', path='/', auth_path='/', params={},
532
headers=headers, data=body)
533
response = self._mexe(http_request, sender=None,
534
override_num_retries=10)
535
response_body = response.read()
536
boto.log.debug(response_body)
537
if response.status == 200:
539
return json.loads(response_body)
541
json_body = json.loads(response_body)
542
fault_name = json_body.get('__type', None)
543
exception_class = self._faults.get(fault_name, self.ResponseError)
544
raise exception_class(response.status, response.reason,