~ken-vandine/ubuntu/lucid/gwibber/twitter-oauth

« back to all changes in this revision

Viewing changes to gwibber/microblog/twitter.py

  • Committer: Ken VanDine
  • Date: 2010-08-31 20:08:35 UTC
  • mfrom: (1.1.21 upstream)
  • Revision ID: ken.vandine@canonical.com-20100831200835-i7l1y21kdikiq582
Tags: 2.30.2~bzr742-0ubuntu1
releasing version 2.30.2~bzr742-0ubuntu1

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
import network, util, htmllib
2
 
from util import log
3
 
from util import exceptions
 
1
import network, util, htmllib, re
 
2
import gnomekeyring
 
3
from oauth import oauth
 
4
from util import log, exceptions
4
5
from gettext import lgettext as _
5
6
log.logger.name = "Twitter"
6
7
 
9
10
  "version": "1.0",
10
11
  
11
12
  "config": [
12
 
    "private:password",
 
13
    "private:secret_token",
 
14
    "access_token",
13
15
    "username",
14
16
    "color",
15
17
    "receive_enabled",
16
18
    "send_enabled",
17
19
  ],
18
20
 
19
 
  "authtype": "login",
 
21
  "authtype": "oauth1a",
20
22
  "color": "#729FCF",
21
23
 
22
24
  "features": [
44
46
}
45
47
 
46
48
URL_PREFIX = "https://twitter.com"
 
49
API_PREFIX = "https://api.twitter.com/1"
47
50
 
48
51
def unescape(s):
49
52
  p = htmllib.HTMLParser(None)
53
56
 
54
57
class Client:
55
58
  def __init__(self, acct):
 
59
    if not acct.has_key("access_token") and not acct.has_key("secret_token"):
 
60
      raise exceptions.GwibberProtocolError("keyring")
 
61
    if acct.has_key("secret_token") and acct.has_key("password"): acct.pop("password")
56
62
    self.account = acct
 
63
    self.sigmethod = oauth.OAuthSignatureMethod_HMAC_SHA1()
 
64
    self.consumer = oauth.OAuthConsumer(*util.resources.get_twitter_keys())
 
65
    self.token = oauth.OAuthToken(acct["access_token"], acct["secret_token"])
57
66
 
58
67
  def _common(self, data):
59
68
    m = {}; 
60
 
    m["id"] = str(data["id"])
61
 
    m["protocol"] = "twitter"
62
 
    m["account"] = self.account["_id"]
63
 
    m["time"] = util.parsetime(data["created_at"])
64
 
    m["text"] = unescape(data["text"])
65
 
    m["to_me"] = ("@%s" % self.account["username"]) in data["text"]
66
 
 
67
 
    m["html"] = util.linkify(data["text"],
68
 
      ((util.PARSE_HASH, '#<a class="hash" href="%s#search?q=\\1">\\1</a>' % URL_PREFIX),
69
 
      (util.PARSE_NICK, '@<a class="nick" href="%s/\\1">\\1</a>' % URL_PREFIX)), escape=False)
70
 
 
71
 
    m["content"] = util.linkify(data["text"],
72
 
      ((util.PARSE_HASH, '#<a class="hash" href="gwibber:/tag?acct=%s&query=\\1">\\1</a>' % m["account"]),
73
 
      (util.PARSE_NICK, '@<a class="nick" href="gwibber:/user?acct=%s&name=\\1">\\1</a>' % m["account"])), escape=False)
74
 
 
 
69
    try:
 
70
      m["id"] = str(data["id"])
 
71
      m["protocol"] = "twitter"
 
72
      m["account"] = self.account["_id"]
 
73
      m["time"] = util.parsetime(data["created_at"])
 
74
      m["text"] = unescape(data["text"])
 
75
      m["to_me"] = ("@%s" % self.account["username"]) in data["text"]
 
