~saurabhanandiit/gtg/exportFixed

« back to all changes in this revision

Viewing changes to GTG/backends/tweepy/streaming.py

Merge of my work on liblarch newbase and all the backends ported to liblarch
(which mainly means porting the datastore).
One failing test, will check it.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# Tweepy
 
2
# Copyright 2009-2010 Joshua Roesslein
 
3
# See LICENSE for details.
 
4
 
 
5
import httplib
 
6
from socket import timeout
 
7
from threading import Thread
 
8
from time import sleep
 
9
import urllib
 
10
 
 
11
from tweepy.auth import BasicAuthHandler
 
12
from tweepy.models import Status
 
13
from tweepy.api import API
 
14
from tweepy.error import TweepError
 
15
 
 
16
from tweepy.utils import import_simplejson
 
17
json = import_simplejson()
 
18
 
 
19
STREAM_VERSION = 1
 
20
 
 
21
 
 
22
class StreamListener(object):
 
23
 
 
24
    def __init__(self, api=None):
 
25
        self.api = api or API()
 
26
 
 
27
    def on_data(self, data):
 
28
        """Called when raw data is received from connection.
 
29
 
 
30
        Override this method if you wish to manually handle
 
31
        the stream data. Return False to stop stream and close connection.
 
32
        """
 
33
 
 
34
        if 'in_reply_to_status_id' in data:
 
35
            status = Status.parse(self.api, json.loads(data))
 
36
            if self.on_status(status) is False:
 
37
                return False
 
38
        elif 'delete' in data:
 
39
            delete = json.loads(data)['delete']['status']
 
40
            if self.on_delete(delete['id'], delete['user_id']) is False:
 
41
                return False
 
42
        elif 'limit' in data:
 
43
            if self.on_limit(json.loads(data)['limit']['track']) is False:
 
44
                return False
 
45
 
 
46
    def on_status(self, status):
 
47
        """Called when a new status arrives"""
 
48
        return
 
49
 
 
50
    def on_delete(self, status_id, user_id):
 
51
        """Called when a delete notice arrives for a status"""
 
52
        return
 
53
 
 
54
    def on_limit(self, track):
 
55
        """Called when a limitation notice arrvies"""
 
56
        return
 
57
 
 
58
    def on_error(self, status_code):
 
59
        """Called when a non-200 status code is returned"""
 
60
        return False
 
61
 
 
62
    def on_timeout(self):
 
63
        """Called when stream connection times out"""
 
64
        return
 
65
 
 
66
 
 
67
class Stream(object):
 
68
 
 
69
    host = 'stream.twitter.com'
 
70
 
 
71
    def __init__(self, username, password, listener, timeout=5.0, retry_count = None,
 
72
                    retry_time = 10.0, snooze_time = 5.0, buffer_size=1500, headers=None):
 
73
        self.auth = BasicAuthHandler(username, password)
 
74
        self.running = False
 
75
        self.timeout = timeout
 
76
        self.retry_count = retry_count
 
77
        self.retry_time = retry_time
 
78
        self.snooze_time = snooze_time
 
79
        self.buffer_size = buffer_size
 
80
        self.listener = listener
 
81
        self.api = API()
 
82
        self.headers = headers or {}
 
83
        self.body = None
 
84
 
 
85
    def _run(self):
 
86
        # setup
 
87
        self.auth.apply_auth(None, None, self.headers, None)
 
88
 
 
89
        # enter loop
 
90
        error_counter = 0
 
91
        conn = None
 
92
        exception = None
 
93
        while self.running:
 
94
            if self.retry_count and error_counter > self.retry_count:
 
95
                # quit if error count greater than retry count
 
96
                break
 
97
            try:
 
98
                conn = httplib.HTTPConnection(self.host)
 
99
                conn.connect()
 
100
                conn.sock.settimeout(self.timeout)
 
101
                conn.request('POST', self.url, self.body, headers=self.headers)
 
102
                resp = conn.getresponse()
 
103
                if resp.status != 200:
 
104
                    if self.listener.on_error(resp.status) is False:
 
105
                        break
 
106
                    error_counter += 1
 
107
                    sleep(self.retry_time)
 
108
                else:
 
109
                    error_counter = 0
 
110
                    self._read_loop(resp)
 
111
            except timeout:
 
112
                if self.listener.on_timeout() == False:
 
113
                    break
 
114
                if self.running is False:
 
115
                    break
 
116
                conn.close()
 
117
                sleep(self.snooze_time)
 
118
            except Exception, exception:
 
119
                # any other exception is fatal, so kill loop
 
120
                break
 
121
 
 
122
        # cleanup
 
123
        self.running = False
 
124
        if conn:
 
125
            conn.close()
 
126
 
 
127
        if exception:
 
128
            raise exception
 
129
 
 
130
    def _read_loop(self, resp):
 
131
        data = ''
 
132
        while self.running:
 
133
            if resp.isclosed():
 
134
                break
 
135
 
 
136
            # read length
 
137
            length = ''
 
138
            while True:
 
139
                c = resp.read(1)
 
140
                if c == '\n':
 
141
                    break
 
142
                length += c
 
143
            length = length.strip()
 
144
            if length.isdigit():
 
145
                length = int(length)
 
146
            else:
 
147
                continue
 
148
 
 
149
            # read data and pass into listener
 
150
            data = resp.read(length)
 
151
            if self.listener.on_data(data) is False:
 
152
                self.running = False
 
153
 
 
154
    def _start(self, async):
 
155
        self.running = True
 
156
        if async:
 
157
            Thread(target=self._run).start()
 
158
        else:
 
159
            self._run()
 
160
 
 
161
    def firehose(self, count=None, async=False):
 
162
        if self.running:
 
163
            raise TweepError('Stream object already connected!')
 
164
        self.url = '/%i/statuses/firehose.json?delimited=length' % STREAM_VERSION
 
165
        if count:
 
166
            self.url += '&count=%s' % count
 
167
        self._start(async)
 
168
 
 
169
    def retweet(self, async=False):
 
170
        if self.running:
 
171
            raise TweepError('Stream object already connected!')
 
172
        self.url = '/%i/statuses/retweet.json?delimited=length' % STREAM_VERSION
 
173
        self._start(async)
 
174
 
 
175
    def sample(self, count=None, async=False):
 
176
        if self.running:
 
177
            raise TweepError('Stream object already connected!')
 
178
        self.url = '/%i/statuses/sample.json?delimited=length' % STREAM_VERSION
 
179
        if count:
 
180
            self.url += '&count=%s' % count
 
181
        self._start(async)
 
182
 
 
183
    def filter(self, follow=None, track=None, async=False):
 
184
        params = {}
 
185
        self.headers['Content-type'] = "application/x-www-form-urlencoded"
 
186
        if self.running:
 
187
            raise TweepError('Stream object already connected!')
 
188
        self.url = '/%i/statuses/filter.json?delimited=length' % STREAM_VERSION
 
189
        if follow:
 
190
            params['follow'] = ','.join(map(str, follow))
 
191
        if track:
 
192
            params['track'] = ','.join(map(str, track))
 
193
        self.body = urllib.urlencode(params)
 
194
        self._start(async)
 
195
 
 
196
    def disconnect(self):
 
197
        if self.running is False:
 
198
            return
 
199
        self.running = False
 
200