~lmirror/lmirror/master

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
1178
1179
1180
1181
#
# LMirror is Copyright (C) 2010 Robert Collins <robertc@robertcollins.net>
# 
# LMirror is free software: you can redistribute it and/or modify it under the
# terms of the GNU General Public License as published by the Free Software
# Foundation, either version 3 of the License, or (at your option) any later
# version.
# 
# This program is distributed in the hope that it will be useful, but WITHOUT ANY
# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
# PARTICULAR PURPOSE.  See the GNU General Public License for more details.
# 
# You should have received a copy of the GNU General Public License along with
# this program.  If not, see <http://www.gnu.org/licenses/>.
# 
# In the LMirror source tree the file COPYING.txt contains the GNU General Public
# License version 3.
# 

"""Journals are the individual items that l_mirror uses to synchronise changes.

When a mirror set is updated a new journal is written (see DiskUpdater for
instance), and when a node is receiving updates it receives all the journals it
is missing, and then the updates determined by combining all the journals
together.

The parse() function can parse a journal bytes to make a new Journal object.

The Combiner object can combine multiple journals together, and then either
generate the model of a tree on disk, or a new journal with redundant changes
eliminated.

DiskUpdater is a class to compare a memory 'tree' and a bzr transport and 
output a journal to update the tree to match the transport. DiskUpdater uses
a helper filter object to do path filtering. FilterCombiner and ProcessFilter
are useful helpers for such filtering.

Various Replay objects can replay a journal. TransportReplay replays a journal
by reading the file content from a transport object. ReplayGenerator is the
core workhorse for streaming the data needed to do a replay; it is serialised
when streaming from http and deserialised by FromFileGenerator.
"""

__all__ = ['parse', 'Combiner', 'Journal', 'DiskUpdater', 'TransportReplay',
    'FilterCombiner', 'ProcessFilter',
    ]

import errno
import os
from hashlib import sha1 as sha
import re

from bzrlib import errors, osutils


class PathContent(object):
    """Content about a path.

    This is an abstract type with enough data to verify whether a given path
    has been updated correctly or not.

    :ivar kind: The kind of the path.
    """

    def __hash__(self):
        return hash(tuple(self.as_tokens()))

    def __eq__(self, other):
        return type(other) == type(self) and self.__dict__ == other.__dict__

    def __ne__(self, other):
        return not self == other

    def as_tokens(self):
        """Return a list of the tokens that would parse into this PathContent.
        
        :return: A list of tokens.
        """
        raise NotImplementedError(self.as_tokens)

    def __repr__(self):
        return "PathContent: '%s'" % (self.as_tokens(),)


class FileContent(PathContent):
    """Content for files.
    
    :ivar sha1: The sha1 of the filec ontent.
    :ivar length: The length of the file.
    :ivar mtime: The mtime of the file. None if it is not known.
    """

    def __init__(self, sha1, length, mtime):
        self.kind = 'file'
        self.sha1 = sha1
        self.length = length
        self.mtime = mtime

    def as_tokens(self):
        if self.mtime is None:
            mtime = "None"
        else:
            mtime = "%0.6f" % self.mtime
        return [self.kind, self.sha1, str(self.length), mtime]


class SymlinkContent(PathContent):
    """Content for symlinks."""

    def __init__(self, target):
        self.kind = 'symlink'
        self.target = target

    def as_tokens(self):
        return [self.kind, self.target]


class DirContent(PathContent):
    """Content for directories."""

    def __init__(self):
        self.kind = 'dir'

    def as_tokens(self):
        return [self.kind]


