~zulcss/samba/server-dailies-3.4

« back to all changes in this revision

Viewing changes to source4/scripting/python/samba/torture/spoolss.py

  • Committer: Chuck Short
  • Date: 2010-09-28 20:38:39 UTC
  • Revision ID: zulcss@ubuntu.com-20100928203839-pgjulytsi9ue63x1
Initial version

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
import sys, string
 
2
import dcerpc
 
3
 
 
4
 
 
5
def ResizeBufferCall(fn, pipe, r):
 
6
 
 
7
    r['buffer'] = None
 
8
    r['buf_size'] = 0
 
9
    
 
10
    result = fn(pipe, r)
 
11
 
 
12
    if result['result'] == dcerpc.WERR_INSUFFICIENT_BUFFER or \
 
13
       result['result'] == dcerpc.WERR_MORE_DATA:
 
14
        r['buffer'] = result['buf_size'] * '\x00'
 
15
        r['buf_size'] = result['buf_size']
 
16
 
 
17
    result = fn(pipe, r)
 
18
 
 
19
    return result
 
20
 
 
21
 
 
22
def test_OpenPrinterEx(pipe, printer):
 
23
 
 
24
    print 'spoolss_OpenPrinterEx(%s)' % printer
 
25
 
 
26
    printername = '\\\\%s' % dcerpc.dcerpc_server_name(pipe)
 
27
    
 
28
    if printer is not None:
 
29
        printername = printername + '\\%s' % printer
 
30
 
 
31
    r = {}
 
32
    r['printername'] = printername
 
33
    r['datatype'] = None
 
34
    r['devmode_ctr'] = {}
 
35
    r['devmode_ctr']['size'] = 0
 
36
    r['devmode_ctr']['devmode'] = None
 
37
    r['access_mask'] = 0x02000000
 
38
    r['level'] = 1
 
39
    r['userlevel'] = {}
 
40
    r['userlevel']['level1'] = {}
 
41
    r['userlevel']['level1']['size'] = 0
 
42
    r['userlevel']['level1']['client'] = None
 
43
    r['userlevel']['level1']['user'] = None
 
44
    r['userlevel']['level1']['build'] = 1381
 
45
    r['userlevel']['level1']['major'] = 2
 
46
    r['userlevel']['level1']['minor'] = 0
 
47
    r['userlevel']['level1']['processor'] = 0
 
48
 
 
49
    result = dcerpc.spoolss_OpenPrinterEx(pipe, r)
 
50
 
 
51
    return result['handle']
 
52
 
 
53
 
 
54
def test_ClosePrinter(pipe, handle):
 
55
 
 
56
    r = {}
 
57
    r['handle'] = handle
 
58
 
 
59
    dcerpc.spoolss_ClosePrinter(pipe, r)
 
60
 
 
61
 
 
62
def test_GetPrinter(pipe, handle):
 
63
 
 
64
    r = {}
 
65
    r['handle'] = handle
 
66
 
 
67
    for level in [0, 1, 2, 3, 4, 5, 6, 7]:
 
68
 
 
69
        print 'spoolss_GetPrinter(level = %d)' % level
 
70
 
 
71
        r['level'] = level
 
72
        r['buffer'] = None
 
73
        r['buf_size'] = 0
 
74
 
 
75
        result = ResizeBufferCall(dcerpc.spoolss_GetPrinter, pipe, r)
 
76
 
 
77
 
 
78
def test_EnumForms(pipe, handle):
 
79
 
 
80
    print 'spoolss_EnumForms()'
 
81
 
 
82
    r = {}
 
83
    r['handle'] = handle
 
84
    r['level'] = 1
 
85
    r['buffer'] = None
 
86
    r['buf_size'] = 0
 
87
 
 
88
    result = ResizeBufferCall(dcerpc.spoolss_EnumForms, pipe, r)
 
89
 
 
90
    forms = dcerpc.unmarshall_spoolss_FormInfo_array(
 
91
        result['buffer'], r['level'], result['count'])
 
92
 
 
93
    for form in forms:
 
94
 
 
95
        r = {}
 
96
        r['handle'] = handle
 
97
        r['formname'] = form['info1']['formname']
 
98
        r['level'] = 1
 
99
 
 
100
        result = ResizeBufferCall(dcerpc.spoolss_GetForm, pipe, r)
 
101
 
 
102
 
 
103
def test_EnumPorts(pipe, handle):
 