76
 
 
77
      m["html"] = util.linkify(data["text"],
 
78
        ((util.PARSE_HASH, '#<a class="hash" href="%s#search?q=\\1">\\1</a>' % URL_PREFIX),
 
79
        (util.PARSE_NICK, '@<a class="nick" href="%s/\\1">\\1</a>' % URL_PREFIX)), escape=False)
 
80
 
 
81
      m["content"] = util.linkify(data["text"],
 
82
        ((util.PARSE_HASH, '#<a class="hash" href="gwibber:/tag?acct=%s&query=\\1">\\1</a>' % m["account"]),
 
83
        (util.PARSE_NICK, '@<a class="nick" href="gwibber:/user?acct=%s&name=\\1">\\1</a>' % m["account"])), escape=False)
 
84
    except: 
 
85
      log.logger.error("%s failure - %s", PROTOCOL_INFO["name"], data)
 
86
 
75
87
    return m
76
88
 
77
89
  def _message(self, data):
 
90
    if type(data) == type(None):
 
91
      return []
 
92
 
78
93
    m = self._common(data)
79
94
    m["source"] = data.get("source", False)
80
95
    
122
137
    return m
123
138
 
124
139
  def _get(self, path, parse="message", post=False, single=False, **args):
125
 
    url = "/".join((URL_PREFIX, path))
126
 
    
127
 
    data = network.Download(url, util.compact(args) or None, post,
128
 
        self.account["username"], self.account["password"]).get_json()
129
 
 
130
 
    # error is "Could not authenticate you" for failed auth
131
 
    try:
132
 
      if single: return [getattr(self, "_%s" % parse)(data)]
133
 
      if parse: return [getattr(self, "_%s" % parse)(m) for m in data]
134
 
      else: return []
135
 
    except:
136
 
      if data.has_key("error"):
137
 
        if "authenticate" in data["error"]:
138
 
          raise exceptions.GwibberProtocolError("auth", self.account["protocol"], self.account["username"], data["error"])
 
140
    url = "/".join((API_PREFIX, path))
 
141
 
 
142
    request = oauth.OAuthRequest.from_consumer_and_token(self.consumer, self.token,
 
143
        http_method=post and "POST" or "GET", http_url=url, parameters=util.compact(args))
 
144
    request.sign_request(self.sigmethod, self.consumer, self.token)
 
145
    
 
146
    if post:
 
147
      data = network.Download(request.to_url(), util.compact(args), post).get_json()
 
148
    else:
 
149
      data = network.Download(request.to_url(), None, post).get_json()
 
150
 
 
151
    if isinstance(data, dict) and data.get("errors", 0):
 
152
      if "authenticate" in data["errors"][0]["message"]:
 
153
        raise exceptions.GwibberProtocolError("auth",
 
154
            self.account["protocol"], self.account["username"], data["errors"][0]["message"])
 
155
      else:
 
156
        for error in data["errors"]:
 
157
          log.logger.info("Twitter failure - %s", error["message"])
 
158
        return []
 
159
    elif isinstance(data, dict) and data.get("error", 0):
 
160
      log.logger.error("%s failure - %s", PROTOCOL_INFO["name"], data["error"])
 
161
      return []
 
162
    elif isinstance(data, str):
 
163
      log.logger.error("%s unexpected result - %s", PROTOCOL_INFO["name"], data)
 
164
      return []
 
165
    
 
166
    if single: return [getattr(self, "_%s" % parse)(data)]
 
167
    if parse: return [getattr(self, "_%s" % parse)(m) for m in data]
 
168
    else: return []
139
169
 
140
170
  def _search(self, **args):
141
171
    data = network.Download("http://search.twitter.com/search.json", util.compact(args))
174
204
 
175
205
  def send(self, message):
176
206
    return self._get("statuses/update.json", post=True, single=True,
177
 
        status=message, source="gwibbernet")
178
 
 
 
207
        status=message)
 
208
  
179
209
  def send_thread(self, message, target):
180
210
    return self._get("statuses/update.json", post=True, single=True,
181
 
        status=message, source="gwibbernet", in_reply_to_status_id=target["id"])
182
 
 
 
211
        status=message, in_reply_to_status_id=target["id"])