class Combiner(object):
    """Combine multiple journals.
    
    :ivar journal: The combined journal that is being created.
    """

    def __init__(self):
        """Create a Combiner object."""
        self.journal = Journal()

    def add(self, journal):
        """Add journal to the combined journal.

        :raises ValueError: If the journal cannot be safely combined.
        """
        # paths to delete after iteration.
        pending_del_paths = []
        merged_content = {}
        for path, new_content in journal.paths.iteritems():
            old_content = self.journal.paths.get(path, None)
            if old_content is None:
                continue
            # resolve here
            old_action = old_content[0]
            new_action = new_content[0]
            if (old_action, new_action) == ('new', 'new'):
                raise ValueError('Attempt to add %r twice.' % path)
            elif (old_action, new_action) == ('new', 'del'):
                if old_content[1] != new_content[1]:
                    raise ValueError('Attempt to delete wrong content %r, %r' %
                        (old_content, new_content))
                pending_del_paths.append(path)
            elif (old_action, new_action) == ('new', 'replace'):
                if old_content[1] != new_content[1][0]:
                    raise ValueError('Attempt to replace wrong content %r, %r' %
                        (old_content, new_content))
                merged_content[path] = ('new', new_content[1][1])
            elif (old_action, new_action) == ('del', 'new'):
                merged_content[path] = ('replace', (old_content[1], new_content[1]))
            elif (old_action, new_action) == ('del', 'del'):
                raise ValueError('Attempt to delete %r twice.' % path)
            elif (old_action, new_action) == ('del', 'replace'):
                raise ValueError('Attempt to replace deleted path %r.' % path)
            elif (old_action, new_action) == ('replace', 'new'):
                raise ValueError('Attempt to add %r twice.' % path)
            elif (old_action, new_action) == ('replace', 'del'):
                if old_content[1][1] != new_content[1]:
                    raise ValueError('Attempt to delete wrong content %r, %r' %
                        (old_content, new_content))
                merged_content[path] = ('del', old_content[1][0])
            elif (old_action, new_action) == ('replace', 'replace'):
                if old_content[1][1] != new_content[1][0]:
                    raise ValueError('Attempt to replace wrong content %r, %r' %
                        (old_content, new_content))
                merged_content[path] = ('replace', (old_content[1][0], new_content[1][1]))
            else:
                raise ValueError("Unknown action pair %r" % (
                    (old_action, new_action),))
        # backdoor for speed - XXX may be premature. 
        self.journal.paths.update(journal.paths)
        self.journal.paths.update(merged_content)
        for path in pending_del_paths:
            del self.journal.paths[path]

    def as_tree(self):
        """Convert a from-null combined journal into a tree.

        The tree is represented as a simple dict (path -> content) where content
        is either another dict, for directories, or the kind_data for the path.
        
        :return: A dict representing the tree that the journal would create if
            replayed.
        :raises ValueError: If the journal contains any delete, replace actions
            or items with missing parents.
        """
        result = {}
        for path, (action, kind_data) in sorted(self.journal.paths.iteritems()):
            if action in ('del', 'replace'):
                raise ValueError(
                    'cannot generate a tree representation for a partial '
                    ' journal, path %r is not new.' % path)
            segments = path.split('/')
            cwd = result
            for segment in segments[:-1]:
                try:
                    cwd = cwd[segment]
                except KeyError:
                    raise ValueError('Missing parent dir for path %r' % path)
            if kind_data.kind == 'dir':
                cwd[segments[-1]] = {}
            else:
                cwd[segments[-1]] = kind_data
        return result


