~ubuntu-branches/ubuntu/lucid/couchdb/lucid

« back to all changes in this revision

Viewing changes to src/couchdb/couch_rep_changes_feed.erl

  • Committer: Bazaar Package Importer
  • Author(s): Chad Miller, Ken VanDine
  • Date: 2009-09-25 19:18:26 UTC
  • mfrom: (1.2.5 upstream)
  • Revision ID: james.westby@ubuntu.com-20090925191826-km9018omfgxosvah
Tags: 0.10.0~svn818859-0ubuntu1
* Import code from pre-release branch.  (LP: #427860, #408909)
        + Build and System Integration:
          * Changed `couchdb` script configuration options.
          * Added default.d and local.d configuration directories to load
      sequence.
        + HTTP Interface:
          * Added optional cookie-based authentication handler.
          * Added optional two-legged OAuth authentication handler.
* Packaging of couchdb-bin must replace pre-split couchdb.
  (LP: #432219)
* Move all of /etc and /var out of the couchdb-bin package, to the
  couchdb package. 
* /etc/couchdb must be in couchdb-bin, as config files are needed
  by all servers.
  + So the couchdb user must be managed by couchdb-bin.
  + Split postinst/postrm files to manage different files.
  + Set Replaces of couchdb-bin by couchdb so that config files
    migrate.
* Update the version number in postrm.  (!)
* Use the new "Breaks" field in control file to help split pkg.

[Ken VanDine]
* debian/couchdb.install:
  - removed /var
* debian/rules:
  - removed bootstrap 
* debian/couchdb.postinst:
  - Added the debhelper tag

Show diffs side-by-side

added added

removed removed

Lines of Context:
25
25
-record (state, {
26
26
    changes_from = nil,
27
27
    changes_loop = nil,
 
28
    init_args,
28
29
    last_seq,
29
30
    conn = nil,
30
31
    reqid = nil,
44
45
stop(Server) ->
45
46
    gen_server:call(Server, stop).
46
47
 
47
 
init([_Parent, #http_db{}=Source, Since, PostProps]) ->
 
48
init([_Parent, #http_db{}=Source, Since, PostProps] = Args) ->
48
49
    process_flag(trap_exit, true),
49
50
    Feed = case proplists:get_value(<<"continuous">>, PostProps, false) of
50
51
    false ->
66
67
    receive
67
68
    {ibrowse_async_headers, ReqId, "200", _} ->
68
69
        ibrowse:stream_next(ReqId),
69
 
        {ok, #state{conn=Pid, last_seq=Since, reqid=ReqId}};
 
70
        {ok, #state{conn=Pid, last_seq=Since, reqid=ReqId, init_args=Args}};
70
71
    {ibrowse_async_headers, ReqId, Code, Hdrs} when Code=="301"; Code=="302" ->
71
72
        catch ibrowse:stop_worker_process(Pid),
72
73
        Url2 = mochiweb_headers:get_value("Location", mochiweb_headers:make(Hdrs)),
73
74
        %% TODO use couch_httpc:request instead of start_http_request
74
75
        {Pid2, ReqId2} = start_http_request(Url2),
75
76
        receive {ibrowse_async_headers, ReqId2, "200", _} ->
76
 
            {ok, #state{conn=Pid2, last_seq=Since, reqid=ReqId2}}
 
77
            {ok, #state{conn=Pid2, last_seq=Since, reqid=ReqId2, init_args=Args}}
77
78
        after 30000 ->
78
79
            {stop, changes_timeout}
79
80
        end;
82
83
        ?LOG_INFO("source doesn't have _changes, trying _all_docs_by_seq", []),
83
84
        Self = self(),
84
85
        BySeqPid = spawn_link(fun() -> by_seq_loop(Self, Source, Since) end),
85
 
        {ok, #state{last_seq=Since, changes_loop=BySeqPid}};
 
86
        {ok, #state{last_seq=Since, changes_loop=BySeqPid, init_args=Args}};
86
87
    {ibrowse_async_headers, ReqId, Code, _} ->
87
88
        {stop, {changes_error_code, list_to_integer(Code)}}
88
89
    after 10000 ->
89
90
        {stop, changes_timeout}
90
91
    end;
91
92
 
92
 
init([_Parent, Source, Since, PostProps]) ->
 
93
init([_Parent, Source, Since, PostProps] = InitArgs) ->
93
94
    process_flag(trap_exit, true),
94
95
    Server = self(),
95
96
    ChangesPid =
104
105
            send_local_changes_forever(Server, Source, Since)
105
106
        end)
106
107
    end,
107
 
    {ok, #state{changes_loop=ChangesPid}}.
 
108
    {ok, #state{changes_loop=ChangesPid, init_args=InitArgs}}.
108
109
 
109
110
handle_call({add_change, Row}, From, State) ->
110
111
    handle_add_change(Row, From, State);
113
114
    handle_next_changes(From, State);
114
115
    
115
116
handle_call(stop, _From, State) ->
116
 
    #state{
117
 
        changes_loop = ChangesPid,
118
 
        conn = Conn
119
 
    } = State,
120
 
    if is_pid(ChangesPid) -> exit(ChangesPid, stop); true -> ok end,
121
 
    if is_pid(Conn) -> catch ibrowse:stop_worker_process(Conn); true -> ok end,
122
117
    {stop, normal, ok, State}.
123
118
 
124
119
handle_cast(_Msg, State) ->
127
122
handle_info({ibrowse_async_headers, Id, Code, Hdrs}, #state{reqid=Id}=State) ->
128
123
    handle_headers(list_to_integer(Code), Hdrs, State);
129
124
 
 
125
handle_info({ibrowse_async_response, Id, {error,connection_closed}},
 
126
        #state{reqid=Id}=State) ->
 
127
    handle_retry(State);
 
128
 
130
129
handle_info({ibrowse_async_response, Id, {error,E}}, #state{reqid=Id}=State) ->
131
130
    {stop, {error, E}, State};
132
131
 
148
147
    ?LOG_DEBUG("unexpected message at changes_feed ~p", [Msg]),
149
148
    {noreply, State}.
150
149
 
151
 
terminate(_Reason, #state{conn=Pid}) when is_pid(Pid) ->
152
 
    catch ibrowse:stop_worker_process(Pid),
153
 
    ok;
154
 
terminate(_Reason, _State) ->
 
150
terminate(_Reason, State) ->
 
151
    #state{
 
152
        changes_loop = ChangesPid,
 
153
        conn = Conn
 
154
    } = State,
 
155
    if is_pid(ChangesPid) -> exit(ChangesPid, stop); true -> ok end,
 
156
    if is_pid(Conn) -> catch ibrowse:stop_worker_process(Conn); true -> ok end,
155
157
    ok.
156
158
 
157
159
code_change(_OldVsn, State, _Extra) ->
222
224
        rows = Rows
223
225
    } = State,
224
226
    NewState = try
225
 
        Row = decode_row(<<Partial/binary, Chunk/binary>>),
 
227
        Row = {Props} = decode_row(<<Partial/binary, Chunk/binary>>),
226
228
        case State of
227
229
        #state{reply_to=nil} ->
228
230
            State#state{
229
231
                count = Count+1,
 
232
                last_seq = proplists:get_value(<<"seq">>, Props),
230
233
                partial_chunk = <<>>,
231
234
                rows=queue:in(Row,Rows)
232
235
            };
246
249
    gen_server:reply(State#state.reply_to, complete),
247
250
    {stop, normal, State}.
248
251
 
 
252
handle_retry(State) ->
 
253
    ?LOG_DEBUG("retrying changes feed because our connection closed", []),
 
254
    #state{
 
255
        count = Count,
 
256
        init_args = [_, Source, _, PostProps],
 
257
        last_seq = Since,
 
258
        reply_to = ReplyTo,
 
259
        rows = Rows
 
260
    } = State,
 
261
    case init([nil, Source, Since, PostProps]) of
 
262
    {ok, State1} ->
 
263
        MergedState = State1#state{
 
264
            count = Count,
 
265
            reply_to = ReplyTo,
 
266
            rows = Rows
 
267
        },
 
268
        {noreply, MergedState};
 
269
    _ ->
 
270
        {stop, {error, connection_closed}, State}
 
271
    end.
 
272
 
249
273
by_seq_loop(Server, Source, StartSeq) ->
250
274
    Req = Source#http_db{
251
275
        resource = "_all_docs_by_seq",
277
301
decode_row(<<",\n", Rest/binary>>) ->
278
302
    decode_row(Rest);
279
303
decode_row(Row) ->
280
 
    {[Seq, Id, {<<"changes">>,C}]} = ?JSON_DECODE(Row),
 
304
    {Props} = ?JSON_DECODE(Row),
 
305
    Seq = proplists:get_value(<<"seq">>, Props),
 
306
    Id = proplists:get_value(<<"id">>, Props),
 
307
    C = proplists:get_value(<<"changes">>, Props),
281
308
    C2 = [{[{<<"rev">>,couch_doc:parse_rev(R)}]} || {[{<<"rev">>,R}]} <- C],
282
 
    {[Seq, Id, {<<"changes">>,C2}]}.
 
309
    {[{<<"seq">>, Seq}, {<<"id">>,Id}, {<<"changes">>,C2}]}.
 
310
 
283
311
 
284
312
flush_updated_messages() ->
285
313
    receive updated -> flush_updated_messages()
288
316
 
289
317
local_update_notification(Self, DbName, {updated, DbName}) ->
290
318
    Self ! updated;
291
 
local_update_notification(Self, DbName, {deleted, DbName}) ->
292
 
    Self ! deleted;
293
319
local_update_notification(_, _, _) ->
294
320
    ok.
295
321
 
342
368
    {Pid, Id}.
343
369
 
344
370
wait_db_updated() ->
345
 
    receive deleted ->
346
 
        exit(deleted)
347
 
    after 0 ->
348
 
        receive updated ->
349
 
            flush_updated_messages()
350
 
        end
 
371
    receive updated ->
 
372
        flush_updated_messages()
351
373
    end.