~ubuntu-branches/ubuntu/oneiric/libanyevent-redis-perl/oneiric

« back to all changes in this revision

Viewing changes to lib/AnyEvent/Redis.pm

  • Committer: Bazaar Package Importer
  • Author(s): Alessandro Ghedini
  • Date: 2011-02-09 19:31:48 UTC
  • Revision ID: james.westby@ubuntu.com-20110209193148-asa3jeyaol6nikd2
Tags: upstream-0.23
ImportĀ upstreamĀ versionĀ 0.23

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
package AnyEvent::Redis;
 
2
 
 
3
use strict;
 
4
use 5.008_001;
 
5
our $VERSION = '0.23';
 
6
 
 
7
use constant DEBUG => $ENV{ANYEVENT_REDIS_DEBUG};
 
8
use AnyEvent;
 
9
use AnyEvent::Handle;
 
10
use AnyEvent::Socket;
 
11
use AnyEvent::Redis::Protocol;
 
12
use Try::Tiny;
 
13
use Carp qw(croak);
 
14
use Encode ();
 
15
 
 
16
our $AUTOLOAD;
 
17
 
 
18
sub new {
 
19
    my($class, %args) = @_;
 
20
 
 
21
    my $host = delete $args{host} || '127.0.0.1';
 
22
    my $port = delete $args{port} || 6379;
 
23
 
 
24
    if (my $encoding = $args{encoding}) {
 
25
        $args{encoding} = Encode::find_encoding($encoding);
 
26
        croak qq{Encoding "$encoding" not found} unless ref $args{encoding};
 
27
    }
 
28
 
 
29
    bless {
 
30
        host => $host,
 
31
        port => $port,
 
32
        %args,
 
33
    }, $class;
 
34
}
 
35
 
 
36
sub run_cmd {
 
37
    my $self = shift;
 
38
    my $cmd  = shift;
 
39
 
 
40
    $self->{cmd_cb} or return $self->connect($cmd, @_);
 
41
    $self->{cmd_cb}->($cmd, @_);
 
42
}
 
43
 
 
44
sub DESTROY { }
 
45
 
 
46
sub AUTOLOAD {
 
47
    my $self = shift;
 
48
    (my $method = $AUTOLOAD) =~ s/.*:://;
 
49
    $self->run_cmd($method, @_);
 
50
}
 
51
 
 
52
sub all_cv {
 
53
    my $self = shift;
 
54
    $self->{all_cv} = shift if @_;
 
55
    unless ($self->{all_cv}) {
 
56
        $self->{all_cv} = AE::cv;
 
57
    }
 
58
    $self->{all_cv};
 
59
}
 
60
 
 
61
sub cleanup {
 
62
    my $self = shift;
 
63
    delete $self->{cmd_cb};
 
64
    delete $self->{sock};
 
65
    $self->{on_error}->(@_) if $self->{on_error};
 
66
}
 