class DiskUpdater(object):
    """Create a journal based on local disk and a tree representation.

    You can get a tree representation by using a Combiner to combine several
    journals (including a full snapshot, or starting from empty).

    :ivar tree: The tree to compare with.
    :ivar transport: The transport to read disk data from.
    :ivar last_timestamp: The timestamp of the most recent journal: all files
        modified more than 3 seconds before this timestamp are assumed to be
        unchanged.
    :ivar ui: A ui object to send output to.
    :ivar journal: The journal being built up.
    :ivar name: The name of the mirror set the journal will be updating. Used
        to include the mirror definition.
    :ivar include_re: The compiled re that, when it matches, indicates a path
        should be included.
    :ivar exclude_re: The compield re that, when it matches, indicates a path
        should only be included if it also matches the include_re. Note that
        only the exact path is considered: if you have an exclude rule '^foo'
        and an include rule '^foo/bar', once the directory 'foo' is excluded,
        'foo/bar' will not be examined and thus wont end up included. To deal
        with cases like that, either use a negative lookahead instead - exclude
        '^foo/(?!bar(?$|/)', or exclude paths starting with foo, and include
        both foo and bar explicitly: exclude '^foo(?:$|/)' and include '^foo$',
        '^foo/bar(?$|/)'.
    :ivar filter_callback: A filter which is applid to all paths. If the filter
        returns False for a path, the path is considered to be excluded (as if
        it had matched the exclude_re); if the filter returns True, then the
        path is considered included as if it matched the include_re. Finally,
        if the filter returns None, the path is not influenced by the filter
        callback.
    """

    def __init__(self, tree, transport, name, last_timestamp, ui,
        excludes=(), includes=(), filter_callback=lambda path:None,
        known_changes=None):
        """Create a DiskUpdater.

        :param tree: The tree to compare with.
        :param transport: The transport to read disk data from.
        :param name: The mirror set name, used to include its config in the
            mirror definition.
        :param last_timestamp: The timestamp of the most recent journal: all
            files modified more than 3 seconds before this timestamp are
            assumed to be unchanged. 3 seconds is chosen because it is larger
            than the 2 second fuzz needed to deal with FAT file systems.
        :param ui: A ui object to send output to.
        :param excludes: An optional list of uncompiled regexes to include in
            the exclude_re.
        :param includes: An optional list of uncompiled regexes to include in
            the include_re.
        :param filter_callback: A path filter. See the class docstring for
            details.
        :param known_changes: Either None, or a list of abspaths for changes
            known within the content root being scanned. If not None, then
            the directory structure on disk is not walked; rather the known
            changes are used to detect changes. As a special case, the children
            of directories within known_changes that were not present in the
            last mirror update are scanned on disk.
        """
        self.tree = tree
        self.transport = transport
        self.name = name
        self.last_timestamp = last_timestamp
        self.ui = ui
        self.journal = Journal()
        includes = [r'(?:^|/)\.lmirror/sets(?:$|/%s(?:$|/))' % name
            ] + list(includes)
        self.include_re = re.compile(self._make_re_str(includes))
        excludes = [r'(?:^|/)\.lmirror/'] + list(excludes)
        self.exclude_re = re.compile(self._make_re_str(excludes))
        self.filter_callback = filter_callback
        self.known_changes = known_changes

    def _make_re_str(self, re_strs):
        re_strs = ['(?:%s)' % re_str for re_str in re_strs]
        return '|'.join(re_strs)

    def _real_dir_contents(self, dirname):
        """Get the contents of dirname from disk."""
        return self.transport.list_dir(dirname)
    
    def _known_dir_contents(self, dirname):
        """Get the contents of dirname from self.known_changes."""
        cwd = self._known_changes_dirs
        if dirname == '':
            return cwd.keys()
        elements = dirname.split('/')
        for element in elements:
            cwd = cwd[element][1]
        return cwd.keys()

    def finished(self):
        """Return the journal obtained by scanning the disk."""
        if self.known_changes is None:
            return self._finished_scan(self._real_dir_contents)
        else:
            self._cache_known_changes()
            return self._finished_scan(self._known_dir_contents,
                missing_is_unchanged=True)

    def _cache_known_changes(self):
        """Structure known_changes into a dict for lookups."""
        root_prefix = self.transport.local_abspath('.')
        root = {}
        for path in self.known_changes:
            path = path[len(root_prefix) + 1:]
            elements = path.split('/')
            cwd = root
            for pos, element in enumerate(elements):
                dirname = '/'.join(elements[:pos + 1])
                subdir = cwd.setdefault(element, (dirname, {}))
                cwd = subdir[1]
        self._known_changes_dirs = root

    def _finished_scan(self, dir_contents, missing_is_unchanged=False):
        """Perform finished() by scanning the disk.
        
        :param dir_contents: A callback to get the contents of a directory.
        :param missing_is_unchanged: If True, a path not listed in a directory
            is unchanged, rather than missing.
        """
        pending = ['']
        while pending:
            dirname = pending.pop(-1)
            names = dir_contents(dirname)
            # NB: quadratic in lookup here due to presence in inner loop:
            # consider tuning.
            segments = dirname.split('/')
            cwd = self.tree
            for segment in segments:
                if not segment:
                    continue
                try:
                    cwd = cwd[segment]
                except KeyError:
                    # totally new directory - added to journal by the directory
                    # above.
                    cwd = {}
            # tree_names contains the last recorded set of names.
            tree_names = set(cwd)
            names = set(names)
            if not missing_is_unchanged:
                for name in tree_names - names:
                    # deletes
                    path = dirname and ('%s/%s' % (dirname, name)) or name
                    old_kind_details = cwd[name]
                    if type(old_kind_details) is dict:
                        self._gather_deleted_dir(path, old_kind_details)
                        old_kind_details = DirContent()
                    self.journal.add(path, 'del', old_kind_details)
            new_names = names - tree_names
            for name in names:
                path = dirname and ('%s/%s' % (dirname, name)) or name
                if self._skip_path(path):
                    if name in tree_names:
                        # Newly excluded.
                        old_kind_details = cwd[name]
                        if type(old_kind_details) is dict:
                            self._gather_deleted_dir(path, old_kind_details)
                            old_kind_details = DirContent()
                        self.journal.add(path, 'del', old_kind_details)
                    continue
                try:
                    statinfo = self.transport.stat(path)
                except errors.NoSuchFile:
                    # This file doesn't actually exist: may be concurrent
                    # delete, or a seen change from a changes list.
                    if name in tree_names:
                        # A delete.
                        old_kind_details = cwd[name]
                        if type(old_kind_details) is dict:
                            self._gather_deleted_dir(path, old_kind_details)
                            old_kind_details = DirContent()
                        self.journal.add(path, 'del', old_kind_details)
                    continue
                mtime = getattr(statinfo, 'st_mtime', 0)
                kind = osutils.file_kind_from_stat_mode(statinfo.st_mode)
                if (kind != 'directory' and
                    (self.last_timestamp - mtime > 3 and mtime)
                    and name not in new_names):
                    # We have to look inside directories always; things that
                    # are older than 3 seconds we can trust even FAT to not
                    # be lying about the last-modification (it has 2 second
                    # granularity) and finally its not new (new things have
                    # to be scanned always).
                    continue
                if kind == 'file':
                    f = self.transport.get(path)
                    try:
                        disk_size, disk_sha1 = osutils.size_sha_file(f)
                    finally:
                        f.close()
                    new_kind_details = FileContent(disk_sha1, disk_size, statinfo.st_mtime)
                elif kind == 'symlink':
                    new_kind_details = SymlinkContent(os.readlink(self.transport.local_abspath(path)))
                elif kind == 'directory':
                    new_kind_details = DirContent()
                    pending.append(path)
                else:
                    raise ValueError('unknown kind %r for %r' % (kind, path))
                if name in new_names:
                    self.journal.add(path, 'new', new_kind_details)
                else:
                    old_kind_details = cwd[name]
                    if type(old_kind_details) is dict:
                        old_kind_details = DirContent()
                    if old_kind_details != new_kind_details:
                        self.journal.add(path, 'replace', (old_kind_details,
                            new_kind_details))
        for path, (action, details) in self.journal.paths.iteritems():
            self.ui.output_log(4, __name__, 'Journalling action %s for %r' % (
                action, path))
        return self.journal

    def _gather_deleted_dir(self, path, dirdict):
        # List what the tree thought it had as deletes.
        pending = [(path, dirdict)]
        while pending:
            dirname, cwd = pending.pop(-1)
            for name, old_kind_details in cwd.iteritems():
                path = dirname and ('%s/%s' % (dirname, name)) or name
                if type(old_kind_details) is dict:
                    pending.append((path, old_kind_details))
                    old_kind_details = DirContent()
                self.journal.add(path, 'del', old_kind_details)

    def _skip_path(self, path):
        """Should path be skipped?"""
        if (path.endswith('.lmirror/metadata') or
            path.endswith('.lmirrortemp')):
            # metadata is transmitted by the act of fetching the
            # journal.
            self.ui.output_log(1, __name__,
                "Skipping %r because it is lmirror metadata/temp file" % path)
            return True
        filter_result = self.filter_callback(path)
        excluded = filter_result is False
        included = filter_result is True
        if excluded or self.exclude_re.search(path):
            if not (included or self.include_re.search(path)):
                self.ui.output_log(1, __name__,
                    "Skipping %r because it is excluded." % path)
                return True
            else:
                self.ui.output_log(1, __name__,
                    "Included %r because it is included." % path)
                return False
        else:
            self.ui.output_log(1, __name__,
                "Included %r because it is not excluded." % path)
            return False


