~ubuntu-branches/ubuntu/trusty/erlang/trusty

« back to all changes in this revision

Viewing changes to erts/emulator/beam/erl_db.c

  • Committer: Bazaar Package Importer
  • Author(s): Sergei Golovan
  • Date: 2009-06-11 12:18:07 UTC
  • mfrom: (1.2.2 upstream)
  • Revision ID: james.westby@ubuntu.com-20090611121807-ks7eb4xrt7dsysgx
Tags: 1:13.b.1-dfsg-1
* New upstream release.
* Removed unnecessary dependency of erlang-os-mon on erlang-observer and
  erlang-tools and added missing dependency of erlang-nox on erlang-os-mon
  (closes: #529512).
* Removed a patch to eunit application because the bug was fixed upstream.

Show diffs side-by-side

added added

removed removed

Lines of Context:
53
53
/* Get a key from any table structure and a tagged object */
54
54
#define TERM_GETKEY(tb, obj) db_getkey((tb)->common.keypos, (obj)) 
55
55
 
56
 
/* Utility macros for determining need of auto fixtable */
 
56
 
 
57
/* How safe are we from double-hits or missed objects
 
58
** when iterating without fixation? */ 
 
59
enum DbIterSafety {
 
60
    ITER_UNSAFE,      /* Must fixate to be safe */
 
61
    ITER_SAFE_LOCKED, /* Safe while table is locked, not between trap calls */
 
62
    ITER_SAFE         /* No need to fixate at all */
 
63
};
 
64
#ifdef ERTS_SMP
 
65
#  define ITERATION_SAFETY(Proc,Tab) \
 
66
    ((IS_TREE_TABLE((Tab)->common.status) || ONLY_WRITER(Proc,Tab)) ? ITER_SAFE \
 
67
     : (((Tab)->common.status & DB_FINE_LOCKED) ? ITER_UNSAFE : ITER_SAFE_LOCKED))
 
68
#else
 
69
#  define ITERATION_SAFETY(Proc,Tab) \
 
70
    ((IS_TREE_TABLE((Tab)->common.status) || ONLY_WRITER(Proc,Tab)) \
 
71
     ? ITER_SAFE : ITER_SAFE_LOCKED)
 
72
#endif
57
73
 
58
 
#define ONLY_WRITER(P,T) (((T)->common.status & DB_PRIVATE) || \
59
 
(((T)->common.status & DB_PROTECTED) && (T)->common.owner == (P)->id))
60
 
#define ONLY_READER(P,T) (((T)->common.status & DB_PRIVATE) && \
61
 
(T)->common.owner == (P)->id)
62
74
#define DID_TRAP(P,Ret) (!is_value(Ret) && ((P)->freason == TRAP))
63
 
#define SOLE_LOCKER(P,Fixations) ((Fixations) != NULL && \
64
 
(Fixations)->next == NULL && (Fixations)->pid == (P)->id && \
65
 
(Fixations)->counter == 1)
66
 
 
67
75
 
68
76
 
69
77
/* 
155
163
}    
156
164
 
157
165
 
158
 
typedef enum { LCK_READ=1, LCK_WRITE=2 } db_lock_kind_t;
 
166
typedef enum {
 
167
    LCK_READ=1,     /* read only access */
 
168
    LCK_WRITE=2,    /* exclusive table write access */
 
169
    LCK_WRITE_REC=3 /* record write access */
 
170
} db_lock_kind_t;
159
171
 
160
172
extern DbTableMethod db_hash;
161
173
extern DbTableMethod db_tree;
175
187
 
176
188
static void fix_table_locked(Process* p, DbTable* tb);
177
189
static void unfix_table_locked(Process* p,  DbTable* tb);
 
190
static void set_heir(Process* me, DbTable* tb, Eterm heir, Eterm heir_data);
 
191
static void free_heir_data(DbTable*);
178
192
static void free_fixations_locked(DbTable *tb);
179
193
 
180
194
static int free_table_cont(Process *p,
202
216
 
203
217
static ERTS_INLINE DbTable* db_ref(DbTable* tb)
204
218
{
205
 
    if (tb != NULL)
 
219
    if (tb != NULL) {
206
220
        erts_refc_inc(&tb->common.ref, 2);
 
221
    }
207
222
    return tb;
208
223
}
209
224
 
231
246
#endif
232
247
#ifdef ERTS_SMP
233
248
        erts_smp_rwmtx_destroy(&tb->common.rwlock);
 
249
        erts_smp_mtx_destroy(&tb->common.fixlock);
234
250
#endif
235
 
        erts_db_free(ERTS_ALC_T_DB_TABLE, tb, (void *) tb, sizeof(DbTable));
 
251
        ASSERT(is_immed(tb->common.heir_data));
 
252
        erts_db_free(ERTS_ALC_T_DB_TABLE, tb, (void *) tb, sizeof(DbTable));                 
236
253
        ERTS_ETS_MISC_MEM_ADD(-sizeof(DbTable));
237
254
        return NULL;
238
255
    }
239
256
    return tb;
240
257
}
241
258
 
242
 
static ERTS_INLINE void db_init_lock(DbTable* tb, char *name)
 
259
static ERTS_INLINE void db_init_lock(DbTable* tb, char *rwname, char* fixname)
243
260
{
244
261
    erts_refc_init(&tb->common.ref, 1);
 
262
    erts_refc_init(&tb->common.fixref, 0);
245
263
#ifdef ERTS_SMP
246
 
#ifdef ERTS_ENABLE_LOCK_COUNT
247
 
    erts_smp_rwmtx_init_x(&tb->common.rwlock, name, tb->common.the_name);
248
 
#else
249
 
    erts_smp_rwmtx_init(&tb->common.rwlock, name);
250
 
#endif
 
264
# ifdef ERTS_ENABLE_LOCK_COUNT
 
265
    erts_smp_rwmtx_init_x(&tb->common.rwlock, rwname, tb->common.the_name);
 
266
    erts_smp_mtx_init_x(&tb->common.fixlock, fixname, tb->common.the_name);
 
267
# else
 
268
    erts_smp_rwmtx_init(&tb->common.rwlock, rwname);
 
269
    erts_smp_mtx_init(&tb->common.fixlock, fixname);
 
270
# endif
 
271
    tb->common.is_thread_safe = !(tb->common.status & DB_FINE_LOCKED);
251
272
#endif
252
273
}
253
274
 
254
275
static ERTS_INLINE void db_lock_take_over_ref(DbTable* tb, db_lock_kind_t kind)
255
276
{
256
277
#ifdef ERTS_SMP
257
 
    if (kind == LCK_WRITE)
258
 
        erts_smp_rwmtx_rwlock(&tb->common.rwlock);
 
278
    ASSERT(tb != meta_pid_to_tab && tb != meta_pid_to_fixed_tab);
 
279
    if (tb->common.type & DB_FINE_LOCKED) {
 
280
        if (kind == LCK_WRITE) {           
 
281
            erts_smp_rwmtx_rwlock(&tb->common.rwlock);
 
282
            tb->common.is_thread_safe = 1;
 
283
        } else {        
 
284
            erts_smp_rwmtx_rlock(&tb->common.rwlock);
 
285
            ASSERT(!tb->common.is_thread_safe);
 
286
        }
 
287
    }
259
288
    else
260
 
        erts_smp_rwmtx_rlock(&tb->common.rwlock);
 
289
    { 
 
290
        switch (kind) {
 
291
        case LCK_WRITE:
 
292
        case LCK_WRITE_REC:
 
293
            erts_smp_rwmtx_rwlock(&tb->common.rwlock);
 
294
            break;
 
295
        default:
 
296
            erts_smp_rwmtx_rlock(&tb->common.rwlock);
 
297
        }
 
298
        ASSERT(tb->common.is_thread_safe);
 
299
    }
261
300
#endif
262
301
}
263
302
 
272
311
static ERTS_INLINE void db_unlock(DbTable* tb, db_lock_kind_t kind)
273
312
{
274
313
#ifdef ERTS_SMP
275
 
    if (kind == LCK_WRITE)
276
 
        erts_smp_rwmtx_rwunlock(&tb->common.rwlock);
277
 
    else
278
 
        erts_smp_rwmtx_runlock(&tb->common.rwlock);
 
314
    ASSERT(tb != meta_pid_to_tab && tb != meta_pid_to_fixed_tab);
 
315
 
 
316
    if (tb->common.type & DB_FINE_LOCKED) {
 
317
        if (tb->common.is_thread_safe) {
 
318
            ASSERT(kind == LCK_WRITE);
 
319
            tb->common.is_thread_safe = 0;
 
320
            erts_smp_rwmtx_rwunlock(&tb->common.rwlock);
 
321
        }
 
322
        else {
 
323
            ASSERT(kind != LCK_WRITE);
 
324
            erts_smp_rwmtx_runlock(&tb->common.rwlock);
 
325
        }
 
326
    }
 
327
    else {
 
328
        ASSERT(tb->common.is_thread_safe);
 
329
        switch (kind) {
 
330
        case LCK_WRITE:
 
331
        case LCK_WRITE_REC:
 
332
            erts_smp_rwmtx_rwunlock(&tb->common.rwlock);
 
333
            break;
 
334
        default:
 
335
            erts_smp_rwmtx_runlock(&tb->common.rwlock);
 
336
        }
 
337
    }
279
338
#endif
280
339
    (void) db_unref(tb); /* May delete table... */
281
340
}
282
 
    
 
341
 
 
342
 
 
343
static ERTS_INLINE void db_meta_lock(DbTable* tb, db_lock_kind_t kind)
 
344
{
 
345
    ASSERT(tb == meta_pid_to_tab || tb == meta_pid_to_fixed_tab);
 
346
    ASSERT(kind != LCK_WRITE);
 
347
    /* As long as we only lock for READ we don't have to lock at all. */
 
348
}
 
349
 
 
350
static ERTS_INLINE void db_meta_unlock(DbTable* tb, db_lock_kind_t kind)
 
351
{
 
352
    ASSERT(tb == meta_pid_to_tab || tb == meta_pid_to_fixed_tab);
 
353
    ASSERT(kind != LCK_WRITE);
 
354
}
 
355
 
283
356
static ERTS_INLINE
284
 
DbTable* db_get_table_aux(Process *p,
285
 
                          Eterm id,
286
 
                          int what,
287
 
                          db_lock_kind_t kind,
288
 
                          db_lock_kind_t (*get_kind)(DbTable *))
 
357
DbTable* db_get_table(Process *p,
 
358
                      Eterm id,
 
359
                      int what,
 
360
                      db_lock_kind_t kind)
289
361
{
290
362
    DbTable *tb = NULL;
291
363
 
324
396
        erts_smp_rwmtx_runlock(rwlock);
325
397
    }
326
398
    if (tb) {
327
 
#ifdef ERTS_SMP
328
 
            if (get_kind)
329
 
                kind = (*get_kind)(tb);
330
 
#endif
331
 
            db_lock_take_over_ref(tb, kind);
 
399
        db_lock_take_over_ref(tb, kind);
332
400
        if (tb->common.id == id && ((tb->common.status & what) != 0 || 
333
401
                                    p->id == tb->common.owner)) {
334
 
                return tb;
335
 
            }
336
 
            db_unlock(tb, kind);
 
402
            return tb;
 
403
        }
 
404
        db_unlock(tb, kind);
337
405
    }
338
 
            return NULL;
339
 
        }
340
 
 
341
 
static ERTS_INLINE DbTable* db_get_table(Process *p,
342
 
                             Eterm id,
343
 
                             int what,
344
 
                             db_lock_kind_t kind)
345
 
{
346
 
    return db_get_table_aux(p, id, what, kind, NULL);
347
 
}
348
 
 
349
 
static ERTS_INLINE DbTable* db_get_table2(Process *p,
350
 
                              Eterm id,
351
 
                              int what,
352
 
                              db_lock_kind_t (*get_kind)(DbTable *))
353
 
{
354
 
    return db_get_table_aux(p, id, what, LCK_WRITE, get_kind);
 
406
    return NULL;
355
407
}
356
408
 
357
409
/* Requires meta_main_tab_locks[slot] locked.
486
538
    return ret;
487
539
}
488
540
 
 
541
/* Do a fast fixation of a hash table.
 
542
** Must be matched by a local unfix before releasing table lock.
 
543
*/
 
544
static ERTS_INLINE void local_fix_table(DbTable* tb)
 
545
{
 
546
    erts_refc_inc(&tb->common.fixref, 1);
 
547
}           
 
548
static ERTS_INLINE void local_unfix_table(DbTable* tb)
 
549
{       
 
550
    if (erts_refc_dectest(&tb->common.fixref, 0) == 0) {
 
551
        ASSERT(IS_HASH_TABLE(tb->common.status));
 
552
        db_unfix_table_hash(&(tb->hash));
 
553
    }
 
554
}
 
555
 
489
556
 