67
 
 
68
sub connect {
 
69
    my $self = shift;
 
70
 
 
71
    my $cv;
 
72
    if (@_) {
 
73
        $cv = pop if UNIVERSAL::isa($_[-1], 'AnyEvent::CondVar');
 
74
        $cv ||= AE::cv;
 
75
        push @{$self->{connect_queue}}, [ $cv, @_ ];
 
76
    }
 
77
 
 
78
    return $cv if $self->{sock};
 
79
 
 
80
    $self->{sock} = tcp_connect $self->{host}, $self->{port}, sub {
 
81
        my $fh = shift
 
82
            or do {
 
83
              my $err = "Can't connect Redis server: $!";
 
84
              $self->cleanup($err);
 
85
              $cv->croak($err);
 
86
              return
 
87
            };
 
88
 
 
89
        binmode $fh; # ensure bytes until we decode
 
90
 
 
91
        my $hd = AnyEvent::Handle->new(
 
92
            fh => $fh,
 
93
            on_error => sub { $_[0]->destroy;
 
94
                              if ($_[1]) {
 
95
                                  $self->cleanup($_[2]);
 
96
                              }
 
97
                          },
 
98
            on_eof   => sub { $_[0]->destroy;
 
99
                              $self->cleanup('connection closed');
 
100
                          },
 
101
            encoding => $self->{encoding},
 
102
        );
 
103
 
 
104
        $self->{cmd_cb} = sub {
 
105
            $self->all_cv->begin;
 
106
            my $command = shift;
 
107
 
 
108
            my($cv, $cb);
 
109
            if (@_) {
 
110
                $cv = pop if UNIVERSAL::isa($_[-1], 'AnyEvent::CondVar');
 
111
                $cb = pop if ref $_[-1] eq 'CODE';
 
112
            }
 
113
            $cv ||= AE::cv;
 
114
 
 
115
            my $send = join("\r\n",
 
116
                  "*" . (1 + @_),
 
117
                  map { ('$' . length $_ => $_) }
 
118
                        (uc($command), map { $self->{encoding} && length($_)
 
119
                                             ? $self->{encoding}->encode($_)
 
120
                                             : $_ } @_))
 
121
                . "\r\n";
 
122
 
 
123
            warn $send if DEBUG;
 
124
 
 
125
            $hd->push_write($send);
 
126
 
 
127
            # Are we already subscribed to anything?
 
128
            if($self->{sub} && %{$self->{sub}}) {
 
129
 
 
130
              croak "Use of non-pubsub command during pubsub session may result in unexpected behaviour"
 
131
                unless $command =~ /^p?(?:un)?subscribe$/i;
 
132
 
 
133
              # Remember subscriptions
 
134
              $self->{sub}->{$_} ||= [$cv, $cb] for @_;
 
135
 
 
136
            } elsif ($command !~ /^p?subscribe$/i) {
 
137
 
 
138
                $cv->cb(sub {
 
139
                    my $cv = shift;
 
140
                    try {
 
141
                        my $res = $cv->recv;
 
142
                        $cb->($res);
 
143
                    } catch {
 
144
                        ($self->{on_error} || sub { die @_ })->($_);
 
145
                    }
 
146
                }) if $cb;
 
147
 
 
148
                $hd->push_read("AnyEvent::Redis::Protocol" => sub {
 
149
                        my($res, $err) = @_;
 
150
 
 
151
                        if($command eq 'info') {
 
152
                          $res = { map { split /:/, $_, 2 } split /\r\n/, $res };
 
153
                        } elsif($command eq 'keys' && !ref $res) {
 
154
                          # Older versions of Redis (1.2) need this
 
155
                          $res = [split / /, $res];
 
156
                        }
 
157
 
 
158
                        $self->all_cv->end;
 
159
                        $err ? $cv->croak($res) : $cv->send($res);
 
160
                });
 
161
 
 
162
            } else {
 
163
                croak "Must provide a CODE reference for subscriptions" unless $cb;
 
164
 
 
165
                # Remember subscriptions
 
166
                $self->{sub}->{$_} ||= [$cv, $cb] for @_;
 
167
 
 
168
                my $res_cb; $res_cb = sub {
 
169
 
 
170
                    $hd->push_read("AnyEvent::Redis::Protocol" => sub {
 
171
                            my($res, $err) = @_;
 
172
 
 
173
                            if(ref $res) {
 
174
                                my $action = lc $res->[0];
 
175
                                warn "$action $res->[1]" if DEBUG;
 
176
 
 
177
                                if($action eq 'message') {
 
178
                                    $self->{sub}->{$res->[1]}[1]->($res->[2], $res->[1]);
 
179
 
 
180
                                } elsif($action eq 'pmessage') {
 
181
                                    $self->{sub}->{$res->[1]}[1]->($res->[3], $res->[2], $res->[1]);
 
182
 
 
183
                                } elsif($action eq 'subscribe' || $action eq 'psubscribe') {
 
184
                                    $self->{sub_count} = $res->[2];
 
185
 
 
186
                                } elsif($action eq 'unsubscribe' || $action eq 'punsubscribe') {
 
187
                                    $self->{sub_count} = $res->[2];
 
188
                                    $self->{sub}->{$res->[1]}[0]->send;
 
189
                                    delete $self->{sub}->{$res->[1]};
 
190
                                    $self->all_cv->end;
 
191
 
 
192
                                } else {
 
193
                                    warn "Unknown pubsub action: $action";
 
194
                                }
 
195
                            }
 
196
 
 
197
                            if($self->{sub_count} || %{$self->{sub}}) {
 
198
                                # Carry on reading while we are subscribed
 
199
                                $res_cb->();
 
200
                            }
 
201
                        });
 
202
                };
 
203
 
 
204
                $res_cb->();
 
205
            }
 
206
 
 
207
            return $cv;
 
208
        };
 
209
 
 
210
        for my $queue (@{$self->{connect_queue} || []}) {
 
211
            my($cv, @args) = @$queue;
 
212
            $self->{cmd_cb}->(@args, $cv);
 
213
        }
 
214
 
 
215
    };
 
216
 
 
217
    return $cv;
 
218
}
 
219
 
 
220
1;
 
221
__END__
 
222
 
 
223
=encoding utf-8
 
224
 
 
225
=for stopwords
 
226
 
 
227
=head1 NAME
 
228
 
 
229
AnyEvent::Redis - Non-blocking Redis client
 
230
 
 
231
=head1 SYNOPSIS
 
232
 
 
233
  use AnyEvent::Redis;
 
234
 
 
235
  my $redis = AnyEvent::Redis->new(
 
236
      host => '127.0.0.1',
 
237
      port => 6379,
 
238
      encoding => 'utf8',
 
239
      on_error => sub { warn @_ },
 
240
  );
 
241
 
 
242
  # callback based
 
243
  $redis->set( 'foo'=> 'bar', sub { warn "SET!" } );
 
244
  $redis->get( 'foo', sub { my $value = shift } );
 
245
 
 
246
  my ($key, $value) = ('list_key', 123);
 
