1
package AnyEvent::Redis;
7
use constant DEBUG => $ENV{ANYEVENT_REDIS_DEBUG};
11
use AnyEvent::Redis::Protocol;
19
my($class, %args) = @_;
21
my $host = delete $args{host} || '127.0.0.1';
22
my $port = delete $args{port} || 6379;
24
if (my $encoding = $args{encoding}) {
25
$args{encoding} = Encode::find_encoding($encoding);
26
croak qq{Encoding "$encoding" not found} unless ref $args{encoding};
40
$self->{cmd_cb} or return $self->connect($cmd, @_);
41
$self->{cmd_cb}->($cmd, @_);
48
(my $method = $AUTOLOAD) =~ s/.*:://;
49
$self->run_cmd($method, @_);
54
$self->{all_cv} = shift if @_;
55
unless ($self->{all_cv}) {
56
$self->{all_cv} = AE::cv;
63
delete $self->{cmd_cb};
65
$self->{on_error}->(@_) if $self->{on_error};
73
$cv = pop if UNIVERSAL::isa($_[-1], 'AnyEvent::CondVar');
75
push @{$self->{connect_queue}}, [ $cv, @_ ];
78
return $cv if $self->{sock};
80
$self->{sock} = tcp_connect $self->{host}, $self->{port}, sub {
83
my $err = "Can't connect Redis server: $!";
89
binmode $fh; # ensure bytes until we decode
91
my $hd = AnyEvent::Handle->new(
93
on_error => sub { $_[0]->destroy;
95
$self->cleanup($_[2]);
98
on_eof => sub { $_[0]->destroy;
99
$self->cleanup('connection closed');
101
encoding => $self->{encoding},
104
$self->{cmd_cb} = sub {
105
$self->all_cv->begin;
110
$cv = pop if UNIVERSAL::isa($_[-1], 'AnyEvent::CondVar');
111
$cb = pop if ref $_[-1] eq 'CODE';
115
my $send = join("\r\n",
117
map { ('$' . length $_ => $_) }
118
(uc($command), map { $self->{encoding} && length($_)
119
? $self->{encoding}->encode($_)
125
$hd->push_write($send);
127
# Are we already subscribed to anything?
128
if($self->{sub} && %{$self->{sub}}) {
130
croak "Use of non-pubsub command during pubsub session may result in unexpected behaviour"
131
unless $command =~ /^p?(?:un)?subscribe$/i;
133
# Remember subscriptions
134
$self->{sub}->{$_} ||= [$cv, $cb] for @_;
136
} elsif ($command !~ /^p?subscribe$/i) {
144
($self->{on_error} || sub { die @_ })->($_);
148
$hd->push_read("AnyEvent::Redis::Protocol" => sub {
151
if($command eq 'info') {
152
$res = { map { split /:/, $_, 2 } split /\r\n/, $res };
153
} elsif($command eq 'keys' && !ref $res) {
154
# Older versions of Redis (1.2) need this
155
$res = [split / /, $res];
159
$err ? $cv->croak($res) : $cv->send($res);
163
croak "Must provide a CODE reference for subscriptions" unless $cb;
165
# Remember subscriptions
166
$self->{sub}->{$_} ||= [$cv, $cb] for @_;
168
my $res_cb; $res_cb = sub {
170
$hd->push_read("AnyEvent::Redis::Protocol" => sub {
174
my $action = lc $res->[0];
175
warn "$action $res->[1]" if DEBUG;
177
if($action eq 'message') {
178
$self->{sub}->{$res->[1]}[1]->($res->[2], $res->[1]);
180
} elsif($action eq 'pmessage') {
181
$self->{sub}->{$res->[1]}[1]->($res->[3], $res->[2], $res->[1]);
183
} elsif($action eq 'subscribe' || $action eq 'psubscribe') {
184
$self->{sub_count} = $res->[2];
186
} elsif($action eq 'unsubscribe' || $action eq 'punsubscribe') {
187
$self->{sub_count} = $res->[2];
188
$self->{sub}->{$res->[1]}[0]->send;
189
delete $self->{sub}->{$res->[1]};
193
warn "Unknown pubsub action: $action";
197
if($self->{sub_count} || %{$self->{sub}}) {
198
# Carry on reading while we are subscribed
210
for my $queue (@{$self->{connect_queue} || []}) {
211
my($cv, @args) = @$queue;
212
$self->{cmd_cb}->(@args, $cv);
229
AnyEvent::Redis - Non-blocking Redis client
235
my $redis = AnyEvent::Redis->new(
239
on_error => sub { warn @_ },
243
$redis->set( 'foo'=> 'bar', sub { warn "SET!" } );
244
$redis->get( 'foo', sub { my $value = shift } );
246
my ($key, $value) = ('list_key', 123);
247
$redis->lpush( $key, $value );
248
$redis->lpop( $key, sub { my $value = shift });
251
my $cv = $redis->lpop( $key );
252
$cv->cb(sub { my $value = $_[0]->recv });
256
AnyEvent::Redis is a non-blocking (event-driven) Redis client.
258
This module is an AnyEvent user; you must install and use a supported event loop.
260
=head1 ESTABLISHING A CONNECTION
262
To create a new connection, use the new() method with the following attributes:
266
=item host => <HOSTNAME>
268
B<Required.> The hostname or literal address of the server.
272
Optional. The server port.
274
=item encoding => <ENCODING>
276
Optional. Encode and decode data (when storing and retrieving, respectively)
277
according to I<ENCODING> (C<"utf8"> is recommended or see L<Encode::Supported>
278
for details on possible I<ENCODING> values).
280
Omit if you intend to handle raw binary data with this connection.
282
=item on_error => $cb->($errmsg)
284
Optional. Callback that will be fired if a connection or database-level error
285
occurs. The error message will be passed to the callback as the sole argument.
291
All methods supported by your version of Redis should be supported.
293
=head2 Normal commands
295
There are two alternative approaches for handling results from commands:
299
=item * L<AnyEvent::CondVar> based:
301
my $cv = $redis->command(
302
# arguments to command
316
my($result, $err) = $cv->recv
325
my($result, $err) = @_;
328
(Callback is a wrapper around the C<$cv> approach.)
334
The subscription methods (C<subscribe> and C<psubscribe>) must be used with a callback:
336
my $cv = $redis->subscribe("test", sub {
337
my($message, $channel[, $actual_channel]) = @_;
338
# ($actual_channel is provided for pattern subscriptions.)
341
The C<$cv> condition will be met on unsubscribing from the channel.
343
Due to limitations of the Redis protocol the only valid commands on a
344
connection with an active subscription are subscribe and unsubscribe commands.
346
=head2 Common methods
364
The Redis command reference (L<http://redis.io/commands>) lists all commands
369
This requires Redis >= 1.2.
373
Tatsuhiko Miyagawa E<lt>miyagawa@bulknews.netE<gt> 2009-
377
This library is free software; you can redistribute it and/or modify
378
it under the same terms as Perl itself.
402
L<Redis>, L<AnyEvent>