class FilterCombiner(object):
    """An updater filter to combine other filters.

    This holds a list of filters to combine.  Filters that return True short
    circuit further evaluation. If no filter returns True, if any return False
    False is returned, otherwise None.

    :ivar filters: The filters.
    """

    def __init__(self, *filters):
        """Create a FilterCombiner.

        :param filters: The filters to combine.
        """
        self.filters = filters

    def __call__(self, path):
        result = None
        for filter in self.filters:
            next_result = filter(path)
            if next_result:
                return True
            if next_result is False:
                result = next_result
        return result


class ProcessFilter(object):
    """A filter that uses an external process to perform filtering.

    The process is communicated with via a line based protocol. For each path
    to be filtered, the path + \n are written to the process, and a response
    read back in. The response should be one of True\n, False\n or None\n.

    :ivar proc: The process being used to do the filtering. This must be a 
        subprocess.Popen or similar object; in particular its stdin must
        support write(), and its stdout must support readline(). The process
        is not started, not closed by ProcessFilter - the caller should take
        care of that.
    :ivar ui: The UI for logging.
    :ivar description: The description of the process for logging.
    """

    def __init__(self, proc, ui, description):
        """Create a ProcessFilter using proc to do the filtering.

        :param proc: A subprocess.Popen or similar object with stdin and stdout
            file like objects that can have write() and readline() called on them.
        :param ui: UI to log actions to.
        :param description: How to describe the helper.
        """
        self.proc = proc
        self._results = {'True\n': True, 'False\n': False, 'None\n': None}
        self.ui = ui
        self.description = description

    def __call__(self, path):
        """Filter path. See FilterCombiner's docstring for details.
        
        :param path: The path to filter.
        """
        self.proc.stdin.write("%s\n" % path)
        result = self._results[self.proc.stdout.readline()]
        self.ui.output_log(1, __name__, "helper %s filtered %r with result %r"
            % (self.description, path, result))
        return result