247
  $redis->lpush( $key, $value );
 
248
  $redis->lpop( $key, sub { my $value = shift });
 
249
 
 
250
  # condvar based
 
251
  my $cv = $redis->lpop( $key );
 
252
  $cv->cb(sub { my $value = $_[0]->recv });
 
253
 
 
254
=head1 DESCRIPTION
 
255
 
 
256
AnyEvent::Redis is a non-blocking (event-driven) Redis client.
 
257
 
 
258
This module is an AnyEvent user; you must install and use a supported event loop.
 
259
 
 
260
=head1 ESTABLISHING A CONNECTION
 
261
 
 
262
To create a new connection, use the new() method with the following attributes:
 
263
 
 
264
=over
 
265
 
 
266
=item host => <HOSTNAME>
 
267
 
 
268
B<Required.>  The hostname or literal address of the server.  
 
269
 
 
270
=item port => <PORT>
 
271
 
 
272
Optional.  The server port.
 
273
 
 
274
=item encoding => <ENCODING>
 
275
 
 
276
Optional.  Encode and decode data (when storing and retrieving, respectively)
 
277
according to I<ENCODING> (C<"utf8"> is recommended or see L<Encode::Supported>
 
278
for details on possible I<ENCODING> values).
 
279
 
 
280
Omit if you intend to handle raw binary data with this connection.
 
281
 
 
282
=item on_error => $cb->($errmsg)
 
283
 
 
284
Optional.  Callback that will be fired if a connection or database-level error
 
285
occurs.  The error message will be passed to the callback as the sole argument.
 
286
 
 
287
=back
 
288
 
 
289
=head1 METHODS
 
290
 
 
291
All methods supported by your version of Redis should be supported.
 
292
 
 
293
=head2 Normal commands
 
294
 
 
295
There are two alternative approaches for handling results from commands:
 
296
 
 
297
=over 4
 
298
 
 
299
=item * L<AnyEvent::CondVar> based:
 
300
 
 
301
  my $cv = $redis->command(
 
302
    # arguments to command
 
303
  );
 
304
 
 
305
  # Then...
 
306
  my $res;
 
307
  eval { 
 
308
      # Could die()
 
309
      $res = $cv->recv;
 
310
  }; 
 
311
  warn $@ if $@;
 
312
 
 
313
  # or...
 
314
  $cv->cb(sub {
 
315
    my($cv) = @_;
 
316
    my($result, $err) = $cv->recv
 
317
  });
 
318
 
 
319
 
 
320
=item * Callback:
 
321
 
 
322
  $redis->command(
 
323
    # arguments,
 
324
    sub {
 
325
      my($result, $err) = @_;
 
326
    });
 
327
 
 
328
(Callback is a wrapper around the C<$cv> approach.)
 
329
 
 
330
=back
 
331
 
 
332
=head2 Subscriptions
 
333
 
 
334
The subscription methods (C<subscribe> and C<psubscribe>) must be used with a callback:
 
335
 
 
336
  my $cv = $redis->subscribe("test", sub {
 
337
    my($message, $channel[, $actual_channel]) = @_;
 
338
    # ($actual_channel is provided for pattern subscriptions.)
 
339
  });
 
340
 
 
341
The C<$cv> condition will be met on unsubscribing from the channel.
 
342
 
 
343
Due to limitations of the Redis protocol the only valid commands on a
 
344
connection with an active subscription are subscribe and unsubscribe commands.
 
345
 
 
346
=head2 Common methods
 
347
 
 
348
=over 4
 
349
 
 
350
=item * get
 
351
 
 
352
=item * set
 
353
 
 
354
=item * hset
 
355
 
 
356
=item * hget
 
357
 
 
358
=item * lpush
 
359
 
 
360
=item * lpop
 
361
 
 
362
=back
 
363
 
 
364
The Redis command reference (L<http://redis.io/commands>) lists all commands
 
365
Redis supports.
 
366
 
 
367
=head1 REQUIREMENTS
 
368
 
 
369
This requires Redis >= 1.2.
 
370
 
 
371
=head1 COPYRIGHT
 
372
 
 
373
Tatsuhiko Miyagawa E<lt>miyagawa@bulknews.netE<gt> 2009-
 
374
 
 
375
=head1 LICENSE
 
376
 
 
377
This library is free software; you can redistribute it and/or modify
 
378
it under the same terms as Perl itself.
 
379
 
 
380
=head1 AUTHORS
 
381
 
 
382
Tatsuhiko Miyagawa
 
383
 
 
384
David Leadbeater
 
385
 
 
386
Chia-liang Kao
 
387
 
 
388
franck cuny
 
389
 
 
390
Lee Aylward
 
391
 
 
392
Joshua Barratt
 
393
 
 
394
Jeremy Zawodny
 
395
 
 
396
Leon Brocard
 
397
 
 
398
Michael S. Fischer
 
399
 
 
400
=head1 SEE ALSO
 
401
 
 
402
L<Redis>, L<AnyEvent>
 
403
 
 
404
=cut