2
# vim:fileencoding=utf-8
3
from __future__ import (unicode_literals, division, absolute_import,
7
__copyright__ = '2014, Kovid Goyal <kovid at kovidgoyal.net>'
9
import re, sys, copy, json
10
from itertools import repeat
11
from collections import defaultdict
13
from lxml import etree
14
from lxml.builder import ElementMaker
16
from calibre import prints
17
from calibre.ebooks.metadata import check_isbn, check_doi
18
from calibre.ebooks.metadata.book.base import Metadata
19
from calibre.ebooks.metadata.opf2 import dump_dict
20
from calibre.utils.date import parse_date, isoformat, now
21
from calibre.utils.localization import canonicalize_lang, lang_as_iso639_1
23
_xml_declaration = re.compile(r'<\?xml[^<>]+encoding\s*=\s*[\'"](.*?)[\'"][^<>]*>', re.IGNORECASE)
26
'rdf': 'http://www.w3.org/1999/02/22-rdf-syntax-ns#',
27
'dc': 'http://purl.org/dc/elements/1.1/',
28
'pdf': 'http://ns.adobe.com/pdf/1.3/',
29
'pdfx': 'http://ns.adobe.com/pdfx/1.3/',
30
'xmp': 'http://ns.adobe.com/xap/1.0/',
31
'xmpidq': 'http://ns.adobe.com/xmp/Identifier/qual/1.0/',
32
'xmpMM': 'http://ns.adobe.com/xap/1.0/mm/',
33
'xmpRights': 'http://ns.adobe.com/xap/1.0/rights/',
34
'xmpBJ': 'http://ns.adobe.com/xap/1.0/bj/',
35
'xmpTPg': 'http://ns.adobe.com/xap/1.0/t/pg/',
36
'xmpDM': 'http://ns.adobe.com/xmp/1.0/DynamicMedia/',
37
'prism': 'http://prismstandard.org/namespaces/basic/2.0/',
38
'crossmark': 'http://crossref.org/crossmark/1.0/',
39
'xml': 'http://www.w3.org/XML/1998/namespace',
40
'x': 'adobe:ns:meta/',
41
'calibre': 'http://calibre-ebook.com/xmp-namespace',
42
'calibreSI': 'http://calibre-ebook.com/xmp-namespace-series-index',
43
'calibreCC': 'http://calibre-ebook.com/xmp-namespace-custom-columns',
45
KNOWN_ID_SCHEMES = {'isbn', 'url', 'doi'}
48
prefix, name = name.partition(':')[::2]
49
return '{%s}%s' % (NS_MAP[prefix], name)
54
ans = xpath_cache.get(expr, None)
56
xpath_cache[expr] = ans = etree.XPath(expr, namespaces=NS_MAP)
59
def parse_xmp_packet(raw_bytes):
60
raw_bytes = raw_bytes.strip()
62
pat = r'''<?xpacket\s+[^>]*?begin\s*=\s*['"]([^'"]*)['"]'''
63
encodings = ('8', '16-le', '16-be', '32-le', '32-be')
64
header = raw_bytes[:1024]
65
emap = {'\ufeff'.encode('utf-'+x):'utf-'+x for x in encodings}
68
m = re.search(pat.encode('utf-'+q), header)
70
enc = emap.get(m.group(1), enc)
73
return etree.fromstring(raw_bytes)
74
raw = _xml_declaration.sub('', raw_bytes.decode(enc)) # lxml barfs if encoding declaration present in unicode string
75
return etree.fromstring(raw)
77
def serialize_xmp_packet(root, encoding='utf-8'):
78
root.tail = '\n' + '\n'.join(repeat(' '*100, 30)) # Adobe spec recommends inserting padding at the end of the packet
79
raw_bytes = etree.tostring(root, encoding=encoding, pretty_print=True, with_tail=True, method='xml')
80
return b'<?xpacket begin="%s" id="W5M0MpCehiHzreSzNTczkc9d"?>\n%s\n<?xpacket end="w"?>' % ('\ufeff'.encode(encoding), raw_bytes)
82
def read_simple_property(elem):
86
return elem.get(expand('rdf:resource'), '')
88
def read_lang_alt(parent):
89
# A text value with possible alternate values in different languages
90
items = XPath('descendant::rdf:li[@xml:lang="x-default"]')(parent)
93
items = XPath('descendant::rdf:li')(parent)
97
def read_sequence(parent):
98
# A sequence or set of values (assumes simple properties in the sequence)
99
for item in XPath('descendant::rdf:li')(parent):
100
yield read_simple_property(item)
102
def uniq(vals, kmap=lambda x:x):
103
''' Remove all duplicates from vals, while preserving order. kmap must be a
104
callable that returns a hashable value for every item in vals '''
106
lvals = (kmap(x) for x in vals)
109
return tuple(x for x, k in zip(vals, lvals) if k not in seen and not seen_add(k))
111
def multiple_sequences(expr, root):
112
# Get all values for sequence elements matching expr, ensuring the returned
113
# list contains distinct non-null elements preserving their order.
115
for item in XPath(expr)(root):
116
ans += list(read_sequence(item))
117
return filter(None, uniq(ans))
119
def first_alt(expr, root):
120
# The first element matching expr, assumes that the element contains a
121
# language alternate array
122
for item in XPath(expr)(root):
123
q = read_simple_property(read_lang_alt(item))
127
def first_simple(expr, root):
128
# The value for the first occurrence of an element matching expr (assumes
130
for item in XPath(expr)(root):
131
q = read_simple_property(item)
135
def first_sequence(expr, root):
136
# The first item in a sequence
137
for item in XPath(expr)(root):
138
for ans in read_sequence(item):
141
def read_series(root):
142
for item in XPath('//calibre:series')(root):
143
val = XPath('descendant::rdf:value')(item)
146
if series and series.strip():
148
for si in XPath('descendant::calibreSI:series_index')(item):
150
series_index = float(si.text)
151
except (TypeError, ValueError):
155
return series, series_index
158
def read_user_metadata(mi, root):
159
from calibre.utils.config import from_json
160
from calibre.ebooks.metadata.book.json_codec import decode_is_multiple
162
for item in XPath('//calibre:custom_metadata')(root):
163
for li in XPath('./rdf:Bag/rdf:li')(item):
164
name = XPath('descendant::calibreCC:name')(li)
167
if name.startswith('#') and name not in fields:
168
val = XPath('descendant::rdf:value')(li)
172
fm = json.loads(fm, object_hook=from_json)
173
decode_is_multiple(fm)
174
mi.set_user_metadata(name, fm)
177
prints('Failed to read user metadata:', name)
179
traceback.print_exc()
181
def read_xmp_identifers(parent):
183
<rdf:li rdf:parseType="Resource"><xmpidq:Scheme>URL</xmp:idq><rdf:value>http://foo.com</rdf:value></rdf:li>
185
<rdf:li><rdf:Description><xmpidq:Scheme>URL</xmp:idq><rdf:value>http://foo.com</rdf:value></rdf:Description></rdf:li>
187
for li in XPath('./rdf:Bag/rdf:li')(parent):
188
is_resource = li.attrib.get(expand('rdf:parseType'), None) == 'Resource'
189
is_resource = is_resource or (len(li) == 1 and li[0].tag == expand('rdf:Description'))
191
yield None, li.text or ''
192
value = XPath('descendant::rdf:value')(li)
195
value = value[0].text or ''
196
scheme = XPath('descendant::xmpidq:Scheme')(li)
200
yield scheme[0].text or '', value
202
def metadata_from_xmp_packet(raw_bytes):
203
root = parse_xmp_packet(raw_bytes)
204
mi = Metadata(_('Unknown'))
205
title = first_alt('//dc:title', root)
208
authors = multiple_sequences('//dc:creator', root)
211
tags = multiple_sequences('//dc:subject', root) or multiple_sequences('//pdf:Keywords', root)
214
comments = first_alt('//dc:description', root)
216
mi.comments = comments
217
publishers = multiple_sequences('//dc:publisher', root)
219
mi.publisher = publishers[0]
221
pubdate = parse_date(first_sequence('//dc:date', root) or first_simple('//xmp:CreateDate', root), assume_utc=False)
226
bkp = first_simple('//xmp:CreatorTool', root)
228
mi.book_producer = bkp
229
md = first_simple('//xmp:MetadataDate', root)
232
mi.metadata_date = parse_date(md)
235
rating = first_simple('//calibre:rating', root)
236
if rating is not None:
238
rating = float(rating)
239
if 0 <= rating <= 10:
241
except (ValueError, TypeError):
243
series, series_index = read_series(root)
245
mi.series, mi.series_index = series, series_index
246
for x in ('title_sort', 'author_sort'):
247
for elem in XPath('//calibre:' + x)(root):
248
val = read_simple_property(elem)
252
for x in ('author_link_map', 'user_categories'):
253
val = first_simple('//calibre:'+x, root)
256
setattr(mi, x, json.loads(val))
260
languages = multiple_sequences('//dc:language', root)
262
languages = filter(None, map(canonicalize_lang, languages))
264
mi.languages = languages
267
for xmpid in XPath('//xmp:Identifier')(root):
268
for scheme, value in read_xmp_identifers(xmpid):
270
identifiers[scheme.lower()] = value
272
for namespace in ('prism', 'pdfx'):
273
for scheme in KNOWN_ID_SCHEMES:
274
if scheme not in identifiers:
275
val = first_simple('//%s:%s' % (namespace, scheme), root)
276
scheme = scheme.lower()
278
val = check_isbn(val)
279
elif scheme == 'doi':
282
identifiers[scheme] = val
284
# Check Dublin Core for recognizable identifier types
285
for scheme, check_func in {'doi':check_doi, 'isbn':check_isbn}.iteritems():
286
if scheme not in identifiers:
287
val = check_func(first_simple('//dc:identifier', root))
289
identifiers['doi'] = val
292
mi.set_identifiers(identifiers)
294
read_user_metadata(mi, root)
298
def consolidate_metadata(info_mi, info):
299
''' When both the PDF Info dict and XMP metadata are present, prefer the xmp
300
metadata unless the Info ModDate is never than the XMP MetadataDate. This
301
is the algorithm recommended by the PDF spec. '''
303
xmp_mi = metadata_from_xmp_packet(info['xmp_metadata'])
306
traceback.print_exc()
308
info_title, info_authors, info_tags = info_mi.title or _('Unknown'), list(info_mi.authors or ()), list(info_mi.tags or ())
309
info_mi.smart_update(xmp_mi, replace_metadata=True)
311
if 'ModDate' in info and hasattr(xmp_mi, 'metadata_date'):
313
info_date = parse_date(info['ModDate'])
317
prefer_info = info_date > xmp_mi.metadata_date
319
info_mi.title, info_mi.authors, info_mi.tags = info_title, info_authors, info_tags
321
# We'll use the xmp tags/authors but fallback to the info ones if the
322
# xmp does not have tags/authors. smart_update() should have taken care of
324
info_mi.authors, info_mi.tags = xmp_mi.authors or info_mi.authors, xmp_mi.tags or info_mi.tags
328
return {x:NS_MAP[x] for x in args}
330
def create_simple_property(parent, tag, value):
331
e = parent.makeelement(expand(tag))
335
def create_alt_property(parent, tag, value):
336
e = parent.makeelement(expand(tag))
338
alt = e.makeelement(expand('rdf:Alt'))
340
li = alt.makeelement(expand('rdf:li'))
342
li.set(expand('xml:lang'), 'x-default')
345
def create_sequence_property(parent, tag, val, ordered=True):
346
e = parent.makeelement(expand(tag))
348
seq = e.makeelement(expand('rdf:' + ('Seq' if ordered else 'Bag')))
351
li = seq.makeelement(expand('rdf:li'))
355
def create_identifiers(xmp, identifiers):
356
xmpid = xmp.makeelement(expand('xmp:Identifier'))
358
bag = xmpid.makeelement(expand('rdf:Bag'))
360
for scheme, value in identifiers.iteritems():
361
li = bag.makeelement(expand('rdf:li'))
362
li.set(expand('rdf:parseType'), 'Resource')
364
s = li.makeelement(expand('xmpidq:Scheme'))
367
val = li.makeelement(expand('rdf:value'))
371
def create_series(calibre, series, series_index):
372
s = calibre.makeelement(expand('calibre:series'))
373
s.set(expand('rdf:parseType'), 'Resource')
375
val = s.makeelement(expand('rdf:value'))
379
series_index = float(series_index)
380
except (TypeError, ValueError):
382
si = s.makeelement(expand('calibreSI:series_index'))
383
si.text = '%.2f' % series_index
386
def create_user_metadata(calibre, all_user_metadata):
387
from calibre.utils.config import to_json
388
from calibre.ebooks.metadata.book.json_codec import object_to_unicode, encode_is_multiple
390
s = calibre.makeelement(expand('calibre:custom_metadata'))
392
bag = s.makeelement(expand('rdf:Bag'))
394
for name, fm in all_user_metadata.iteritems():
397
encode_is_multiple(fm)
398
fm = object_to_unicode(fm)
399
fm = json.dumps(fm, default=to_json, ensure_ascii=False)
401
prints('Failed to write user metadata:', name)
403
traceback.print_exc()
405
li = bag.makeelement(expand('rdf:li'))
406
li.set(expand('rdf:parseType'), 'Resource')
408
n = li.makeelement(expand('calibreCC:name'))
411
val = li.makeelement(expand('rdf:value'))
415
def metadata_to_xmp_packet(mi):
416
A = ElementMaker(namespace=NS_MAP['x'], nsmap=nsmap('x'))
417
R = ElementMaker(namespace=NS_MAP['rdf'], nsmap=nsmap('rdf'))
418
root = A.xmpmeta(R.RDF)
420
dc = rdf.makeelement(expand('rdf:Description'), nsmap=nsmap('dc'))
421
dc.set(expand('rdf:about'), '')
423
for prop, tag in {'title':'dc:title', 'comments':'dc:description'}.iteritems():
424
val = mi.get(prop) or ''
425
create_alt_property(dc, tag, val)
426
for prop, (tag, ordered) in {
427
'authors':('dc:creator', True), 'tags':('dc:subject', False), 'publisher':('dc:publisher', False),
429
val = mi.get(prop) or ()
430
if isinstance(val, basestring):
432
create_sequence_property(dc, tag, val, ordered)
433
if not mi.is_null('pubdate'):
434
create_sequence_property(dc, 'dc:date', [isoformat(mi.pubdate, as_utc=False)]) # Adobe spec recommends local time
435
if not mi.is_null('languages'):
436
langs = filter(None, map(lambda x:lang_as_iso639_1(x) or canonicalize_lang(x), mi.languages))
438
create_sequence_property(dc, 'dc:language', langs, ordered=False)
440
xmp = rdf.makeelement(expand('rdf:Description'), nsmap=nsmap('xmp', 'xmpidq'))
441
xmp.set(expand('rdf:about'), '')
444
for x in ('prism', 'pdfx'):
445
p = extra_ids[x] = rdf.makeelement(expand('rdf:Description'), nsmap=nsmap(x))
446
p.set(expand('rdf:about'), '')
449
identifiers = mi.get_identifiers()
451
create_identifiers(xmp, identifiers)
452
for scheme, val in identifiers.iteritems():
453
if scheme in {'isbn', 'doi'}:
454
for prefix, parent in extra_ids.iteritems():
455
ie = parent.makeelement(expand('%s:%s'%(prefix, scheme)))
459
d = xmp.makeelement(expand('xmp:MetadataDate'))
460
d.text = isoformat(now(), as_utc=False)
463
calibre = rdf.makeelement(expand('rdf:Description'), nsmap=nsmap('calibre', 'calibreSI', 'calibreCC'))
464
calibre.set(expand('rdf:about'), '')
466
if not mi.is_null('rating'):
469
except (TypeError, ValueError):
472
create_simple_property(calibre, 'calibre:rating', '%g' % r)
473
if not mi.is_null('series'):
474
create_series(calibre, mi.series, mi.series_index)
475
if not mi.is_null('timestamp'):
476
create_simple_property(calibre, 'calibre:timestamp', isoformat(mi.timestamp, as_utc=False))
477
for x in ('author_link_map', 'user_categories'):
478
val = getattr(mi, x, None)
480
create_simple_property(calibre, 'calibre:'+x, dump_dict(val))
482
for x in ('title_sort', 'author_sort'):
483
if not mi.is_null(x):
484
create_simple_property(calibre, 'calibre:'+x, getattr(mi, x))
486
all_user_metadata = mi.get_all_user_metadata(True)
487
if all_user_metadata:
488
create_user_metadata(calibre, all_user_metadata)
489
return serialize_xmp_packet(root)
491
def find_used_namespaces(elem):
492
getns = lambda x: (x.partition('}')[0][1:] if '}' in x else None)
493
ans = {getns(x) for x in list(elem.attrib) + [elem.tag]}
494
for child in elem.iterchildren(etree.Element):
495
ans |= find_used_namespaces(child)
498
def find_preferred_prefix(namespace, elems):
500
ans = {v:k for k, v in elem.nsmap.iteritems()}.get(namespace, None)
503
return find_preferred_prefix(namespace, elem.iterchildren(etree.Element))
505
def find_nsmap(elems):
506
used_namespaces = set()
508
used_namespaces |= find_used_namespaces(elem)
510
used_namespaces -= {NS_MAP['xml'], NS_MAP['x'], None, NS_MAP['rdf']}
511
rmap = {v:k for k, v in NS_MAP.iteritems()}
513
for ns in used_namespaces:
517
pp = find_preferred_prefix(ns, elems)
518
if pp and pp not in ans:
525
def clone_into(parent, elem):
526
' Clone the element, assuming that all namespace declarations are present in parent '
527
clone = parent.makeelement(elem.tag)
529
if elem.text and not elem.text.isspace():
530
clone.text = elem.text
531
if elem.tail and not elem.tail.isspace():
532
clone.tail = elem.tail
533
clone.attrib.update(elem.attrib)
534
for child in elem.iterchildren(etree.Element):
535
clone_into(clone, child)
537
def merge_xmp_packet(old, new):
538
''' Merge metadata present in the old packet that is not present in the new
539
one into the new one. Assumes the new packet was generated by
540
metadata_to_xmp_packet() '''
541
old, new = parse_xmp_packet(old), parse_xmp_packet(new)
542
# As per the adobe spec all metadata items have to be present inside top-level rdf:Description containers
543
item_xpath = XPath('//rdf:RDF/rdf:Description/*')
545
# First remove all data fields that metadata_to_xmp_packet() knowns about,
546
# since either they will have been set or if not present, imply they have
548
defined_tags = {expand(prefix + ':' + scheme) for prefix in ('prism', 'pdfx') for scheme in KNOWN_ID_SCHEMES}
549
defined_tags |= {expand('dc:' + x) for x in ('identifier', 'title', 'creator', 'date', 'description', 'language', 'publisher', 'subject')}
550
defined_tags |= {expand('xmp:' + x) for x in ('MetadataDate', 'Identifier')}
551
# For redundancy also remove all fields explicitly set in the new packet
552
defined_tags |= {x.tag for x in item_xpath(new)}
553
calibrens = '{%s}' % NS_MAP['calibre']
554
for elem in item_xpath(old):
555
if elem.tag in defined_tags or (elem.tag and elem.tag.startswith(calibrens)):
556
elem.getparent().remove(elem)
558
# Group all items into groups based on their namespaces
559
groups = defaultdict(list)
560
for item in item_xpath(new):
561
ns = item.nsmap[item.prefix]
562
groups[ns].append(item)
564
for item in item_xpath(old):
565
ns = item.nsmap[item.prefix]
566
groups[ns].append(item)
568
A = ElementMaker(namespace=NS_MAP['x'], nsmap=nsmap('x'))
569
R = ElementMaker(namespace=NS_MAP['rdf'], nsmap=nsmap('rdf'))
570
root = A.xmpmeta(R.RDF)
573
for namespace in sorted(groups, key=lambda x:{NS_MAP['dc']:'a', NS_MAP['xmp']:'b', NS_MAP['calibre']:'c'}.get(x, 'z'+x)):
574
items = groups[namespace]
575
desc = rdf.makeelement(expand('rdf:Description'), nsmap=find_nsmap(items))
576
desc.set(expand('rdf:about'), '')
579
clone_into(desc, item)
581
return serialize_xmp_packet(root)
583
if __name__ == '__main__':
584
from calibre.utils.podofo import get_xmp_metadata
585
xmp_packet = get_xmp_metadata(sys.argv[-1])
586
mi = metadata_from_xmp_packet(xmp_packet)
587
np = metadata_to_xmp_packet(mi)
588
print (merge_xmp_packet(xmp_packet, np))