2
# Copyright 2009-2010 Joshua Roesslein
3
# See LICENSE for details.
6
from socket import timeout
7
from threading import Thread
11
from tweepy.auth import BasicAuthHandler
12
from tweepy.models import Status
13
from tweepy.api import API
14
from tweepy.error import TweepError
16
from tweepy.utils import import_simplejson
17
json = import_simplejson()
22
class StreamListener(object):
24
def __init__(self, api=None):
25
self.api = api or API()
27
def on_data(self, data):
28
"""Called when raw data is received from connection.
30
Override this method if you wish to manually handle
31
the stream data. Return False to stop stream and close connection.
34
if 'in_reply_to_status_id' in data:
35
status = Status.parse(self.api, json.loads(data))
36
if self.on_status(status) is False:
38
elif 'delete' in data:
39
delete = json.loads(data)['delete']['status']
40
if self.on_delete(delete['id'], delete['user_id']) is False:
43
if self.on_limit(json.loads(data)['limit']['track']) is False:
46
def on_status(self, status):
47
"""Called when a new status arrives"""
50
def on_delete(self, status_id, user_id):
51
"""Called when a delete notice arrives for a status"""
54
def on_limit(self, track):
55
"""Called when a limitation notice arrvies"""
58
def on_error(self, status_code):
59
"""Called when a non-200 status code is returned"""
63
"""Called when stream connection times out"""
69
host = 'stream.twitter.com'
71
def __init__(self, username, password, listener, timeout=5.0, retry_count = None,
72
retry_time = 10.0, snooze_time = 5.0, buffer_size=1500, headers=None):
73
self.auth = BasicAuthHandler(username, password)
75
self.timeout = timeout
76
self.retry_count = retry_count
77
self.retry_time = retry_time
78
self.snooze_time = snooze_time
79
self.buffer_size = buffer_size
80
self.listener = listener
82
self.headers = headers or {}
87
self.auth.apply_auth(None, None, self.headers, None)
94
if self.retry_count and error_counter > self.retry_count:
95
# quit if error count greater than retry count
98
conn = httplib.HTTPConnection(self.host)
100
conn.sock.settimeout(self.timeout)
101
conn.request('POST', self.url, self.body, headers=self.headers)
102
resp = conn.getresponse()
103
if resp.status != 200:
104
if self.listener.on_error(resp.status) is False:
107
sleep(self.retry_time)
110
self._read_loop(resp)
112
if self.listener.on_timeout() == False:
114
if self.running is False:
117
sleep(self.snooze_time)
118
except Exception, exception:
119
# any other exception is fatal, so kill loop
130
def _read_loop(self, resp):
143
length = length.strip()
149
# read data and pass into listener
150
data = resp.read(length)
151
if self.listener.on_data(data) is False:
154
def _start(self, async):
157
Thread(target=self._run).start()
161
def firehose(self, count=None, async=False):
163
raise TweepError('Stream object already connected!')
164
self.url = '/%i/statuses/firehose.json?delimited=length' % STREAM_VERSION
166
self.url += '&count=%s' % count
169
def retweet(self, async=False):
171
raise TweepError('Stream object already connected!')
172
self.url = '/%i/statuses/retweet.json?delimited=length' % STREAM_VERSION
175
def sample(self, count=None, async=False):
177
raise TweepError('Stream object already connected!')
178
self.url = '/%i/statuses/sample.json?delimited=length' % STREAM_VERSION
180
self.url += '&count=%s' % count
183
def filter(self, follow=None, track=None, async=False):
185
self.headers['Content-type'] = "application/x-www-form-urlencoded"
187
raise TweepError('Stream object already connected!')
188
self.url = '/%i/statuses/filter.json?delimited=length' % STREAM_VERSION
190
params['follow'] = ','.join(map(str, follow))
192
params['track'] = ','.join(map(str, track))
193
self.body = urllib.urlencode(params)
196
def disconnect(self):
197
if self.running is False: