~ubuntu-branches/debian/squeeze/erlang/squeeze

« back to all changes in this revision

Viewing changes to erts/emulator/beam/dist.c

  • Committer: Bazaar Package Importer
  • Author(s): Erlang Packagers, Sergei Golovan
  • Date: 2006-12-03 17:07:44 UTC
  • mfrom: (2.1.11 feisty)
  • Revision ID: james.westby@ubuntu.com-20061203170744-rghjwupacqlzs6kv
Tags: 1:11.b.2-4
[ Sergei Golovan ]
Fixed erlang-base and erlang-base-hipe prerm scripts.

Show diffs side-by-side

added added

removed removed

Lines of Context:
38
38
 
39
39
#define PASS_THROUGH 'p'        /* This code should go */
40
40
 
41
 
byte *dist_buf;
42
 
int dist_buf_size;
43
 
 
44
41
/* distribution trap functions */
45
42
Export* dsend2_trap = NULL;
46
43
Export* dsend3_trap = NULL;
53
50
Export* dmonitor_p_trap = NULL;
54
51
 
55
52
/* local variables */
56
 
static Process *net_kernel;    /* we don't want to look it up */
57
 
static Eterm* dmem;
58
53
 
59
 
#define DMEM_SIZE (14+2+REF_THING_SIZE) /* Enough to hold any control msg */
 
54
static int node_is_alive;
60
55
 
61
56
/* forward declarations */
62
57
 
63
58
static int clear_dist_entry(DistEntry*);
64
 
static int pack_and_send(DistEntry*, Eterm, Eterm, int);
 
59
static int pack_and_send(Process*, Uint32, DistEntry*, Eterm, Eterm, int);
65
60
 
66
61
static Uint no_caches;
67
62
 
 
63
static void
 
64
init_alive(void)
 
65
{
 
66
    node_is_alive = 0;
 
67
}
 
68
 
 
69
static void
 
70
set_alive(void)
 
71
{
 
72
    node_is_alive = 1;
 
73
}
 
74
 
 
75
static void
 
76
set_not_alive(void)
 
77
{
 
78
    node_is_alive = 0;
 
79
}
 
80
 
 
81
static ERTS_INLINE int
 
82
is_alive(void)
 
83
{
 
84
    return node_is_alive;
 
85
}
 
86
 
 
87
int erts_is_alive(void)
 
88
{
 
89
    return is_alive();
 
90
}
 
91
 