class Journal(object):
    """A journal of changes to a file system.
    
    :ivar paths: The paths that the journal alters. A dict from path to 
        action, kind_data.
    """

    def __init__(self):
        """Create a Journal."""
        self.paths = {}

    def add(self, relpath, action, kind_data):
        """Add a path to the journal.
        
        :param relpath: The path to journal.
        :param action: One of new, del, replace.
        :param kind_data: The data for the thing being added/deleted/replaced.
            In the special case of replacement this should be a two-tuple of
            the old data and the new data.
        """
        if relpath in self.paths:
            raise ValueError('path %r is already in use.' % relpath)
        if action == 'replace':
            if len(kind_data) != 2 or not isinstance(kind_data[0], PathContent):
                raise ValueError(
                    'looks like only one kind_data in replace action: %r' %
                    (kind_data,))
        self.paths[relpath] = (action, kind_data)

    def as_bytes(self):
        """Return a byte representation of this journal.

        The representation can be parsed by l_mirror.journals.parse. The
        structure is a header ('l-mimrror-journal-2\n') followed by '\\0'
        delimited tokens. These follow the sequence PATH, ACTION, KIND_DATA* and
        mirror the parameters to ``add``.

        :return: A bytesequence.
        """
        order = sorted(self.paths.items())
        output = []
        for path, (action, kind_data) in order:
            output.append(path)
            output.append(action)
            if action == 'replace':
                output.extend(kind_data[0].as_tokens())
                output.extend(kind_data[1].as_tokens())
            else:
                output.extend(kind_data.as_tokens())
        return 'l-mirror-journal-2\n' + '\0'.join(output)

    def as_groups(self):
        """Create a series of groups that can be acted on to apply the journal.
        
        :Return: An iterator of groups. Each group is a list of (action, path,
            content).
        """
        groups = []
        adds = []
        deletes = []
        replaces = []
        for path, (action, content) in self.paths.iteritems():
            if action == 'new':
                adds.append((action, path, content))
            elif action == 'del':
                deletes.append((action, path, content))
            elif action == 'replace':
                replaces.append((action, path, content))
            else:
                raise ValueError('unknown action %r for %r' % (action, path))
        # Ordering /can/ be more complex than just adds/replace/deletes:
        # might have an add blocked on a replace making a dir above it. Likewise
        # A delete might have to happen before a replace when a dir becomes a 
        # file. When we do smarter things, we'll have to just warn that we may
        # not honour general goals/policy if the user has given us such a
        # transform.
        # For now, simplest thing possible - want to get the concept performance
        # evaluated.
        adds.sort()
        replaces.sort(reverse=True)
        # Children go first when deleting a tree
        deletes.sort(reverse=True)
        return  [adds, replaces, deletes]


def parse(a_bytestring):
    """Parse a_bytestring into a journal.
    
    :return: A Journal.
    """
    header1 = 'l-mirror-journal-1\n'
    header2 = 'l-mirror-journal-2\n'
    if not a_bytestring.startswith(header1):
        if not a_bytestring.startswith(header2):
            raise ValueError('Not a journal: missing header %r' % (
                a_bytestring,))
        else:
            def parse_kind_data(tokens, pos):
                kind = tokens[pos]
                pos += 1
                if kind == 'file':
                    return FileContent(tokens[pos], int(tokens[pos+1]), float(tokens[pos+2])), pos + 3
                elif kind == 'dir':
                    return DirContent(), pos
                elif kind == 'symlink':
                    return SymlinkContent(tokens[pos]), pos + 1
                else:
                    raise ValueError('unknown kind %r at token %d.' % (kind, pos))
            content = a_bytestring[len(header2):]
    else:
        def parse_kind_data(tokens, pos):
            kind = tokens[pos]
            pos += 1
            if kind == 'file':
                return FileContent(tokens[pos], int(tokens[pos+1]), None), pos + 2
            elif kind == 'dir':
                return DirContent(), pos
            elif kind == 'symlink':
                return SymlinkContent(tokens[pos]), pos + 1
            else:
                raise ValueError('unknown kind %r at token %d.' % (kind, pos))
        content = a_bytestring[len(header1):]
    tokens = content.split('\x00')
    result = Journal()
    pos = 0
    if tokens[-1] == '':
        del tokens[-1]
    while pos < len(tokens):
        path = tokens[pos]
        pos += 1
        action = tokens[pos]
        pos += 1
        if action in ('new', 'del'):
            kind_data, pos = parse_kind_data(tokens, pos)
        elif action == 'replace':
            kind_data1, pos = parse_kind_data(tokens, pos)
            kind_data2, pos = parse_kind_data(tokens, pos)
            kind_data = kind_data1, kind_data2
        result.add(path, action, kind_data)
    return result