490
557
/*
491
558
 * BIFs.
494
561
BIF_RETTYPE ets_safe_fixtable_2(BIF_ALIST_2)
495
562
{
496
563
    DbTable *tb;
497
 
 
 
564
    db_lock_kind_t kind;
498
565
#ifdef HARDDEBUG
499
566
    erts_fprintf(stderr,
500
567
                "ets:safe_fixtable(%T,%T); Process: %T, initial: %T:%T/%bpu\n",
501
568
                BIF_ARG_1, BIF_ARG_2, BIF_P->id,
502
569
                BIF_P->initial[0], BIF_P->initial[1], BIF_P->initial[2]);
503
570
#endif
504
 
    if ((tb = db_get_table(BIF_P, BIF_ARG_1, DB_READ, LCK_WRITE)) == NULL) {
505
 
        BIF_ERROR(BIF_P, BADARG);
506
 
    }
 
571
    kind = (BIF_ARG_2 == am_true) ? LCK_READ : LCK_WRITE_REC; 
 
572
 
 
573
    if ((tb = db_get_table(BIF_P, BIF_ARG_1, DB_READ, kind)) == NULL) {
 
574
            BIF_ERROR(BIF_P, BADARG);
 
575
        }
507
576
 
508
577
    if (BIF_ARG_2 == am_true) {
509
578
        fix_table_locked(BIF_P, tb);
510
579
    }
511
580
    else if (BIF_ARG_2 == am_false) {
512
 
        if (tb->common.status & DB_FIXED) {
 
581
        if (IS_FIXED(tb)) {
513
582
            unfix_table_locked(BIF_P, tb);
514
583
        }
515
584
    }
516
585
    else {
517
 
        db_unlock(tb, LCK_WRITE);
 
586
        db_unlock(tb, kind);
518
587
        BIF_ERROR(BIF_P, BADARG);
519
588
    }
520
 
    db_unlock(tb, LCK_WRITE);
 
589
    db_unlock(tb, kind);
521
590
    BIF_RET(am_true);
522
591
}
523
592
 
524
 
#ifdef ERTS_SMP
525
 
#define STEP_LOCK_TYPE(T) \
526
 
  (IS_TREE_TABLE((T)->common.type) ? LCK_WRITE : LCK_READ)
527
 
#else
528
 
#define STEP_LOCK_TYPE(T) LCK_WRITE
529
 
#endif
530
 
 
531
 
static db_lock_kind_t
532
 
step_lock_type(DbTable *tb)
533
 
{
534
 
    return STEP_LOCK_TYPE(tb);
535
 
}
536
593
 
537
594
/* 
538
595
** Returns the first Key in a table 
545
602
 
546
603
    CHECK_TABLES();
547
604
 
548
 
    tb = db_get_table2(BIF_P, BIF_ARG_1, DB_READ, step_lock_type);
 
605
    tb = db_get_table(BIF_P, BIF_ARG_1, DB_READ, LCK_READ);
549
606
 
550
607
    if (!tb) {
551
608
        BIF_ERROR(BIF_P, BADARG);
553
610
 
554
611
    cret = tb->common.meth->db_first(BIF_P, tb, &ret);
555
612
 
556
 
    db_unlock(tb, STEP_LOCK_TYPE(tb));
 
613
    db_unlock(tb, LCK_READ);
557
614
 
558
615
    if (cret != DB_ERROR_NONE) {
559
616
        BIF_ERROR(BIF_P, BADARG);
572
629
 
573
630
    CHECK_TABLES();
574
631
 
575
 
    tb = db_get_table2(BIF_P, BIF_ARG_1, DB_READ, step_lock_type);
 
632
    tb = db_get_table(BIF_P, BIF_ARG_1, DB_READ, LCK_READ);
576
633
 
577
634
    if (!tb) {
578
635
        BIF_ERROR(BIF_P, BADARG);
580
637
 
581
638
    cret = tb->common.meth->db_next(BIF_P, tb, BIF_ARG_2, &ret);
582
639
 
583
 
    db_unlock(tb, STEP_LOCK_TYPE(tb));
 
640
    db_unlock(tb, LCK_READ);
584
641
 
585
642
    if (cret != DB_ERROR_NONE) {
586
643
        BIF_ERROR(BIF_P, BADARG);
599
656
 
600
657
    CHECK_TABLES();
601
658
 
602
 
    tb = db_get_table2(BIF_P, BIF_ARG_1, DB_READ, step_lock_type);
 
659
    tb = db_get_table(BIF_P, BIF_ARG_1, DB_READ, LCK_READ);
603
660
 
604
661
    if (!tb) {
605
662
        BIF_ERROR(BIF_P, BADARG);
607
664
 
608
665
    cret = tb->common.meth->db_last(BIF_P, tb, &ret);
609
666
 
610
 
    db_unlock(tb, STEP_LOCK_TYPE(tb));
 
667
    db_unlock(tb, LCK_READ);
611
668
 
612
669
    if (cret != DB_ERROR_NONE) {
613
670
        BIF_ERROR(BIF_P, BADARG);
626
683
 
627
684
    CHECK_TABLES();
628
685
 
629
 
    tb = db_get_table2(BIF_P, BIF_ARG_1, DB_READ, step_lock_type);
 
686
    tb = db_get_table(BIF_P, BIF_ARG_1, DB_READ, LCK_READ);
630
687
 
631
688
    if (!tb) {
632
689
        BIF_ERROR(BIF_P, BADARG);
634
691
 
635
692
    cret = tb->common.meth->db_prev(BIF_P,tb,BIF_ARG_2,&ret);
636
693
 
637
 
    db_unlock(tb, STEP_LOCK_TYPE(tb));
 
694
    db_unlock(tb, LCK_READ);
638
695
 
639
696
    if (cret != DB_ERROR_NONE) {
640
697
        BIF_ERROR(BIF_P, BADARG);
655
712
    Eterm cell[2];
656
713
    DbUpdateHandle handle;
657
714
 
658
 
    if ((tb = db_get_table(BIF_P, BIF_ARG_1, DB_WRITE, LCK_WRITE)) == NULL) {
 
715
    if ((tb = db_get_table(BIF_P, BIF_ARG_1, DB_WRITE, LCK_WRITE_REC)) == NULL) {
659
716
        BIF_ERROR(BIF_P, BADARG);
660
717
    }
661
718
    if (!(tb->common.status & (DB_SET | DB_ORDERED_SET))) {
681
738
        Sint position;
682
739
 
683
740
        if (is_not_list(iter)) {
684
 
            goto bail_out;
 
741
            goto finalize;
685
742
        }
686
743
        pv = CAR(list_val(iter));    /* {Pos,Value} */
687
744
        if (is_not_tuple(pv)) {
688
 
            goto bail_out;
 
745
            goto finalize;
689
746
        }
690
747
        pvp = tuple_val(pv);
691
748
        if (arityval(*pvp) != 2 || !is_small(pvp[1])) {
692
 
            goto bail_out;
 
749
            goto finalize;
693
750
        }
694
751
        position = signed_val(pvp[1]);
695
752
        if (position < 1 || position == tb->common.keypos || 
696
753
            position > arityval(handle.dbterm->tpl[0])) {
697
 
            goto bail_out;
 
754
            goto finalize;
698
755
        }       
699
756
    }
700
757
    /* The point of no return, no failures from here on.
706
763
        db_do_update_element(&handle, signed_val(pvp[1]), pvp[2]);
707
764
    }
708
765
 
709
 
    if (handle.mustFinalize) {
710
 
        db_finalize_update_element(&handle);
711
 
    }
 
766
finalize:
 
767
    tb->common.meth->db_finalize_dbterm(&handle);
712
768
 
713
769
bail_out:
714
 
    db_unlock(tb, LCK_WRITE);
 
770
    db_unlock(tb, LCK_WRITE_REC);
715
771
 
716
772
    switch (cret) {
717
773
    case DB_ERROR_NONE:
751
807
    Eterm* hstart;
752
808
    Eterm* hend;
753
809
 
754
 
    if ((tb = db_get_table(BIF_P, BIF_ARG_1, DB_WRITE, LCK_WRITE)) == NULL) {
 
810
    if ((tb = db_get_table(BIF_P, BIF_ARG_1, DB_WRITE, LCK_WRITE_REC)) == NULL) {
755
811
        BIF_ERROR(BIF_P, BADARG);
756
812
    }
757
813
    if (!(tb->common.status & (DB_SET | DB_ORDERED_SET))) {
784
840
        Eterm incr, warp, oldcnt;
785
841
 
786
842
        if (is_not_list(iter)) {
787
 
            goto bail_out;
 
843
            goto finalize;
788
844
        }
789
845
        upop = CAR(list_val(iter));
790
846
        if (is_not_tuple(upop)) {
791
 
            goto bail_out;
 
847
            goto finalize;
792
848
        }
793
849
        tpl = tuple_val(upop);
794
850
        switch (arityval(*tpl)) {
795
851
        case 4: /* threshold specified */
796
852
            if (is_not_integer(tpl[3])) {
797
 
                goto bail_out;
 
853
                goto finalize;
798
854
            }
799
855
            warp = tpl[4];
800
856
            if (is_big(warp)) {
801
857
                halloc_size += BIG_NEED_SIZE(big_arity(warp));
802
858
            }
803
859
            else if (is_not_small(warp)) {
804
 
                goto bail_out;
 
860
                goto finalize;
805
861
            }
806
862
            /* Fall through */
807
863
        case 2:
808
864
            if (!is_small(tpl[1])) {
809
 
                goto bail_out;
 
865
                goto finalize;
810
866
            }
811
867
            incr = tpl[2];
812
868
            if (is_big(incr)) {
813
869
                halloc_size += BIG_NEED_SIZE(big_arity(incr));
814
870
            }
815
871
            else if (is_not_small(incr)) {
816
 
                goto bail_out;
 
872
                goto finalize;
817
873
            }
818
874
            position = signed_val(tpl[1]);
819
875
            if (position < 1 || position == tb->common.keypos ||
820
876
                position > arityval(handle.dbterm->tpl[0])) {
821
 
                goto bail_out;
 
877
                goto finalize;
822
878
            }
823
879
            oldcnt = handle.dbterm->tpl[position];
824
880
            if (is_big(oldcnt)) {
825
881
                halloc_size += BIG_NEED_SIZE(big_arity(oldcnt));
826
882
            }
827
883
            else if (is_not_small(oldcnt)) {
828
 
                goto bail_out;
 
884
                goto finalize;
829
885
            }
830
886
            break;
831
887
        default:
832
 
            goto bail_out;
 
888
            goto finalize;
833
889
        }
834
890
        halloc_size += 2;  /* worst growth case: small(0)+small(0)=big(2) */
835
891
    }
889
945
            break;          
890
946
        }
891
947
    }
892
 
    if (handle.mustFinalize) {
893
 
        db_finalize_update_element(&handle);
894
 
    }
895
948
 
896
949
    ASSERT(is_integer(ret) || is_nil(ret) || 
897
950
           (is_list(ret) && (list_val(ret)+list_size)==ret_list_currp));
899
952
 
900
953
    HRelease(BIF_P,hend,htop);
901
954
 
 
955
finalize:
 
956
    tb->common.meth->db_finalize_dbterm(&handle);
 
957
 
902
958
bail_out:
903
 
    db_unlock(tb, LCK_WRITE);
 
959
    db_unlock(tb, LCK_WRITE_REC);
904
960
 
905
961
    switch (cret) {
906
962
    case DB_ERROR_NONE:
920
976
{
921
977
    DbTable* tb;
922
978
    int cret = DB_ERROR_NONE;
923
 
    Eterm ret = am_true;
924
979
    Eterm lst;
925
980
    DbTableMethod* meth;
 
981
    db_lock_kind_t kind;
926
982
 
927
983
    CHECK_TABLES();
928
984
 
929
 
    if ((tb = db_get_table(BIF_P, BIF_ARG_1, DB_WRITE, LCK_WRITE)) == NULL) {
 
985
    /* Write lock table if more than one object to keep atomicy */
 
986
    kind = ((is_list(BIF_ARG_2) && CDR(list_val(BIF_ARG_2)) != NIL)
 
987
            ? LCK_WRITE : LCK_WRITE_REC);
 
988
 
 
989
    if ((tb = db_get_table(BIF_P, BIF_ARG_1, DB_WRITE, kind)) == NULL) {
930
990
        BIF_ERROR(BIF_P, BADARG);
931
991
    }
932
992
    if (BIF_ARG_2 == NIL) {
933
 
        db_unlock(tb, LCK_WRITE);
 
993
        db_unlock(tb, kind);
934
994
        BIF_RET(am_true);
935
995
    }
936
996
    meth = tb->common.meth;
945
1005
            goto badarg;
946
1006
        }
947
1007
        for (lst = BIF_ARG_2; is_list(lst); lst = CDR(list_val(lst))) {
948
 
            cret = meth->db_put(BIF_P, tb, CAR(list_val(lst)), &ret);
 
1008
            cret = meth->db_put(tb, CAR(list_val(lst)), 0);
949
1009
            if (cret != DB_ERROR_NONE)
950
1010
                break;
951
1011
        }
954
1014
            (arityval(*tuple_val(BIF_ARG_2)) < tb->common.keypos)) {
955
1015
            goto badarg;
956
1016
        }
957
 
        cret = meth->db_put(BIF_P, tb, BIF_ARG_2, &ret);
 
1017
        cret = meth->db_put(tb, BIF_ARG_2, 0);
958
1018
    }
959
1019
 
960
 
    db_unlock(tb, LCK_WRITE);
 
1020
    db_unlock(tb, kind);
961
1021
    
962
1022
    switch (cret) {
963
1023
    case DB_ERROR_NONE:
964
 
        BIF_RET(ret);
 
1024
        BIF_RET(am_true);
965
1025
    case DB_ERROR_SYSRES:
966
1026
        BIF_ERROR(BIF_P, SYSTEM_LIMIT);
967
1027
    default:
968
1028
        BIF_ERROR(BIF_P, BADARG);
969
1029
    }
970
1030
 badarg:
971
 
    db_unlock(tb, LCK_WRITE);
 
1031
    db_unlock(tb, kind);
972
1032
    BIF_ERROR(BIF_P, BADARG);    
973
1033
}
974
1034
 
981
1041
    DbTable* tb;
982
1042
    int cret = DB_ERROR_NONE;
983
1043
    Eterm ret = am_true;
984
 
    Eterm lst;
985
 
    Eterm lookup_ret;
986
 
    DbTableMethod* meth;
 
1044
    Eterm obj;
 
1045
    db_lock_kind_t kind;
987
1046
 
988
1047
    CHECK_TABLES();
989
1048
 
