~storm/storm/trunk

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
#
# Copyright (c) 2006, 2007 Canonical
#
# Written by Gustavo Niemeyer <gustavo@niemeyer.net>
#
# This file is part of Storm Object Relational Mapper.
#
# Storm is free software; you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as
# published by the Free Software Foundation; either version 2.1 of
# the License, or (at your option) any later version.
#
# Storm is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
#
from urllib.parse import quote

from storm.exceptions import URIError


class URI(object):
    """A representation of a Uniform Resource Identifier (URI).

    This is intended exclusively for database connection URIs.

    @ivar username: The username part of the URI, or C{None}.
    @ivar password: The password part of the URI, or C{None}.
    @ivar host: The host part of the URI, or C{None}.
    @type port: L{int}
    @ivar port: The port part of the URI, or C{None}.
    @ivar database: The part of the URI representing the database name, or
        C{None}.
    """

    username = None
    password = None
    host = None
    port = None
    database = None

    def __init__(self, uri_str):
        try:
            self.scheme, rest = uri_str.split(":", 1)
        except ValueError:
            raise URIError("URI has no scheme: %s" % repr(uri_str))

        self.options = {}

        if "?" in rest:
            rest, options = rest.split("?", 1)
            for pair in options.split("&"):
                key, value = pair.split("=", 1)
                self.options[unescape(key)] = unescape(value)
        if rest:
            if not rest.startswith("//"):
                self.database = unescape(rest)
            else:
                rest = rest[2:]
                if "/" in rest:
                    rest, database = rest.split("/", 1)
                    self.database = unescape(database)
                if "@" in rest:
                    userpass, hostport = rest.split("@", 1)
                else:
                    userpass = None
                    hostport = rest
                if hostport:
                    if ":" in hostport:
                        host, port = hostport.rsplit(":", 1)
                        self.host = unescape(host)
                        if port:
                            self.port = int(port)
                    else:
                        self.host = unescape(hostport)
                if userpass is not None:
                    if ":" in userpass:
                        username, password = userpass.rsplit(":", 1)
                        self.username = unescape(username)
                        self.password = unescape(password)
                    else:
                        self.username = unescape(userpass)

    def copy(self):
        uri = object.__new__(self.__class__)
        uri.__dict__.update(self.__dict__)
        uri.options = self.options.copy()
        return uri

    def __str__(self):
        tokens = [self.scheme, ":"]
        append = tokens.append
        if (self.username is not None or self.password is not None or
            self.host is not None or self.port is not None):
            append("//")
            if self.username is not None or self.password is not None:
                if self.username is not None:
                    append(escape(self.username))
                if self.password is not None:
                    append(":")
                    append(escape(self.password))
                append("@")
            if self.host is not None:
                append(escape(self.host))
            if self.port is not None:
                append(":")
                append(str(self.port))
            append("/")
        if self.database is not None:
            append(escape(self.database, "/"))
        if self.options:
            options = ["%s=%s" % (escape(key), escape(value))
                       for key, value in sorted(self.options.items())]
            append("?")
            append("&".join(options))
        return "".join(tokens)


def escape(s, safe=""):
    return quote(s, safe)


def unescape(s):
    if "%" not in s:
        return s
    i = 0
    j = s.find("%")
    r = []
    while j != -1:
        r.append(s[i:j])
        i = j+3
        r.append(chr(int(s[j+1:i], 16)))
        j = s.find("%", i)
    r.append(s[i:])
    return "".join(r)