class Action(object):
    """An action that can be taken.
    
    :ivar type: new/replace/del - the action type.
    :ivar path: The path being acted on.
    :ivar content: The content being acted on. For deletes the old content, for
        new the content, and for replaces the old, new content.
    """
    
    def __init__(self, action_type, path, content):
        """Create an Action."""
        self.type = action_type
        self.path = path
        self.content = content

    def __repr__(self):
        return "Action: %r %s %s" % (self.path, self.type, self.content)

    def as_bytes(self):
        """Return a generator of bytes for this action.

        This contains the type
        
        :param sourcedir: A Transport to read new file content from.
        """
        if self.type == 'replace':
            content_list = []
            content_list.extend(self.content[0].as_tokens())
            content_list.extend(self.content[1].as_tokens())
            content_bytes = "\x00".join(content_list)
            content = self.content[1]
        else:
            content_bytes = '\x00'.join(self.content.as_tokens())
            if self.type == 'new':
                content = self.content
            else:
                content = None
        yield "%s\x00%s\x00%s\x00" % (self.path, self.type, content_bytes)
        if content and content.kind == 'file':
            source = self.get_file()
            remaining = content.length
            while remaining:
                read_size = min(remaining, 65536)
                read_content = source.read(read_size)
                remaining -= len(read_content)
                if not read_content:
                    raise ValueError('0 byte read, expected %d' % read_size)
                yield read_content

    def get_file(self):
        """Get a file like object for file content for this action."""
        raise NotImplementedError(self)

    def ignore_file(self):
        """Tell the action that its file content is being skipped."""


class TransportAction(Action):
    """An Action which gets file content from a transport.
    
    :ivar sourcedir: Transport to read file content from.
    """

    def __init__(self, action_type, path, content, sourcedir, ui):
        Action.__init__(self, action_type, path, content)
        self.sourcedir = sourcedir
        self.ui = ui

    def get_file(self):
        """Get the content for a new file as a file-like object."""
        return self.sourcedir.get(self.path)

    def ignore_file(self):
        if type(self.content) is tuple:
            content = self.content[1]
        else:
            content = self.content
        self.ui.output_log(
            4, __name__, 'Ignoring %s %r' % (content.kind, self.path))


class StreamedAction(Action):
    """An Action which gets file content from a FromFileGenerator."""

    def __init__(self, action_type, path, content, generator, ui):
        Action.__init__(self, action_type, path, content)
        self.generator = generator
        self.ui = ui

    def get_file(self):
        if self.type == 'replace':
            content = self.content[1]
        else:
            content = self.content
        if content.kind != 'file':
            raise ValueError('invalid call to get_file: kind is %r' %
                content.kind)
        return BufferedFile(self.generator, content.length, self.ui)

    def ignore_file(self):
        if type(self.content) is tuple:
            content = self.content[1]
        else:
            content = self.content
        self.ui.output_log(
            4, __name__, 'Ignoring %s %r' % (content.kind, self.path))
        if self.type != 'del':
            self.get_file().close()


class BufferedFile(object):
    """A file-like object which reads from a FromFileGenerator's stream."""

    def __init__(self, generator, remaining, ui):
        self.generator = generator
        self.remaining = remaining
        self.ui = ui

    def read(self, count=None):
        if count is None:
            count = self.remaining
        read_size = min(self.remaining, count)
        self.ui.output_log(1, __name__, "Reading from stream, read_size=%d remaining=%d" % (read_size, self.remaining))
        if not read_size:
            return ''
        read_content = self.generator._next_bytes(read_size)
        self.remaining -= len(read_content)
        return read_content

    def close(self):
        while self.remaining:
            self.read()


class ReplayGenerator(object):
    """Generate the data needed to perform a replay of a journal.

    The stream method returns a generator of objects which can be either
    converted to an action, or to bytes.

    :ivar journal: The journal being generated from.
    :ivar sourcedir: The transport content is read from.
    :ivar ui: A UI for reporting with.
    """

    def __init__(self, journal, sourcedir, ui):
        """Create a ReplayGenerator.

        :param journal: The journal to replay.
        :param sourcedir: The transport to read from.
        :param ui: The ui to use for reporting.
        """
        self.journal = journal
        self.sourcedir = sourcedir
        self.ui = ui

    def stream(self):
        """Generate the stream."""
        groups = self.journal.as_groups()
        for group in groups:
            for action, path, content in group:
                yield TransportAction(
                    action, path, content, self.sourcedir, self.ui)

    def as_bytes(self):
        """Return a generator of bytestrings for this generator's content."""
        for item in self.stream():
            for segment in item.as_bytes():
                yield segment