990
 
    if ((tb = db_get_table(BIF_P, BIF_ARG_1, DB_WRITE, LCK_WRITE)) == NULL) {
 
1049
    if (is_list(BIF_ARG_2)) {
 
1050
        if (CDR(list_val(BIF_ARG_2)) != NIL) {
 
1051
            Eterm lst;
 
1052
            Eterm lookup_ret;
 
1053
            DbTableMethod* meth;
 
1054
 
 
1055
            /* More than one object, use LCK_WRITE to keep atomicy */
 
1056
            kind = LCK_WRITE;
 
1057
            tb = db_get_table(BIF_P, BIF_ARG_1, DB_WRITE, kind);
 
1058
            if (tb == NULL) {
 
1059
                BIF_ERROR(BIF_P, BADARG);
 
1060
            }
 
1061
            meth = tb->common.meth;
 
1062
            for (lst = BIF_ARG_2; is_list(lst); lst = CDR(list_val(lst))) {
 
1063
                if (is_not_tuple(CAR(list_val(lst)))
 
1064
                    || (arityval(*tuple_val(CAR(list_val(lst))))
 
1065
                        < tb->common.keypos)) {
 
1066
                    goto badarg;
 
1067
                }
 
1068
            }
 
1069
            if (lst != NIL) {
 
1070
                goto badarg;
 
1071
            }    
 
1072
            for (lst = BIF_ARG_2; is_list(lst); lst = CDR(list_val(lst))) {
 
1073
                cret = meth->db_member(tb, TERM_GETKEY(tb,CAR(list_val(lst))),
 
1074
                                       &lookup_ret);
 
1075
                if ((cret != DB_ERROR_NONE) || (lookup_ret != am_false)) {
 
1076
                    ret = am_false;
 
1077
                    goto done;
 
1078
                }
 
1079
            }
 
1080
    
 
1081
            for (lst = BIF_ARG_2; is_list(lst); lst = CDR(list_val(lst))) {
 
1082
                cret = meth->db_put(tb,CAR(list_val(lst)), 0);
 
1083
                if (cret != DB_ERROR_NONE)
 
1084
                    break;
 
1085
            }
 
1086
            goto done;
 
1087
        }
 
1088
        obj = CAR(list_val(BIF_ARG_2));
 
1089
    }
 
1090
    else {
 
1091
        obj = BIF_ARG_2;
 
1092
    }
 
1093
    /* Only one object (or NIL) 
 
1094
    */
 
1095
    kind = LCK_WRITE_REC;
 
1096
    if ((tb = db_get_table(BIF_P, BIF_ARG_1, DB_WRITE, kind)) == NULL) {
991
1097
        BIF_ERROR(BIF_P, BADARG);
992
1098
    }
993
1099
    if (BIF_ARG_2 == NIL) {
994
 
        db_unlock(tb, LCK_WRITE);
 
1100
        db_unlock(tb, kind);
995
1101
        BIF_RET(am_true);
996
1102
    }
997
 
    meth = tb->common.meth;
998
 
    if (is_list(BIF_ARG_2)) {
999
 
        for (lst = BIF_ARG_2; is_list(lst); lst = CDR(list_val(lst))) {
1000
 
            if (is_not_tuple(CAR(list_val(lst))) || 
1001
 
                (arityval(*tuple_val(CAR(list_val(lst)))) < tb->common.keypos)) {
1002
 
                goto badarg;
1003
 
            }
1004
 
        }
1005
 
        if (lst != NIL) {
1006
 
            goto badarg;
1007
 
        }
1008
 
 
1009
 
        for (lst = BIF_ARG_2; is_list(lst); lst = CDR(list_val(lst))) {
1010
 
            cret = meth->db_member(BIF_P, tb,
1011
 
                                   TERM_GETKEY(tb,CAR(list_val(lst))),
1012
 
                                   &lookup_ret);
1013
 
            if ((cret != DB_ERROR_NONE) || (lookup_ret != am_false)) {
1014
 
                ret = am_false;
1015
 
                goto done;
1016
 
            }
1017
 
        }
1018
 
 
1019
 
        for (lst = BIF_ARG_2; is_list(lst); lst = CDR(list_val(lst))) {
1020
 
            cret = meth->db_put(BIF_P, tb,CAR(list_val(lst)),&ret);
1021
 
            if (cret != DB_ERROR_NONE)
1022
 
                break;
1023
 
        }
1024
 
    } else {
1025
 
        if (is_not_tuple(BIF_ARG_2) || 
1026
 
            (arityval(*tuple_val(BIF_ARG_2)) < tb->common.keypos)) {
1027
 
            goto badarg;
1028
 
        }
1029
 
        cret = meth->db_member(BIF_P, tb,TERM_GETKEY(tb,BIF_ARG_2),
1030
 
                               &lookup_ret);
1031
 
        if ((cret != DB_ERROR_NONE) || (lookup_ret != am_false)) 
1032
 
            ret = am_false;
1033
 
        else {
1034
 
            cret = meth->db_put(BIF_P,tb,BIF_ARG_2, &ret);
1035
 
        }
 
1103
    if (is_not_tuple(obj)
 
1104
        || (arityval(*tuple_val(obj)) < tb->common.keypos)) {
 
1105
        goto badarg;
1036
1106
    }
 
1107
    cret = tb->common.meth->db_put(tb, obj,
 
1108
                                   1); /* key_clash_fail */
1037
1109
 
1038
1110
done:
1039
 
    db_unlock(tb, LCK_WRITE);
 
1111
    db_unlock(tb, kind);
1040
1112
    switch (cret) {
1041
1113
    case DB_ERROR_NONE:
1042
1114
        BIF_RET(ret);
 
1115
    case DB_ERROR_BADKEY:
 
1116
        BIF_RET(am_false);
1043
1117
    case DB_ERROR_SYSRES:
1044
1118
        BIF_ERROR(BIF_P, SYSTEM_LIMIT);
1045
1119
    default:
1046
1120
        BIF_ERROR(BIF_P, BADARG);
1047
1121
    }
1048
1122
 badarg:
1049
 
    db_unlock(tb, LCK_WRITE);
 
1123
    db_unlock(tb, kind);
1050
1124
    BIF_ERROR(BIF_P, BADARG);    
1051
1125
}
1052
1126
 
1110
1184
    Eterm list;
1111
1185
    Eterm val;
1112
1186
    Eterm ret;
 
1187
    Eterm heir;
 
1188
    Eterm heir_data;
1113
1189
    Uint32 status;
1114
1190
    Sint keypos;
1115
 
    int is_named;
 
1191
    int is_named, is_fine_locked;
1116
1192
    int cret;
1117
 
    Eterm dummy;
1118
1193
    Eterm meta_tuple[3];
1119
1194
    DbTableMethod* meth;
1120
1195
 
1125
1200
        BIF_ERROR(BIF_P, BADARG);
1126
1201
    }
1127
1202
 
1128
 
    status = DB_NORMAL | DB_SET | DB_LHASH | DB_PROTECTED;
 
1203
    status = DB_NORMAL | DB_SET | DB_PROTECTED;
1129
1204
    keypos = 1;
1130
1205
    is_named = 0;
 
1206
    is_fine_locked = 0;
 
1207
    heir = am_none;
 
1208
    heir_data = am_undefined;
1131
1209
 
1132
1210
    list = BIF_ARG_2;
1133
1211
    while(is_list(list)) {
1147
1225
        /*TT*/
1148
1226
        else if (is_tuple(val)) {
1149
1227
            Eterm *tp = tuple_val(val);
1150
 
 
1151
 
            if ((arityval(tp[0]) == 2) && (tp[1] == am_keypos) &&
1152
 
                is_small(tp[2]) && (signed_val(tp[2]) > 0)) {
1153
 
                keypos = signed_val(tp[2]);
1154
 
            }
1155
 
            else {
1156
 
                BIF_ERROR(BIF_P, BADARG);
1157
 
            }
 
1228
            if (arityval(tp[0]) == 2) {
 
1229
                if (tp[1] == am_keypos
 
1230
                    && is_small(tp[2]) && (signed_val(tp[2]) > 0)) {
 
1231
                    keypos = signed_val(tp[2]);
 
1232
                }               
 
1233
                else if (tp[1] == am_write_concurrency) {
 
1234
                    if (tp[2] == am_true) {
 
1235
                        is_fine_locked = 1;
 
1236
                    } else if (tp[2] == am_false) {
 
1237
                        is_fine_locked = 0;
 
1238
                    } else break;
 
1239
                }
 
1240
                else if (tp[1] == am_heir && tp[2] == am_none) {
 
1241
                    heir = am_none;
 
1242
                    heir_data = am_undefined;
 
1243
                }
 
1244
                else break;
 
1245
            }
 
1246
            else if (arityval(tp[0]) == 3 && tp[1] == am_heir
 
1247
                     && is_internal_pid(tp[2])) {
 
1248
                heir = tp[2];
 
1249
                heir_data = tp[3];
 
1250
            }
 
1251
            else break;
1158
1252
        }
1159
1253
        else if (val == am_public) {
1160
1254
            status |= DB_PUBLIC;
1161
 
            status &= ~DB_PROTECTED;
 
1255
            status &= ~(DB_PROTECTED|DB_PRIVATE);
1162
1256
        }
1163
1257
        else if (val == am_private) {
1164
1258
            status |= DB_PRIVATE;
1165
 
            status &= ~DB_PROTECTED;
 
1259
            status &= ~(DB_PROTECTED|DB_PUBLIC);
1166
1260
        }
1167
1261
        else if (val == am_named_table) {
1168
1262
            is_named = 1;
1169
1263
        }
1170
1264
        else if (val == am_set || val == am_protected)
1171
1265
            ;
1172
 
        else {
1173
 
            BIF_ERROR(BIF_P, BADARG);
1174
 
        }
 
1266
        else break;
 
1267
 
1175
1268
        list = CDR(list_val(list));
1176
1269
    }
1177
 
    if (is_not_nil(list)) /* it must be a well formed list */
1178
 
        BIF_ERROR(BIF_P, BADARG);
 
1270
    if (is_not_nil(list)) { /* bad opt or not a well formed list */
 
1271
        BIF_ERROR(BIF_P, BADARG);
 
1272
    }
 
1273
    if (IS_HASH_TABLE(status)) {
 
1274
        meth = &db_hash;
 
1275
        #ifdef ERTS_SMP
 
1276
        if (is_fine_locked && !(status & DB_PRIVATE)) {
 
1277
            status |= DB_FINE_LOCKED;
 
1278
        }
 
1279
        #endif
 
1280
    }
 
1281
    else if (IS_TREE_TABLE(status)) {
 
1282
        meth = &db_tree;
 
1283
    }
 
1284
    else {
 
1285
        BIF_ERROR(BIF_P, BADARG);
 
1286
    }
1179
1287
 
1180
1288
    /* we create table outside any table lock
1181
1289
     * and take the unusal cost of destroy table if it
1186
1294
 
1187
1295
        erts_smp_atomic_init(&init_tb.common.memory_size, 0);
1188
1296
        tb = (DbTable*) erts_db_alloc(ERTS_ALC_T_DB_TABLE,
1189
 
                                      &init_tb,
1190
 
                                      sizeof(DbTable));
 
1297
                                      &init_tb, sizeof(DbTable));
1191
1298
        ERTS_ETS_MISC_MEM_ADD(sizeof(DbTable));
1192
1299
        erts_smp_atomic_init(&tb->common.memory_size,
1193
1300
                             erts_smp_atomic_read(&init_tb.common.memory_size));
1194
1301
    }
1195
1302
 
1196
 
    if (IS_HASH_TABLE(status))
1197
 
        meth = &db_hash;
1198
 
    else if (IS_TREE_TABLE(status))
1199
 
        meth = &db_tree;
1200
 
    else
1201
 
        meth = NULL;
1202
 
 
1203
1303
    tb->common.meth = meth;
1204
 
 
1205
 
 
1206
1304
    tb->common.the_name = BIF_ARG_1;
1207
 
    tb->common.status = status;
1208
 
    
1209
 
    db_init_lock(tb, "db_tab");
 
1305
    tb->common.status = status;    
1210
1306
#ifdef ERTS_SMP
1211
1307
    tb->common.type = status & ERTS_ETS_TABLE_TYPES;
1212
1308
    /* Note, 'type' is *read only* from now on... */
1213
1309
#endif
 
1310
    db_init_lock(tb, "db_tab", "db_tab_fix");
1214
1311
    tb->common.keypos = keypos;
1215
1312
    tb->common.owner = BIF_P->id;
 
1313
    set_heir(BIF_P, tb, heir, heir_data);
1216
1314
 
1217
 
    tb->common.nitems = 0;
1218
 
    tb->common.kept_items = 0;
 
1315
    erts_smp_atomic_init(&tb->common.nitems, 0);
1219
1316
 
1220
1317
    tb->common.fixations = NULL;
1221
1318
 
1222
 
    if (meth == NULL) 
1223
 
        cret = DB_ERROR_UNSPEC;
1224
 
    else
1225
 
        cret = meth->db_create(BIF_P, tb);
1226
 
    if (cret != DB_ERROR_NONE) {
1227
 
        erts_db_free(ERTS_ALC_T_DB_TABLE, tb, (void *) tb, sizeof(DbTable));
1228
 
        ERTS_ETS_MISC_MEM_ADD(-sizeof(DbTable));
1229
 
        BIF_ERROR(BIF_P, BADARG);
1230
 
    }
 
1319
    cret = meth->db_create(BIF_P, tb);
 
1320
    ASSERT(cret == DB_ERROR_NONE);
1231
1321
 
1232
1322
    erts_smp_spin_lock(&meta_main_tab_main_lock);
1233
1323
 
1235
1325
        erts_smp_spin_unlock(&meta_main_tab_main_lock);
1236
1326
        erts_send_error_to_logger_str(BIF_P->group_leader,
1237
1327
                                      "** Too many db tables **\n");
 
1328
        free_heir_data(tb);
1238
1329
        tb->common.meth->db_free_table(tb);
1239
1330
        erts_db_free(ERTS_ALC_T_DB_TABLE, tb, (void *) tb, sizeof(DbTable));
1240
1331
        ERTS_ETS_MISC_MEM_ADD(-sizeof(DbTable));
1247
1338
    meta_main_tab_cnt++;
1248
1339
 
1249
1340
    if (is_named) {
1250
 
                ret = BIF_ARG_1;
 
1341
        ret = BIF_ARG_1;
1251
1342
    }
1252
1343
    else {
1253
1344
        ret = make_small(slot | meta_main_tab_seq_cnt);
1270
1361
        meta_main_tab_unlock(slot);
1271
1362
 
1272
1363
        db_lock_take_over_ref(tb,LCK_WRITE);
 
1364
        free_heir_data(tb);
1273
1365
        tb->common.meth->db_free_table(tb);
1274
1366
        db_unlock(tb,LCK_WRITE);
1275
1367
        BIF_ERROR(BIF_P, BADARG);
1288
1380
                     erts_smp_atomic_read(&meta_pid_to_fixed_tab->common.memory_size));
1289
1381
#endif
1290
1382
 
1291
 
    db_lock(meta_pid_to_tab, LCK_WRITE);
1292
 
    if (db_put_hash(NULL, meta_pid_to_tab,
1293
 
                    TUPLE2(meta_tuple, BIF_P->id, 
1294
 
                           make_small(slot)),&dummy) != DB_ERROR_NONE) {
 
1383
    db_meta_lock(meta_pid_to_tab, LCK_WRITE_REC);
 
1384
    if (db_put_hash(meta_pid_to_tab,
 
1385
                    TUPLE2(meta_tuple, BIF_P->id, make_small(slot)),
 
1386
                    0) != DB_ERROR_NONE) {
1295
1387
        erl_exit(1,"Could not update ets metadata.");
1296
1388
    }
1297
 
    db_unlock(meta_pid_to_tab, LCK_WRITE);
 
1389
    db_meta_unlock(meta_pid_to_tab, LCK_WRITE_REC);
1298
1390
 
1299
1391
    BIF_RET(ret);
1300
1392
}
1344
1436
        BIF_ERROR(BIF_P, BADARG);
1345
1437
    }
1346
1438
 
1347
 
    cret = tb->common.meth->db_member(BIF_P, tb, BIF_ARG_2, &ret);
 