104
 
 
105
    print 'spoolss_EnumPorts()'
 
106
 
 
107
    for level in [1, 2]:
 
108
 
 
109
        r = {}
 
110
        r['handle'] = handle
 
111
        r['servername'] = None
 
112
        r['level'] = level
 
113
 
 
114
        result = ResizeBufferCall(dcerpc.spoolss_EnumPorts, pipe, r)
 
115
 
 
116
        ports = dcerpc.unmarshall_spoolss_PortInfo_array(
 
117
            result['buffer'], r['level'], result['count'])
 
118
 
 
119
        if level == 1:
 
120
            port_names = map(lambda x: x['info1']['port_name'], ports)
 
121
 
 
122
 
 
123
def test_DeleteForm(pipe, handle, formname):
 
124
 
 
125
    r = {}
 
126
    r['handle'] = handle
 
127
    r['formname'] = formname
 
128
 
 
129
    dcerpc.spoolss_DeleteForm(pipe, r)
 
130
 
 
131
 
 
132
def test_GetForm(pipe, handle, formname):
 
133
 
 
134
    r = {}
 
135
    r['handle'] = handle
 
136
    r['formname'] = formname
 
137
    r['level'] = 1
 
138
 
 
139
    result = ResizeBufferCall(dcerpc.spoolss_GetForm, pipe, r)
 
140
 
 
141
    return result['info']['info1']
 
142
    
 
143
 
 
144
def test_SetForm(pipe, handle, form):
 
145
 
 
146
    print 'spoolss_SetForm()'
 
147
 
 
148
    r = {}
 
149
    r['handle'] = handle
 
150
    r['level'] = 1
 
151
    r['formname'] = form['info1']['formname']
 
152
    r['info'] = form
 
153
 
 
154
    dcerpc.spoolss_SetForm(pipe, r)
 
155
 
 
156
    newform = test_GetForm(pipe, handle, r['formname'])
 
157
 
 
158
    if form['info1'] != newform:
 
159
        print 'SetForm: mismatch: %s != %s' % \
 
160
              (r['info']['info1'], f)
 
161
        sys.exit(1)
 
162
 
 
163
 
 
164
def test_AddForm(pipe, handle):
 
165
 
 
166
    print 'spoolss_AddForm()'
 
167
 
 
168
    formname = '__testform__'
 
169
 
 
170
    r = {}
 
171
    r['handle'] = handle
 
172
    r['level'] = 1
 
173
    r['info'] = {}
 
174
    r['info']['info1'] = {}
 
175
    r['info']['info1']['formname'] = formname
 
176
    r['info']['info1']['flags'] = 0x0002
 
177
    r['info']['info1']['width'] = 100
 
178
    r['info']['info1']['length'] = 100
 
179
    r['info']['info1']['left'] = 0
 
180
    r['info']['info1']['top'] = 1000
 
181
    r['info']['info1']['right'] = 2000
 
182
    r['info']['info1']['bottom'] = 3000
 
183
 
 
184
    try:
 
185
        result = dcerpc.spoolss_AddForm(pipe, r)
 
186
    except dcerpc.WERROR, arg:
 
187
        if arg[0] == dcerpc.WERR_ALREADY_EXISTS:
 
188
            test_DeleteForm(pipe, handle, formname)
 
189
        result = dcerpc.spoolss_AddForm(pipe, r)
 
190
 
 
191
    f = test_GetForm(pipe, handle, formname)
 
192
 
 
193
    if r['info']['info1'] != f:
 
194
        print 'AddForm: mismatch: %s != %s' % \
 
195
              (r['info']['info1'], f)
 
196
        sys.exit(1)
 
197
 
 
198
    r['formname'] = formname
 
199
 
 
200
    test_SetForm(pipe, handle, r['info'])
 
201
 
 
202
    test_DeleteForm(pipe, handle, formname)
 
203
 
 
204
 
 
205
def test_EnumJobs(pipe, handle):
 
206
 
 
207
    print 'spoolss_EnumJobs()'
 
208
 
 
209
    r = {}
 
210
    r['handle'] = handle
 
211
    r['firstjob'] = 0
 
212
    r['numjobs'] = 0xffffffff
 
213
    r['level'] = 1
 
214
 
 
215
    result = ResizeBufferCall(dcerpc.spoolss_EnumJobs, pipe, r)
 
216
 
 
217
    if result['buffer'] is None:
 
218
        return
 