class FromFileGenerator(object):
    """A ReplayGenerator that pulls from a file in read-once, no-seeking mode.

    This is used for streaming from HTTP servers.
    """

    def __init__(self, stream, ui):
        self._stream = stream
        self.buffered_bytes = []
        self.ui = ui

    def parse_kind_data(self, tokens, pos):
        kind = tokens[pos]
        pos += 1
        if kind == 'file':
            mtime = tokens[pos+2]
            if mtime == "None":
                mtime = None
            else:
                mtime = float(mtime)
            return FileContent(tokens[pos], int(tokens[pos+1]), mtime), pos + 3
        elif kind == 'dir':
            return DirContent(), pos
        elif kind == 'symlink':
            return SymlinkContent(tokens[pos]), pos + 1
        else:
            raise ValueError('unknown kind %r at token %d.' % (kind, pos))

    def stream(self):
        """Generate an object-level stream."""
        while True:
            # TODO: make this more efficient: but wait till its shown to be a
            # key issue to address.
            some_bytes = self._next_bytes(4096)
            if not some_bytes:
                return
            tokens = some_bytes.split('\x00')
            path = tokens[0]
            action = tokens[1]
            kind = tokens[2]
            if action in ('new', 'del'):
                kind_data, pos = self.parse_kind_data(tokens, 2)
            elif action == 'replace':
                kind_data1, pos = self.parse_kind_data(tokens, 2)
                kind_data2, pos = self.parse_kind_data(tokens, pos)
                kind_data = kind_data1, kind_data2
            else:
                raise ValueError('unknown action %r' % action)
            self._push('\x00'.join(tokens[pos:]))
            yield StreamedAction(action, path, kind_data, self, self.ui)

    def _next_bytes(self, count):
        """Return up to count bytes.

        :return some bytes: An empty string indicates end of file.
        """
        if count <= 0:
            raise ValueError('attempt to read 0 bytes!')
        if not self.buffered_bytes:
            return self._stream.read(count)
        if count >= len(self.buffered_bytes[0]):
            partial = self.buffered_bytes.pop(0)
            if len(partial) < count:
                return partial + self._next_bytes(count - len(partial))
            else:
                return partial
        result = self.buffered_bytes[0][:count]
        self.buffered_bytes[0] = self.buffered_bytes[0][count:]
        return result

    def _push(self, unused_bytes):
        self.buffered_bytes.insert(0, unused_bytes)

    def as_bytes(self):
        content = self._stream.read(65536)
        while content:
            yield content
            content = self._stream.read(65536)


class CancellableDelete:
    """Group a number of actions and allow them to all be cancelled at once."""

    def __init__(self, content, contentdir, path, ui):
        self.cancelled = False
        self.content = content
        self.contentdir = contentdir
        self.path = path
        self.ui = ui

    def delete(self):
        # TODO: we may want to warn or perhaps have a strict mode here.
        # e.g. handle already deleted things. This should become clear
        # when recovery mode is done.
        if self.cancelled:
            return
        self.ui.output_log(4, __name__, 'Deleting %s %r' %
            (self.content.kind, self.path))
        try:
            if self.content.kind != 'dir':
                self.contentdir.delete(self.path)
            else:
                self.contentdir.rmdir(self.path)
        except errors.NoSuchFile:
            # Already gone, ignore it.
            pass