1439
    cret = tb->common.meth->db_member(tb, BIF_ARG_2, &ret);
1348
1440
 
1349
1441
    db_unlock(tb, LCK_READ);
1350
1442
 
1432
1524
    }
1433
1525
    
1434
1526
    if (tb->common.owner != BIF_P->id) {
1435
 
        Eterm dummy;
1436
1527
        Eterm meta_tuple[3];
1437
1528
 
1438
1529
        /*
1439
 
         * The process is being deleted by a process other than its owner.
 
1530
         * The table is being deleted by a process other than its owner.
1440
1531
         * To make sure that the table will be completely deleted if the
1441
1532
         * current process will be killed (e.g. by an EXIT signal), we will
1442
1533
         * now transfer the ownership to the current process.
1443
1534
         */
1444
 
        db_lock(meta_pid_to_tab, LCK_WRITE);
 
1535
        db_meta_lock(meta_pid_to_tab, LCK_WRITE_REC);
1445
1536
        db_erase_bag_exact2(meta_pid_to_tab, tb->common.owner,
1446
1537
                            make_small(tb->common.slot));
1447
1538
 
1448
1539
        BIF_P->flags |= F_USING_DB;
1449
1540
        tb->common.owner = BIF_P->id;
1450
1541
 
1451
 
        db_put_hash(NULL,
1452
 
                    meta_pid_to_tab,
 
1542
        db_put_hash(meta_pid_to_tab,
1453
1543
                    TUPLE2(meta_tuple,BIF_P->id,make_small(tb->common.slot)),
1454
 
                    &dummy);
1455
 
        db_unlock(meta_pid_to_tab, LCK_WRITE);
1456
 
    }
 
1544
                    0);
 
1545
        db_meta_unlock(meta_pid_to_tab, LCK_WRITE_REC);
 
1546
    }    
 
1547
    /* disable inheritance */
 
1548
    free_heir_data(tb);
 
1549
    tb->common.heir = am_none;
1457
1550
 
1458
1551
    free_fixations_locked(tb);
1459
1552
 
1477
1570
}
1478
1571
 
1479
1572
/* 
 
1573
** BIF ets:give_away(Tab, Pid, GiftData)
 
1574
*/
 
1575
BIF_RETTYPE ets_give_away_3(BIF_ALIST_3)
 
1576
{
 
1577
    Process* to_proc = NULL;
 
1578
    ErtsProcLocks to_locks = ERTS_PROC_LOCK_MAIN;
 
1579
    Eterm buf[5];
 
1580
    Eterm to_pid = BIF_ARG_2;
 
1581
    Eterm from_pid;
 
1582
    DbTable* tb = NULL;
 
1583
 
 
1584
    if (!is_internal_pid(to_pid)) {
 
1585
        goto badarg;
 
1586
    }
 
1587
    to_proc = erts_pid2proc(BIF_P, ERTS_PROC_LOCK_MAIN, to_pid, to_locks);
 
1588
    if (to_proc == NULL) {
 
1589
        goto badarg;
 
1590
    }
 
1591
 
 
1592
    if ((tb = db_get_table(BIF_P, BIF_ARG_1, DB_WRITE, LCK_WRITE)) == NULL
 
1593
        || tb->common.owner != BIF_P->id) {
 
1594
        goto badarg;
 
1595
    }
 
1596
    from_pid = tb->common.owner;
 
1597
    if (to_pid == from_pid) {
 
1598
        goto badarg;  /* or should we be idempotent? return false maybe */
 
1599
    }
 
1600
 
 
1601
    db_meta_lock(meta_pid_to_tab, LCK_WRITE_REC);
 
1602
    db_erase_bag_exact2(meta_pid_to_tab, tb->common.owner,
 
1603
                        make_small(tb->common.slot));
 
1604
 
 
1605
    to_proc->flags |= F_USING_DB;
 
1606
    tb->common.owner = to_pid;
 
1607
 
 
1608
    db_put_hash(meta_pid_to_tab,
 
1609
                TUPLE2(buf,to_pid,make_small(tb->common.slot)),
 
1610
                0);
 
1611
    db_meta_unlock(meta_pid_to_tab, LCK_WRITE_REC);
 
1612
 
 
1613
    db_unlock(tb,LCK_WRITE);
 
1614
    erts_send_message(BIF_P, to_proc, &to_locks,
 
1615
                      TUPLE4(buf, am_ETS_TRANSFER, tb->common.id, from_pid, BIF_ARG_3), 
 
1616
                      0);
 
1617
    erts_smp_proc_unlock(to_proc, to_locks);
 
1618
    BIF_RET(am_true);
 
1619
 
 
1620
badarg:
 
1621
    if (to_proc != NULL && to_proc != BIF_P) erts_smp_proc_unlock(to_proc, to_locks);
 
1622
    if (tb != NULL) db_unlock(tb, LCK_WRITE);
 
1623
    BIF_ERROR(BIF_P, BADARG);
 
1624
}
 
1625
 
 
1626
BIF_RETTYPE ets_setopts_2(BIF_ALIST_2)
 
1627
{
 
1628
    DbTable* tb = NULL;
 
1629
    Eterm* tp;
 
1630
    Eterm opt;
 
1631
    Eterm heir_data;
 
1632
 
 
1633
    if (is_list(BIF_ARG_2)) {
 
1634
        if (CDR(list_val(BIF_ARG_2)) != NIL) {
 
1635
            goto badarg;
 
1636
        }
 
1637
        opt = CAR(list_val(BIF_ARG_2));
 
1638
    } else {
 
1639
        opt = BIF_ARG_2;
 
1640
    }
 
1641
 
 
1642
    if (!is_tuple(opt)) {
 
1643
        goto badarg;
 
1644
    }
 
1645
    tp = tuple_val(opt);
 
1646
 
 
1647
 
 
1648
    if (arityval(tp[0]) == 3 && tp[1] == am_heir && is_internal_pid(tp[2])) {
 
1649
        heir_data = tp[3];
 
1650
    }
 
1651
    else if (arityval(tp[0]) == 2 && tp[1] == am_heir && tp[2] == am_none) {
 
1652
        heir_data = am_undefined;
 
1653
    }
 
1654
    else goto badarg;
 
1655
 
 
1656
    if ((tb = db_get_table(BIF_P, BIF_ARG_1, DB_WRITE, LCK_WRITE)) == NULL
 
1657
        || tb->common.owner != BIF_P->id) {
 
1658
        goto badarg;
 
1659
    }
 
1660
 
 
1661
    free_heir_data(tb);
 
1662
    set_heir(BIF_P, tb, tp[2], heir_data);
 
1663
 
 
1664
    db_unlock (tb,LCK_WRITE);
 
1665
    BIF_RET(am_true);
 
1666
 
 
1667
badarg:
 
1668
    if (tb != NULL) {
 
1669
        db_unlock(tb,LCK_WRITE);
 
1670
    }
 
1671
    BIF_ERROR(BIF_P, BADARG);
 
1672
}
 
1673
 
 
1674
/* 
1480
1675
** BIF to erase a whole table and release all memory it holds 
1481
1676
*/
1482
1677
BIF_RETTYPE ets_delete_all_objects_1(BIF_ALIST_1)
1509
1704
 
1510
1705
    CHECK_TABLES();
1511
1706
 