68
92
void clear_cache(DistEntry *dep)
69
93
{
70
94
    ErlCache* cp;
120
144
*/
121
145
 
122
146
 
123
 
int is_node_name(char *ptr, int len)
 
147
static int is_node_name(char *ptr, int len)
124
148
{
125
149
   int c = '\0';                /* suppress use-before-set warning */
126
150
   int pos = 0;
154
178
    if(is_not_atom(a))
155
179
        return 0;
156
180
    i = atom_val(a);
157
 
    ASSERT((i > 0) && (i < atom_table_size) &&  (atom_tab(i) != NULL));
158
 
    return is_node_name(atom_tab(i)->name, atom_tab(i)->len);
 
181
    ASSERT((i > 0) && (i < atom_table_size()) &&  (atom_tab(i) != NULL));
 
182
    return is_node_name((char*)atom_tab(i)->name, atom_tab(i)->len);
159
183
}
160
184
 
 
185
typedef struct {
 
186
    DistEntry *dep;
 
187
} NetExitsContext;
 
188
 
161
189
/* 
162
190
** This function is called when a distribution 
163
191
** port or process terminates
164
192
*/
165
 
 
166
 
int do_net_exits(DistEntry *dep)
167
 
{
168
 
    Eterm tup;
169
 
    Eterm name;
170
 
    Eterm item;
171
 
    ErlLink* lnk;
172
 
    Process *rp;
 
193
static void doit_monitor_net_exits(ErtsMonitor *mon, void *vnecp)
 
194
{
 
195
    Process *rp;
 
196
    ErtsMonitor *rmon;
 
197
    DistEntry *dep = ((NetExitsContext *) vnecp)->dep;
 
198
    Uint32 rp_locks = ERTS_PROC_LOCK_LINK;
 
199
 
 
200
    rp = erts_pid2proc(NULL, 0, mon->pid, rp_locks);
 
201
    if (!rp) {
 
202
        ASSERT(0); /* ? */
 
203
        goto done;
 
204
    }
 
205
 
 
206
    if (mon->type == MON_ORIGIN) {
 
207
        /* local pid is beeing monitored */
 
208
        rmon = erts_remove_monitor(&(rp->monitors),mon->ref);
 
209
        /* ASSERT(rmon != NULL); nope, can happen during process exit */
 
210
        if (rmon != NULL) {
 
211
            erts_destroy_monitor(rmon);
 
212
        }
 
213
    } else {
 
214
        Eterm lhp[3];
 
215
        Eterm watched;
 
216
        ASSERT(mon->type == MON_TARGET);
 
217
        rmon = erts_remove_monitor(&(rp->monitors),mon->ref);
 
218
        /* ASSERT(rmon != NULL); can happen during process exit */
 
219
        if (rmon != NULL) {
 
220
            ASSERT(is_atom(rmon->name) || is_nil(rmon->name));
 
221
            watched = (is_atom(rmon->name)
 
222
                       ? TUPLE2(lhp, rmon->name, dep->sysname)
 
223
                       : rmon->pid);
 
224
#ifdef ERTS_SMP
 
225
            rp_locks |= ERTS_PROC_LOCKS_MSG_SEND;
 
226
            erts_smp_proc_lock(rp, ERTS_PROC_LOCKS_MSG_SEND);
 
227
#endif
 
228
            erts_queue_monitor_message(rp, &rp_locks, mon->ref, am_process, 
 
229
                                       watched, am_noconnection);
 
230
            erts_destroy_monitor(rmon);
 
231
        }
 
232
    }
 
233
    erts_smp_proc_unlock(rp, rp_locks);
 
234
 done:
 
235
    erts_destroy_monitor(mon);
 
236
}
 
237
        
 
238
typedef struct {
 
239
    NetExitsContext *necp;
 
240
    ErtsLink *lnk;
 
241
} LinkNetExitsContext;
 
242
 
 
243
/* 
 
244
** This is the function actually doing the job of sending exit messages
 
245
** for links in a dist entry upon net_exit (the node goes down), NB,
 
246
** only process links, not node monitors are handled here, 
 
247
** they reside in a separate tree....
 
248
*/
 
249
static void doit_link_net_exits_sub(ErtsLink *sublnk, void *vlnecp)
 
250
{
 
251
    ErtsLink *lnk = ((LinkNetExitsContext *) vlnecp)->lnk; /* the local pid */
 
252
    ErtsLink *rlnk;
 
253
    Process *rp;
 
254
 
 
255
    ASSERT(lnk->type == LINK_PID);
 
256
    if (is_internal_pid(lnk->pid)) {
 
257
        int xres;
 
258
        Uint32 rp_locks = ERTS_PROC_LOCK_LINK|ERTS_PROC_LOCKS_XSIG_SEND;
 
259
 
 
260
        rp = erts_pid2proc(NULL, 0, lnk->pid, rp_locks);
 
261
        if (!rp) {
 
262
            goto done;
 
263
        }
 
264
 
 
265
        rlnk = erts_remove_link(&(rp->nlinks), sublnk->pid);
 
266
        xres = erts_send_exit_signal(NULL,
 
267
                                     sublnk->pid,
 
268
                                     rp,
 
269
                                     &rp_locks,
 
270
                                     am_noconnection,
 
271
                                     NIL,
 
272
                                     NULL,
 
273
                                     0);
 
274
 
 
275
        if (rlnk) {
 
276
            erts_destroy_link(rlnk);
 
277
            if (xres >= 0 && IS_TRACED_FL(rp, F_TRACE_PROCS)) {
 
278
                /* We didn't exit the process and it is traced */
 
279
                trace_proc(NULL, rp, am_getting_unlinked, sublnk->pid);
 
280
            }
 
281
        }
 
282
        erts_smp_proc_unlock(rp, rp_locks);
 
283
    }
 
284
 done:
 
285
    erts_destroy_link(sublnk);
 
286
 
 
287
}
 
288
    
 
289
 
 
290
 
 
291
 
 
292
 
 
293
/* 
 
294
** This function is called when a distribution 
 
295
** port or process terminates, once for each link on the high level, 
 
296
** it in turn traverses the link subtree for the specific link node...
 
297
*/
 
298
static void doit_link_net_exits(ErtsLink *lnk, void *vnecp)
 
299
{
 
300
    LinkNetExitsContext lnec = {(NetExitsContext *) vnecp, lnk};
 
301
    ASSERT(lnk->type == LINK_PID)
 
302
    erts_sweep_links(lnk->root, &doit_link_net_exits_sub, (void *) &lnec);
 
303
#ifdef DEBUG
 
304
    lnk->root = NULL;
 
305
#endif
 
306
    erts_destroy_link(lnk);
 
307
}
 
308
 
 
309
 
 
310
static void doit_node_link_net_exits(ErtsLink *lnk, void *vnecp)
 
311
{
 
312
    DistEntry *dep = ((NetExitsContext *) vnecp)->dep;
 
313
    Eterm name = dep->sysname;
 
314
    Process *rp;
 
315
    ErtsLink *rlnk;
 
316
    Uint i,n;
 
317
    ASSERT(lnk->type == LINK_NODE)
 
318
    if (is_internal_pid(lnk->pid)) {
 
319
        Uint32 rp_locks = ERTS_PROC_LOCK_LINK|ERTS_PROC_LOCKS_MSG_SEND;
 
320
        rp = erts_pid2proc(NULL, 0, lnk->pid, rp_locks);
 
321
        if (!rp) {
 
322
            goto done;
 
323
        }
 
324
        rlnk = erts_remove_link(&(rp->nlinks), name);
 
325
        if (rlnk != NULL) {
 
326
            ASSERT(is_atom(rlnk->pid) && (rlnk->type == LINK_NODE));
 
327
            erts_destroy_link(rlnk);
 
328
        }
 
329
        n = ERTS_LINK_ROOT_AS_UINT(lnk);
 
330
#ifdef ERTS_SMP
 
331
        /* Drop messages if receiver has a pending exit ... */
 
332
        if (!ERTS_PROC_PENDING_EXIT(rp))
 
333
#endif
 
334
        {
 
335
            for (i = 0; i < n; ++i) {
 
336
                ErlHeapFragment* bp;
 
337
                ErlOffHeap *ohp;
 
338
                Eterm tup;
 
339
                Eterm *hp = erts_alloc_message_heap(3,&bp,&ohp,rp,&rp_locks);
 
340
                tup = TUPLE2(hp, am_nodedown, name);
 
341
                erts_queue_message(rp, rp_locks, bp, tup, NIL);
 
342
            }
 
343
        }
 
344
        erts_smp_proc_unlock(rp, rp_locks);
 
345
    }
 
346
 done:
 
347
    erts_destroy_link(lnk);
 
348
}
 
349
 
 
350
        
 
351
/*
 
352
 * proc is currently running or exiting process.
 
353
 */
 
354
int erts_do_net_exits(DistEntry *dep)
 
355
{
173
356
    if (dep == erts_this_dist_entry) {  /* Net kernel has died (clean up!!) */
174
357
        DistEntry *tdep;
 
358
        (void) erts_smp_io_lock();
 
359
        erts_smp_mtx_lock(&erts_dist_table_mtx);
175
360
 
176
361
        /* KILL all port controllers */
177
362
        while(erts_visible_dist_entries || erts_hidden_dist_entries) {
183
368
                   && (erts_port[internal_port_index(tdep->cid)].status
184
369
                       & DISTRIBUTION)
185
370
                   && erts_port[internal_port_index(tdep->cid)].dist_entry);
 
371
            erts_smp_mtx_unlock(&erts_dist_table_mtx);
186
372
            /* will call do_net_exists !!! */
187
 
            do_exit_port(tdep->cid, tdep->cid, am_killed);
 
373
            erts_do_exit_port(tdep->cid, tdep->cid, am_killed);
 
374
            erts_smp_mtx_lock(&erts_dist_table_mtx);
188
375
        }
189
376
 
190
 
        net_kernel = NULL;
 
377
        erts_smp_mtx_unlock(&erts_dist_table_mtx);
 
378
 
 
379
        erts_smp_io_unlock();
 
380
        erts_smp_block_system(ERTS_BS_FLG_ALLOW_GC);
191
381
        erts_set_this_node(am_Noname, 0);
 
382
        set_not_alive();
 
383
        erts_smp_release_system();
 
384
 
192
385
    }
193
 
    else {
194
 
        lnk = dep->links;
195
 
        dep->links = NULL;
196
 
 
197
 
        while (lnk != NULL) {
198
 
            item = lnk->item;
199
 
            switch(lnk->type) {
200
 
            case LNK_LINK:
201
 
                if (is_internal_pid(item)) {
202
 
                    if ((rp = pid2proc(item)) == NULL)
203
 
                        break;
204
 
                    if (rp->flags & F_TRAPEXIT) {
205
 
                        ErlLink **rlinkpp = 
206
 
                            find_link(&rp->links, LNK_LINK, lnk->data, NIL);
207
 
                        del_link(rlinkpp);
208
 
                        deliver_exit_message(lnk->data, rp, am_noconnection);
209
 
                        if (IS_TRACED_FL(rp, F_TRACE_PROCS) 
210
 
                            && rlinkpp != NULL) {
211
 
                            trace_proc(NULL, rp, 
212
 
                                       am_getting_unlinked, lnk->data);
213
 
                        }
214
 
                    }
215
 
                    else
216
 
                        schedule_exit(rp, am_noconnection);
217
 
                }
218
 
                break;
219
 
 
220
 
            case LNK_LINK1:
221
 
                if (is_external_pid(item)
222
 
                    && external_pid_dist_entry(item) != erts_this_dist_entry) {
223
 
                   /* We are being monitored */
224
 
                   if ((rp = pid2proc(lnk->data)) == NULL)
225
 
                      break;
226
 
                   del_link(find_link_by_ref(&rp->links, lnk->ref));
227
 
                } else {
228
 
                   Eterm lhp[3];
229
 
                   Eterm watched;
230
 
                   /* We are monitoring */
231
 
                   if ((rp = pid2proc(item)) == NULL)
232
 
                      break;
233
 
                   ASSERT(is_pid(lnk->data) || is_atom(lnk->data));
234
 
                   watched = (is_atom(lnk->data)
235
 
                              ? TUPLE2(&lhp[0], lnk->data, dep->sysname)
236
 
                              : lnk->data);
237
 
                   queue_monitor_message(rp, lnk->ref, am_process, 
238
 
                                         watched, am_noconnection);
239
 
                   del_link(find_link_by_ref(&rp->links, lnk->ref));
240
 
                }
241
 
                break;
242
 
 
243
 
            case LNK_NODE:
244
 
                name = dep->sysname;
245
 
                if (is_internal_pid(item)) {
246
 
                    ErlHeapFragment* bp;
247
 
                    Eterm* hp;
248
 
                    if ((rp = pid2proc(item)) == NULL)
249
 
                        break;
250
 
                    del_link(find_link(&rp->links,LNK_NODE,name,NIL));
251
 
                    bp = new_message_buffer(3);
252
 
                    hp = bp->mem;
253
 
                    tup = TUPLE2(hp, am_nodedown, name);
254
 
                    queue_message_tt(rp, bp, tup, NIL);
255
 
                    if (SAVED_HEAP_TOP(rp) == NULL) {
256
 
                        SAVED_HEAP_TOP(rp) = HEAP_TOP(rp);
257
 
                        HEAP_TOP(rp) = HEAP_LIMIT(rp);
258
 
                    }
259
 
                    MSO(rp).overhead = HEAP_SIZE(rp);
260
 
                    BUMP_ALL_REDS(rp);
261
 
                }
262
 
                break;
263
 
                
264
 
            case LNK_OMON:
265
 
            case LNK_TMON:
266
 
            default:
267
 
                erl_exit(1, "bad link type in dist links\n");
268
 
            }
269
 
            del_link(&lnk);
270
 
        }
 
386
    else { /* recursive call via erts_do_exit_port() will end up here */
 
387
        NetExitsContext nec = {dep};
 
388
        ErtsLink *nlinks;
 
389
        ErtsLink *node_links;
 
390
        ErtsMonitor *monitors;
 
391
 
 
392
        ERTS_SMP_LC_ASSERT(erts_smp_lc_io_is_locked());
 
393
 
 
394
        erts_smp_dist_entry_lock(dep);
 
395
        monitors        = dep->monitors;
 
396
        nlinks          = dep->nlinks;
 
397
        node_links      = dep->node_links;
 
398
        dep->monitors   = NULL;
 
399
        dep->nlinks     = NULL;
 
400
        dep->node_links = NULL;
 
401
        erts_smp_dist_entry_unlock(dep);
 
402
 
 
403
        erts_sweep_monitors(monitors, &doit_monitor_net_exits, (void *) &nec);
 
404
        erts_sweep_links(nlinks, &doit_link_net_exits, (void *) &nec);
 
405
        erts_sweep_links(node_links, &doit_node_link_net_exits, (void *) &nec);
 
406
 
271
407
        clear_dist_entry(dep);
272
408
    }
273
409
    return 1;
281
417
 
282
418
void init_dist(void)
283
419
{
284
 
    net_kernel = NULL;
 
420
    init_alive();
285
421
 
286
422
    no_caches = 0;
287
423
 
288
 
    dist_buf = tmp_buf;   /* This is the buffer we encode into */
289
 
    dist_buf_size = TMP_BUF_SIZE - 20;
290
 
 
291
 
    dmem = (Eterm *) erts_alloc(ERTS_ALC_T_DMSG_BLD_BUF,
292
 
                                DMEM_SIZE * sizeof(Eterm));
293
 
 
294
424
    /* Lookup/Install all references to trap functions */
295
425
    dsend2_trap = trap_function(am_dsend,2);
296
426
    dsend3_trap = trap_function(am_dsend,3);
297
427
    /*    dsend_nosuspend_trap = trap_function(am_dsend_nosuspend,2);*/
298
428
    dlink_trap = trap_function(am_dlink,1);
299
429
    dunlink_trap = trap_function(am_dunlink,1);
300
 
    dmonitor_node_trap = trap_function(am_dmonitor_node,2);
 
430
    dmonitor_node_trap = trap_function(am_dmonitor_node,3);
301
431
    dgroup_leader_trap = trap_function(am_dgroup_leader,2);
302
432
    dexit_trap = trap_function(am_dexit, 2);
303
433
    dmonitor_p_trap = trap_function(am_dmonitor_p, 2);
307
437
{
308
438
    clear_cache(dep);
309
439
    erts_set_dist_entry_not_connected(dep);
310
 
    dep->links = NULL;
 
440
    erts_smp_dist_entry_lock(dep);
 
441
    dep->nlinks = NULL;
 
442
    dep->node_links = NULL;
 
443
    dep->monitors = NULL;
311
444
    dep->status = 0;
 
445
    erts_smp_dist_entry_unlock(dep);
312
446
    return 0;
313
447
}
314
448
 
315
449
/*
 
450
 * SMP NOTE on dist_*() functions:
 
451
 *
 
452
 * Requirements for usage of dist_*() functions:
 
453
 *   I/O lock, lock on dep has to be held, and if c_p != NULL, at least
 
454
 *   main lock has to be held on c_p.
 
455
 *
 
456
 * Also note that lock on dep will be released and reacquired,
 
457
 * and that lock(s) on c_p may be released and reacquired.
 
458
 *
 
459
 */
 
460
 
 
461
/*
316
462
** Send a DOP_LINK link message
317
463
*/
318
 
int dist_link(DistEntry *dep, Eterm local, Eterm remote)
319
 
{
320
 
    Eterm ctl = TUPLE3(dmem, make_small(DOP_LINK), local, remote);
321
 
 
322
 
    return pack_and_send(dep, ctl, THE_NON_VALUE, 0);
323
 
}
324
 
 
325
 
 
326
 
int dist_unlink(DistEntry *dep, Eterm local, Eterm remote)
327
 
{
328
 
    Eterm ctl = TUPLE3(dmem, make_small(DOP_UNLINK), local, remote);
329
 
 
330
 
    return pack_and_send(dep, ctl, THE_NON_VALUE, 0);
331
 
}
332
 
 
333
 
int dist_m_exit(DistEntry *dep, Eterm watcher, Eterm watched, 
 
464
/* SMP NOTE: See "SMP NOTE on dist_*() functions" above */
 
465
int dist_link(Process *c_p, Uint32 c_p_locks,
 
466
              DistEntry *dep, Eterm local, Eterm remote)
 
467
{
 
468
    Eterm ctl_heap[4];
 
469
    Eterm ctl = TUPLE3(&ctl_heap[0], make_small(DOP_LINK), local, remote);
 
470
 
 
471
    return pack_and_send(c_p, c_p_locks, dep, ctl, THE_NON_VALUE, 0);
 
472
}
 
473
 
 
474
 
 
475
/* SMP NOTE: See "SMP NOTE on dist_*() functions" above */
 
476
int dist_unlink(Process *c_p, Uint32 c_p_locks,
 
477
                DistEntry *dep, Eterm local, Eterm remote)
 
478
{
 
479
    Eterm ctl_heap[4];
 
480
    Eterm ctl = TUPLE3(&ctl_heap[0], make_small(DOP_UNLINK), local, remote);
 
481
 
 
482
    return pack_and_send(c_p, c_p_locks, dep, ctl, THE_NON_VALUE, 0);
 
483
}
 
484
 
 
485
 
 
486
/* A local process that's beeing monitored by a remote one exits. We send:
 
487
   {DOP_MONITOR_P_EXIT, Local pid or name, Remote pid, ref, reason},
 
488
   which is rather sad as only the ref is needed, no pid's... */
 
489
/* SMP NOTE: See "SMP NOTE on dist_*() functions" above */
 
490
int dist_m_exit(Process *c_p, Uint32 c_p_locks,
 
491
                DistEntry *dep, Eterm watcher, Eterm watched, 
334
492
                Eterm ref, Eterm reason)
335
493
{
336
494
    Eterm ctl;
337
 
    Eterm *hp = dmem;
 
495
    Eterm ctl_heap[6];
338
496
 
339
 
    ctl = TUPLE5(hp, make_small(DOP_MONITOR_P_EXIT),
 
497
    ctl = TUPLE5(&ctl_heap[0], make_small(DOP_MONITOR_P_EXIT),
340
498
                 watched, watcher, ref, reason);
341
499
 
342
 
    del_link(find_link_by_ref(&(dep->links), ref));
 
500
#ifdef DEBUG
 
501
    ASSERT(!erts_lookup_monitor(dep->monitors, ref));
 
502
#endif
343
503
 
344
 
    return pack_and_send(dep, ctl, THE_NON_VALUE, 1);
 
504
    return pack_and_send(c_p, c_p_locks, dep, ctl, THE_NON_VALUE, 1);
345
505
}
346
 
 
347
 
int dist_monitor(DistEntry *dep, Eterm watcher, Eterm watched, Eterm ref)
 
506
/* We want to monitor a process (named or unnamed) on another node, we send:
 
507
   {DOP_MONITOR_P, Local pid, Remote pid or name, Ref}, which is exactly what's
 
508
   needed on the other side... */
 
509
/* SMP NOTE: See "SMP NOTE on dist_*() functions" above */
 
510
int dist_monitor(Process *c_p, Uint32 c_p_locks,
 
511
                 DistEntry *dep, Eterm watcher, Eterm watched, Eterm ref)
348
512
{
349
513
    Eterm ctl;
350
 
    Eterm *hp = dmem;
 
514
    Eterm ctl_heap[5];
351
515
 
352
 
    ctl = TUPLE4(hp,
 
516
    ctl = TUPLE4(&ctl_heap[0],
353
517
                 make_small(DOP_MONITOR_P),
354
518
                 watcher, watched, ref);
355
519
 
356
 
    return pack_and_send(dep, ctl, THE_NON_VALUE, 0);
 
520
    return pack_and_send(c_p, c_p_locks, dep, ctl, THE_NON_VALUE, 0);
357
521
}
358
522
 
359
 
int dist_demonitor(DistEntry *dep, Eterm watcher, Eterm watched, 
 
523
/* A local process monitoring a remote one wants to stop monitoring, either 
 
524
   because of a demonitor bif call or because the local process died. We send
 
525
   {DOP_DEMONITOR_P, Local pid, Remote pid or name, ref}, which is once again
 
526
   rather redundant as only the ref will be needed on the other side... */
 
527
 
 
528
/* SMP NOTE: See "SMP NOTE on dist_*() functions" above */
 
529
int dist_demonitor(Process *c_p, Uint32 c_p_locks,
 
530
                   DistEntry *dep, Eterm watcher, Eterm watched, 
360
531
                   Eterm ref, int force)
361
532
{
362
533
    Eterm ctl;
363
 
    Eterm *hp = dmem;
 
534
    Eterm ctl_heap[5];
364
535
 
365
 
    ctl = TUPLE4(hp,
 
536
    ctl = TUPLE4(&ctl_heap[0],
366
537
                 make_small(DOP_DEMONITOR_P),
367
538
                 watcher, watched, ref);
368
539
 
369
 
    return pack_and_send(dep, ctl, THE_NON_VALUE, force);
 
540
    return pack_and_send(c_p, c_p_locks, dep, ctl, THE_NON_VALUE, force);
370
541
}
371
542
 
372
 
int dist_send(Process* sender, DistEntry *dep, Eterm remote, Eterm message)
 
543
/* SMP NOTE: See "SMP NOTE on dist_*() functions" above */
 
544
int dist_send(Process* sender, Uint32 sender_locks,
 
545
              DistEntry *dep, Eterm remote, Eterm message)
373
546
{
374
547
    Eterm ctl;
 
548
    Eterm ctl_heap[5];
375
549
    Eterm token = NIL;
376
550
 
377
551
    if (SEQ_TRACE_TOKEN(sender) != NIL) {
381
555
    }
382
556
 
383
557
    if (token != NIL)
384
 
        ctl = TUPLE4(dmem,make_small(DOP_SEND_TT), am_Cookie, remote, token);
 
558
        ctl = TUPLE4(&ctl_heap[0],
 
559
                     make_small(DOP_SEND_TT), am_Cookie, remote, token);
385
560
    else
386
 
        ctl = TUPLE3(dmem,make_small(DOP_SEND), am_Cookie, remote);
387
 
    return pack_and_send(dep, ctl, message, 0);
 
561
        ctl = TUPLE3(&ctl_heap[0], make_small(DOP_SEND), am_Cookie, remote);
 
562
    return pack_and_send(sender, sender_locks, dep, ctl, message, 0);
388
563
}
389
564
 
390
 
int dist_reg_send(Process* sender, DistEntry *dep, Eterm remote_name, Eterm message)
 
565
/* SMP NOTE: See "SMP NOTE on dist_*() functions" above */
 
566
int dist_reg_send(Process* sender, Uint32 sender_locks,
 
567
                  DistEntry *dep, Eterm remote_name, Eterm message)
391
568
{
392
569
    Eterm ctl;
 
570
    Eterm ctl_heap[6];
393
571
    Eterm token = NIL;
394
572
 
395
573
    if (SEQ_TRACE_TOKEN(sender) != NIL) {
399
577
    }
400
578
 
401
579
    if (token != NIL)
402
 
        ctl = TUPLE5(dmem,make_small(DOP_REG_SEND_TT),
 
580
        ctl = TUPLE5(&ctl_heap[0], make_small(DOP_REG_SEND_TT),
403
581
                     sender->id, am_Cookie, remote_name, token);
404
582
    else
405
 
        ctl = TUPLE4(dmem,make_small(DOP_REG_SEND),
 
583
        ctl = TUPLE4(&ctl_heap[0], make_small(DOP_REG_SEND),
406
584
                     sender->id, am_Cookie, remote_name);
407
 
    return pack_and_send(dep, ctl, message, 0);
 
585
    return pack_and_send(sender, sender_locks, dep, ctl, message, 0);
408
586
}
409
587
 
410
588
/* local has died, deliver the exit signal to remote
412
590
** this implies that the driver must always be ready to queue
413
591
** data even if it has signaled that it is busy !!!
414
592
*/
415
 
int dist_exit_tt(DistEntry *dep, Eterm local, Eterm remote, 
 
593
/* SMP NOTE: See "SMP NOTE on dist_*() functions" above */
 
594
int dist_exit_tt(Process* c_p, Uint32 c_p_locks,
 
595
                 DistEntry *dep, Eterm local, Eterm remote, 
416
596
                 Eterm reason, Eterm token)
417
597
{
418
598
    Eterm ctl;
 
599
    Eterm ctl_heap[6];
 
600
    ErtsLink *lnk, *sublnk;
419
601
 
420
602
    if (token != NIL) { 
421
603
        /* token should be updated by caller */
422
604
        seq_trace_output_exit(token, reason, SEQ_TRACE_SEND, remote, local);
423
 
        ctl = TUPLE5(dmem, make_small(DOP_EXIT_TT), local, remote, token, reason);
 
605
        ctl = TUPLE5(&ctl_heap[0],
 
606
                     make_small(DOP_EXIT_TT), local, remote, token, reason);
424
607
    } else {
425
 
        ctl = TUPLE4(dmem, make_small(DOP_EXIT), local, remote, reason);
426
 
    }
427
 
    del_link(find_link(&(dep->links), LNK_LINK, local, remote));
428
 
 
429
 
    return pack_and_send(dep, ctl, THE_NON_VALUE, 1);  /* forced, i.e ignore busy */
 
608
        ctl = TUPLE4(&ctl_heap[0], make_small(DOP_EXIT), local, remote, reason);
 
609
    }
 
610
    
 
611
    lnk = erts_lookup_link(dep->nlinks, local);
 
612
 
 
613
    if (lnk != NULL) {
 
614
        sublnk = erts_remove_link(&(lnk->root), remote);
 
615
        if (sublnk != NULL) {
 
616
            erts_destroy_link(sublnk);
 
617
        }
 
618
        if (lnk->root == NULL) {
 
619
            erts_destroy_link(erts_remove_link(&(dep->nlinks), local));
 
620
        }
 
621
    }
 
622
 
 
623
    return pack_and_send(c_p, c_p_locks, dep, ctl, THE_NON_VALUE, 1);  /* forced, i.e ignore busy */
430
624
}
431
625
 
432
 
int dist_exit(DistEntry *dep, Eterm local, Eterm remote, Eterm reason)
 
626
/* SMP NOTE: See "SMP NOTE on dist_*() functions" above */
 
627
int dist_exit(Process* c_p, Uint32 c_p_locks,
 
628
              DistEntry *dep, Eterm local, Eterm remote, Eterm reason)
433
629
{
434
 
    Eterm ctl = TUPLE4(dmem, make_small(DOP_EXIT), local, remote, reason);
435
 
    del_link(find_link(&(dep->links), LNK_LINK, local, remote));
436
 
    return pack_and_send(dep, ctl, THE_NON_VALUE, 1);  /* forced, i.e ignore busy */
 
630
    Eterm ctl_heap[5];
 
631
    Eterm ctl = TUPLE4(&ctl_heap[0],
 
632
                       make_small(DOP_EXIT), local, remote, reason);
 
633
    ErtsLink *lnk, *sublnk;
 
634
 
 
635
    lnk = erts_lookup_link(dep->nlinks, local);
 
636
 
 
637
    if (lnk != NULL) {
 
638
        sublnk = erts_remove_link(&(lnk->root), remote);
 
639
        if (sublnk != NULL) {
 
640
            erts_destroy_link(sublnk);
 
641
        }
 
642
        if (lnk->root == NULL) {
 
643
            erts_destroy_link(erts_remove_link(&(dep->nlinks), local));
 
644
        }
 
645
    }
 
646
 
 
647
    return pack_and_send(c_p, c_p_locks, dep, ctl, THE_NON_VALUE, 1);  /* forced, i.e ignore busy */
437
648
}
438
649
 
439
650
/* internal version of dist_exit2 that force send through busy port */
440
 
int dist_exit2(DistEntry *dep, Eterm local, Eterm remote, Eterm reason)
 
651
/* SMP NOTE: See "SMP NOTE on dist_*() functions" above */
 
652
int dist_exit2(Process* c_p, Uint32 c_p_locks,
 
653
               DistEntry *dep, Eterm local, Eterm remote, Eterm reason)
441
654
{
442
 
    Eterm ctl = TUPLE4(dmem, make_small(DOP_EXIT2), local, remote, reason);
 
655
    Eterm ctl_heap[5];
 
656
    Eterm ctl = TUPLE4(&ctl_heap[0],
 
657
                       make_small(DOP_EXIT2), local, remote, reason);
443
658
 
444
 
    return pack_and_send(dep, ctl, THE_NON_VALUE, 0);
 
659
    return pack_and_send(c_p, c_p_locks, dep, ctl, THE_NON_VALUE, 0);
445
660
}
446
661
 
447
662
 
448
 
int dist_group_leader(DistEntry *dep, Eterm leader, Eterm remote)
 
663
/* SMP NOTE: See "SMP NOTE on dist_*() functions" above */
 
664
int dist_group_leader(Process* c_p, Uint32 c_p_locks,
 
665
                      DistEntry *dep, Eterm leader, Eterm remote)
449
666
{
450
 
    Eterm ctl = TUPLE3(dmem, make_small(DOP_GROUP_LEADER), leader, remote);
 
667
    Eterm ctl_heap[4];
 
668
    Eterm ctl = TUPLE3(&ctl_heap[0],
 
669
                       make_small(DOP_GROUP_LEADER), leader, remote);
451
670
 
452
 
    return pack_and_send(dep, ctl, THE_NON_VALUE, 0);
 
671
    return pack_and_send(c_p, c_p_locks, dep, ctl, THE_NON_VALUE, 0);
453
672
}
454
673
 
455
674
/*
461
680
**
462
681
**   assert  hlen == 0 !!!
463
682
*/
464
 
int net_mess2(DistEntry *dep, byte *hbuf, int hlen, byte *buf, int len)
 
683
int erts_net_message(DistEntry *dep, byte *hbuf, int hlen, byte *buf, int len)
465
684
{
466
 
    ErlLink **lnkp;
467
685
    byte *t;
468
686
    int i;
469
687
    int ctl_len;
476
694
    Eterm message;
477
695
    Eterm reason;
478
696
    Process* rp;
479
 
#ifndef SHARED_HEAP
480
697
    Eterm ctl_default[64];
481
698
    Eterm* ctl = ctl_default;
482
 
#endif
483
699
    ErlOffHeap off_heap;
484
700
    Eterm* hp;
485
 
    int type;
 
701
    Eterm* hp_end;
 
702
    Sint type;
486
703
    Eterm token;
487
704
    Eterm token_size;
488
705
    int orig_len = len;
 
706
    ErtsMonitor *mon;
 
707
    ErtsLink *lnk, *sublnk;
 
708
    int res;
489
709
 
490
710
    /* Thanks to Luke Gorrie */
491
711
    off_heap.mso = NULL;
492
 
#ifndef SHARED_HEAP
 
712
#ifndef HYBRID /* FIND ME! */
493
713
    off_heap.funs = NULL;
494
714
#endif
495
715
    off_heap.overhead = 0;
496
716
    off_heap.externals = NULL;
497
717
 
498
 
    if (net_kernel == NULL)  /* XXX check if this may trig */
 
718
    ERTS_SMP_CHK_NO_PROC_LOCKS;
 
719
 
 
720
    ERTS_SMP_LC_ASSERT(erts_smp_lc_io_is_locked());
 
721
 
 
722
    if (!is_alive())
499
723
        return 0;
500
724
    if (hlen > 0)
501
725
        goto data_error;
506
730
#if defined(PURIFY)
507
731
#  define PURIFY_MSG(msg) \
508
732
    purify_printf("%s, line %d: %s", __FILE__, __LINE__, msg)
 
733
#elif defined(VALGRIND)
 
734
#  define PURIFY_MSG(msg) \
 
735
    VALGRIND_PRINTF("%s, line %d: %s", __FILE__, __LINE__, msg)
 
736
#  define PURIFY_MSG
509
737
#else
510
738
#  define PURIFY_MSG(msg)
511
739
#endif
522
750
        goto data_error;
523
751
    }
524
752
    orig_ctl_len = ctl_len;
525
 
#ifdef SHARED_HEAP
526
 
    hp = erts_global_alloc(ctl_len);
527
 
#else
528
753
    if (ctl_len > sizeof(ctl_default)/sizeof(ctl_default[0])) {
529
754
        ctl = erts_alloc(ERTS_ALC_T_DCTRL_BUF, ctl_len * sizeof(Eterm));
530
755
    }
531
756
    hp = ctl;
532
 
#endif
533
757
 
 
758
    erts_smp_dist_entry_lock(dep);
534
759
    arg = erts_from_external_format(dep, &hp, &t, &off_heap);
 
760
    erts_smp_dist_entry_unlock(dep);
535
761
    if (is_non_value(arg)) {
536
762
        PURIFY_MSG("data error");
537
763
        goto data_error;
541
767
 
542
768
    if (is_not_tuple(arg) || 
543
769
        (tuple = tuple_val(arg), arityval(*tuple) < 1) ||
544
 
        is_not_small(tuple[1]))
545
 
    {
546
 
        cerr_pos = 0;
547
 
        erl_printf(CBUF, "Invalid distribution message: ");
548
 
        ldisplay(arg, CBUF, 200);
549
 
        send_error_to_logger(NIL);
 
770
        is_not_small(tuple[1])) {
 
771
        erts_dsprintf_buf_t *dsbufp = erts_create_logger_dsbuf();
 
772
        erts_dsprintf(dsbufp, "Invalid distribution message: %.200T", arg);
 
773
        erts_send_error_to_logger_nogl(dsbufp);
550
774
        goto data_error;
551
775
    }
552
776
 
557
781
        from = tuple[2];
558
782
        to   = tuple[3];  /* local proc to link to */
559
783
 
560
 
        if ((rp = pid2proc(to)) == NULL) {
 
784
        rp = erts_pid2proc_opt(NULL, 0,
 
785
                               to, ERTS_PROC_LOCK_LINK,
 
786
                               ERTS_P2P_FLG_ALLOW_OTHER_X);
 
787
        erts_smp_dist_entry_lock(dep);
 
788
        if (!rp) {
561
789
            /* This is tricky (we MUST force a distributed send) */
562
790
            /* We may send it to net_kernel and let it do the job !!! */
563
 
            dist_exit(dep, to, from, am_noproc);
564
 
            break;
565
 
        }
566
 
        if (find_link(&rp->links,LNK_LINK, from, NIL) != NULL)
567
 
            break;
568
 
        dep->links = new_link(dep->links,LNK_LINK, to, from);
569
 
        rp->links = new_link(rp->links, LNK_LINK, from, NIL);
 
791
            dist_exit(NULL, 0, dep, to, from, am_noproc);
 
792
            erts_smp_dist_entry_unlock(dep);
 
793
            break;
 
794
        }
 
795
 
 
796
        res = erts_add_link(&(rp->nlinks), LINK_PID, from);
 
797
 
 
798
        if (res < 0) {
 
799
            /* It was already there! Lets skip the rest... */
 
800
            erts_smp_dist_entry_unlock(dep);
 
801
            erts_smp_proc_unlock(rp, ERTS_PROC_LOCK_LINK);
 
802
            break;
 
803
        }
 
804
        lnk = erts_add_or_lookup_link(&(dep->nlinks), LINK_PID, rp->id);
 
805
        erts_add_link(&(lnk->root), LINK_PID, from);
 
806
        erts_smp_dist_entry_unlock(dep);
 
807
 
570
808
        if (IS_TRACED_FL(rp, F_TRACE_PROCS))
571
809
            trace_proc(NULL, rp, am_getting_linked, from);
 
810
 
 
811
        erts_smp_proc_unlock(rp, ERTS_PROC_LOCK_LINK);
572
812
        break;
573
813
 
574
814
    case DOP_UNLINK: {
575
 
        ErlLink **rlinkpp;
576
815
        from = tuple[2];
577
816
        to = tuple[3];
578
817
        
579
 
        if ((rp = pid2proc(to)) == NULL)
 
818
        rp = erts_pid2proc_opt(NULL, 0,
 
819
                               to, ERTS_PROC_LOCK_LINK,
 
820
                               ERTS_P2P_FLG_ALLOW_OTHER_X);
 
821
        if (!rp)
580
822
            break;
581
 
        rlinkpp = find_link(&rp->links, LNK_LINK, from, NIL);
582
 
        del_link(rlinkpp);
583
 
        del_link(find_link(&dep->links, LNK_LINK, to, from));
584
 
        
585
 
        if (IS_TRACED_FL(rp, F_TRACE_PROCS) && rlinkpp != NULL) {
 
823
 
 
824
        erts_smp_dist_entry_lock(dep);
 
825
        lnk = erts_remove_link(&(rp->nlinks), from);
 
826
        if (lnk != NULL) {
 
827
            erts_destroy_link(lnk);
 
828
        }
 
829
 
 
830
        lnk = erts_lookup_link(dep->nlinks, rp->id); 
 
831
        if (lnk != NULL) {
 
832
            sublnk = erts_remove_link(&(lnk->root), from);
 
833
            if (sublnk != NULL) {
 
834
                erts_destroy_link(sublnk);
 
835
            }
 
836
            if (lnk->root == NULL) {
 
837
                erts_destroy_link(erts_remove_link(&(dep->nlinks), rp->id));
 
838
            } 
 
839
        }
 
840
 
 
841
        erts_smp_dist_entry_unlock(dep);
 
842
 
 
843
        if (IS_TRACED_FL(rp, F_TRACE_PROCS) && lnk != NULL) {
586
844
            trace_proc(NULL, rp, am_getting_unlinked, from);
587
845
        }
 
846
 
 
847
        erts_smp_proc_unlock(rp, ERTS_PROC_LOCK_LINK);
588
848
        break;
589
849
    }
590
850
    
591
851
    case DOP_MONITOR_P: {
592
 
        Eterm watched_p;
 
852
        /* A remote process wants to monitor us, we get:
 
853
           {DOP_MONITOR_P, Remote pid, local pid or name, ref} */
 
854
        Eterm name;
593
855
 
594
856
        watcher = tuple[2];
595
857
        watched = tuple[3];  /* local proc to monitor */
596
858
        ref     = tuple[4];
597
859
 
598
 
        watched_p = watched;
599
 
 
600
860
        if (is_atom(watched)) {
601
 
            rp = whereis_process(watched);
602
 
            if ((rp == NULL) || (rp->status == P_EXITING)) {
603
 
                dist_m_exit(dep, watcher, watched, ref, am_noproc);
604
 
                break;
605
 
            }
606
 
            watched = rp->id;
607
 
        } else if ((rp = pid2proc(watched)) == NULL) {
608
 
           dist_m_exit(dep, watcher, watched, ref, am_noproc);
609
 
           break;
610
 
        }
611
 
        dep->links = new_ref_link(dep->links, LNK_LINK1, watcher, watched, ref);
612
 
        rp->links = new_ref_link(rp->links, LNK_LINK1, watcher, watched_p, ref);
 
861
            name = watched;
 
862
            rp = erts_whereis_process(NULL, 0, watched, ERTS_PROC_LOCK_LINK, 1);
 
863
        }
 
864
        else {
 
865
            name = NIL;
 
866
            rp = erts_pid2proc_opt(NULL, 0,
 
867
                                   watched, ERTS_PROC_LOCK_LINK,
 
868
                                   ERTS_P2P_FLG_ALLOW_OTHER_X);
 
869
        }
 
870
 
 
871
        erts_smp_dist_entry_lock(dep);
 
872
        
 
873
        if (!rp)
 
874
            dist_m_exit(NULL, 0, dep, watcher, watched, ref, am_noproc);
 
875
        else {
 
876
            if (is_atom(watched))
 
877
                watched = rp->id;
 
878
            erts_add_monitor(&(dep->monitors), MON_ORIGIN, ref, watched, name);
 
879
            erts_add_monitor(&(rp->monitors), MON_TARGET, ref, watcher, name);
 
880
            erts_smp_proc_unlock(rp, ERTS_PROC_LOCK_LINK);
 
881
        }
 
882
 
 
883
        erts_smp_dist_entry_unlock(dep);
 
884
 
613
885
        break;
614
886
    }
615
887
 
616
888
    case DOP_DEMONITOR_P:
 
889
        /* A remote node informs us that a local pid in no longer monitored
 
890
           We get {DOP_DEMONITOR_P, Remote pid, Local pid or name, ref},
 
891
           We need only the ref of course */
 
892
 
617
893
        /* watcher = tuple[2]; */
618
894
        /* watched = tuple[3]; May be an atom in case of monitor name */
619
895
        ref = tuple[4];
620
896
 
621
 
        lnkp = find_link_by_ref(&dep->links, ref);
622
 
        if (lnkp == NULL)
623
 
            break;
624
 
        watched = (*lnkp)->data;
625
 
        if ((rp = pid2proc(watched)) == NULL)
626
 
           break;
627
 
        del_link(lnkp);
628
 
        del_link(find_link_by_ref(&rp->links, ref));
 
897
        erts_smp_dist_entry_lock(dep);
 
898
        mon = erts_remove_monitor(&(dep->monitors),ref);
 
899
        /* ASSERT(mon != NULL); can happen in case of broken dist message */
 
900
        if (mon == NULL) {
 
901
            erts_smp_dist_entry_unlock(dep);
 
902
            break;
 
903
        }
 
904
        watched = mon->pid;
 
905
        erts_destroy_monitor(mon);
 
906
        erts_smp_dist_entry_unlock(dep);
 
907
        rp = erts_pid2proc_opt(NULL, 0,
 
908
                               watched, ERTS_PROC_LOCK_LINK,
 
909
                               ERTS_P2P_FLG_ALLOW_OTHER_X);
 
910
        if (!rp) {
 
911
            break;
 
912
        }
 
913
        mon = erts_remove_monitor(&(rp->monitors),ref);
 
914
        erts_smp_proc_unlock(rp, ERTS_PROC_LOCK_LINK);
 
915
        ASSERT(mon != NULL);
 
916
        if (mon == NULL) {
 
917
            break;
 
918
        }
 
919
        erts_destroy_monitor(mon);
629
920
        break;
630
921
 
631
922
    case DOP_NODE_LINK: /* XXX never sent ?? */
649
940
        }
650
941
        from = tuple[2];
651
942
        to = tuple[4];
652
 
        if ((rp = whereis_process(to)) == NULL) {
 
943
        rp = erts_whereis_process(NULL, 0, to, ERTS_PROC_LOCKS_MSG_SEND, 0);
 
944
        if (!rp || ERTS_PROC_PENDING_EXIT(rp)) {
 
945
            ErlHeapFragment* msg;
 
946
#ifdef ERTS_SMP
 
947
            if (rp)
 
948
                erts_smp_proc_unlock(rp, ERTS_PROC_LOCKS_MSG_SEND);
 
949
#endif
653
950
            /*
654
951
             * The receiver is invalid, but we must decode the
655
952
             * message anyway to keep the atom cache up-to-date.
656
953
             */
657
 
            ErlHeapFragment* msg = new_message_buffer(i);
 
954
            msg = new_message_buffer(i);
658
955
            hp = msg->mem;
659
956
            (void) erts_from_external_format(dep, &hp, &t, &msg->off_heap);
660
957
            free_message_buffer(msg);
661
958
        } else {
662
 
            hp = HAlloc(rp, i + token_size);
663
 
            message = erts_from_external_format(dep, &hp, &t, &MSO(rp));
 
959
            Uint32 locks = ERTS_PROC_LOCKS_MSG_SEND;
 
960
            ErlHeapFragment *bp;
 
961
            ErlOffHeap *ohp;
 
962
            hp = erts_alloc_message_heap(i+token_size,&bp,&ohp,rp,&locks);
 
963
            hp_end = hp + i + token_size;
 
964
            message = erts_from_external_format(dep, &hp, &t, ohp);
664
965
            if (is_non_value(message)) {
665
966
                PURIFY_MSG("data error");
666
967
                goto data_error;
669
970
                token = NIL;
670
971
            } else {
671
972
                token = tuple[5];
672
 
                token = copy_struct(token, token_size, &hp, &MSO(rp));
673
 
            }
674
 
            queue_message_tt(rp, NULL, message, token);
675
 
 
 
973
                token = copy_struct(token, token_size, &hp, ohp);
 
974
            }
 
975
            if (!bp) {
 
976
                HRelease(rp,hp_end,hp);
 
977
            }
 
978
            else {
 
979
                Uint final_size = hp - &bp->mem[0];
 
980
                Eterm brefs[2] = {message, token};
 
981
                ASSERT(i + token_size - (hp_end - hp) == final_size);
 
982
                bp = erts_resize_message_buffer(bp, final_size, &brefs[0], 2);
 
983
                message = brefs[0];
 
984
                token = brefs[1];
 
985
            }
 
986
            erts_queue_message(rp, locks, bp, message, token);
 
987
            erts_smp_proc_unlock(rp, locks);
676
988
        }
677
989
        break;
678
990
 
691
1003
        }
692
1004
 
693
1005
        to = tuple[3];
694
 
        if ((rp = pid2proc(to)) == NULL) {
 
1006
        rp = erts_pid2proc(NULL, 0, to, ERTS_PROC_LOCKS_MSG_SEND);
 
1007
        if (!rp || ERTS_PROC_PENDING_EXIT(rp)) {
 
1008
            ErlHeapFragment* msg;
 
1009
#ifdef ERTS_SMP
 
1010
            if (rp)
 
1011
                erts_smp_proc_unlock(rp, ERTS_PROC_LOCKS_MSG_SEND);
 
1012
#endif
695
1013
            /*
696
1014
             * The receiver is invalid, but we must decode the
697
1015
             * message anyway to keep the atom cache up-to-date.
698
1016
             */
699
 
            ErlHeapFragment* msg = new_message_buffer(i);
 
1017
            msg = new_message_buffer(i);
700
1018
            hp = msg->mem;
701
1019
            (void) erts_from_external_format(dep, &hp, &t, &msg->off_heap);
702
1020
            free_message_buffer(msg);
703
1021
        } else {
704
 
            hp = HAlloc(rp, i + token_size);
705
 
            message = erts_from_external_format(dep, &hp, &t, &MSO(rp));
 
1022
            Uint32 locks = ERTS_PROC_LOCKS_MSG_SEND;
 
1023
            ErlOffHeap *ohp;
 
1024
            ErlHeapFragment *bp;
 
1025
            hp = erts_alloc_message_heap(i+token_size,&bp,&ohp,rp,&locks);
 
1026
 
 
1027
            hp_end = hp + i + token_size;
 
1028
            message = erts_from_external_format(dep, &hp, &t, ohp);
706
1029
            if (is_non_value(message)) {
707
1030
                PURIFY_MSG("data error");
708
1031
                goto data_error;
711
1034
                token = NIL;
712
1035
            } else {
713
1036
                token = tuple[4];
714
 
                token = copy_struct(token, token_size, &hp, &MSO(rp));
715
 
            }
716
 
            queue_message_tt(rp, NULL, message, token);
717
 
 
 
1037
                token = copy_struct(token, token_size, &hp, ohp);
 
1038
            }
 
1039
            if (!bp) {
 
1040
                HRelease(rp,hp_end,hp);
 
1041
            }
 
1042
            else {
 
1043
                Uint final_size = hp - &bp->mem[0];
 
1044
                Eterm brefs[2] = {message, token};
 
1045
                ASSERT(i + token_size - (hp_end - hp) == final_size);
 
1046
                bp = erts_resize_message_buffer(bp, final_size, &brefs[0], 2);
 
1047
                message = brefs[0];
 
1048
                token = brefs[1];
 
1049
            }
 
1050
            erts_queue_message(rp, locks, bp, message, token);
 
1051
            erts_smp_proc_unlock(rp, locks);
718
1052
        }
719
1053
        break;
720
1054
 
721
1055
    case DOP_MONITOR_P_EXIT: {
 
1056
        /* We are monitoring a process on the remote node which dies, we get
 
1057
           {DOP_MONITOR_P_EXIT, Remote pid or name, Local pid, ref, reason} */
 
1058
           
 
1059
 
722
1060
        Eterm lhp[3];
 
1061
        Eterm sysname;
 
1062
        Uint32 rp_locks = ERTS_PROC_LOCKS_MSG_SEND|ERTS_PROC_LOCK_LINK;
723
1063
 
724
1064
        /* watched = tuple[2]; */  /* remote proc which died */
725
 
        watcher = tuple[3];
 
1065
        /* watcher = tuple[3]; */
726
1066
        ref     = tuple[4];
727
1067
        reason  = tuple[5];
728
1068
 
729
 
        if ((rp = pid2proc(watcher)) == NULL)
730
 
           break;
731
 
 
732
 
        lnkp = find_link_by_ref(&rp->links, ref);
733
 
        if (lnkp == NULL)
734
 
            break;
735
 
        
736
 
        del_link(lnkp);
737
 
 
738
 
        lnkp = find_link_by_ref(&dep->links, ref);
739
 
        
740
 
        ASSERT(is_pid((*lnkp)->data) || is_atom((*lnkp)->data));
741
 
        watched = (is_atom((*lnkp)->data)
742
 
                   ? TUPLE2(&lhp[0], (*lnkp)->data, dep->sysname)
743
 
                   : (*lnkp)->data);
744
 
        
745
 
        queue_monitor_message(rp, ref, am_process, watched, reason);
746
 
        del_link(lnkp);
 
1069
        erts_smp_dist_entry_lock(dep);
 
1070
        sysname = dep->sysname;
 
1071
        mon = erts_remove_monitor(&(dep->monitors), ref);
 
1072
        /*
 
1073
         * If demonitor was performed at the same time as the
 
1074
         * monitored process exits, monitoring side will have
 
1075
         * removed info about monitor. In this case, do nothing
 
1076
         * and everything will be as it should.
 
1077
         */
 
1078
        erts_smp_dist_entry_unlock(dep);
 
1079
        if (mon == NULL) {
 
1080
            break;
 
1081
        }
 
1082
        rp = erts_pid2proc(NULL, 0, mon->pid, rp_locks);
 
1083
        if (rp == NULL) {
 
1084
            break;
 
1085
        }
 
1086
 
 
1087
        erts_destroy_monitor(mon);
 
1088
 
 
1089
        mon = erts_remove_monitor(&(rp->monitors),ref);
 
1090
 
 
1091
        if (mon == NULL) {
 
1092
            erts_smp_proc_unlock(rp, rp_locks);
 
1093
            break;
 
1094
        }
 
1095
        
 
1096
        watched = (is_not_nil(mon->name)
 
1097
                   ? TUPLE2(&lhp[0], mon->name, sysname)
 
1098
                   : mon->pid);
 
1099
        
 
1100
        erts_queue_monitor_message(rp, &rp_locks,
 
1101
                                   ref, am_process, watched, reason);
 
1102
        erts_smp_proc_unlock(rp, rp_locks);
 
1103
        erts_destroy_monitor(mon);
747
1104
        break;
748
1105
    }
749
1106
 
750
1107
    case DOP_EXIT_TT:
751
 
    case DOP_EXIT:
 
1108
    case DOP_EXIT: {
 
1109
        Uint32 rp_locks = ERTS_PROC_LOCK_LINK|ERTS_PROC_LOCKS_XSIG_SEND;
752
1110
        /* 'from', which 'to' is linked to, died */
753
1111
        if (type == DOP_EXIT) {
754
1112
           from = tuple[2];
763
1121
        }
764
1122
        if (is_not_internal_pid(to) && is_not_internal_port(to))
765
1123
            break;
766
 
        del_link(find_link(&dep->links, LNK_LINK, to, from));
767
 
 
768
 
        if (is_internal_pid(to)) {
769
 
            ErlLink **rlinkpp;
770
 
            rp = internal_pid_index(to) < erts_max_processes ? 
771
 
                process_tab[internal_pid_index(to)] : NULL;
772
 
            if (INVALID_PID(rp, to))
773
 
                break;
774
 
            rlinkpp = find_link(&rp->links, LNK_LINK, from, NIL);
775
 
            del_link(rlinkpp);
 
1124
 
 
1125
        erts_smp_dist_entry_lock(dep);
 
1126
        lnk = erts_lookup_link(dep->nlinks, to); 
 
1127
        if (lnk != NULL) {
 
1128
            sublnk = erts_remove_link(&(lnk->root), from);
 
1129
            if (sublnk != NULL) {
 
1130
                erts_destroy_link(sublnk);
 
1131
            }
 
1132
            if (lnk->root == NULL) {
 
1133
                erts_destroy_link(erts_remove_link(&(dep->nlinks), to));
 
1134
            } 
 
1135
        }
 
1136
 
 
1137
        erts_smp_dist_entry_unlock(dep);
 
1138
 
 
1139
        rp = erts_pid2proc(NULL, 0, to, rp_locks);
 
1140
        if (rp) {
 
1141
            lnk = erts_remove_link(&(rp->nlinks), from);
 
1142
 
 
1143
            /* If lnk == NULL, we have unlinked on this side, i.e.
 
1144
             * ignore exit.
 
1145
             */
 
1146
            if (lnk) {
 
1147
                int xres;
 
1148
                erts_destroy_link(lnk);
776
1149
#if 0
777
 
            /* Arndt: Maybe it should never be 'kill', but it can be,
778
 
               namely when a linked process does exit(kill). Until we know
779
 
               whether that is incorrect and what should happen instead,
780
 
               we leave the assertion out. */
781
 
            ASSERT(reason != am_kill); /* should never be kill (killed) */
 
1150
                /* Arndt: Maybe it should never be 'kill', but it can be,
 
1151
                   namely when a linked process does exit(kill). Until we know
 
1152
                   whether that is incorrect and what should happen instead,
 
1153
                   we leave the assertion out. */
 
1154
                ASSERT(reason != am_kill); /* should never be kill (killed) */
782
1155
#endif
783
 
            if (rp->flags & F_TRAPEXIT) {
784
 
                /* token updated by remote node */
785
 
                deliver_exit_message_tt(from, rp, reason, token);
786
 
                if (IS_TRACED_FL(rp, F_TRACE_PROCS) && rlinkpp != NULL) {
787
 
                    trace_proc(NULL, rp, am_getting_unlinked, from);
788
 
                }
789
 
            } else if (reason == am_normal) {
790
 
                if (IS_TRACED_FL(rp, F_TRACE_PROCS) && rlinkpp != NULL) {
791
 
                    trace_proc(NULL, rp, am_getting_unlinked, from);
792
 
                }
793
 
            } else {
794
 
                schedule_exit(rp, reason);
 
1156
                xres = erts_send_exit_signal(NULL,
 
1157
                                             from,
 
1158
                                             rp,
 
1159
                                             &rp_locks, 
 
1160
                                             reason,
 
1161
                                             token,
 
1162
                                             NULL,
 
1163
                                             ERTS_XSIG_FLG_IGN_KILL);
 
1164
                if (xres >= 0 && IS_TRACED_FL(rp, F_TRACE_PROCS)) {
 
1165
                    /* We didn't exit the process and it is traced */
 
1166
                    trace_proc(NULL, rp, am_getting_unlinked, from);
 
1167
                }
795
1168
            }
 
1169
            erts_smp_proc_unlock(rp, rp_locks);
796
1170
        }
797
 
        else {
 
1171
        else if (is_internal_port(to)) {
798
1172
            /* Internal port */
799
1173
            int ix = internal_port_index(to);
800
1174
            if (! INVALID_PORT(erts_port+ix, to)) {
801
 
                del_link(find_link(&erts_port[ix].links,LNK_LINK,from,NIL));
 
1175
                lnk = erts_remove_link(&(erts_port[ix].nlinks), from);
 
1176
                if (lnk != NULL) {
 
1177
                    erts_destroy_link(lnk);
 
1178
                }
802
1179
            }
803
 
            do_exit_port(to, from, reason);
 
1180
            erts_do_exit_port(to, from, reason);
804
1181
        }
805
1182
        break;
806
 
 
 
1183
    }
807
1184
    case DOP_EXIT2_TT:
808
 
    case DOP_EXIT2:
 
1185
    case DOP_EXIT2: {
 
1186
        Uint32 rp_locks = ERTS_PROC_LOCKS_XSIG_SEND;
809
1187
        /* 'from' is send an exit signal to 'to' */
810
1188
        if (type == DOP_EXIT2) {
811
1189
           from = tuple[2];
818
1196
           token = tuple[4];
819
1197
           reason = tuple[5];
820
1198
        }
821
 
        if (is_internal_pid(to)) {
822
 
            rp = internal_pid_index(to) < erts_max_processes ?
823
 
                process_tab[internal_pid_index(to)] : NULL;
824
 
            if (INVALID_PID(rp, to))
825
 
                break;
826
 
            if (reason == am_kill)
827
 
                schedule_exit(rp, am_killed);
828
 
            else if (rp->flags & F_TRAPEXIT)
829
 
                /* token updated by remote node */
830
 
                deliver_exit_message_tt(from, rp, reason, token);
831
 
            else if (reason != am_normal)
832
 
                schedule_exit(rp, reason);
 
1199
        rp = erts_pid2proc(NULL, 0, to, rp_locks);
 
1200
        if (rp) {
 
1201
            (void) erts_send_exit_signal(NULL,
 
1202
                                         from,
 
1203
                                         rp,
 
1204
                                         &rp_locks,
 
1205
                                         reason,
 
1206
                                         token,
 
1207
                                         NULL,
 
1208
                                         0);
 
1209
            erts_smp_proc_unlock(rp, rp_locks);
833
1210
        }
834
1211
        break;
835
 
 
 
1212
    }
836
1213
    case DOP_GROUP_LEADER:
837
1214
        from = tuple[2];   /* Group leader  */
838
1215
        to = tuple[3];     /* new member */
839
 
        if (is_not_internal_pid(to))
840
 
            break;
841
 
        rp = internal_pid_index(to) < erts_max_processes ?
842
 
            process_tab[internal_pid_index(to)] : NULL;
843
 
        if (INVALID_PID(rp, to))
844
 
            break;
845
 
        /*
846
 
         * XXX Multi-thread note: Allocating on another process's heap.
847
 
         */
848
 
        if (is_pid(from))
849
 
            rp->group_leader = STORE_NC_IN_PROC(rp, from);
 
1216
        if (is_not_pid(from))
 
1217
            break;
 
1218
 
 
1219
        rp = erts_pid2proc(NULL, 0, to, ERTS_PROC_LOCK_MAIN);
 
1220
        if (!rp)
 
1221
            break;
 
1222
        rp->group_leader = STORE_NC_IN_PROC(rp, from);
 
1223
        erts_smp_proc_unlock(rp, ERTS_PROC_LOCK_MAIN);
850
1224
        break;
851
1225
 
852
 
    default:
853
 
        cerr_pos = 0;
854
 
        erl_printf(CBUF, "Illegal value in distribution dispatch switch: ");
855
 
        ldisplay(arg, CBUF, 200);
856
 
        send_error_to_logger(NIL);
857
 
        PURIFY_MSG("data error");
858
 
        goto data_error;
 
1226
    default: {
 
1227
            erts_dsprintf_buf_t *dsbufp = erts_create_logger_dsbuf();
 
1228
            erts_dsprintf(dsbufp,
 
1229
                          "Illegal value in distribution dispatch switch: "
 
1230
                          "%.200T",
 
1231
                          arg);
 
1232
            erts_send_error_to_logger_nogl(dsbufp);
 
1233
            PURIFY_MSG("data error");
 
1234
            goto data_error;
 
1235
        }
859
1236
    }
860
1237
 
861
1238
    if (off_heap.mso) {
864
1241
    if (off_heap.externals) {
865
1242
        erts_cleanup_externals(off_heap.externals);
866
1243
    }
867
 
#ifndef SHARED_HEAP
 
1244
#ifndef HYBRID /* FIND ME! */
868
1245
    if (off_heap.funs) {
869
1246
        erts_cleanup_funs(off_heap.funs);
870
1247
    }
872
1249
        erts_free(ERTS_ALC_T_DCTRL_BUF, (void *) ctl);
873
1250
    }
874
1251
#endif
 
1252
    ERTS_SMP_CHK_NO_PROC_LOCKS;
875
1253
    return 0;
876
1254
 
877
 
 data_error:
878
 
    {
879
 
        char *initial = "Got invalid data on distribution channel, "
880
 
            "offending packet is: <<";
881
 
        char *trailer =">>";
882
 
        int inilen = strlen(initial)+strlen(trailer);
883
 
        int toprint = orig_len;
884
 
 
885
 
        cerr_pos = 0;
886
 
        if (toprint*4 >= (TMP_BUF_SIZE - inilen))
887
 
            toprint = (TMP_BUF_SIZE - 1 - inilen) / 4;
888
 
        erl_printf(CBUF,"%s",initial);
889
 
        for(i = 0; i < toprint; ++i) {
890
 
            if(i < toprint - 1) {
891
 
                erl_printf(CBUF,"%d,",(int)buf[i]);
892
 
            } else {
893
 
                erl_printf(CBUF,"%d",(int)buf[i]);
894
 
            }
895
 
        }
896
 
        erl_printf(CBUF,trailer);
897
 
        erts_send_warning_to_logger(NIL,tmp_buf,cerr_pos);
 
1255
 data_error: {
 
1256
        erts_dsprintf_buf_t *dsbufp = erts_create_logger_dsbuf();
 
1257
        erts_dsprintf(dsbufp,
 
1258
                      "Got invalid data on distribution channel, "
 
1259
                      "offending packet is: <<");
 
1260
        for(i = 0; i < orig_len; ++i)
 
1261
            erts_dsprintf(dsbufp, i ? ",%b8u" : "%b8u", buf[i]);
 
1262
        erts_dsprintf(dsbufp, ">>");
 
1263
        erts_send_warning_to_logger_nogl(dsbufp);
898
1264
    }
899
1265
    if (off_heap.mso) {
900
1266
        erts_cleanup_mso(off_heap.mso);
902
1268
    if (off_heap.externals) {
903
1269
        erts_cleanup_externals(off_heap.externals);
904
1270
    }
905
 
#ifndef SHARED_HEAP
 
1271
#ifndef HYBRID /* FIND ME! */
906
1272
    if (off_heap.funs) {
907
1273
        erts_cleanup_funs(off_heap.funs);
908
1274
    }
910
1276
        erts_free(ERTS_ALC_T_DCTRL_BUF, (void *) ctl);
911
1277
    }
912
1278
#endif
913
 
    do_exit_port(dep->cid, dep->cid, am_killed);
 
1279
    erts_do_exit_port(dep->cid, dep->cid, am_killed);
 
1280
    ERTS_SMP_CHK_NO_PROC_LOCKS;
914
1281
    return -1;
915
1282
}
916
1283
 
929
1296
/*         0 on ok      */
930
1297
/*         1 on resend  */
931
1298
 
 
1299
#define DEFAULT_TMP_DIST_BUF_SZ (8*1024)
932
1300
 
933
 
static int pack_and_send(DistEntry *dep, Eterm ctl, Eterm mess, int force_busy)
 
1301
static int pack_and_send(Process *c_p, Uint32 c_p_locks,
 
1302
                         DistEntry *dep, Eterm ctl, Eterm mess, int force_busy)
934
1303
{
 
1304
    byte *bufp;
 
1305
    Uint bufsz;
935
1306
    byte *t;
936
1307
    Port* p;
937
1308
    Eterm cid = dep->cid;
938
1309
 
939
 
    if (erts_this_node->sysname == am_Noname)
 
1310
    if (!is_alive())
940
1311
        return -1;
941
1312
    if (cid == NIL)
942
1313
        return 0;
949
1320
        return 0;
950
1321
    if (!force_busy && (p->status & PORT_BUSY))
951
1322
        return 1;
 
1323
 
952
1324
#ifdef MESS_DEBUG
953
 
    if (is_value(mess)) {
954
 
        erl_printf(CERR,">>ctl+mess>> ");
955
 
        display(ctl,CERR);
956
 
        erl_printf(CERR," && ");
957
 
        display(mess,CERR);
958
 
        erl_printf(CERR,"\n\r");
959
 
    }
960
 
    else {
961
 
        erl_printf(CERR,">> ");
962
 
        display(ctl,CERR);
963
 
        erl_printf(CERR, "\n\r");
964
 
    }
 
1325
    if (is_value(mess))
 
1326
        erts_printf(stderr, ">>ctl+mess>> %T && %T\n", ctl, mess);
 
1327
    else
 
1328
        erts_printf(stderr, ">> %T\n", ctl);
965
1329
#endif
966
 
    t = dist_buf;
 
1330
    bufp = (byte *) erts_alloc(ERTS_ALC_T_TMP_DIST_BUF, DEFAULT_TMP_DIST_BUF_SZ);
 
1331
    bufsz = DEFAULT_TMP_DIST_BUF_SZ;
 
1332
    t = bufp;
967
1333
    *t++ = PASS_THROUGH;          /* not needed !!! */
968
 
    erts_to_external_format(dep, ctl, &t);
 
1334
 
 
1335
    erts_to_external_format(dep, ctl, &t, &bufp, &bufsz);
969
1336
    if (is_value(mess))
970
 
        erts_to_external_format(dep, mess, &t);
971
 
    dist_port_command(p, dist_buf, t-dist_buf);
 
1337
        erts_to_external_format(dep, mess, &t, &bufp, &bufsz);
 
1338
 
 
1339
#ifdef ERTS_SMP
 
1340
    /*
 
1341
     * When we call dist_port_command we should only hold the io lock.
 
1342
     */
 
1343
    erts_smp_dist_entry_unlock(dep);
 
1344
    if (!c_p)
 
1345
        c_p_locks = 0;
 
1346
    if (c_p_locks)
 
1347
        erts_smp_proc_unlock(c_p, c_p_locks);
 
1348
 
 
1349
    ERTS_SMP_CHK_NO_PROC_LOCKS;
 
1350
    ERTS_SMP_LC_ASSERT(erts_smp_lc_io_is_locked());
 
1351
#endif
 
1352
 
 
1353
    dist_port_command(p, bufp, t-bufp);
 
1354
    erts_free(ERTS_ALC_T_TMP_DIST_BUF, (void *) bufp);
 
1355
 
 
1356
#ifdef ERTS_SMP
 
1357
    /* Restore locks as held when pack_and_send() was called...
 
1358
     * Lock order of interest:
 
1359
     * - io lock
 
1360
     * - process locks
 
1361
     * - dist entry
 
1362
     */
 
1363
    if (c_p_locks)
 
1364
        erts_smp_proc_lock(c_p, c_p_locks);
 
1365
    erts_smp_dist_entry_lock(dep);
 
1366
#endif
 
1367
 
972
1368
    return 0;
973
1369
}
974
1370
 
 
1371
struct print_to_data {
 
1372
    int to;
 
1373
    void *arg;
 
1374
};
 
1375
 
 
1376
static void doit_print_monitor_info(ErtsMonitor *mon, void *vptdp)
 
1377
{
 
1378
    int to = ((struct print_to_data *) vptdp)->to;
 
1379
    void *arg = ((struct print_to_data *) vptdp)->arg;
 
1380
    Process *rp;
 
1381
    ErtsMonitor *rmon;
 
1382
    rp = erts_pid2proc_unlocked(mon->pid);
 
1383
    if (!rp || (rmon = erts_lookup_monitor(rp->monitors, mon->ref)) == NULL) {
 
1384
        erts_print(to, arg, "Warning, stray monitor for: %T\n", mon->pid);
 
1385
    } else if (mon->type == MON_ORIGIN) {
 
1386
        /* Local pid is being monitored */
 
1387
        erts_print(to, arg, "Remotely monitored by: %T %T\n",
 
1388
                   mon->pid, rmon->pid);
 
1389
    } else {
 
1390
        erts_print(to, arg, "Remote monitoring: %T ", mon->pid);
 
1391
        if (is_not_atom(rmon->pid))
 
1392
            erts_print(to, arg, "%T\n", rmon->pid);
 
1393
        else
 
1394
            erts_print(to, arg, "{%T, %T}\n",
 
1395
                       rmon->name,
 
1396
                       rmon->pid); /* which in this case is the 
 
1397
                                      remote system name... */
 
1398
    }
 
1399
}    
 
1400
 
 
1401
static void print_monitor_info(int to, void *arg, ErtsMonitor *mon)
 
1402
{
 
1403
    struct print_to_data ptd = {to, arg};
 
1404
    erts_doforall_monitors(mon,&doit_print_monitor_info,&ptd);
 
1405
}
 
1406
 
 
1407
typedef struct {
 
1408
    struct print_to_data *ptdp;
 
1409
    Eterm from;
 
1410
} PrintLinkContext;
 
1411
 
 
1412
static void doit_print_link_info2(ErtsLink *lnk, void *vpplc)
 
1413
{
 
1414
    PrintLinkContext *pplc = (PrintLinkContext *) vpplc;
 
1415
    erts_print(pplc->ptdp->to, pplc->ptdp->arg, "Remote link: %T %T\n",
 
1416
               pplc->from, lnk->pid);
 
1417
}
 
1418
 
 
1419
static void doit_print_link_info(ErtsLink *lnk, void *vptdp)
 
1420
{
 
1421
    if (is_internal_pid(lnk->pid) && erts_pid2proc_unlocked(lnk->pid)) {
 
1422
        PrintLinkContext plc = {(struct print_to_data *) vptdp, lnk->pid};
 
1423
        erts_doforall_links(lnk->root, &doit_print_link_info2, &plc);
 
1424
    } 
 
1425
}
 
1426
 
 
1427
static void print_link_info(int to, void *arg, ErtsLink *lnk)
 
1428
{
 
1429
    struct print_to_data ptd = {to, arg};
 
1430
    erts_doforall_links(lnk, &doit_print_link_info, (void *) &ptd);
 
1431
}
 
1432
 
 
1433
typedef struct {
 
1434
    struct print_to_data ptd;
 
1435
    Eterm sysname;
 
1436
} PrintNodeLinkContext;
 
1437
    
 
1438
 
 
1439
static void doit_print_nodelink_info(ErtsLink *lnk, void *vpcontext)
 
1440
{
 
1441
    PrintNodeLinkContext *pcontext = vpcontext;
 
1442
 
 
1443
    if (is_internal_pid(lnk->pid) && erts_pid2proc_unlocked(lnk->pid))
 
1444
        erts_print(pcontext->ptd.to, pcontext->ptd.arg,
 
1445
                   "Remote monitoring: %T %T\n", lnk->pid, pcontext->sysname);
 
1446
}
 
1447
 
 
1448
static void print_nodelink_info(int to, void *arg, ErtsLink *lnk, Eterm sysname)
 
1449
{
 
1450
    PrintNodeLinkContext context = {{to, arg}, sysname};
 
1451
    erts_doforall_links(lnk, &doit_print_nodelink_info, &context);
 
1452
}
 
1453
 
 
1454
 
975
1455
static int
976
 
info_dist_entry(CIO to, DistEntry *dep, int visible, int connected)
 
1456
info_dist_entry(int to, void *arg, DistEntry *dep, int visible, int connected)
977
1457
{
978
 
  ErlLink* lnk;
979
1458
 
980
1459
  if (visible && connected) {
981
 
      erl_printf(to, "=visible_node:");
 
1460
      erts_print(to, arg, "=visible_node:");
982
1461
  } else if (connected) {
983
 
      erl_printf(to, "=hidden_node:");
 
1462
      erts_print(to, arg, "=hidden_node:");
984
1463
  } else {
985
 
      erl_printf(to, "=not_connected:");
 
1464
      erts_print(to, arg, "=not_connected:");
986
1465
  }
987
 
  erl_printf(to, "%d\n", dist_entry_channel_no(dep));
 
1466
  erts_print(to, arg, "%d\n", dist_entry_channel_no(dep));
988
1467
 
989
1468
  if(connected && is_nil(dep->cid)) {
990
 
    erl_printf(to,"Error: Not connected node still registered as connected:");
991
 
    display(dep->sysname, to);
992
 
    erl_printf(to, "\n");
 
1469
    erts_print(to, arg,
 
1470
               "Error: Not connected node still registered as connected:%T\n",
 
1471
               dep->sysname);
993
1472
    return 0;
994
1473
  }
995
1474
 
996
1475
  if(!connected && is_not_nil(dep->cid)) {
997
 
    erl_printf(to,"Error: Connected node not registered as connected:");
998
 
    display(dep->sysname, to);
999
 
    erl_printf(to, "\n");
 
1476
    erts_print(to, arg,
 
1477
               "Error: Connected node not registered as connected:%T\n",
 
1478
               dep->sysname);
1000
1479
    return 0;
1001
1480
  }
1002
1481
 
1003
 
  erl_printf(to, "Name: ");
1004
 
  display(dep->sysname, to);
 
1482
  erts_print(to, arg, "Name: %T", dep->sysname);
1005
1483
#ifdef DEBUG
1006
 
  erl_printf(to," (refc=%d)", dep->refc);
 
1484
  erts_print(to, arg, " (refc=%d)", erts_refc_read(&dep->refc, 1));
1007
1485
#endif
1008
 
  erl_printf(to, "\n");
 
1486
  erts_print(to, arg, "\n");
1009
1487
  if (!connected && is_nil(dep->cid)) {
1010
 
    if (dep->links) {
1011
 
      erl_printf(to,"Error: Got links to not connected node:");
1012
 
      display(dep->sysname, to);
1013
 
      erl_printf(to, "\n");
 
1488
    if (dep->nlinks) {
 
1489
      erts_print(to, arg, "Error: Got links to not connected node:%T\n",
 
1490
                 dep->sysname);
1014
1491
    }
1015
1492
    return 0;
1016
1493
  }
1017
1494
 
1018
 
  erl_printf(to, "Controller: ");
1019
 
  display(dep->cid, to);
1020
 
  erl_printf(to, "\n");
1021
 
 
1022
 
  erts_print_node_info(to, dep->sysname, NULL, NULL);
1023
 
 
1024
 
  if ((lnk = dep->links)) {
1025
 
    while(lnk) {
1026
 
      switch(lnk->type) {
1027
 
      case LNK_LINK:
1028
 
        if (is_internal_pid(lnk->item)) {
1029
 
          if (pid2proc(lnk->item) == NULL)
1030
 
            break;
1031
 
          erl_printf (to, "Remote link: ");
1032
 
          display(lnk->item,to);
1033
 
          erl_printf (to, " ");
1034
 
          display(lnk->data,to);
1035
 
        }
1036
 
        break;
1037
 
      case LNK_LINK1:
1038
 
        if (is_external_pid(lnk->item)
1039
 
            && external_pid_dist_entry(lnk->item) != erts_this_dist_entry) {
1040
 
          /* We are being monitored */
1041
 
          if (pid2proc(lnk->data) == NULL)
1042
 
            break;
1043
 
          erl_printf(to, "Remotely monitored by: ");
1044
 
          display(lnk->data,to);
1045
 
          erl_printf (to, " ");
1046
 
          display(lnk->item,to);
1047
 
        } else {
1048
 
          /* We are monitoring */
1049
 
          if (pid2proc(lnk->item) == NULL)
1050
 
            break;
1051
 
          erl_printf (to, "Remote monitoring: ");
1052
 
          display(lnk->item,to);
1053
 
          erl_printf (to, " ");
1054
 
          if (is_not_atom(lnk->data)) {
1055
 
            display(lnk->data, to);
1056
 
          } else {
1057
 
            erl_printf (to,"{");
1058
 
            display(lnk->data,to);
1059
 
            erl_printf (to,", ");
1060
 
            display(dep->sysname,to);
1061
 
            erl_printf (to,"}");
1062
 
          }
1063
 
        }
1064
 
        break;
1065
 
      case LNK_NODE:
1066
 
        if (is_internal_pid(lnk->item)) {
1067
 
          if (pid2proc(lnk->item) == NULL)
1068
 
            break;
1069
 
          erl_printf (to, "Remote monitoring: ");
1070
 
          display(lnk->item,to);
1071
 
          erl_printf (to, " ");
1072
 
          display(dep->sysname,to);
1073
 
        }
1074
 
        break;
1075
 
      case LNK_OMON:
1076
 
      case LNK_TMON:
1077
 
      default:
1078
 
        erl_printf (to, "Error: Bad remote link type (%d) found", lnk->type);
1079
 
      }
1080
 
      erl_printf(to," \n");
1081
 
      lnk = lnk->next;
1082
 
    }
1083
 
  }
 
1495
  erts_print(to, arg, "Controller: %T\n", dep->cid, to);
 
1496
 
 
1497
  erts_print_node_info(to, arg, dep->sysname, NULL, NULL);
 
1498
  print_monitor_info(to, arg, dep->monitors);
 
1499
  print_link_info(to, arg, dep->nlinks);
 
1500
  print_nodelink_info(to, arg, dep->node_links, dep->sysname);
1084
1501
 
1085
1502
  return 0;
1086
1503
    
1087
1504
}
1088
 
int distribution_info(CIO to)           /* Called by break handler */
 
1505
int distribution_info(int to, void *arg)        /* Called by break handler */
1089
1506
{
1090
1507
    DistEntry *dep;
1091
1508
 
1092
 
    erl_printf(to, "=node:");
1093
 
    display(erts_this_dist_entry->sysname, to);
1094
 
    erl_printf(to, "\n");
1095
 
 
 
1509
    erts_print(to, arg, "=node:%T\n", erts_this_dist_entry->sysname);
 
1510
 
1096
1511
    if (erts_this_node->sysname == am_Noname) {
1097
 
        erl_printf(to, "=no_distribution\n");
 
1512
        erts_print(to, arg, "=no_distribution\n");
1098
1513
        return(0);
1099
1514
    }
1100
1515
 
1101
1516
#if 0
1102
1517
    if (!erts_visible_dist_entries && !erts_hidden_dist_entries) 
1103
 
      erl_printf(to,"Alive but not holding any connections \n");
 
1518
      erts_print(to, arg, "Alive but not holding any connections \n");
1104
1519
#endif
1105
1520
 
1106
1521
    for(dep = erts_visible_dist_entries; dep; dep = dep->next) {
1107
 
      info_dist_entry(to, dep, 1, 1);
 
1522
      info_dist_entry(to, arg, dep, 1, 1);
1108
1523
    }
1109
1524
 
1110
1525
    for(dep = erts_hidden_dist_entries; dep; dep = dep->next) {
1111
 
      info_dist_entry(to, dep, 0, 1);
 
1526
      info_dist_entry(to, arg, dep, 0, 1);
1112
1527
    }
1113
1528
 
1114
1529
    for (dep = erts_not_connected_dist_entries; dep; dep = dep->next) {
1115
 
        info_dist_entry(to, dep, 0, 0);
 
1530
        info_dist_entry(to, arg, dep, 0, 0);
1116
1531
    }
1117
1532
 
1118
1533
    return(0);
1128
1543
 
1129
1544
void upp(byte *buf, int sz)
1130
1545
{
1131
 
    bin_write(CERR,buf,sz);
 
1546
    bin_write(ERTS_PRINT_STDERR,NULL,buf,sz);
1132
1547
}
1133
1548
 
1134
1549
void print_pass_through(DistEntry *dep, byte *t, int len)
1140
1555
    Uint i;
1141
1556
 
1142
1557
    if ((i = decode_size(t, len)) == -1) {
1143
 
        erl_printf(CERR,"Bailing out in decode_size control message\n\r");
 
1558
        erts_printf(stderr, "Bailing out in decode_size control message\n");
1144
1559
        upp(orig, len);
1145
1560
        erl_exit(1, "Bailing out in decode_size control message\n");
1146
1561
    }
1148
1563
    hp = ctl->mem;
1149
1564
    i = erts_from_external_format(dep, &hp, &t, &ctl->mso);
1150
1565
    if (is_non_value(i)) {
1151
 
        erl_printf(CERR,"Bailing out in erts_from_external_format control message\n\r");
 
1566
        erts_printf(stderr, "Bailing out in erts_from_external_format control "
 
1567
                    "message\n");
1152
1568
        upp(orig, len);
1153
 
        erl_exit(1, "Bailing out in erts_from_external_format control message\n");
 
1569
        erl_exit(1, "Bailing out in erts_from_external_format control "
 
1570
                 "message\n");
1154
1571
    }
1155
 
    erl_printf(CERR,"GOT: ");
1156
 
    display(i, CERR);
 
1572
    erts_printf(stderr, "GOT: %T", i);
1157
1573
    if (t >= (orig + len)) {
1158
 
        erl_printf(CERR,"\n\r");
 
1574
        erts_printf(stderr, "\n");
1159
1575
        free_message_buffer(ctl);
1160
1576
        return;
1161
1577
    }
1162
1578
    if ((i = decode_size(t, len)) == -1) {
1163
 
        erl_printf(CERR,"Bailing out in decode_size second element\n\r");
 
1579
        erts_printf(stderr, "Bailing out in decode_size second element\n");
1164
1580
        upp(orig, len);
1165
1581
        erl_exit(1, "Bailing out in decode_size second element\n");
1166
1582
    }
1168
1584
    hp = msg->mem;
1169
1585
    i = erts_from_external_format(dep, &hp, &t, &msg->mso);
1170
1586
    if (is_non_value(i)) {
1171
 
        erl_printf(CERR,"Bailing out in erts_from_external_format second element\n\r");
 
1587
        erts_printf(stderr, "Bailing out in erts_from_external_format second "
 
1588
                    "element\n");
1172
1589
        upp(orig, len);
1173
 
        erl_exit(1, "Bailing out in erts_from_external_format second element\n");
 
1590
        erl_exit(1, "Bailing out in erts_from_external_format second "
 
1591
                 "element\n");
1174
1592
    }
1175
 
    display(i, CERR);
1176
 
    erl_printf(CERR, "\n\r");
 
1593
    erts_printf(stderr, "%T\n", i);
1177
1594
    free_message_buffer(msg);
1178
1595
    return;
1179
1596
}
1204
1621
/**********************************************************************
1205
1622
 ** Set the node name of current node fail if node already is set.
1206
1623
 ** setnode(name@host, Creation)
1207
 
 ** loads functions pointer to trap_functions from module (erl_net?)
 
1624
 ** loads functions pointer to trap_functions from module erlang.
1208
1625
 **    erlang:dsend/2
1209
1626
 **    erlang:dlink/1
1210
1627
 **    erlang:dunlink/1
1211
 
 **    erlang:dmonitor_node/2
 
1628
 **    erlang:dmonitor_node/3
1212
1629
 **    erlang:dgroup_leader/2
1213
1630
 **    erlang:dexit/2
1214
1631
 **  -- are these needed ?
1217
1634
 
1218
1635
BIF_RETTYPE setnode_2(BIF_ALIST_2)
1219
1636
{
 
1637
    Process *net_kernel;
1220
1638
    Uint creation;
1221
1639
 
1222
1640
    /* valid creation ? */
1231
1649
 
1232
1650
    if (BIF_ARG_1 == am_Noname) /* cant use this name !! */
1233
1651
        goto error;
1234
 
    if (net_kernel != NULL)     /* net_kernel must be down */
 
1652
    if (is_alive())     /* must not be alive! */
1235
1653
        goto error;
1236
1654
 
1237
1655
    /* Check that all trap functions are defined !! */
1247
1665
        goto error;
1248
1666
    }
1249
1667
 
1250
 
    if ((net_kernel = whereis_process(am_net_kernel)) == NULL)
 
1668
    net_kernel = erts_whereis_process(BIF_P, ERTS_PROC_LOCK_MAIN,
 
1669
                                      am_net_kernel, ERTS_PROC_LOCK_MAIN, 0);
 
1670
    if (!net_kernel)
1251
1671
        goto error;
 
1672
 
1252
1673
    /* By setting dist_entry==erts_this_dist_entry and DISTRIBUTION on
1253
1674
       net_kernel do_net_exist will be called when net_kernel
1254
1675
       is terminated !! */
1255
1676
    net_kernel->dist_entry = erts_this_dist_entry;
1256
 
    erts_this_dist_entry->refc++;
 
1677
    erts_refc_inc(&erts_this_dist_entry->refc, 2);
1257
1678
    net_kernel->flags |= F_DISTRIBUTION;
1258
1679
 
 
1680
    if (net_kernel != BIF_P)
 
1681
        erts_smp_proc_unlock(net_kernel, ERTS_PROC_LOCK_MAIN);
 
1682
 
 
1683
#ifdef DEBUG
 
1684
    erts_smp_mtx_lock(&erts_dist_table_mtx);
1259
1685
    ASSERT(!erts_visible_dist_entries && !erts_hidden_dist_entries);
 
1686
    erts_smp_mtx_unlock(&erts_dist_table_mtx);
 
1687
#endif
1260
1688
 
 
1689
    erts_smp_proc_unlock(BIF_P, ERTS_PROC_LOCK_MAIN);
 
1690
    erts_smp_block_system(ERTS_BS_FLG_ALLOW_GC);
1261
1691
    erts_set_this_node(BIF_ARG_1, (Uint32) creation);
 
1692
    set_alive();
 
1693
    erts_smp_release_system();
 
1694
    erts_smp_proc_lock(BIF_P, ERTS_PROC_LOCK_MAIN);
1262
1695
 
1263
1696
    BIF_RET(am_true);
1264
1697
 
1283
1716
 
1284
1717
BIF_RETTYPE setnode_3(BIF_ALIST_3)
1285
1718
{
 
1719
    BIF_RETTYPE res;
1286
1720
    Uint flags;
1287
1721
    unsigned long version;
1288
1722
    Eterm ic, oc;
1289
1723
    Eterm *tp;
1290
 
    DistEntry *dep;
 
1724
    DistEntry *dep = NULL;
1291
1725
    int ix;
1292
1726
 
1293
1727
    /*
1294
1728
     * Check and pick out arguments
1295
1729
     */
1296
1730
 
 
1731
    erts_smp_io_safe_lock(BIF_P, ERTS_PROC_LOCK_MAIN);
 
1732
 
1297
1733
    if (!is_node_name_atom(BIF_ARG_1) ||
1298
1734
        is_not_internal_port(BIF_ARG_2) ||
1299
1735
        (erts_this_node->sysname == am_Noname)) {
1317
1753
 
1318
1754
    /* DFLAG_EXTENDED_REFERENCES is compulsory from R9 and forward */
1319
1755
    if (!(DFLAG_EXTENDED_REFERENCES & flags)) {
1320
 
        cerr_pos = 0;
1321
 
        display(BIF_P->id, CBUF);
1322
 
        if (BIF_P->reg) {
1323
 
            erl_printf(CBUF, " (");
1324
 
            display(BIF_P->reg->name, CBUF);
1325
 
            erl_printf(CBUF, ")");
1326
 
        }
1327
 
        erl_printf(CBUF, " attempted to enable connection to node ");
1328
 
        display(BIF_ARG_1, CBUF);
1329
 
        erl_printf(CBUF, " which is not able to handle extended references.\n");
1330
 
        send_error_to_logger(NIL);
 
1756
        erts_dsprintf_buf_t *dsbufp = erts_create_logger_dsbuf();
 
1757
        erts_dsprintf(dsbufp, "%T", BIF_P->id);
 
1758
        if (BIF_P->reg)
 
1759
            erts_dsprintf(dsbufp, " (%T)", BIF_P->reg->name);
 
1760
        erts_dsprintf(dsbufp,
 
1761
                      " attempted to enable connection to node %T "
 
1762
                      "which is not able to handle extended references.\n",
 
1763
                      BIF_ARG_1);
 
1764
        erts_send_error_to_logger(BIF_P->group_leader, dsbufp);
1331
1765
        goto error;
1332
1766
    }
1333
1767
 
1336
1770
     */
1337
1771
 
1338
1772
    /* get dist_entry */
1339
 
    if ((dep = erts_find_or_insert_dist_entry(BIF_ARG_1))
1340
 
        == erts_this_dist_entry) {
 
1773
    dep = erts_find_or_insert_dist_entry(BIF_ARG_1);
 
1774
    if (dep == erts_this_dist_entry)
1341
1775
        goto error;
1342
 
    } else if (!dep) {
1343
 
        BIF_ERROR(BIF_P,SYSTEM_LIMIT); /* Should never happen!!! */
1344
 
    }
 
1776
    else if (!dep)
 
1777
        goto system_limit; /* Should never happen!!! */
1345
1778
 
1346
 
    if (dep->cid == BIF_ARG_2)
 
1779
    if (dep->cid == BIF_ARG_2) {
 
1780
        erts_deref_dist_entry(dep);
1347
1781
        goto done;
 
1782
    }
1348
1783
    /* We may have a sync problem here ?? */
1349
1784
    if (dep->cid != NIL)
1350
1785
        goto error;
1353
1788
    if ((INVALID_PORT(erts_port+ix, BIF_ARG_2)) 
1354
1789
        || (erts_port[ix].status & EXITING)
1355
1790
        || (erts_port[ix].dist_entry != NULL))
1356
 
      goto error;
 
1791
        goto error;
1357
1792
 
1358
1793
    erts_port[ix].status |= DISTRIBUTION;
1359
1794
 
1360
 
    if (erts_port[ix].dist_entry) {
1361
 
        DEREF_DIST_ENTRY(erts_port[ix].dist_entry);
1362
 
    }
1363
 
 
1364
1795
    erts_port[ix].dist_entry = dep;
1365
 
    dep->refc++;
1366
1796
 
1367
 
     if (!(flags & DFLAG_ATOM_CACHE) ||
 
1797
    if (!(flags & DFLAG_ATOM_CACHE) ||
1368
1798
        (!(flags & DFLAG_PUBLISHED) && !(flags & DFLAG_HIDDEN_ATOM_CACHE))
1369
1799
        /* Nodes which cannot use atom cache on non-published connections
1370
1800
           doesn't send the DFLAG_HIDDEN_ATOM_CACHE flag. */) {
1378
1808
 
1379
1809
    erts_set_dist_entry_connected(dep, BIF_ARG_2, flags);
1380
1810
 
1381
 
done:
1382
 
    BIF_RET(am_true);
1383
 
 
1384
 
error:
1385
 
    BIF_ERROR(BIF_P, BADARG);
 
1811
 done:
 
1812
    ERTS_BIF_PREP_RET(res, am_true);
 
1813
 
 
1814
 done_error:
 
1815
 
 
1816
    erts_smp_io_unlock();
 
1817
 
 
1818
    return res;
 
1819
 
 
1820
    /* Errors ... */
 
1821
 
 
1822
 error:
 
1823
    ERTS_BIF_PREP_ERROR(res, BIF_P, BADARG);
 
1824
    if (dep)
 
1825
        erts_deref_dist_entry(dep);
 
1826
    goto done_error;
 
1827
 
 
1828
 system_limit:
 
1829
    ERTS_BIF_PREP_ERROR(res, BIF_P, SYSTEM_LIMIT);
 
1830
    goto done_error;
1386
1831
}
1387
1832
 
1388
1833
 
1394
1839
    Eterm local;
1395
1840
    Eterm remote;
1396
1841
    DistEntry *rdep;
1397
 
    Process *lp;
1398
 
    Eterm exit_value = (BIF_ARG_2 == am_kill) ? am_killed : BIF_ARG_2;
1399
1842
 
1400
1843
    local = BIF_ARG_1;
1401
1844
    remote = BIF_ARG_3;
1416
1859
 
1417
1860
    /* Check that local is local */
1418
1861
    if (is_internal_pid(local)) {
1419
 
 
1420
 
        if ((lp = pid2proc(local)) == NULL)
1421
 
            BIF_RET(am_true); /* ignore */
1422
 
 
1423
 
        if ((lp->flags & F_TRAPEXIT) && (BIF_ARG_2 != am_kill))
1424
 
            deliver_exit_message(remote, lp, exit_value);
1425
 
        else if (BIF_ARG_2 != am_normal)
1426
 
            schedule_exit(lp, exit_value);
 
1862
        Process *lp;
 
1863
        Uint32 lp_locks;
 
1864
        if (BIF_P->id == local) {
 
1865
            lp_locks = ERTS_PROC_LOCKS_ALL;
 
1866
            lp = BIF_P;
 
1867
            erts_smp_proc_lock(BIF_P, ERTS_PROC_LOCKS_ALL_MINOR);
 
1868
        }
 
1869
        else {
 
1870
            lp_locks = ERTS_PROC_LOCKS_XSIG_SEND;
 
1871
            lp = erts_pid2proc(BIF_P, ERTS_PROC_LOCK_MAIN, local, lp_locks);
 
1872
            if (!lp) {
 
1873
                BIF_RET(am_true); /* ignore */
 
1874
            }
 
1875
        }
1427
1876
        
1428
 
        if (BIF_P->status != P_RUNNING) {
1429
 
            BIF_P->fvalue = exit_value;
1430
 
            KILL_CATCHES(BIF_P);
1431
 
            BIF_ERROR(BIF_P, USER_EXIT);
1432
 
        }
 
1877
        (void) erts_send_exit_signal(BIF_P,
 
1878
                                     remote,
 
1879
                                     lp,
 
1880
                                     &lp_locks,
 
1881
                                     BIF_ARG_2,
 
1882
                                     NIL,
 
1883
                                     NULL,
 
1884
                                     0);
 
1885
        if (lp == BIF_P) {
 
1886
             /*
 
1887
              * We may have exited current process and may have to take action.
 
1888
              */
 
1889
#ifdef ERTS_SMP
 
1890
             ERTS_SMP_BIF_CHK_PENDING_EXIT(BIF_P, lp_locks);
 
1891
             lp_locks &= ~ERTS_PROC_LOCK_MAIN;
 
1892
#else
 
1893
             ERTS_BIF_CHK_EXITED(BIF_P);
 
1894
#endif
 
1895
         }
 
1896
         erts_smp_proc_unlock(lp, lp_locks);
1433
1897
    }
1434
1898
    else if (is_internal_port(local)) {
1435
 
        do_exit_port(local, remote, BIF_ARG_2);
1436
 
        if (BIF_P->status != P_RUNNING) {
1437
 
            BIF_P->fvalue = (BIF_ARG_2 == am_kill) ? am_killed : BIF_ARG_2;
1438
 
            KILL_CATCHES(BIF_P);
1439
 
            BIF_ERROR(BIF_P, USER_EXIT);         
1440
 
        }
 
1899
        erts_smp_proc_unlock(BIF_P, ERTS_PROC_LOCK_MAIN);
 
1900
        erts_do_exit_port(local, remote, BIF_ARG_2);
 
1901
        erts_smp_proc_lock(BIF_P, ERTS_PROC_LOCK_MAIN);
 
1902
        ERTS_BIF_CHK_EXITED(BIF_P);
1441
1903
    }
1442
1904
    else if ((is_external_pid(local) || is_external_port(local))
1443
1905
             && external_dist_entry(local) == erts_this_dist_entry) {
1460
1922
    Eterm remote;
1461
1923
    Process *lp;
1462
1924
    DistEntry *dep = BIF_P->dist_entry;
 
1925
    ErtsLink *lnk;
 
1926
 
 
1927
    erts_smp_io_safe_lock(BIF_P, ERTS_PROC_LOCK_MAIN);
1463
1928
 
1464
1929
    /* Must be called from distribution process */
1465
1930
    if (!(BIF_P->flags & F_DISTRIBUTION) || (dep == NULL))
1471
1936
    if (is_not_pid(remote) || (pid_dist_entry(remote) != dep))
1472
1937
        goto error;
1473
1938
    if (is_pid(local)) {
1474
 
        if ((lp = pid2proc(local)) == NULL) {
1475
 
            dist_exit(dep, local, remote,  am_noproc);
 
1939
        lp = erts_pid2proc(BIF_P, ERTS_PROC_LOCK_MAIN,
 
1940
                           local, ERTS_PROC_LOCK_LINK);
 
1941
        erts_smp_dist_entry_lock(dep);
 
1942
        if (!lp) {
 
1943
            ERTS_SMP_ASSERT_IS_NOT_EXITING(BIF_P);
 
1944
            dist_exit(BIF_P, ERTS_PROC_LOCK_MAIN,
 
1945
                      dep, local, remote,  am_noproc);
 
1946
            erts_smp_dist_entry_unlock(dep);
 
1947
            erts_smp_io_unlock();
1476
1948
            BIF_RET(am_true);
1477
1949
        }
1478
1950
    }
1479
1951
    else /* no ports yet */
1480
1952
        goto error;
1481
1953
 
1482
 
    if (find_link(&lp->links, LNK_LINK, remote,NIL) != NULL)
1483
 
        BIF_RET(am_true);
1484
 
 
1485
 
    dep->links = new_link(dep->links,LNK_LINK, local, remote);
1486
 
    lp->links = new_link(lp->links,LNK_LINK,remote,NIL);
 
1954
    erts_smp_io_unlock();
 
1955
 
 
1956
    if (erts_add_link(&(lp->nlinks), LINK_PID, remote) < 0) {
 
1957
        erts_smp_dist_entry_unlock(dep);
 
1958
        BIF_RET(am_true);
 
1959
    }
 
1960
    lnk = erts_add_or_lookup_link(&(dep->nlinks), LINK_PID, lp->id);
 
1961
 
 
1962
    erts_add_link(&(lnk->root), LINK_PID, remote);
 
1963
 
 
1964
    erts_smp_dist_entry_unlock(dep);
1487
1965
 
1488
1966
    if (IS_TRACED_FL(lp, F_TRACE_PROCS))
1489
1967
        trace_proc(BIF_P, lp, am_getting_linked, remote);
 
1968
 
 
1969
    erts_smp_proc_unlock(lp, ERTS_PROC_LOCK_LINK);
1490
1970
    BIF_RET(am_true);
1491
1971
 
1492
1972
 error:
 
1973
    erts_smp_io_unlock();
1493
1974
    BIF_ERROR(BIF_P, BADARG);
1494
1975
}
1495
1976
 
1502
1983
    Eterm remote = BIF_ARG_2;
1503
1984
    Process *lp;
1504
1985
    DistEntry *dep = BIF_P->dist_entry;
 
1986
    ErtsLink *lnk, *sublnk;
1505
1987
 
1506
1988
    /* Must be called from distribution process */
1507
1989
    if (!(BIF_P->flags & F_DISTRIBUTION) || (dep == NULL))
1512
1994
        goto error;
1513
1995
 
1514
1996
    if (is_pid(local)) {
1515
 
        if ((lp = pid2proc(local)) == NULL)
 
1997
        lp = erts_pid2proc(BIF_P, ERTS_PROC_LOCK_MAIN,
 
1998
                           local, ERTS_PROC_LOCK_LINK);
 
1999
        if (!lp) {
 
2000
            ERTS_SMP_ASSERT_IS_NOT_EXITING(BIF_P);
1516
2001
            BIF_RET(am_true);
 
2002
        }
1517
2003
    }
1518
2004
    else /* no ports yet */
1519
2005
        goto error;
1520
2006
 
 
2007
    erts_smp_dist_entry_lock(dep);
 
2008
 
1521
2009
    /* unlink and ignore errors */
1522
 
    del_link(find_link(&lp->links, LNK_LINK, remote, NIL));
1523
 
    del_link(find_link(&dep->links, LNK_LINK, local, remote));
 
2010
    lnk = erts_lookup_link(dep->nlinks, local);
 
2011
 
 
2012
    if (lnk != NULL) {
 
2013
        sublnk = erts_remove_link(&(lnk->root), remote);
 
2014
        if (sublnk != NULL) {
 
2015
            erts_destroy_link(sublnk);
 
2016
        }
 
2017
        if (lnk->root == NULL) {
 
2018
            erts_destroy_link(erts_remove_link(&(dep->nlinks), local));
 
2019
        }
 
2020
    }
 
2021
    lnk = erts_remove_link(&(lp->nlinks), remote);
 
2022
    if (lnk != NULL) {
 
2023
        erts_destroy_link(lnk);
 
2024
    }
 
2025
    erts_smp_dist_entry_unlock(dep);
 
2026
 
1524
2027
    if (IS_TRACED_FL(lp, F_TRACE_PROCS))
1525
2028
        trace_proc(BIF_P, lp, am_unlink, remote);
 
2029
    erts_smp_proc_unlock(lp, ERTS_PROC_LOCK_LINK);
1526
2030
    BIF_RET(am_true);
1527
2031
 
1528
2032
 error:
1595
2099
 
1596
2100
    length = 0;
1597
2101
 
 
2102
    erts_smp_mtx_lock(&erts_dist_table_mtx);
 
2103
 
1598
2104
    ASSERT(erts_no_of_not_connected_dist_entries >= 0);
1599
2105
    ASSERT(erts_no_of_hidden_dist_entries >= 0);
1600
2106
    ASSERT(erts_no_of_visible_dist_entries >= 0);
1609
2115
 
1610
2116
    result = NIL;
1611
2117
 
1612
 
    if (length == 0)
1613
 
      BIF_RET(result);
 
2118
    if (length == 0) {
 
2119
        erts_smp_mtx_unlock(&erts_dist_table_mtx);
 
2120
        BIF_RET(result);
 
2121
    }
1614
2122
 
1615
2123
    hp = HAlloc(BIF_P, 2*length);
1616
2124
 
1637
2145
        hp += 2;
1638
2146
    }
1639
2147
    ASSERT(endp == hp);
1640
 
 
 
2148
    erts_smp_mtx_unlock(&erts_dist_table_mtx);
1641
2149
    BIF_RET(result);
1642
2150
}
1643
2151
 
1646
2154
 
1647
2155
BIF_RETTYPE is_alive_0(BIF_ALIST_0)
1648
2156
{
1649
 
    if (erts_this_node->sysname == am_Noname)
1650
 
        BIF_RET(am_false);
1651
 
    BIF_RET(am_true);
 
2157
    Eterm res = is_alive() ? am_true : am_false;
 
2158
    BIF_RET(res);
1652
2159
}
1653
2160
 
1654
2161
/**********************************************************************/
1655
 
/* monitor_node(Node, Bool) -> Bool */
 
2162
/* erlang:monitor_node(Node, Bool, Options) -> Bool */
1656
2163
 
1657
 
BIF_RETTYPE monitor_node_2(BIF_ALIST_2)
 
2164
BIF_RETTYPE monitor_node_3(BIF_ALIST_3)
1658
2165
{
1659
2166
    DistEntry *dep;
 
2167
    ErtsLink *lnk;
 
2168
    Eterm l;
 
2169
 
 
2170
    for (l = BIF_ARG_3; l != NIL && is_list(l); l = CDR(list_val(l))) {
 
2171
        Eterm t = CAR(list_val(l));
 
2172
        /* allow_passive_connect the only available option right now */
 
2173
        if (t != am_allow_passive_connect) {
 
2174
            BIF_ERROR(BIF_P, BADARG);
 
2175
        }
 
2176
    }
 
2177
    if (l != NIL) {
 
2178
        BIF_ERROR(BIF_P, BADARG);
 
2179
    }
1660
2180
 
1661
2181
    if (is_not_atom(BIF_ARG_1) ||
1662
2182
        ((BIF_ARG_2 != am_true) && (BIF_ARG_2 != am_false)) ||
1665
2185
        BIF_ERROR(BIF_P, BADARG);
1666
2186
    }
1667
2187
    if ((dep = erts_sysname_to_connected_dist_entry(BIF_ARG_1)) == NULL) {
1668
 
        BIF_TRAP2(dmonitor_node_trap, BIF_P, BIF_ARG_1, BIF_ARG_2);
 
2188
        BIF_TRAP3(dmonitor_node_trap, BIF_P, BIF_ARG_1, BIF_ARG_2, BIF_ARG_3);
1669
2189
    }
1670
 
    if (dep == erts_this_dist_entry) 
1671
 
        BIF_RET(am_true);
 
2190
    if (dep == erts_this_dist_entry)
 
2191
        goto done;
 
2192
 
 
2193
    erts_smp_proc_lock(BIF_P, ERTS_PROC_LOCK_LINK);
 
2194
    erts_smp_dist_entry_lock(dep);
 
2195
 
1672
2196
    if (BIF_ARG_2 == am_true) {
1673
2197
        ASSERT(dep->cid != NIL);
1674
 
        dep->links = new_link(dep->links, LNK_NODE, BIF_P->id, NIL);
1675
 
 
1676
 
        BIF_P->links = new_link(BIF_P->links, LNK_NODE, BIF_ARG_1, NIL);
1677
 
        BIF_RET(am_true);
 
2198
        lnk = erts_add_or_lookup_link(&(dep->node_links), LINK_NODE, 
 
2199
                                      BIF_P->id);
 
2200
        ++ERTS_LINK_ROOT_AS_UINT(lnk);
 
2201
        lnk = erts_add_or_lookup_link(&(BIF_P->nlinks), LINK_NODE, BIF_ARG_1);
 
2202
        ++ERTS_LINK_ROOT_AS_UINT(lnk);
1678
2203
    }
1679
2204
    else  {
1680
 
        del_link(find_link(&(dep->links), LNK_NODE, BIF_P->id,NIL));
1681
 
        del_link(find_link(&(BIF_P->links), LNK_NODE, BIF_ARG_1, NIL));
1682
 
        BIF_RET(am_true);
 
2205
        lnk = erts_lookup_link(dep->node_links, BIF_P->id);
 
2206
        if (lnk != NULL) {
 
2207
            if ((--ERTS_LINK_ROOT_AS_UINT(lnk)) == 0) {
 
2208
                erts_destroy_link(erts_remove_link(&(dep->node_links), 
 
2209
                                                   BIF_P->id));
 
2210
            }
 
2211
        }
 
2212
        lnk = erts_lookup_link(BIF_P->nlinks, BIF_ARG_1);
 
2213
        if (lnk != NULL) {
 
2214
            if ((--ERTS_LINK_ROOT_AS_UINT(lnk)) == 0) {
 
2215
                erts_destroy_link(erts_remove_link(&(BIF_P->nlinks), 
 
2216
                                                   BIF_ARG_1));
 
2217
            }
 
2218
        }
1683
2219
    }
1684
 
}
 
2220
 
 
2221
    erts_smp_dist_entry_unlock(dep);
 
2222
    erts_smp_proc_unlock(BIF_P, ERTS_PROC_LOCK_LINK);
 
2223
 
 
2224
 done:
 
2225
    erts_deref_dist_entry(dep);
 
2226
    BIF_RET(am_true);
 
2227
}
 
2228
 
 
2229
/* monitor_node(Node, Bool) -> Bool */
 
2230
 
 
2231
BIF_RETTYPE monitor_node_2(BIF_ALIST_2)
 
2232
{
 
2233
    BIF_RET(monitor_node_3(BIF_P,BIF_ARG_1,BIF_ARG_2,NIL));
 
2234
}
 
2235