class TransportReplay(object):
    """Replay a journal reading content from a transport.

    The replay() method is the main interface.
    
    :ivar generator: A ReplayGenerator to read the actions and content from.
        The generator is not trusted - its actions are cross checked against
        journal.
    :ivar contentdir: The transport to apply changes to.
    :ivar journal: The journal to apply.
    :ivar ui: A UI for reporting with.
    """

    def __init__(self, journal, generator, contentdir, ui):
        """Create a TransportReplay for journal from generator to contentdir.

        :param journal: The journal to replay.
        :param generator: The ReplayGenerator to get new content from. All the
            actions it supplies are cross checked against journal.
        :param contentdir: The transport to apply changes to.
        :param ui: The ui to use for reporting.
        """
        self.journal = journal
        self.generator = generator.stream()
        self.contentdir = contentdir
        self.ui = ui

    def replay(self):
        """Replay the journal."""
        groups = self.journal.as_groups()
        for pos, group in enumerate(groups):
            self.ui.output_log(4, __name__, "Processing group %d of %d with %d elements" % (pos, len(groups), len(group)))
            elements = set(group)
            assert len(elements) == len(group)
            to_rename = []
            to_delete = []
            try:
                while elements:
                    self.ui.output_log(3, __name__, "Waiting for element, %d remaining" % (len(elements),))
                    action_obj = self.generator.next()
                    # If this fails, generator has sent us some garbage.
                    elements.remove((action_obj.type, action_obj.path,
                        action_obj.content))
                    action = action_obj.type
                    path = action_obj.path
                    content = action_obj.content
                    if action == 'new':
                        self.put_with_check(path, content, action_obj)()
                    if action == 'replace':
                        # TODO: (again, to do with replacing files with dirs:)
                        #       do not delay creating dirs needed for files
                        #       below them, or create the files in the temp
                        #       dir.
                        cancellable = CancellableDelete(
                            content[0], self.contentdir, path, self.ui)
                        to_rename.append(
                            self.put_with_check(path, content[1], action_obj,
                                cancellable))
                        to_delete.append(cancellable)
                    if action == 'del':
                        cancellable = CancellableDelete(
                            content, self.contentdir, path, self.ui)
                        to_delete.append(cancellable)
                for cancellable in to_delete:
                    # Second pass on the group to handle deletes as late as possible
                    cancellable.delete()
            finally:
                for doit in to_rename:
                    doit()

    def ensure_dir(self, path):
        """Ensure that path is a dir.

        If the path exists and is not a dir, an error is raised.
        """
        try:
            self.contentdir.mkdir(path)
        except errors.FileExists:
            st = self.contentdir.stat(path)
            if osutils.file_kind_from_stat_mode(st.st_mode) != 'directory':
                raise ValueError('unexpected non-directory at %r' % path)

    def check_file(self, path, content):
        """Check if there is a file at path with content.

        :raises: ValueError if there a non-file at path.
        :return: True if there is a file present with the right content.
        """
        try:
            st = self.contentdir.stat(path)
            if osutils.file_kind_from_stat_mode(st.st_mode) != 'file':
                raise ValueError('unexpected non-file at %r' % path)
            f = self.contentdir.get(path)
            try:
                self.ui.output_log(4, __name__, 'Hashing %s %r' % (content.kind, path))
                size, sha1 = osutils.size_sha_file(f)
            finally:
                f.close()
            return sha1 == content.sha1 and size == content.length
        except errors.NoSuchFile:
            return False

    def ensure_file(self, tempname, path, content):
        """Ensure that there is a file with content content at path.

        :param tempname: The name of a temporary file with the needed content.
        """
        if self.contentdir.has(path):
            self.contentdir.delete(path)
        self.contentdir.rename(tempname, path)

    def ensure_link(self, realpath, target):
        """Ensure that realpath is a link to target.
        
        An error is raised if something is in the way.
        """
        try:
            os.symlink(target, realpath)
        except OSError, e:
            if e.errno == errno.EEXIST:
                st = os.lstat(realpath)
                if osutils.file_kind_from_stat_mode(st.st_mode) != 'symlink':
                    raise ValueError('unexpected non-symlink at %r' % realpath)
                os.unlink(realpath)
                os.symlink(target, realpath)
            else:
                raise

    def put_with_check(self, path, content, action, cancellable=None):
        """Put a_file at path checking that as received it matches content.

        :param path: A relpath.
        :param content: A content description of a file.
        :param action: An action object which can supply file content.
        :param cancellable: A Cancellable object to use when the content is
            already present locally.
        :return: A callable that will execute the rename-into-place - all the
            IO has been done before returning.
        """
        tempname = '%s.lmirrortemp' % path
        self.ui.output_log(4, __name__, 'Checking %s %r' % (content.kind, path))
        if content.kind == 'dir':
            return lambda: self.ensure_dir(path)
        elif content.kind == 'symlink':
            realpath = self.contentdir.local_abspath(path)
            return lambda: self.ensure_link(realpath, content.target)
        elif content.kind != 'file':
            raise ValueError('unknown kind %r for %r' % (content.kind, path))
        # don't download content we don't need
        try:
            if self.check_file(path, content):
                action.ignore_file()
                if cancellable:
                    cancellable.cancelled = True
                return lambda:None
        except (ValueError, IOError):
            # If we can't read the file for some reason, we obviously need to
            # write it :).
            pass
        a_file = action.get_file()
        source = _ShaFile(a_file)
        try:
            # FIXME: mode should be supplied from above, or use 0600 and chmod
            # later.
            stream = self.contentdir.open_write_stream(tempname, 0644)
            try:
                size = osutils.pumpfile(source, stream)
            finally:
                stream.close()
            # TODO: here is where we should check for a mirror-is-updating
            # case.
            if size != content.length or source.sha1.hexdigest() != content.sha1:
                self.contentdir.delete(tempname)
                raise ValueError(
                    'read incorrect content for %r, got sha %r wanted %r' % (
                    path, source.sha1.hexdigest(), content.sha1))
            if content.mtime is not None:
                try:
                    temppath = self.contentdir.local_abspath(tempname)
                except errors.NotLocalUrl, e:
                    # swallow NotLocalUrl errors: they primarily indicate that
                    # the test suite is running against memory, with files that
                    # don't exist.
                    self.ui.output_log(4, __name__,
                        'Failed to set mtime for %r - nonlocal url %r.' % (
                        tempname, self.contentdir))
                else:
                    # Perhaps the first param - atime - should be 'now'.
                    os.utime(temppath, (content.mtime, content.mtime))
        finally:
            a_file.close()
        return lambda: self.ensure_file(tempname, path, content)


class _ShaFile(object):
    """Pretend to be a file, calculating the sha and size.
    
    After reading from this file, access the sha1 and size attributes to
    get the sha and size.

    XXX: I'm sure this is a dup with something bzrlib or somewhere else. Find
    and reuse.

    :ivar sha1: A sha1 object.
    """

    def __init__(self, a_file):
        self.a_file = a_file
        self.sha1 = sha()

    def read(self, amount=None):
        result = self.a_file.read(amount)
        self.sha1.update(result)
        return result