219
    
 
220
    jobs = dcerpc.unmarshall_spoolss_JobInfo_array(
 
221
        result['buffer'], r['level'], result['count'])
 
222
 
 
223
    for job in jobs:
 
224
 
 
225
        s = {}
 
226
        s['handle'] = handle
 
227
        s['job_id'] = job['info1']['job_id']
 
228
        s['level'] = 1
 
229
 
 
230
        result = ResizeBufferCall(dcerpc.spoolss_GetJob, pipe, s)
 
231
 
 
232
        if result['info'] != job:
 
233
            print 'EnumJobs: mismatch: %s != %s' % (result['info'], job)
 
234
            sys.exit(1)
 
235
    
 
236
 
 
237
    # TODO: AddJob, DeleteJob, ScheduleJob
 
238
 
 
239
 
 
240
def test_EnumPrinterData(pipe, handle):
 
241
 
 
242
    print 'test_EnumPrinterData()'
 
243
 
 
244
    enum_index = 0
 
245
 
 
246
    while 1:
 
247
 
 
248
        r = {}
 
249
        r['handle'] = handle
 
250
        r['enum_index'] = enum_index
 
251
 
 
252
        r['value_offered'] = 0
 
253
        r['data_size'] = 0
 
254
        
 
255
        result = dcerpc.spoolss_EnumPrinterData(pipe, r)
 
256
 
 
257
        r['value_offered'] = result['value_needed']
 
258
        r['data_size'] = result['data_size']
 
259
 
 
260
        result = dcerpc.spoolss_EnumPrinterData(pipe, r)
 
261
 
 
262
        if result['result'] == dcerpc.WERR_NO_MORE_ITEMS:
 
263
            break
 
264
 
 
265
        s = {}
 
266
        s['handle'] = handle
 
267
        s['value_name'] = result['value_name']
 
268
 
 
269
        result2 = ResizeBufferCall(dcerpc.spoolss_GetPrinterData, pipe, s)
 
270
 
 
271
        if result['buffer'][:result2['buf_size']] != result2['buffer']:
 
272
            print 'EnumPrinterData/GetPrinterData mismatch'
 
273
            sys.exit(1)
 
274
 
 
275
        enum_index += 1
 
276
 
 
277
 
 
278
def test_SetPrinterDataEx(pipe, handle):
 
279
 
 
280
    valuename = '__printerdataextest__'
 
281
    data = '12345'
 
282
 
 
283
    r = {}
 
284
    r['handle'] = handle
 
285
    r['key_name'] = 'DsSpooler'
 
286
    r['value_name'] = valuename
 
287
    r['type'] = 3
 
288
    r['buffer'] = data
 
289
    r['buf_size'] = len(data)
 
290
    
 
291
    result = dcerpc.spoolss_SetPrinterDataEx(pipe, r)
 
292
 
 
293
 
 
294
def test_EnumPrinterDataEx(pipe, handle):
 
295
 
 
296
    r = {}
 
297
    r['handle'] = handle
 
298
    r['key_name'] = 'DsSpooler'
 
299
    r['buf_size'] = 0
 
300
 
 
301
    result = dcerpc.spoolss_EnumPrinterDataEx(pipe, r)
 
302
 
 
303
    if result['result'] == dcerpc.WERR_MORE_DATA:
 
304
        r['buf_size'] = result['buf_size']
 
305
 
 
306
        result = dcerpc.spoolss_EnumPrinterDataEx(pipe, r)
 
307
 
 
308
    # TODO: test spoolss_GetPrinterDataEx()
 
309
 
 
310
 
 
311
def test_SetPrinterData(pipe, handle):
 
312
 
 
313
    print 'testing spoolss_SetPrinterData()'
 
314
 
 
315
    valuename = '__printerdatatest__'
 
316
    data = '12345'
 
317
    
 
318
    r = {}
 
319
    r['handle'] = handle
 
320
    r['value_name'] = valuename
 
321
    r['type'] = 3                       # REG_BINARY
 
322
    r['buffer'] = data
 
323
    r['real_len'] = 5
 
324
 
 
325
    dcerpc.spoolss_SetPrinterData(pipe, r)
 
326
 
 
327
    s = {}
 
328
    s['handle'] = handle
 
329
    s['value_name'] = valuename
 
330
 
 
331
    result = ResizeBufferCall(dcerpc.spoolss_GetPrinterData, pipe, r)
 
332
 
 
333
    if result['buffer'] != data:
 