1512
 
    if ((tb = db_get_table(BIF_P, BIF_ARG_1, DB_WRITE, LCK_WRITE)) == NULL) {
 
1707
    if ((tb = db_get_table(BIF_P, BIF_ARG_1, DB_WRITE, LCK_WRITE_REC)) == NULL) {
1513
1708
        BIF_ERROR(BIF_P, BADARG);
1514
1709
    }
1515
1710
 
1516
 
    cret = tb->common.meth->db_erase(BIF_P,tb,BIF_ARG_2,&ret);
 
1711
    cret = tb->common.meth->db_erase(tb,BIF_ARG_2,&ret);
1517
1712
 
1518
 
    db_unlock(tb, LCK_WRITE);
 
1713
    db_unlock(tb, LCK_WRITE_REC);
1519
1714
 
1520
1715
    switch (cret) {
1521
1716
    case DB_ERROR_NONE:
1538
1733
 
1539
1734
    CHECK_TABLES();
1540
1735
 
1541
 
    if ((tb = db_get_table(BIF_P, BIF_ARG_1, DB_WRITE, LCK_WRITE)) == NULL) {
 
1736
    if ((tb = db_get_table(BIF_P, BIF_ARG_1, DB_WRITE, LCK_WRITE_REC)) == NULL) {
1542
1737
        BIF_ERROR(BIF_P, BADARG);
1543
1738
    }
1544
1739
    if (is_not_tuple(BIF_ARG_2) || 
1545
1740
        (arityval(*tuple_val(BIF_ARG_2)) < tb->common.keypos)) {
1546
 
        db_unlock(tb, LCK_WRITE);
 
1741
        db_unlock(tb, LCK_WRITE_REC);
1547
1742
        BIF_ERROR(BIF_P, BADARG);
1548
1743
    }
1549
1744
 
1550
 
    cret = tb->common.meth->db_erase_object(BIF_P, tb,
1551
 
                                            BIF_ARG_2, &ret);
1552
 
    db_unlock(tb, LCK_WRITE);
 
1745
    cret = tb->common.meth->db_erase_object(tb, BIF_ARG_2, &ret);
 
1746
    db_unlock(tb, LCK_WRITE_REC);
1553
1747
 
1554
1748
    switch (cret) {
1555
1749
    case DB_ERROR_NONE:
1573
1767
    Eterm *tptr;
1574
1768
    
1575
1769
    CHECK_TABLES();
1576
 
#ifdef DEBUG
1577
 
    /*
1578
 
     * Make sure that the table exists.
1579
 
     */
1580
 
    if (!is_tuple(a1)) {
1581
 
        /* "Cannot" happen, this is not a real BIF, its trapped
1582
 
           to under controlled conditions */
1583
 
        erl_exit(1,"Internal error in ets:select_delete");
1584
 
    }
1585
 
#endif
 
1770
    ASSERT(is_tuple(a1));
1586
1771
    tptr = tuple_val(a1);
1587
 
#ifdef DEBUG
1588
 
    if (arityval(*tptr) < 1) {
1589
 
        erl_exit(1,"Internal error in ets:select_delete");
1590
 
    }
1591
 
#endif
 
1772
    ASSERT(arityval(*tptr) >= 1);
1592
1773
    
1593
 
    if ((tb = db_get_table(p, tptr[1], DB_WRITE, LCK_WRITE)) == NULL) {
1594
 
        /* This should be OK, the emulator handles errors in BIF's that aren't
1595
 
           exported nowdays... */
 
1774
    if ((tb = db_get_table(p, tptr[1], DB_WRITE, LCK_WRITE_REC)) == NULL) {
1596
1775
        BIF_ERROR(p,BADARG);
1597
1776
    }
1598
1777
 
1599
1778
    cret = tb->common.meth->db_select_delete_continue(p,tb,a1,&ret);
1600
1779
 
1601
 
    db_unlock(tb, LCK_WRITE);
1602
 
    /* SMP fixme table may be deleted !!! */
 
1780
    if(!DID_TRAP(p,ret) && ITERATION_SAFETY(p,tb) != ITER_SAFE) {  
 
1781
        unfix_table_locked(p, tb);
 
1782
    }
 
1783
 
 
1784
    db_unlock(tb, LCK_WRITE_REC);
1603
1785
 
1604
1786
    switch (cret) {
1605
 
    case DB_ERROR_NONE:
1606
 
        if(IS_HASH_TABLE(tb->common.status) && !DID_TRAP(p,ret) &&
1607
 
           !ONLY_READER(p,tb)) {
1608
 
            ets_safe_fixtable_2(p, tb->common.id, am_false);
1609
 
        } 
 
1787
    case DB_ERROR_NONE:      
1610
1788
        ERTS_BIF_PREP_RET(result, ret);
1611
1789
        break;
1612
1790
    default:
1613
 
        if(IS_HASH_TABLE(tb->common.status) && !ONLY_READER(p,tb)) {
1614
 
            ets_safe_fixtable_2(p, tb->common.id, am_false);
1615
 
        } 
1616
1791
        ERTS_BIF_PREP_ERROR(result, p, BADARG);
1617
1792
        break;
1618
1793
    }
1619
 
 
1620
1794
    erts_match_set_release_result(p);
1621
1795
 
1622
1796
    return result;
1629
1803
    DbTable* tb;
1630
1804
    int cret;
1631
1805
    Eterm ret;
 
1806
    enum DbIterSafety safety;
1632
1807
 
1633
1808
    CHECK_TABLES();
1634
 
    /*
1635
 
     * Make sure that the table exists.
1636
 
     */
1637
1809
 
1638
 
    if ((tb = db_get_table(BIF_P, BIF_ARG_1, DB_WRITE, LCK_WRITE)) == NULL) {
1639
 
        BIF_ERROR(BIF_P, BADARG);
1640
 
    }
1641
 
    if(eq(BIF_ARG_2, ms_delete_all)){
1642
 
        int nitems = tb->common.nitems;
 
1810
    if(eq(BIF_ARG_2, ms_delete_all)) {
 
1811
        int nitems;
 
1812
        if ((tb = db_get_table(BIF_P, BIF_ARG_1, DB_WRITE, LCK_WRITE)) == NULL) {
 
1813
            BIF_ERROR(BIF_P, BADARG);
 
1814
        }
 
1815
        nitems = erts_smp_atomic_read(&tb->common.nitems);
1643
1816
        tb->common.meth->db_delete_all_objects(BIF_P, tb);
1644
1817
        db_unlock(tb, LCK_WRITE);
1645
1818
        BIF_RET(erts_make_integer(nitems,BIF_P));
1646
1819
    }
 
1820
 
 
1821
    if ((tb = db_get_table(BIF_P, BIF_ARG_1, DB_WRITE, LCK_WRITE_REC)) == NULL) {
 
1822
        BIF_ERROR(BIF_P, BADARG);
 
1823
    }
 
1824
    safety = ITERATION_SAFETY(BIF_P,tb);
 
1825
    if (safety == ITER_UNSAFE) {
 
1826
        local_fix_table(tb);
 
1827
    }
1647
1828
    cret = tb->common.meth->db_select_delete(BIF_P, tb, BIF_ARG_2, &ret);
1648
1829
 
 
1830
    if (DID_TRAP(BIF_P,ret) && safety != ITER_SAFE) {
 
1831
        fix_table_locked(BIF_P,tb);
 
1832
    }
 
1833
    if (safety == ITER_UNSAFE) {
 
1834
        local_unfix_table(tb);
 
1835
    }
 
1836
    db_unlock(tb, LCK_WRITE_REC);
 
1837
 
1649
1838
    switch (cret) {
1650
 
    case DB_ERROR_NONE:
1651
 
        if(IS_HASH_TABLE(tb->common.status) && DID_TRAP(BIF_P,ret) && 
1652
 
           !ONLY_READER(BIF_P,tb)) {
1653
 
            /* We will trap and as we're here this call wasn't a trap... */
1654
 
            fix_table_locked(BIF_P, tb);
1655
 
        }
1656
 
        db_unlock(tb, LCK_WRITE);
 
1839
    case DB_ERROR_NONE: 
1657
1840
        ERTS_BIF_PREP_RET(result, ret);
1658
1841
        break;
1659
1842
    case DB_ERROR_SYSRES:
1660
 
        db_unlock(tb, LCK_WRITE);
1661
1843
        ERTS_BIF_PREP_ERROR(result, BIF_P, SYSTEM_LIMIT);
1662
1844
        break;
1663
1845
    default:
1664
 
        db_unlock(tb, LCK_WRITE);
1665
1846
        ERTS_BIF_PREP_ERROR(result, BIF_P, BADARG);
1666
1847
        break;
1667
1848
    }
1720
1901
 
1721
1902
    CHECK_TABLES();
1722
1903
 
1723
 
    if ((tb = db_get_table2(BIF_P, BIF_ARG_1, DB_READ, 
1724
 
                            step_lock_type)) == NULL) {
 
1904
    if ((tb = db_get_table(BIF_P, BIF_ARG_1, DB_READ, LCK_READ)) == NULL) {
1725
1905
        BIF_ERROR(BIF_P, BADARG);
1726
1906
    }
1727
1907
    /* The slot number is checked in table specific code. */
1728
1908
    cret = tb->common.meth->db_slot(BIF_P, tb, BIF_ARG_2, &ret);
1729
 
    db_unlock(tb, STEP_LOCK_TYPE(tb));
 
1909
    db_unlock(tb, LCK_READ);
1730
1910
    switch (cret) {
1731
1911
    case DB_ERROR_NONE:
1732
1912
        BIF_RET(ret);
1782
1962
    int cret;
1783
1963
    Eterm ret;
1784
1964
    Sint chunk_size;
 
1965
    enum DbIterSafety safety;
1785
1966
 
1786
1967
    CHECK_TABLES();
1787
 
    /*
1788
 
     * Make sure that the table exists.
1789
 
     */
1790
1968
 
1791
 
    if ((tb = db_get_table(BIF_P, BIF_ARG_1, DB_READ, LCK_WRITE)) == NULL) {
1792
 
        BIF_ERROR(BIF_P, BADARG);
1793
 
    }
1794
 
    
1795
1969
    /* Chunk size strictly greater than 0 */
1796
1970
    if (is_not_small(BIF_ARG_3) || (chunk_size = signed_val(BIF_ARG_3)) <= 0) {
1797
 
        db_unlock(tb, LCK_WRITE);
1798
 
        BIF_ERROR(BIF_P, BADARG);
1799
 
    }
1800
 
 
 
1971
        BIF_ERROR(BIF_P, BADARG);
 
1972
    }
 
1973
    if ((tb = db_get_table(BIF_P, BIF_ARG_1, DB_READ, LCK_READ)) == NULL) {
 
1974
        BIF_ERROR(BIF_P, BADARG);
 
1975
    }
 
1976
    safety = ITERATION_SAFETY(BIF_P,tb);
 
1977
    if (safety == ITER_UNSAFE) {
 
1978
        local_fix_table(tb);
 
1979
    }
1801
1980
    cret = tb->common.meth->db_select_chunk(BIF_P, tb,
1802
1981
                                            BIF_ARG_2, chunk_size, 
1803
 
                                            0 /* not reversed */, &ret);
 
1982
                                            0 /* not reversed */,
 
1983
                                            &ret);
 
1984
    if (DID_TRAP(BIF_P,ret) && safety != ITER_SAFE) {
 
1985
        fix_table_locked(BIF_P, tb);
 
1986
    }
 
1987
    if (safety == ITER_UNSAFE) {
 
1988
        local_unfix_table(tb);
 
1989
    }
 
1990
    db_unlock(tb, LCK_READ);
 
1991
 
1804
1992
    switch (cret) {
1805
1993
    case DB_ERROR_NONE:
1806
 
        if(IS_HASH_TABLE(tb->common.status) && DID_TRAP(BIF_P,ret) &&
1807
 
           !ONLY_WRITER(BIF_P,tb)) {
1808
 
            /* We will trap and as we're here this call wasn't a trap... */
1809
 
            fix_table_locked(BIF_P, tb);
1810
 
        } /* Otherwise keep it as is */
1811
 
        db_unlock(tb, LCK_WRITE);
1812
1994
        ERTS_BIF_PREP_RET(result, ret);
1813
1995
        break;
1814
1996
    case DB_ERROR_SYSRES:
1815
 
        db_unlock(tb, LCK_WRITE);
1816
1997
        ERTS_BIF_PREP_ERROR(result, BIF_P, SYSTEM_LIMIT);
1817
1998
        break;
1818
1999
    default:
1819
 
        db_unlock(tb, LCK_WRITE);
1820
2000
        ERTS_BIF_PREP_ERROR(result, BIF_P, BADARG);
1821
2001
        break;
1822
2002
    }
1841
2021
    tptr = tuple_val(a1);
1842
2022
    ASSERT(arityval(*tptr) >= 1)
1843
2023
 
1844
 
    if ((tb = db_get_table(p, tptr[1], DB_READ, LCK_WRITE)) == NULL) {
 
2024
    if ((tb = db_get_table(p, tptr[1], DB_READ, LCK_READ)) == NULL) {
1845
2025
        BIF_ERROR(p, BADARG);
1846
2026
    }
1847
2027
 
1848
 
    cret = tb->common.meth->db_select_continue(p, tb,a1,&ret);
 
2028
    cret = tb->common.meth->db_select_continue(p, tb, a1,
 
2029
                                               &ret);
 
2030
 
 
2031
    if (!DID_TRAP(p,ret) && ITERATION_SAFETY(p,tb) != ITER_SAFE) {
 
2032
        unfix_table_locked(p, tb);
 
2033
    }
 
2034
    db_unlock(tb, LCK_READ);
1849
2035
 
1850
2036
    switch (cret) {
1851
2037
    case DB_ERROR_NONE:
1852
 
        if(IS_HASH_TABLE(tb->common.status) && !DID_TRAP(p,ret) &&
1853
 
           !ONLY_WRITER(p,tb)) {
1854
 
            /* We did trap, but no more... */
1855
 
            unfix_table_locked(p, tb);
1856
 
        } /* Otherwise keep it fixed */
1857
2038
        ERTS_BIF_PREP_RET(result, ret);
1858
2039
        break;
1859
2040
    case DB_ERROR_SYSRES:
1860
 
        if(IS_HASH_TABLE(tb->common.status) && !ONLY_WRITER(p,tb)) {
1861
 
            unfix_table_locked(p, tb);
1862
 
        }
1863
2041
        ERTS_BIF_PREP_ERROR(result, p, SYSTEM_LIMIT);
1864
2042
        break;
1865
2043
    default:
1866
 
        if(IS_HASH_TABLE(tb->common.status) && !ONLY_WRITER(p,tb)) {
1867
 
            unfix_table_locked(p, tb);
1868
 
        }
1869
2044
        ERTS_BIF_PREP_ERROR(result, p, BADARG);
1870
2045
        break;
1871
2046
    }
1872
2047
 
1873
 
    db_unlock(tb, LCK_WRITE);
1874
 
 
1875
2048
    erts_match_set_release_result(p);
1876
2049
 
1877
2050
    return result;
1885
2058
    int cret;
1886
2059
    Eterm ret;
1887
2060
    Eterm *tptr;
 
2061
    enum DbIterSafety safety;
1888
2062
 
1889
2063
    CHECK_TABLES();
1890
2064
 
1899
2073
        BIF_ERROR(BIF_P, BADARG);
1900
2074
    }
1901
2075
    tptr = tuple_val(BIF_ARG_1);
1902
 
    if (arityval(*tptr) < 1 || 
1903
 
        (tb = db_get_table(BIF_P, tptr[1], DB_READ, LCK_WRITE)) == NULL) {
 
2076
    if (arityval(*tptr) < 1 ||
 
2077
        (tb = db_get_table(BIF_P, tptr[1], DB_READ, LCK_READ)) == NULL) {
1904
2078
        BIF_ERROR(BIF_P, BADARG);
1905
2079
    }
1906
2080
 
 
2081
    safety = ITERATION_SAFETY(BIF_P,tb);
 
2082
    if (safety == ITER_UNSAFE) {
 
2083
        local_fix_table(tb);
 
2084
    }
 
2085
 
1907
2086
    cret = tb->common.meth->db_select_continue(BIF_P,tb,
1908
2087
                                               BIF_ARG_1, &ret);
1909
2088
 
 
2089
    if (DID_TRAP(BIF_P,ret) && safety != ITER_SAFE) {
 
2090
        fix_table_locked(BIF_P, tb);
 
2091
    }
 
2092
    if (safety == ITER_UNSAFE) {
 
2093
        local_unfix_table(tb);
 
2094
    }
 
2095
    db_unlock(tb, LCK_READ);
 
2096
 
1910
2097
    switch (cret) {
1911
2098
    case DB_ERROR_NONE:
1912
 
        if(IS_HASH_TABLE(tb->common.status) &&  DID_TRAP(BIF_P,ret) &&
1913
 
           !ONLY_WRITER(BIF_P,tb)) {
1914
 
            /* We will trap and as we're here this call wasn't a trap... */
1915
 
            fix_table_locked(BIF_P, tb);
1916
 
        } /* Otherwise keep it as is */
1917
2099
        ERTS_BIF_PREP_RET(result, ret);
1918
2100
        break;
1919
2101
    case DB_ERROR_SYSRES:
1924
2106
        break;
1925
2107
    }
1926
2108
 
1927
 
    db_unlock(tb, LCK_WRITE);
1928
 
 
1929
2109
    erts_match_set_release_result(BIF_P);
1930
2110
 
1931
2111
    return result;
1936
2116
    BIF_RETTYPE result;
1937
2117
    DbTable* tb;
1938
2118
    int cret;
 
2119
    enum DbIterSafety safety;
1939
2120
    Eterm ret;
1940
2121
 
1941
2122
    CHECK_TABLES();
 
2123
 
1942
2124
    /*
1943
2125
     * Make sure that the table exists.
1944
2126
     */
1945
2127
 
1946
 
    if ((tb = db_get_table(BIF_P, BIF_ARG_1, DB_READ, LCK_WRITE)) == NULL) {
 
2128
    if ((tb = db_get_table(BIF_P, BIF_ARG_1, DB_READ, LCK_READ)) == NULL) {
1947
2129
        BIF_ERROR(BIF_P, BADARG);
1948
2130
    }
 
2131
    safety = ITERATION_SAFETY(BIF_P,tb);
 
2132
    if (safety == ITER_UNSAFE) {
 
2133
        local_fix_table(tb);
 
2134
    }
 
2135
 
1949
2136
    cret = tb->common.meth->db_select(BIF_P, tb, BIF_ARG_2,
1950
2137
                                      0, &ret);
1951
2138
 
1952
 
    /* SMP fixme table may be deleted !!! */
 
2139
    if (DID_TRAP(BIF_P,ret) && safety != ITER_SAFE) {
 
2140
        fix_table_locked(BIF_P, tb);
 
2141
    }    
 
2142
    if (safety == ITER_UNSAFE) {
 
2143
        local_unfix_table(tb);
 
2144
    }
 
2145
    db_unlock(tb, LCK_READ);
 
2146
 
1953
2147
    switch (cret) {
1954
2148
    case DB_ERROR_NONE:
1955
 
        if(IS_HASH_TABLE(tb->common.status) && DID_TRAP(BIF_P,ret) &&
1956
 
           !ONLY_WRITER(BIF_P,tb)) {
1957
 
            /* We will trap and as we're here this call wasn't a trap... */
1958
 
            fix_table_locked(BIF_P, tb);
1959
 
        } /* Otherwise keep it as is */
1960
2149
        ERTS_BIF_PREP_RET(result, ret);
1961
2150
        break;
1962
2151
    case DB_ERROR_SYSRES:
1967
2156
        break;
1968
2157
    }
1969
2158
 
1970
 
    db_unlock(tb, LCK_WRITE);
1971
 
 
1972
2159
    erts_match_set_release_result(BIF_P);
1973
2160
 
1974
2161
    return result;
1987
2174
 
1988
2175
    tptr = tuple_val(a1);
1989
2176
    ASSERT(arityval(*tptr) >= 1)
1990
 
    if ((tb = db_get_table(p, tptr[1], DB_READ, LCK_WRITE)) == NULL) {
 
2177
    if ((tb = db_get_table(p, tptr[1], DB_READ, LCK_READ)) == NULL) {
1991
2178
        BIF_ERROR(p, BADARG);
1992
2179
    }
1993
2180
 
1994
2181
    cret = tb->common.meth->db_select_count_continue(p, tb, a1, &ret);
1995
2182
 
 
2183
    if (!DID_TRAP(p,ret) && ITERATION_SAFETY(p,tb) != ITER_SAFE) {
 
2184
        unfix_table_locked(p, tb);
 
2185
    }
 
2186
    db_unlock(tb, LCK_READ);
 
2187
 
1996
2188
    switch (cret) {
1997
2189
    case DB_ERROR_NONE:
1998
 
        if(IS_HASH_TABLE(tb->common.status) &&
1999
 
           !DID_TRAP(p,ret) &&
2000
 
           !ONLY_WRITER(p,tb)) {
2001
 
            /* We did trap, but no more... */
2002
 
            unfix_table_locked(p, tb);
2003
 
        } /* Otherwise keep it fixed */
2004
2190
        ERTS_BIF_PREP_RET(result, ret);
2005
2191
        break;
2006
2192
    case DB_ERROR_SYSRES:
2007
 
        if(IS_HASH_TABLE(tb->common.status) && !ONLY_WRITER(p,tb)) {
2008
 
            unfix_table_locked(p, tb);
2009
 
        }
2010
2193
        ERTS_BIF_PREP_ERROR(result, p, SYSTEM_LIMIT);
2011
2194
        break;
2012
2195
    default:
2013
 
        if(IS_HASH_TABLE(tb->common.status) && !ONLY_WRITER(p,tb)) {
2014
 
            unfix_table_locked(p, tb);
2015
 
        }
2016
2196
        ERTS_BIF_PREP_ERROR(result, p, BADARG);
2017
2197
        break;
2018
2198
    }
2019
2199
 
2020
 
    db_unlock(tb, LCK_WRITE);
2021
2200
    erts_match_set_release_result(p);
2022
2201
 
2023
2202
    return result;
2028
2207
    BIF_RETTYPE result;
2029
2208
    DbTable* tb;
2030
2209
    int cret;
 
2210
    enum DbIterSafety safety;
2031
2211
    Eterm ret;
2032
2212
 
2033
2213
    CHECK_TABLES();
2035
2215
     * Make sure that the table exists.
2036
2216
     */
2037
2217
 
2038
 
    if ((tb = db_get_table(BIF_P, BIF_ARG_1, DB_READ, LCK_WRITE)) == NULL) {
 
2218
    if ((tb = db_get_table(BIF_P, BIF_ARG_1, DB_READ, LCK_READ)) == NULL) {
2039
2219
        BIF_ERROR(BIF_P, BADARG);
2040
2220
    }
 
2221
    safety = ITERATION_SAFETY(BIF_P,tb);
 
2222
    if (safety == ITER_UNSAFE) {
 
2223
        local_fix_table(tb);
 
2224
    }
2041
2225
    cret = tb->common.meth->db_select_count(BIF_P,tb,BIF_ARG_2, &ret);
2042
2226
 
 
2227
    if (DID_TRAP(BIF_P,ret) && safety != ITER_SAFE) {
 
2228
        fix_table_locked(BIF_P, tb);
 
2229
    }
 
2230
    if (safety == ITER_UNSAFE) {
 
2231
        local_unfix_table(tb);
 
2232
    }
 
2233
    db_unlock(tb, LCK_READ);
2043
2234
    switch (cret) {
2044
2235
    case DB_ERROR_NONE:
2045
 
        if(IS_HASH_TABLE(tb->common.status) &&
2046
 
           DID_TRAP(BIF_P,ret) &&
2047
 
           !ONLY_WRITER(BIF_P,tb)) {
2048
 
            /* We will trap and as we're here this call wasn't a trap... */
2049
 
            fix_table_locked(BIF_P, tb);
2050
 
        } /* Otherwise keep it as is */
2051
2236
        ERTS_BIF_PREP_RET(result, ret);
2052
2237
        break;
2053
2238
    case DB_ERROR_SYSRES:
2057
2242
        ERTS_BIF_PREP_ERROR(result, BIF_P, BADARG);
2058
2243
        break;
2059
2244
    }
2060
 
    db_unlock(tb, LCK_WRITE);
2061
2245
 
2062
2246
    erts_match_set_release_result(BIF_P);
2063
2247
 
2070
2254
    BIF_RETTYPE result;
2071
2255
    DbTable* tb;
2072
2256
    int cret;
 
2257
    enum DbIterSafety safety;
2073
2258
    Eterm ret;
2074
2259
    Sint chunk_size;
2075
2260
 
2078
2263
     * Make sure that the table exists.
2079
2264
     */
2080
2265
 
2081
 
    if ((tb = db_get_table(BIF_P, BIF_ARG_1, DB_READ, LCK_WRITE)) == NULL) {
 
2266
    if ((tb = db_get_table(BIF_P, BIF_ARG_1, DB_READ, LCK_READ)) == NULL) {
2082
2267
        BIF_ERROR(BIF_P, BADARG);
2083
2268
    }
2084
2269
    
2085
2270
    /* Chunk size strictly greater than 0 */
2086
2271
    if (is_not_small(BIF_ARG_3) || (chunk_size = signed_val(BIF_ARG_3)) <= 0) {
2087
 
        db_unlock(tb, LCK_WRITE);
 
2272
        db_unlock(tb, LCK_READ);
2088
2273
        BIF_ERROR(BIF_P, BADARG);
2089
2274
    }
2090
 
 
 
2275
    safety = ITERATION_SAFETY(BIF_P,tb);
 
2276
    if (safety == ITER_UNSAFE) {
 
2277
        local_fix_table(tb);
 
2278
    }
2091
2279
    cret = tb->common.meth->db_select_chunk(BIF_P,tb,
2092
2280
                                            BIF_ARG_2, chunk_size, 
2093
2281
                                            1 /* reversed */, &ret);
 
2282
    if (DID_TRAP(BIF_P,ret) && safety != ITER_SAFE) {
 
2283
        fix_table_locked(BIF_P, tb);
 
2284
    }
 
2285
    if (safety == ITER_UNSAFE) {
 
2286
        local_unfix_table(tb);
 
2287
    }
 
2288
    db_unlock(tb, LCK_READ);
2094
2289
    switch (cret) {
2095
2290
    case DB_ERROR_NONE:
2096
 
        if(IS_HASH_TABLE(tb->common.status) &&
2097
 
           DID_TRAP(BIF_P,ret) &&
2098
 
           !ONLY_WRITER(BIF_P,tb)) {
2099
 
            /* We will trap and as we're here this call wasn't a trap... */
2100
 
            fix_table_locked(BIF_P, tb);
2101
 
        } /* Otherwise keep it as is */
2102
2291
        ERTS_BIF_PREP_RET(result, ret);
2103
2292
        break;
2104
2293
    case DB_ERROR_SYSRES:
2108
2297
        ERTS_BIF_PREP_ERROR(result, BIF_P, BADARG);
2109
2298
        break;
2110
2299
    }
2111
 
 
2112
 
    db_unlock(tb, LCK_WRITE);
2113
 
 
2114
2300
    erts_match_set_release_result(BIF_P);
2115
 
 
2116
2301
    return result;
2117
2302
}
2118
2303
 
2126
2311
    BIF_RETTYPE result;
2127
2312
    DbTable* tb;
2128
2313
    int cret;
 
2314
    enum DbIterSafety safety;
2129
2315
    Eterm ret;
2130
2316
 
2131
2317
    CHECK_TABLES();
2133
2319
     * Make sure that the table exists.
2134
2320
     */
2135
2321
 
2136
 
    if ((tb = db_get_table(BIF_P, BIF_ARG_1, DB_READ, LCK_WRITE)) == NULL) {
 
2322
    if ((tb = db_get_table(BIF_P, BIF_ARG_1, DB_READ, LCK_READ)) == NULL) {
2137
2323
        BIF_ERROR(BIF_P, BADARG);
2138
2324
    }
2139
 
 
 
2325
    safety = ITERATION_SAFETY(BIF_P,tb);
 
2326
    if (safety == ITER_UNSAFE) {
 
2327
        local_fix_table(tb);
 
2328
    }
2140
2329
    cret = tb->common.meth->db_select(BIF_P,tb,BIF_ARG_2,
2141
2330
                                      1 /*reversed*/, &ret);
2142
2331
 
 
2332
    if (DID_TRAP(BIF_P,ret) && safety != ITER_SAFE) {
 
2333
        fix_table_locked(BIF_P, tb);
 
2334
    }    
 
2335
    if (safety == ITER_UNSAFE) {
 
2336
        local_unfix_table(tb);
 
2337
    }
 
2338
    db_unlock(tb, LCK_READ);
2143
2339
    switch (cret) {
2144
2340
    case DB_ERROR_NONE:
2145
 
        if(IS_HASH_TABLE(tb->common.status) && DID_TRAP(BIF_P,ret) &&
2146
 
           !ONLY_WRITER(BIF_P,tb)) {
2147
 
            /* We will trap and as we're here this call wasn't a trap... */
2148
 
            fix_table_locked(BIF_P, tb);
2149
 
        } /* Otherwise keep it as is */
2150
2341
        ERTS_BIF_PREP_RET(result, ret);
2151
2342
        break;
2152
2343
    case DB_ERROR_SYSRES:
2156
2347
        ERTS_BIF_PREP_ERROR(result, BIF_P, BADARG);
2157
2348
        break;
2158
2349
    }
2159
 
 
2160
 
    db_unlock(tb, LCK_WRITE);
2161
 
 
2162
2350
    erts_match_set_release_result(BIF_P);
2163
 
 
2164
2351
    return result;
2165
2352
}
2166
2353
 
2208
2395
BIF_RETTYPE ets_info_1(BIF_ALIST_1)
2209
2396
{
2210
2397
    static Eterm fields[] = {am_protection, am_keypos, am_type, am_named_table,
2211
 
                                 am_node, am_size, am_name, am_owner, am_memory};
 
2398
        am_node, am_size, am_name, am_heir, am_owner, am_memory};
2212
2399
    Eterm results[sizeof(fields)/sizeof(Eterm)];
2213
2400
    DbTable* tb;
2214
2401
    Eterm res;
2215
2402
    int i;
2216
2403
    Eterm* hp;
 
2404
    /*Process* rp = NULL;*/
 
2405
    Eterm owner;
2217
2406
 
2218
2407
    if ((tb = db_get_table(BIF_P, BIF_ARG_1, DB_INFO, LCK_READ)) == NULL) {
2219
2408
        if (is_atom(BIF_ARG_1) || is_small(BIF_ARG_1)) {
2221
2410
        }
2222
2411
        BIF_ERROR(BIF_P, BADARG);
2223
2412
    }
 
2413
 
 
2414
    owner = tb->common.owner;
 
2415
 
 
2416
    /* If/when we implement lockless private tables:
 
2417
    if ((tb->common.status & DB_PRIVATE) && owner != BIF_P->id) {
 
2418
        db_unlock(tb, LCK_READ);
 
2419
        rp = erts_pid2proc_not_running(BIF_P, ERTS_PROC_LOCK_MAIN,
 
2420
                                       owner, ERTS_PROC_LOCK_MAIN);
 
2421
        if (rp == NULL) {
 
2422
            BIF_RET(am_undefined);
 
2423
        }
 
2424
        if (rp == ERTS_PROC_LOCK_BUSY) {
 
2425
            ERTS_BIF_YIELD1(bif_export[BIF_ets_info_1], BIF_P, BIF_ARG_1);
 
2426
        }
 
2427
        if ((tb = db_get_table(BIF_P, BIF_ARG_1, DB_INFO, LCK_READ)) == NULL
 
2428
            || tb->common.owner != owner) {
 
2429
            if (BIF_P != rp)
 
2430
                erts_smp_proc_unlock(rp, ERTS_PROC_LOCK_MAIN);
 
2431
            if (is_atom(BIF_ARG_1) || is_small(BIF_ARG_1)) {
 
2432
                BIF_RET(am_undefined);
 
2433
            }
 
2434
            BIF_ERROR(BIF_P, BADARG);
 
2435
        }
 
2436
    }*/
2224
2437
    for (i = 0; i < sizeof(fields)/sizeof(Eterm); i++) {
2225
2438
        results[i] = table_info(BIF_P, tb, fields[i]);
2226
2439
        ASSERT(is_value(results[i]));
2227
2440
    }
2228
2441
    db_unlock(tb, LCK_READ);
2229
2442
 
 
2443
    /*if (rp != NULL && rp != BIF_P)
 
2444
        erts_smp_proc_unlock(rp, ERTS_PROC_LOCK_MAIN);*/
 
2445
 
2230
2446
    hp = HAlloc(BIF_P, 5*sizeof(fields)/sizeof(Eterm));
2231
2447
    res = NIL;
2232
2448
    for (i = 0; i < sizeof(fields)/sizeof(Eterm); i++) {
2422
2638
 
2423
2639
    meta_pid_to_tab->common.id = NIL;
2424
2640
    meta_pid_to_tab->common.the_name = am_true;
2425
 
    meta_pid_to_tab->common.status = (DB_NORMAL | DB_BAG | DB_LHASH | 
2426
 
                                      DB_PUBLIC);
 
2641
    meta_pid_to_tab->common.status = (DB_NORMAL | DB_BAG | DB_PUBLIC | DB_FINE_LOCKED);
2427
2642
#ifdef ERTS_SMP
2428
2643
    meta_pid_to_tab->common.type
2429
2644
        = meta_pid_to_tab->common.status & ERTS_ETS_TABLE_TYPES;
2430
2645
    /* Note, 'type' is *read only* from now on... */
 
2646
    meta_pid_to_tab->common.is_thread_safe = 0;
2431
2647
#endif
2432
2648
    meta_pid_to_tab->common.keypos = 1;
2433
2649
    meta_pid_to_tab->common.owner  = NIL;
2434
 
    meta_pid_to_tab->common.nitems = 0;
 
2650
    erts_smp_atomic_init(&meta_pid_to_tab->common.nitems, 0);
2435
2651
    meta_pid_to_tab->common.slot   = -1;
2436
2652
    meta_pid_to_tab->common.meth   = &db_hash;
2437
2653
 
2438
 
    db_init_lock(meta_pid_to_tab, "meta_pid_to_tab");
 
2654
    erts_refc_init(&meta_pid_to_tab->common.ref, 1);
 
2655
    erts_refc_init(&meta_pid_to_tab->common.fixref, 0);
 
2656
    /* Neither rwlock or fixlock used
 
2657
    db_init_lock(meta_pid_to_tab, "meta_pid_to_tab", "meta_pid_to_tab_FIX");*/
2439
2658
 
2440
2659
    if (db_create_hash(NULL, meta_pid_to_tab) != DB_ERROR_NONE) {
2441
2660
        erl_exit(1,"Unable to create ets metadata tables.");
2451
2670
 
2452
2671
    meta_pid_to_fixed_tab->common.id = NIL;
2453
2672
    meta_pid_to_fixed_tab->common.the_name = am_true;
2454
 
    meta_pid_to_fixed_tab->common.status = (DB_NORMAL | DB_BAG | DB_LHASH | 
2455
 
                                            DB_PUBLIC);
 
2673
    meta_pid_to_fixed_tab->common.status = (DB_NORMAL | DB_BAG | DB_PUBLIC | DB_FINE_LOCKED);
2456
2674
#ifdef ERTS_SMP
2457
2675
    meta_pid_to_fixed_tab->common.type
2458
2676
        = meta_pid_to_fixed_tab->common.status & ERTS_ETS_TABLE_TYPES;
2459
2677
    /* Note, 'type' is *read only* from now on... */
 
2678
    meta_pid_to_fixed_tab->common.is_thread_safe = 0;
2460
2679
#endif
2461
2680
    meta_pid_to_fixed_tab->common.keypos = 1;
2462
2681
    meta_pid_to_fixed_tab->common.owner  = NIL;
2463
 
    meta_pid_to_fixed_tab->common.nitems = 0;
 
2682
    erts_smp_atomic_init(&meta_pid_to_fixed_tab->common.nitems, 0);
2464
2683
    meta_pid_to_fixed_tab->common.slot   = -1;
2465
2684
    meta_pid_to_fixed_tab->common.meth   = &db_hash;
2466
2685
 
2467
 
    db_init_lock(meta_pid_to_fixed_tab, "meta_pid_to_fixed_tab");
 
2686
    erts_refc_init(&meta_pid_to_fixed_tab->common.ref, 1);
 
2687
    erts_refc_init(&meta_pid_to_fixed_tab->common.fixref, 0);
 
2688
    /* Neither rwlock or fixlock used
 
2689
    db_init_lock(meta_pid_to_fixed_tab, "meta_pid_to_fixed_tab", "meta_pid_to_fixed_tab_FIX");*/
2468
2690
 
2469
2691
    if (db_create_hash(NULL, meta_pid_to_fixed_tab) != DB_ERROR_NONE) {
2470
2692
        erl_exit(1,"Unable to create ets metadata tables.");
2556
2778
{
2557
2779
    ASSERT(state->slots.clean_ix <= state->slots.ix);
2558
2780
    if (state->slots.clean_ix < state->slots.ix) {
2559
 
        db_lock(meta_pid_to_tab, LCK_WRITE);
 
2781
        db_meta_lock(meta_pid_to_tab, LCK_WRITE_REC);
2560
2782
        if (state->slots.size < ARRAY_CHUNK
2561
2783
            && state->slots.ix == state->slots.size) {
2562
2784
            Eterm dummy;
2563
 
            db_erase_hash(NULL,meta_pid_to_tab,pid,&dummy);
 
2785
            db_erase_hash(meta_pid_to_tab,pid,&dummy);
2564
2786
        }
2565
2787
        else {
2566
2788
            int ix;
2570
2792
                                    pid,
2571
2793
                                    state->slots.arr[ix]);
2572
2794
        }
2573
 
        db_unlock(meta_pid_to_tab, LCK_WRITE);
 
2795
        db_meta_unlock(meta_pid_to_tab, LCK_WRITE_REC);
2574
2796
        state->slots.clean_ix = state->slots.ix;
2575
2797
    }
2576
2798
}
2580
2802
{
2581
2803
    ASSERT(state->slots.clean_ix <= state->slots.ix);
2582
2804
    if (state->slots.clean_ix < state->slots.ix) {
2583
 
        db_lock(meta_pid_to_fixed_tab, LCK_WRITE);
 
2805
        db_meta_lock(meta_pid_to_fixed_tab, LCK_WRITE_REC);
2584
2806
        if (state->slots.size < ARRAY_CHUNK
2585
2807
            && state->slots.ix == state->slots.size) {
2586
2808
            Eterm dummy;
2587
 
            db_erase_hash(NULL,meta_pid_to_fixed_tab,pid,&dummy);
 
2809
            db_erase_hash(meta_pid_to_fixed_tab,pid,&dummy);
2588
2810
        }
2589
2811
        else {
2590
2812
            int ix;
2594
2816
                                    pid,
2595
2817
                                    state->slots.arr[ix]);
2596
2818
        }
2597
 
        db_unlock(meta_pid_to_fixed_tab, LCK_WRITE);
 
2819
        db_meta_unlock(meta_pid_to_fixed_tab, LCK_WRITE_REC);
2598
2820
        state->slots.clean_ix = state->slots.ix;
2599
2821
    }
2600
2822
}
2601
2823
 
 
2824
/* In: Table LCK_WRITE
 
2825
** Return TRUE : ok, table not mine and NOT locked anymore.
 
2826
** Return FALSE: failed, table still mine (LCK_WRITE)
 
2827
*/
 
2828
static int give_away_to_heir(Process* p, DbTable* tb)
 
2829
{
 
2830
    Process* to_proc;
 
2831
    ErtsProcLocks to_locks = ERTS_PROC_LOCK_MAIN;
 
2832
    Eterm buf[5];
 
2833
    Eterm to_pid;
 
2834
    Eterm heir_data;
 
2835
 
 
2836
    ASSERT(tb->common.owner == p->id);
 
2837
    ASSERT(is_internal_pid(tb->common.heir));
 
2838
    ASSERT(tb->common.heir != p->id);
 
2839
retry:
 
2840
    to_pid = tb->common.heir;
 
2841
    to_proc = erts_pid2proc_opt(p, ERTS_PROC_LOCK_MAIN,
 
2842
                                to_pid, to_locks,
 
2843
                                ERTS_P2P_FLG_TRY_LOCK);
 
2844
    if (to_proc == ERTS_PROC_LOCK_BUSY) {
 
2845
        db_ref(tb); /* while unlocked */
 
2846
        db_unlock(tb,LCK_WRITE);    
 
2847
        to_proc = erts_pid2proc(p, ERTS_PROC_LOCK_MAIN,
 
2848
                                to_pid, to_locks);    
 
2849
        db_lock(tb,LCK_WRITE);
 
2850
        tb = db_unref(tb);
 
2851
        ASSERT(tb != NULL);
 
2852
    
 
2853
        if (tb->common.owner != p->id) {
 
2854
            if (to_proc != NULL ) {
 
2855
                erts_smp_proc_unlock(to_proc, to_locks);
 
2856
            }
 
2857
            db_unlock(tb,LCK_WRITE);
 
2858
            return !0; /* ok, someone already gave my table away */
 
2859
        }
 
2860
        if (tb->common.heir != to_pid) {  /* someone changed the heir */ 
 
2861
            if (to_proc != NULL ) {
 
2862
                erts_smp_proc_unlock(to_proc, to_locks);
 
2863
            }
 
2864
            if (to_pid == p->id || to_pid == am_none) {
 
2865
                return 0; /* no real heir, table still mine */
 
2866
            }
 
2867
            goto retry;
 
2868
        }
 
2869
    }
 
2870
    if (to_proc == NULL) {
 
2871
        return 0; /* heir not alive, table still mine */
 
2872
    }
 
2873
    if (erts_cmp_timeval(&to_proc->started, &tb->common.heir_started) != 0) {
 
2874
        erts_smp_proc_unlock(to_proc, to_locks);
 
2875
        return 0; /* heir dead and pid reused, table still mine */
 
2876
    }
 
2877
    db_meta_lock(meta_pid_to_tab, LCK_WRITE_REC);
 
2878
    db_erase_bag_exact2(meta_pid_to_tab, tb->common.owner,
 
2879
                        make_small(tb->common.slot));
 
2880
    
 
2881
    to_proc->flags |= F_USING_DB;
 
2882
    tb->common.owner = to_pid;
 
2883
    
 
2884
    db_put_hash(meta_pid_to_tab,
 
2885
                TUPLE2(buf,to_pid,make_small(tb->common.slot)),
 
2886
                0);
 
2887
    db_meta_unlock(meta_pid_to_tab, LCK_WRITE_REC);
 
2888
    
 
2889
    db_unlock(tb,LCK_WRITE);
 
2890
    heir_data = tb->common.heir_data;
 
2891
    if (!is_immed(heir_data)) {
 
2892
        Eterm* tpv = DBTERM_BUF((DbTerm*)heir_data); /* tuple_val */
 
2893
        ASSERT(arityval(*tpv) == 1);
 
2894
        heir_data = tpv[1];
 
2895
    }
 
2896
    erts_send_message(p, to_proc, &to_locks,
 
2897
                      TUPLE4(buf, am_ETS_TRANSFER, tb->common.id, p->id, heir_data), 
 
2898
                      0);
 
2899
    erts_smp_proc_unlock(to_proc, to_locks);
 
2900
    return !0;
 
2901
}
 
2902
 
2602
2903
/*
2603
2904
 * erts_db_process_exiting() is called when a process terminates.
2604
2905
 * It returns 0 when completely done, and !0 when it wants to
2627
2928
        switch (state->op) {
2628
2929
        case ErtsDbProcCleanupOpGetTables:
2629
2930
            state->slots.size = ARRAY_CHUNK;
2630
 
            db_lock(meta_pid_to_tab, LCK_READ);
 
2931
            db_meta_lock(meta_pid_to_tab, LCK_READ);
2631
2932
            ret = db_get_element_array(meta_pid_to_tab,
2632
2933
                                       pid,
2633
2934
                                       2,
2634
2935
                                       state->slots.arr,
2635
2936
                                       &state->slots.size);
2636
 
            db_unlock(meta_pid_to_tab, LCK_READ);
 
2937
            db_meta_unlock(meta_pid_to_tab, LCK_READ);
2637
2938
            if (ret == DB_ERROR_BADKEY) {
2638
2939
                /* Done with tables; now fixations */
2639
2940
                state->progress = ErtsDbProcCleanupProgressFixations;
2664
2965
                    db_lock_take_over_ref(tb, LCK_WRITE);
2665
2966
                    /* Ownership may have changed since
2666
2967
                       we looked up the table. */
2667
 
                    if (tb->common.owner != pid)
2668
 
                        do_yield = 0;
 
2968
                    if (tb->common.owner != pid) {
 
2969
                        do_yield = 0;
 
2970
                        db_unlock(tb, LCK_WRITE);
 
2971
                    }
 
2972
                    else if (tb->common.heir != am_none
 
2973
                             && tb->common.heir != pid
 
2974
                             && give_away_to_heir(c_p, tb)) {
 
2975
                        do_yield = 0;
 
2976
                    }
2669
2977
                    else {
2670
2978
                        int first_call;
2671
2979
#ifdef HARDDEBUG
2676
2984
#endif
2677
2985
                        first_call = (tb->common.status & DB_DELETE) == 0;
2678
2986
                        if (first_call) {
2679
 
                            first_call = 1;
2680
 
                            
2681
2987
                            /* Clear all access bits. */
2682
2988
                            tb->common.status &= ~(DB_PROTECTED
2683
2989
                                                   | DB_PUBLIC
2687
2993
                            if (is_atom(tb->common.id))
2688
2994
                                remove_named_tab(tb->common.id);
2689
2995
 
 
2996
                            free_heir_data(tb);
2690
2997
                            free_fixations_locked(tb);
2691
2998
                        }
2692
2999
 
2693
3000
                        do_yield = free_table_cont(c_p, tb, first_call, 0);
2694
 
                    }
2695
 
                    db_unlock(tb, LCK_WRITE);
 
3001
                        db_unlock(tb, LCK_WRITE);
 
3002
                    }               
2696
3003
                    if (do_yield)
2697
3004
                        goto yield;
2698
3005
                }
2707
3014
 
2708
3015
        case ErtsDbProcCleanupOpGetFixations:
2709
3016
            state->slots.size = ARRAY_CHUNK;
2710
 
            db_lock(meta_pid_to_fixed_tab, LCK_READ);
 
3017
            db_meta_lock(meta_pid_to_fixed_tab, LCK_READ);
2711
3018
            ret = db_get_element_array(meta_pid_to_fixed_tab, 
2712
3019
                                       pid,
2713
3020
                                       2,
2714
3021
                                       state->slots.arr,
2715
3022
                                       &state->slots.size);
2716
 
            db_unlock(meta_pid_to_fixed_tab, LCK_READ);
 
3023
            db_meta_unlock(meta_pid_to_fixed_tab, LCK_READ);
2717
3024
 
2718
3025
            if (ret == DB_ERROR_BADKEY) {
2719
3026
                /* Done */
2742
3049
                meta_main_tab_unlock(ix);
2743
3050
                if (tb) {
2744
3051
                    int reds;
2745
 
                    DbFixation **pp;
 
3052
                    DbFixation** pp;
2746
3053
 
2747
 
                    db_lock_take_over_ref(tb, LCK_WRITE);
 
3054
                    db_lock_take_over_ref(tb, LCK_WRITE_REC);
 
3055
                    #ifdef ERTS_SMP
 
3056
                    erts_smp_mtx_lock(&tb->common.fixlock);
 
3057
                    #endif
2748
3058
                    reds = 10;
2749
 
 
2750
 
                    for (pp = &(tb->common.fixations);
2751
 
                         *pp;
2752
 
                         pp = &((*pp)->next)) {
 
3059
                    
 
3060
                    for (pp = &tb->common.fixations; *pp != NULL;
 
3061
                          pp = &(*pp)->next) {
2753
3062
                        if ((*pp)->pid == pid) {
2754
 
                            DbFixation *fix = *pp;
2755
 
                            *pp = (*pp)->next;
 
3063
                            DbFixation* fix = *pp;
 
3064
                            erts_refc_add(&tb->common.fixref,-fix->counter,0);
 
3065
                            *pp = fix->next;
2756
3066
                            erts_db_free(ERTS_ALC_T_DB_FIXATION,
2757
 
                                         tb,
2758
 
                                         (void *) fix,
2759
 
                                         sizeof(DbFixation));
 
3067
                                         tb, fix, sizeof(DbFixation));
2760
3068
                            ERTS_ETS_MISC_MEM_ADD(-sizeof(DbFixation));
2761
3069
                            break;
2762
3070
                        }
2763
3071
                    }
2764
 
                    if (tb->common.fixations == NULL) {
2765
 
                        if (IS_HASH_TABLE(tb->common.status)) {
2766
 
                            db_unfix_table_hash(&(tb->hash));
2767
 
                            reds += 40;
2768
 
                        }
2769
 
                        tb->common.status &= ~DB_FIXED;
 
3072
                    #ifdef ERTS_SMP
 
3073
                    erts_smp_mtx_unlock(&tb->common.fixlock);
 
3074
                    #endif
 
3075
                    if (!IS_FIXED(tb) && IS_HASH_TABLE(tb->common.status)) {
 
3076
                        db_unfix_table_hash(&(tb->hash));
 
3077
                        reds += 40;
2770
3078
                    }
2771
 
                    db_unlock(tb, LCK_WRITE);
 
3079
                    db_unlock(tb, LCK_WRITE_REC);
2772
3080
                    BUMP_REDS(c_p, reds);
2773
3081
                }
2774
3082
                state->slots.ix++;
2819
3127
    return !0;
2820
3128
}
2821
3129
 
2822
 
/*  SMP note: table must be WRITE locked */
 
3130
/*  SMP note: table only need to be LCK_READ locked */
2823
3131
static void fix_table_locked(Process* p, DbTable* tb)
2824
3132
{
2825
3133
    DbFixation *fix;
2826
 
    Eterm dummy;
2827
3134
    Eterm meta_tuple[3];
2828
3135
 
2829
 
    if (!(tb->common.status & DB_FIXED)) { 
2830
 
        tb->common.status |= DB_FIXED;
 
3136
#ifdef ERTS_SMP
 
3137
    erts_smp_mtx_lock(&tb->common.fixlock);
 
3138
#endif
 
3139
    erts_refc_inc(&tb->common.fixref,1);
 
3140
    fix = tb->common.fixations;
 
3141
    if (fix == NULL) { 
2831
3142
        get_now(&(tb->common.megasec),
2832
3143
                &(tb->common.sec), 
2833
3144
                &(tb->common.microsec));
2834
3145
    }
2835
 
    for (fix = tb->common.fixations; fix != NULL; fix = fix->next) {
2836
 
        if (fix->pid == p->id) {
2837
 
            ++(fix->counter);
2838
 
            break;
 
3146
    else {
 
3147
        for (; fix != NULL; fix = fix->next) {
 
3148
            if (fix->pid == p->id) {
 
3149
                ++(fix->counter);
 
3150
#ifdef ERTS_SMP
 
3151
                erts_smp_mtx_unlock(&tb->common.fixlock);
 
3152
#endif
 
3153
                return;
 
3154
            }
2839
3155
        }
2840
3156
    }
2841
 
    if (fix == NULL) {
2842
 
        fix = (DbFixation *) erts_db_alloc(ERTS_ALC_T_DB_FIXATION,
2843
 
                                           tb, sizeof(DbFixation));
2844
 
        ERTS_ETS_MISC_MEM_ADD(sizeof(DbFixation));
2845
 
        fix->pid = p->id;
2846
 
        fix->counter = 1;
2847
 
        fix->next = tb->common.fixations;
2848
 
        tb->common.fixations = fix;
2849
 
        p->flags |= F_USING_DB;        
2850
 
        db_lock(meta_pid_to_fixed_tab, LCK_WRITE);
2851
 
        if (db_put_hash(NULL,meta_pid_to_fixed_tab,
2852
 
                        TUPLE2(meta_tuple, 
2853
 
                               p->id, 
2854
 
                               make_small(tb->common.slot)),
2855
 
                        &dummy)
2856
 
            != DB_ERROR_NONE) {
2857
 
            erl_exit(1,"Could not insert ets metadata"
2858
 
                     " in safe_fixtable.");
2859
 
        }       
2860
 
        db_unlock(meta_pid_to_fixed_tab, LCK_WRITE);
2861
 
    }
 
3157
    fix = (DbFixation *) erts_db_alloc(ERTS_ALC_T_DB_FIXATION,
 
3158
                                       tb, sizeof(DbFixation));
 
3159
    ERTS_ETS_MISC_MEM_ADD(sizeof(DbFixation));
 
3160
    fix->pid = p->id;
 
3161
    fix->counter = 1;
 
3162
    fix->next = tb->common.fixations;
 
3163
    tb->common.fixations = fix;
 
3164
#ifdef ERTS_SMP
 
3165
    erts_smp_mtx_unlock(&tb->common.fixlock);
 
3166
#endif
 
3167
    p->flags |= F_USING_DB;        
 
3168
    db_meta_lock(meta_pid_to_fixed_tab, LCK_WRITE_REC);
 
3169
    if (db_put_hash(meta_pid_to_fixed_tab,
 
3170
                    TUPLE2(meta_tuple, p->id, make_small(tb->common.slot)),
 
3171
                    0) != DB_ERROR_NONE) {
 
3172
        erl_exit(1,"Could not insert ets metadata in safe_fixtable.");
 
3173
    }   
 
3174
    db_meta_unlock(meta_pid_to_fixed_tab, LCK_WRITE_REC);
2862
3175
}
2863
3176
 
2864
 
/*  SMP note: table must be WRITE locked */
 
3177
/*  SMP note: hash table need to be at least LCK_WRITE_REC locked
 
3178
              tree need only to be LCK_READ */
2865
3179
static void unfix_table_locked(Process* p,  DbTable* tb)
2866
3180
{
2867
 
    DbFixation **pp;
2868
 
    DbFixation *fix;
2869
 
    
2870
 
    for (pp = &(tb->common.fixations); *pp != NULL; pp = &((*pp)->next)) {
 
3181
    DbFixation** pp;
 
3182
 
 
3183
#ifdef ERTS_SMP
 
3184
    erts_smp_mtx_lock(&tb->common.fixlock);
 
3185
#endif
 
3186
    for (pp = &tb->common.fixations; *pp != NULL; pp = &(*pp)->next) {
2871
3187
        if ((*pp)->pid == p->id) {
2872
 
            --((*pp)->counter);
2873
 
            ASSERT((*pp)->counter >= 0);
2874
 
            if ((*pp)->counter == 0) {
2875
 
                fix = *pp;
2876
 
                *pp = (*pp)->next;
2877
 
                db_lock(meta_pid_to_fixed_tab, LCK_WRITE);
2878
 
                db_erase_bag_exact2(meta_pid_to_fixed_tab,
2879
 
                                    p->id,
2880
 
                                    make_small(tb->common.slot));
2881
 
                db_unlock(meta_pid_to_fixed_tab, LCK_WRITE);
2882
 
                erts_db_free(ERTS_ALC_T_DB_FIXATION,
2883
 
                             tb, (void *) fix, sizeof(DbFixation));
2884
 
                ERTS_ETS_MISC_MEM_ADD(-sizeof(DbFixation));
 
3188
            DbFixation* fix = *pp;
 
3189
            erts_refc_dec(&tb->common.fixref,0);
 
3190
            --(fix->counter);
 
3191
            ASSERT(fix->counter >= 0);
 
3192
            if (fix->counter > 0) {
 
3193
                break;
2885
3194
            }
2886
 
            break;
 
3195
            *pp = fix->next;
 
3196
#ifdef ERTS_SMP
 
3197
            erts_smp_mtx_unlock(&tb->common.fixlock);
 
3198
#endif
 
3199
            db_meta_lock(meta_pid_to_fixed_tab, LCK_WRITE_REC);
 
3200
            db_erase_bag_exact2(meta_pid_to_fixed_tab,
 
3201
                                p->id, make_small(tb->common.slot));
 
3202
            db_meta_unlock(meta_pid_to_fixed_tab, LCK_WRITE_REC);
 
3203
            erts_db_free(ERTS_ALC_T_DB_FIXATION,
 
3204
                         tb, (void *) fix, sizeof(DbFixation));
 
3205
            ERTS_ETS_MISC_MEM_ADD(-sizeof(DbFixation));
 
3206
            goto unlocked;
2887
3207
        }
2888
3208
    }
2889
 
    if (tb->common.fixations == NULL) {
2890
 
        if (IS_HASH_TABLE(tb->common.status)) {
2891
 
            db_unfix_table_hash(&(tb->hash));
2892
 
        }
2893
 
        tb->common.status &= ~DB_FIXED;
 
3209
#ifdef ERTS_SMP
 
3210
    erts_smp_mtx_unlock(&tb->common.fixlock);
 
3211
#endif
 
3212
unlocked:
 
3213
 
 
3214
    if (!IS_FIXED(tb) && IS_HASH_TABLE(tb->common.status)) {
 
3215
        db_unfix_table_hash(&(tb->hash));
2894
3216
    }
2895
3217
}
2896
3218
 
2897
 
/* Assume that tb is locked (write) */
 
3219
/* Assume that tb is WRITE locked */
2898
3220
static void free_fixations_locked(DbTable *tb)
2899
3221
{
2900
3222
    DbFixation *fix;
2903
3225
    fix = tb->common.fixations;
2904
3226
    while (fix != NULL) {
2905
3227
        next_fix = fix->next;
2906
 
        db_lock(meta_pid_to_fixed_tab, LCK_WRITE);
 
3228
        db_meta_lock(meta_pid_to_fixed_tab, LCK_WRITE_REC);
2907
3229
        db_erase_bag_exact2(meta_pid_to_fixed_tab,
2908
3230
                            fix->pid,
2909
3231
                            make_small(tb->common.slot));
2910
 
        db_unlock(meta_pid_to_fixed_tab, LCK_WRITE);
 
3232
        db_meta_unlock(meta_pid_to_fixed_tab, LCK_WRITE_REC);
2911
3233
        erts_db_free(ERTS_ALC_T_DB_FIXATION,
2912
3234
                     tb, (void *) fix, sizeof(DbFixation));
2913
3235
        ERTS_ETS_MISC_MEM_ADD(-sizeof(DbFixation));
2917
3239
    tb->common.fixations = NULL;
2918
3240
}
2919
3241
 
 
3242
static void set_heir(Process* me, DbTable* tb, Eterm heir, Eterm heir_data)
 
3243
{       
 
3244
    tb->common.heir = heir;
 
3245
    if (heir == am_none) {
 
3246
        return;
 
3247
    }
 
3248
    if (heir == me->id) {
 
3249
        tb->common.heir_started = me->started;
 
3250
    }
 
3251
    else {
 
3252
        Process* heir_proc= erts_pid2proc_opt(me, ERTS_PROC_LOCK_MAIN, heir,
 
3253
                                              0, ERTS_P2P_FLG_SMP_INC_REFC);
 
3254
        if (heir_proc != NULL) {
 
3255
            tb->common.heir_started = heir_proc->started;
 
3256
            erts_smp_proc_dec_refc(heir_proc);
 
3257
        } else {
 
3258
            tb->common.heir = am_none;
 
3259
        }
 
3260
    }
 
3261
 
 
3262
    if (!is_immed(heir_data)) {
 
3263
        Eterm tmp[2];
 
3264
        /* Make a dummy 1-tuple around data to use db_get_term() */
 
3265
        heir_data = (Eterm) db_get_term(&tb->common, NULL, 0,
 
3266
                                        TUPLE1(tmp,heir_data));
 
3267
        ASSERT(!is_immed(heir_data));
 
3268
    }
 
3269
    tb->common.heir_data = heir_data;
 
3270
}
 
3271
 
 
3272
static void free_heir_data(DbTable* tb)
 
3273
{
 
3274
    if (tb->common.heir != am_none && !is_immed(tb->common.heir_data)) {
 
3275
        DbTerm* p = (DbTerm*) tb->common.heir_data;
 
3276
        db_free_term_data(p);
 
3277
        erts_db_free(ERTS_ALC_T_DB_TERM, tb, (void *)p,
 
3278
                     sizeof(DbTerm) + (p->size-1)*sizeof(Eterm));
 
3279
    }
 
3280
    #ifdef DEBUG
 
3281
    tb->common.heir_data = am_undefined;
 
3282
    #endif
 
3283
}
2920
3284
 
2921
3285
static BIF_RETTYPE ets_delete_trap(Process *p, Eterm cont)
2922
3286
{
2956
3320
    }
2957
3321
#endif
2958
3322
 
2959
 
    result = tb->common.meth->db_free_table_continue(tb, first);
 
3323
    result = tb->common.meth->db_free_table_continue(tb);
2960
3324
 
2961
3325
    if (result == 0) {
2962
3326
#ifdef HARDDEBUG
2978
3342
        meta_main_tab_unlock(tb->common.slot);
2979
3343
 
2980
3344
        if (clean_meta_tab) {
2981
 
            db_lock(meta_pid_to_tab, LCK_WRITE);
 
3345
            db_meta_lock(meta_pid_to_tab, LCK_WRITE_REC);
2982
3346
            db_erase_bag_exact2(meta_pid_to_tab,tb->common.owner,
2983
3347
                                make_small(tb->common.slot));
2984
 
            db_unlock(meta_pid_to_tab, LCK_WRITE);
 
3348
            db_meta_unlock(meta_pid_to_tab, LCK_WRITE_REC);
2985
3349
        }
2986
3350
        db_unref(tb);
2987
3351
        BUMP_REDS(p, 100);
2994
3358
    Eterm ret = THE_NON_VALUE;
2995
3359
 
2996
3360
    if (What == am_size) {
2997
 
        ret = make_small(tb->common.nitems);
 
3361
        ret = make_small(erts_smp_atomic_read(&tb->common.nitems));
2998
3362
    } else if (What == am_type) {
2999
3363
        if (tb->common.status & DB_SET)  {
3000
3364
            ret = am_set;
3014
3378
        ret = erts_make_integer(words, p);
3015
3379
    } else if (What == am_owner) {
3016
3380
        ret = tb->common.owner;
 
3381
    } else if (What == am_heir) {
 
3382
        ret = tb->common.heir;
3017
3383
    } else if (What == am_protection) {
3018
3384
        if (tb->common.status & DB_PRIVATE) 
3019
3385
            ret = am_private;
3036
3402
        print_table(ERTS_PRINT_STDOUT, NULL, 1, tb);
3037
3403
        ret = am_true;
3038
3404
    } else if (What == am_atom_put("fixed",5)) { 
3039
 
        if (tb->common.status & DB_FIXED)
 
3405
        if (IS_FIXED(tb))
3040
3406
            ret = am_true;
3041
3407
        else
3042
3408
            ret = am_false;
3043
3409
    } else if (What == am_atom_put("kept_objects",12)) {
3044
 
        ret = make_small(tb->common.kept_items);
3045
 
    } else if (What == am_atom_put("safe_fixed",10)) { 
3046
 
        if (tb->common.fixations != NULL) {
 
3410
        ret = make_small(IS_HASH_TABLE(tb->common.status)
 
3411
                         ? db_kept_items_hash(&tb->hash) : 0);
 
3412
    } else if (What == am_atom_put("safe_fixed",10)) {
 
3413
#ifdef ERTS_SMP
 
3414
        erts_smp_mtx_lock(&tb->common.fixlock);
 
3415
#endif
 
3416
        if (IS_FIXED(tb)) {
3047
3417
            Uint need;
3048
3418
            Eterm *hp;
3049
3419
            Eterm tpl, lst;
3069
3439
        } else {
3070
3440
            ret = am_false;
3071
3441
        }
 
3442
#ifdef ERTS_SMP
 
3443
        erts_smp_mtx_unlock(&tb->common.fixlock);
 
3444
#endif
 
3445
    } else if (What == am_atom_put("buckets",7)) {
 
3446
        ret = make_small(erts_smp_atomic_read(IS_HASH_TABLE(tb->common.status)
 
3447
                                              ? &tb->hash.nactive
 
3448
                                              : &tb->common.nitems));
3072
3449
    }
3073
3450
    return ret;
3074
3451
}
3080
3457
 
3081
3458
    tb->common.meth->db_print(to, to_arg, show, tb);
3082
3459
 
3083
 
    erts_print(to, to_arg, "Objects: %d\n", tb->common.nitems);
 
3460
    erts_print(to, to_arg, "Objects: %d\n", (int)erts_smp_atomic_read(&tb->common.nitems));
3084
3461
    erts_print(to, to_arg, "Words: %bpu\n",
3085
3462
               (Uint) ((erts_smp_atomic_read(&tb->common.memory_size)
3086
3463
                        + sizeof(Uint)