334
        print 'SetPrinterData: mismatch'
 
335
        sys.exit(1)
 
336
 
 
337
    dcerpc.spoolss_DeletePrinterData(pipe, r)
 
338
 
 
339
 
 
340
def test_EnumPrinters(pipe):
 
341
 
 
342
    print 'testing spoolss_EnumPrinters()'
 
343
 
 
344
    printer_names = None
 
345
 
 
346
    r = {}
 
347
    r['flags'] = 0x02
 
348
    r['server'] = None
 
349
 
 
350
    for level in [0, 1, 2, 4, 5]:
 
351
 
 
352
        print 'test_EnumPrinters(level = %d)' % level
 
353
 
 
354
        r['level'] = level
 
355
 
 
356
        result = ResizeBufferCall(dcerpc.spoolss_EnumPrinters, pipe, r)
 
357
 
 
358
        printers = dcerpc.unmarshall_spoolss_PrinterInfo_array(
 
359
            result['buffer'], r['level'], result['count'])
 
360
 
 
361
        if level == 2:
 
362
            for p in printers:
 
363
 
 
364
                # A nice check is for the specversion in the
 
365
                # devicemode.  This has always been observed to be
 
366
                # 1025.
 
367
 
 
368
                if p['info2']['devmode']['specversion'] != 1025:
 
369
                    print 'test_EnumPrinters: specversion != 1025'
 
370
                    sys.exit(1)
 
371
 
 
372
    r['level'] = 1
 
373
    result = ResizeBufferCall(dcerpc.spoolss_EnumPrinters, pipe, r)
 
374
    
 
375
    for printer in dcerpc.unmarshall_spoolss_PrinterInfo_array(
 
376
        result['buffer'], r['level'], result['count']):
 
377
 
 
378
        if string.find(printer['info1']['name'], '\\\\') == 0:
 
379
            print 'Skipping remote printer %s' % printer['info1']['name']
 
380
            continue
 
381
 
 
382
        printername = string.split(printer['info1']['name'], ',')[0]
 
383
 
 
384
        handle = test_OpenPrinterEx(pipe, printername)
 
385
 
 
386
        test_GetPrinter(pipe, handle)
 
387
        test_EnumPorts(pipe, handle)
 
388
        test_EnumForms(pipe, handle)
 
389
        test_AddForm(pipe, handle)
 
390
        test_EnumJobs(pipe, handle)
 
391
        test_EnumPrinterData(pipe, handle)
 
392
        test_EnumPrinterDataEx(pipe, handle)
 
393
        test_SetPrinterData(pipe, handle)
 
394
#       test_SetPrinterDataEx(pipe, handle)
 
395
        test_ClosePrinter(pipe, handle)
 
396
 
 
397
 
 
398
def test_EnumPrinterDrivers(pipe):
 
399
 
 
400
    print 'test spoolss_EnumPrinterDrivers()'
 
401
 
 
402
    for level in [1, 2, 3]:
 
403
 
 
404
        r = {}
 
405
        r['server'] = None
 
406
        r['environment'] = None
 
407
        r['level'] = level
 
408
 
 
409
        result = ResizeBufferCall(dcerpc.spoolss_EnumPrinterDrivers, pipe, r)
 
410
 
 
411
        drivers = dcerpc.unmarshall_spoolss_DriverInfo_array(
 
412
            result['buffer'], r['level'], result['count'])
 
413
 
 
414
        if level == 1:
 
415
            driver_names = map(lambda x: x['info1']['driver_name'], drivers)
 
416
            
 
417
 
 
418
def test_PrintServer(pipe):
 
419
    
 
420
    handle = test_OpenPrinterEx(pipe, None)
 
421
 
 
422
    # EnumForms and AddForm tests return WERR_BADFID here (??)
 
423
 
 
424
    test_ClosePrinter(pipe, handle)
 
425
    
 
426
 
 
427
def runtests(binding, domain, username, password):
 
428
    
 
429
    print 'Testing SPOOLSS pipe'
 
430
 
 
431
    pipe = dcerpc.pipe_connect(binding,
 
432
            dcerpc.DCERPC_SPOOLSS_UUID, dcerpc.DCERPC_SPOOLSS_VERSION,
 
433
            domain, username, password)
 
434
 
 
435
    test_EnumPrinters(pipe)
 
436
    test_EnumPrinterDrivers(pipe)
 
437
    test_PrintServer(pipe)