Bug 1247994 - Upgrade vendored requests package to 2.9.1; r=mshal
Previously, we We were running version 2.5.1 of requests. Newer versions have several bug fixes and even address a CVE. Source was obtained from https://pypi.python.org/packages/source/r/requests/requests-2.9.1.tar.gz and checked in without modification. This should be a rubber stamp review. MozReview-Commit-ID: 9tFSVJFfwGh
This commit is contained in:
@@ -9,6 +9,7 @@ import os
|
||||
import pickle
|
||||
import unittest
|
||||
import collections
|
||||
import contextlib
|
||||
|
||||
import io
|
||||
import requests
|
||||
@@ -16,7 +17,9 @@ import pytest
|
||||
from requests.adapters import HTTPAdapter
|
||||
from requests.auth import HTTPDigestAuth, _basic_auth_str
|
||||
from requests.compat import (
|
||||
Morsel, cookielib, getproxies, str, urljoin, urlparse, is_py3, builtin_str)
|
||||
Morsel, cookielib, getproxies, str, urljoin, urlparse, is_py3,
|
||||
builtin_str, OrderedDict
|
||||
)
|
||||
from requests.cookies import cookiejar_from_dict, morsel_to_cookie
|
||||
from requests.exceptions import (ConnectionError, ConnectTimeout,
|
||||
InvalidSchema, InvalidURL, MissingSchema,
|
||||
@@ -32,6 +35,11 @@ try:
|
||||
except ImportError:
|
||||
import io as StringIO
|
||||
|
||||
try:
|
||||
from multiprocessing.pool import ThreadPool
|
||||
except ImportError:
|
||||
ThreadPool = None
|
||||
|
||||
if is_py3:
|
||||
def u(s):
|
||||
return s
|
||||
@@ -40,20 +48,33 @@ else:
|
||||
return s.decode('unicode-escape')
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def httpbin(httpbin):
|
||||
# Issue #1483: Make sure the URL always has a trailing slash
|
||||
httpbin_url = httpbin.url.rstrip('/') + '/'
|
||||
|
||||
def inner(*suffix):
|
||||
return urljoin(httpbin_url, '/'.join(suffix))
|
||||
|
||||
return inner
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def httpsbin_url(httpbin_secure):
|
||||
# Issue #1483: Make sure the URL always has a trailing slash
|
||||
httpbin_url = httpbin_secure.url.rstrip('/') + '/'
|
||||
|
||||
def inner(*suffix):
|
||||
return urljoin(httpbin_url, '/'.join(suffix))
|
||||
|
||||
return inner
|
||||
|
||||
|
||||
# Requests to this URL should always fail with a connection timeout (nothing
|
||||
# listening on that port)
|
||||
TARPIT = "http://10.255.255.1"
|
||||
HTTPBIN = os.environ.get('HTTPBIN_URL', 'http://httpbin.org/')
|
||||
# Issue #1483: Make sure the URL always has a trailing slash
|
||||
HTTPBIN = HTTPBIN.rstrip('/') + '/'
|
||||
|
||||
|
||||
def httpbin(*suffix):
|
||||
"""Returns url for HTTPBIN resource."""
|
||||
return urljoin(HTTPBIN, '/'.join(suffix))
|
||||
|
||||
|
||||
class RequestsTestCase(unittest.TestCase):
|
||||
class TestRequests(object):
|
||||
|
||||
_multiprocess_can_split_ = True
|
||||
|
||||
@@ -97,13 +118,13 @@ class RequestsTestCase(unittest.TestCase):
|
||||
assert pr.url == req.url
|
||||
assert pr.body == 'life=42'
|
||||
|
||||
def test_no_content_length(self):
|
||||
def test_no_content_length(self, httpbin):
|
||||
get_req = requests.Request('GET', httpbin('get')).prepare()
|
||||
assert 'Content-Length' not in get_req.headers
|
||||
head_req = requests.Request('HEAD', httpbin('head')).prepare()
|
||||
assert 'Content-Length' not in head_req.headers
|
||||
|
||||
def test_override_content_length(self):
|
||||
def test_override_content_length(self, httpbin):
|
||||
headers = {
|
||||
'Content-Length': 'not zero'
|
||||
}
|
||||
@@ -124,19 +145,35 @@ class RequestsTestCase(unittest.TestCase):
|
||||
"http://example.com/path?key=value#fragment", params={"a": "b"}).prepare()
|
||||
assert request.url == "http://example.com/path?key=value&a=b#fragment"
|
||||
|
||||
def test_mixed_case_scheme_acceptable(self):
|
||||
def test_params_original_order_is_preserved_by_default(self):
|
||||
param_ordered_dict = OrderedDict((('z', 1), ('a', 1), ('k', 1), ('d', 1)))
|
||||
session = requests.Session()
|
||||
request = requests.Request('GET', 'http://example.com/', params=param_ordered_dict)
|
||||
prep = session.prepare_request(request)
|
||||
assert prep.url == 'http://example.com/?z=1&a=1&k=1&d=1'
|
||||
|
||||
def test_params_bytes_are_encoded(self):
|
||||
request = requests.Request('GET', 'http://example.com',
|
||||
params=b'test=foo').prepare()
|
||||
assert request.url == 'http://example.com/?test=foo'
|
||||
|
||||
def test_binary_put(self):
|
||||
request = requests.Request('PUT', 'http://example.com',
|
||||
data=u"ööö".encode("utf-8")).prepare()
|
||||
assert isinstance(request.body, bytes)
|
||||
|
||||
def test_mixed_case_scheme_acceptable(self, httpbin):
|
||||
s = requests.Session()
|
||||
s.proxies = getproxies()
|
||||
parts = urlparse(httpbin('get'))
|
||||
schemes = ['http://', 'HTTP://', 'hTTp://', 'HttP://',
|
||||
'https://', 'HTTPS://', 'hTTps://', 'HttPs://']
|
||||
schemes = ['http://', 'HTTP://', 'hTTp://', 'HttP://']
|
||||
for scheme in schemes:
|
||||
url = scheme + parts.netloc + parts.path
|
||||
r = requests.Request('GET', url)
|
||||
r = s.send(r.prepare())
|
||||
assert r.status_code == 200, 'failed for scheme {0}'.format(scheme)
|
||||
|
||||
def test_HTTP_200_OK_GET_ALTERNATIVE(self):
|
||||
def test_HTTP_200_OK_GET_ALTERNATIVE(self, httpbin):
|
||||
r = requests.Request('GET', httpbin('get'))
|
||||
s = requests.Session()
|
||||
s.proxies = getproxies()
|
||||
@@ -145,7 +182,7 @@ class RequestsTestCase(unittest.TestCase):
|
||||
|
||||
assert r.status_code == 200
|
||||
|
||||
def test_HTTP_302_ALLOW_REDIRECT_GET(self):
|
||||
def test_HTTP_302_ALLOW_REDIRECT_GET(self, httpbin):
|
||||
r = requests.get(httpbin('redirect', '1'))
|
||||
assert r.status_code == 200
|
||||
assert r.history[0].status_code == 302
|
||||
@@ -155,7 +192,7 @@ class RequestsTestCase(unittest.TestCase):
|
||||
# r = requests.post(httpbin('status', '302'), data={'some': 'data'})
|
||||
# self.assertEqual(r.status_code, 200)
|
||||
|
||||
def test_HTTP_200_OK_GET_WITH_PARAMS(self):
|
||||
def test_HTTP_200_OK_GET_WITH_PARAMS(self, httpbin):
|
||||
heads = {'User-agent': 'Mozilla/5.0'}
|
||||
|
||||
r = requests.get(httpbin('user-agent'), headers=heads)
|
||||
@@ -163,25 +200,25 @@ class RequestsTestCase(unittest.TestCase):
|
||||
assert heads['User-agent'] in r.text
|
||||
assert r.status_code == 200
|
||||
|
||||
def test_HTTP_200_OK_GET_WITH_MIXED_PARAMS(self):
|
||||
def test_HTTP_200_OK_GET_WITH_MIXED_PARAMS(self, httpbin):
|
||||
heads = {'User-agent': 'Mozilla/5.0'}
|
||||
|
||||
r = requests.get(httpbin('get') + '?test=true', params={'q': 'test'}, headers=heads)
|
||||
assert r.status_code == 200
|
||||
|
||||
def test_set_cookie_on_301(self):
|
||||
def test_set_cookie_on_301(self, httpbin):
|
||||
s = requests.session()
|
||||
url = httpbin('cookies/set?foo=bar')
|
||||
s.get(url)
|
||||
assert s.cookies['foo'] == 'bar'
|
||||
|
||||
def test_cookie_sent_on_redirect(self):
|
||||
def test_cookie_sent_on_redirect(self, httpbin):
|
||||
s = requests.session()
|
||||
s.get(httpbin('cookies/set?foo=bar'))
|
||||
r = s.get(httpbin('redirect/1')) # redirects to httpbin('get')
|
||||
assert 'Cookie' in r.json()['headers']
|
||||
|
||||
def test_cookie_removed_on_expire(self):
|
||||
def test_cookie_removed_on_expire(self, httpbin):
|
||||
s = requests.session()
|
||||
s.get(httpbin('cookies/set?foo=bar'))
|
||||
assert s.cookies['foo'] == 'bar'
|
||||
@@ -194,18 +231,18 @@ class RequestsTestCase(unittest.TestCase):
|
||||
)
|
||||
assert 'foo' not in s.cookies
|
||||
|
||||
def test_cookie_quote_wrapped(self):
|
||||
def test_cookie_quote_wrapped(self, httpbin):
|
||||
s = requests.session()
|
||||
s.get(httpbin('cookies/set?foo="bar:baz"'))
|
||||
assert s.cookies['foo'] == '"bar:baz"'
|
||||
|
||||
def test_cookie_persists_via_api(self):
|
||||
def test_cookie_persists_via_api(self, httpbin):
|
||||
s = requests.session()
|
||||
r = s.get(httpbin('redirect/1'), cookies={'foo': 'bar'})
|
||||
assert 'foo' in r.request.headers['Cookie']
|
||||
assert 'foo' in r.history[0].request.headers['Cookie']
|
||||
|
||||
def test_request_cookie_overrides_session_cookie(self):
|
||||
def test_request_cookie_overrides_session_cookie(self, httpbin):
|
||||
s = requests.session()
|
||||
s.cookies['foo'] = 'bar'
|
||||
r = s.get(httpbin('cookies'), cookies={'foo': 'baz'})
|
||||
@@ -213,13 +250,13 @@ class RequestsTestCase(unittest.TestCase):
|
||||
# Session cookie should not be modified
|
||||
assert s.cookies['foo'] == 'bar'
|
||||
|
||||
def test_request_cookies_not_persisted(self):
|
||||
def test_request_cookies_not_persisted(self, httpbin):
|
||||
s = requests.session()
|
||||
s.get(httpbin('cookies'), cookies={'foo': 'baz'})
|
||||
# Sending a request with cookies should not add cookies to the session
|
||||
assert not s.cookies
|
||||
|
||||
def test_generic_cookiejar_works(self):
|
||||
def test_generic_cookiejar_works(self, httpbin):
|
||||
cj = cookielib.CookieJar()
|
||||
cookiejar_from_dict({'foo': 'bar'}, cj)
|
||||
s = requests.session()
|
||||
@@ -230,7 +267,7 @@ class RequestsTestCase(unittest.TestCase):
|
||||
# Make sure the session cj is still the custom one
|
||||
assert s.cookies is cj
|
||||
|
||||
def test_param_cookiejar_works(self):
|
||||
def test_param_cookiejar_works(self, httpbin):
|
||||
cj = cookielib.CookieJar()
|
||||
cookiejar_from_dict({'foo': 'bar'}, cj)
|
||||
s = requests.session()
|
||||
@@ -238,13 +275,13 @@ class RequestsTestCase(unittest.TestCase):
|
||||
# Make sure the cookie was sent
|
||||
assert r.json()['cookies']['foo'] == 'bar'
|
||||
|
||||
def test_requests_in_history_are_not_overridden(self):
|
||||
def test_requests_in_history_are_not_overridden(self, httpbin):
|
||||
resp = requests.get(httpbin('redirect/3'))
|
||||
urls = [r.url for r in resp.history]
|
||||
req_urls = [r.request.url for r in resp.history]
|
||||
assert urls == req_urls
|
||||
|
||||
def test_history_is_always_a_list(self):
|
||||
def test_history_is_always_a_list(self, httpbin):
|
||||
"""
|
||||
Show that even with redirects, Response.history is always a list.
|
||||
"""
|
||||
@@ -254,7 +291,7 @@ class RequestsTestCase(unittest.TestCase):
|
||||
assert isinstance(resp.history, list)
|
||||
assert not isinstance(resp.history, tuple)
|
||||
|
||||
def test_headers_on_session_with_None_are_not_sent(self):
|
||||
def test_headers_on_session_with_None_are_not_sent(self, httpbin):
|
||||
"""Do not send headers in Session.headers with None values."""
|
||||
ses = requests.Session()
|
||||
ses.headers['Accept-Encoding'] = None
|
||||
@@ -262,7 +299,7 @@ class RequestsTestCase(unittest.TestCase):
|
||||
prep = ses.prepare_request(req)
|
||||
assert 'Accept-Encoding' not in prep.headers
|
||||
|
||||
def test_user_agent_transfers(self):
|
||||
def test_user_agent_transfers(self, httpbin):
|
||||
|
||||
heads = {
|
||||
'User-agent': 'Mozilla/5.0 (github.com/kennethreitz/requests)'
|
||||
@@ -278,15 +315,15 @@ class RequestsTestCase(unittest.TestCase):
|
||||
r = requests.get(httpbin('user-agent'), headers=heads)
|
||||
assert heads['user-agent'] in r.text
|
||||
|
||||
def test_HTTP_200_OK_HEAD(self):
|
||||
def test_HTTP_200_OK_HEAD(self, httpbin):
|
||||
r = requests.head(httpbin('get'))
|
||||
assert r.status_code == 200
|
||||
|
||||
def test_HTTP_200_OK_PUT(self):
|
||||
def test_HTTP_200_OK_PUT(self, httpbin):
|
||||
r = requests.put(httpbin('put'))
|
||||
assert r.status_code == 200
|
||||
|
||||
def test_BASICAUTH_TUPLE_HTTP_200_OK_GET(self):
|
||||
def test_BASICAUTH_TUPLE_HTTP_200_OK_GET(self, httpbin):
|
||||
auth = ('user', 'pass')
|
||||
url = httpbin('basic-auth', 'user', 'pass')
|
||||
|
||||
@@ -301,48 +338,55 @@ class RequestsTestCase(unittest.TestCase):
|
||||
r = s.get(url)
|
||||
assert r.status_code == 200
|
||||
|
||||
def test_connection_error(self):
|
||||
def test_connection_error_invalid_domain(self):
|
||||
"""Connecting to an unknown domain should raise a ConnectionError"""
|
||||
with pytest.raises(ConnectionError):
|
||||
requests.get("http://fooobarbangbazbing.httpbin.org")
|
||||
requests.get("http://doesnotexist.google.com")
|
||||
|
||||
def test_connection_error_invalid_port(self):
|
||||
"""Connecting to an invalid port should raise a ConnectionError"""
|
||||
with pytest.raises(ConnectionError):
|
||||
requests.get("http://httpbin.org:1")
|
||||
requests.get("http://localhost:1", timeout=1)
|
||||
|
||||
def test_LocationParseError(self):
|
||||
"""Inputing a URL that cannot be parsed should raise an InvalidURL error"""
|
||||
with pytest.raises(InvalidURL):
|
||||
requests.get("http://fe80::5054:ff:fe5a:fc0")
|
||||
|
||||
def test_basicauth_with_netrc(self):
|
||||
def test_basicauth_with_netrc(self, httpbin):
|
||||
auth = ('user', 'pass')
|
||||
wrong_auth = ('wronguser', 'wrongpass')
|
||||
url = httpbin('basic-auth', 'user', 'pass')
|
||||
|
||||
def get_netrc_auth_mock(url):
|
||||
return auth
|
||||
requests.sessions.get_netrc_auth = get_netrc_auth_mock
|
||||
old_auth = requests.sessions.get_netrc_auth
|
||||
|
||||
# Should use netrc and work.
|
||||
r = requests.get(url)
|
||||
assert r.status_code == 200
|
||||
try:
|
||||
def get_netrc_auth_mock(url):
|
||||
return auth
|
||||
requests.sessions.get_netrc_auth = get_netrc_auth_mock
|
||||
|
||||
# Given auth should override and fail.
|
||||
r = requests.get(url, auth=wrong_auth)
|
||||
assert r.status_code == 401
|
||||
# Should use netrc and work.
|
||||
r = requests.get(url)
|
||||
assert r.status_code == 200
|
||||
|
||||
s = requests.session()
|
||||
# Given auth should override and fail.
|
||||
r = requests.get(url, auth=wrong_auth)
|
||||
assert r.status_code == 401
|
||||
|
||||
# Should use netrc and work.
|
||||
r = s.get(url)
|
||||
assert r.status_code == 200
|
||||
s = requests.session()
|
||||
|
||||
# Given auth should override and fail.
|
||||
s.auth = wrong_auth
|
||||
r = s.get(url)
|
||||
assert r.status_code == 401
|
||||
# Should use netrc and work.
|
||||
r = s.get(url)
|
||||
assert r.status_code == 200
|
||||
|
||||
def test_DIGEST_HTTP_200_OK_GET(self):
|
||||
# Given auth should override and fail.
|
||||
s.auth = wrong_auth
|
||||
r = s.get(url)
|
||||
assert r.status_code == 401
|
||||
finally:
|
||||
requests.sessions.get_netrc_auth = old_auth
|
||||
|
||||
def test_DIGEST_HTTP_200_OK_GET(self, httpbin):
|
||||
|
||||
auth = HTTPDigestAuth('user', 'pass')
|
||||
url = httpbin('digest-auth', 'auth', 'user', 'pass')
|
||||
@@ -358,7 +402,7 @@ class RequestsTestCase(unittest.TestCase):
|
||||
r = s.get(url)
|
||||
assert r.status_code == 200
|
||||
|
||||
def test_DIGEST_AUTH_RETURNS_COOKIE(self):
|
||||
def test_DIGEST_AUTH_RETURNS_COOKIE(self, httpbin):
|
||||
url = httpbin('digest-auth', 'auth', 'user', 'pass')
|
||||
auth = HTTPDigestAuth('user', 'pass')
|
||||
r = requests.get(url)
|
||||
@@ -367,14 +411,14 @@ class RequestsTestCase(unittest.TestCase):
|
||||
r = requests.get(url, auth=auth)
|
||||
assert r.status_code == 200
|
||||
|
||||
def test_DIGEST_AUTH_SETS_SESSION_COOKIES(self):
|
||||
def test_DIGEST_AUTH_SETS_SESSION_COOKIES(self, httpbin):
|
||||
url = httpbin('digest-auth', 'auth', 'user', 'pass')
|
||||
auth = HTTPDigestAuth('user', 'pass')
|
||||
s = requests.Session()
|
||||
s.get(url, auth=auth)
|
||||
assert s.cookies['fake'] == 'fake_value'
|
||||
|
||||
def test_DIGEST_STREAM(self):
|
||||
def test_DIGEST_STREAM(self, httpbin):
|
||||
|
||||
auth = HTTPDigestAuth('user', 'pass')
|
||||
url = httpbin('digest-auth', 'auth', 'user', 'pass')
|
||||
@@ -385,7 +429,7 @@ class RequestsTestCase(unittest.TestCase):
|
||||
r = requests.get(url, auth=auth, stream=False)
|
||||
assert r.raw.read() == b''
|
||||
|
||||
def test_DIGESTAUTH_WRONG_HTTP_401_GET(self):
|
||||
def test_DIGESTAUTH_WRONG_HTTP_401_GET(self, httpbin):
|
||||
|
||||
auth = HTTPDigestAuth('user', 'wrongpass')
|
||||
url = httpbin('digest-auth', 'auth', 'user', 'pass')
|
||||
@@ -401,7 +445,7 @@ class RequestsTestCase(unittest.TestCase):
|
||||
r = s.get(url)
|
||||
assert r.status_code == 401
|
||||
|
||||
def test_DIGESTAUTH_QUOTES_QOP_VALUE(self):
|
||||
def test_DIGESTAUTH_QUOTES_QOP_VALUE(self, httpbin):
|
||||
|
||||
auth = HTTPDigestAuth('user', 'pass')
|
||||
url = httpbin('digest-auth', 'auth', 'user', 'pass')
|
||||
@@ -409,7 +453,7 @@ class RequestsTestCase(unittest.TestCase):
|
||||
r = requests.get(url, auth=auth)
|
||||
assert '"auth"' in r.request.headers['Authorization']
|
||||
|
||||
def test_POSTBIN_GET_POST_FILES(self):
|
||||
def test_POSTBIN_GET_POST_FILES(self, httpbin):
|
||||
|
||||
url = httpbin('post')
|
||||
post1 = requests.post(url).raise_for_status()
|
||||
@@ -427,7 +471,7 @@ class RequestsTestCase(unittest.TestCase):
|
||||
with pytest.raises(ValueError):
|
||||
requests.post(url, files=['bad file data'])
|
||||
|
||||
def test_POSTBIN_GET_POST_FILES_WITH_DATA(self):
|
||||
def test_POSTBIN_GET_POST_FILES_WITH_DATA(self, httpbin):
|
||||
|
||||
url = httpbin('post')
|
||||
post1 = requests.post(url).raise_for_status()
|
||||
@@ -446,17 +490,17 @@ class RequestsTestCase(unittest.TestCase):
|
||||
with pytest.raises(ValueError):
|
||||
requests.post(url, files=['bad file data'])
|
||||
|
||||
def test_conflicting_post_params(self):
|
||||
def test_conflicting_post_params(self, httpbin):
|
||||
url = httpbin('post')
|
||||
with open('requirements.txt') as f:
|
||||
pytest.raises(ValueError, "requests.post(url, data='[{\"some\": \"data\"}]', files={'some': f})")
|
||||
pytest.raises(ValueError, "requests.post(url, data=u('[{\"some\": \"data\"}]'), files={'some': f})")
|
||||
|
||||
def test_request_ok_set(self):
|
||||
def test_request_ok_set(self, httpbin):
|
||||
r = requests.get(httpbin('status', '404'))
|
||||
assert not r.ok
|
||||
|
||||
def test_status_raising(self):
|
||||
def test_status_raising(self, httpbin):
|
||||
r = requests.get(httpbin('status', '404'))
|
||||
with pytest.raises(requests.exceptions.HTTPError):
|
||||
r.raise_for_status()
|
||||
@@ -464,11 +508,11 @@ class RequestsTestCase(unittest.TestCase):
|
||||
r = requests.get(httpbin('status', '500'))
|
||||
assert not r.ok
|
||||
|
||||
def test_decompress_gzip(self):
|
||||
def test_decompress_gzip(self, httpbin):
|
||||
r = requests.get(httpbin('gzip'))
|
||||
r.content.decode('ascii')
|
||||
|
||||
def test_unicode_get(self):
|
||||
def test_unicode_get(self, httpbin):
|
||||
url = httpbin('/get')
|
||||
requests.get(url, params={'foo': 'føø'})
|
||||
requests.get(url, params={'føø': 'føø'})
|
||||
@@ -476,29 +520,29 @@ class RequestsTestCase(unittest.TestCase):
|
||||
requests.get(url, params={'foo': 'foo'})
|
||||
requests.get(httpbin('ø'), params={'foo': 'foo'})
|
||||
|
||||
def test_unicode_header_name(self):
|
||||
def test_unicode_header_name(self, httpbin):
|
||||
requests.put(
|
||||
httpbin('put'),
|
||||
headers={str('Content-Type'): 'application/octet-stream'},
|
||||
data='\xff') # compat.str is unicode.
|
||||
|
||||
def test_pyopenssl_redirect(self):
|
||||
requests.get('https://httpbin.org/status/301')
|
||||
def test_pyopenssl_redirect(self, httpsbin_url, httpbin_ca_bundle):
|
||||
requests.get(httpsbin_url('status', '301'), verify=httpbin_ca_bundle)
|
||||
|
||||
def test_urlencoded_get_query_multivalued_param(self):
|
||||
def test_urlencoded_get_query_multivalued_param(self, httpbin):
|
||||
|
||||
r = requests.get(httpbin('get'), params=dict(test=['foo', 'baz']))
|
||||
assert r.status_code == 200
|
||||
assert r.url == httpbin('get?test=foo&test=baz')
|
||||
|
||||
def test_different_encodings_dont_break_post(self):
|
||||
def test_different_encodings_dont_break_post(self, httpbin):
|
||||
r = requests.post(httpbin('post'),
|
||||
data={'stuff': json.dumps({'a': 123})},
|
||||
params={'blah': 'asdf1234'},
|
||||
files={'file': ('test_requests.py', open(__file__, 'rb'))})
|
||||
assert r.status_code == 200
|
||||
|
||||
def test_unicode_multipart_post(self):
|
||||
def test_unicode_multipart_post(self, httpbin):
|
||||
r = requests.post(httpbin('post'),
|
||||
data={'stuff': u('ëlïxr')},
|
||||
files={'file': ('test_requests.py', open(__file__, 'rb'))})
|
||||
@@ -519,7 +563,7 @@ class RequestsTestCase(unittest.TestCase):
|
||||
files={'file': ('test_requests.py', open(__file__, 'rb'))})
|
||||
assert r.status_code == 200
|
||||
|
||||
def test_unicode_multipart_post_fieldnames(self):
|
||||
def test_unicode_multipart_post_fieldnames(self, httpbin):
|
||||
filename = os.path.splitext(__file__)[0] + '.py'
|
||||
r = requests.Request(method='POST',
|
||||
url=httpbin('post'),
|
||||
@@ -530,13 +574,24 @@ class RequestsTestCase(unittest.TestCase):
|
||||
assert b'name="stuff"' in prep.body
|
||||
assert b'name="b\'stuff\'"' not in prep.body
|
||||
|
||||
def test_unicode_method_name(self):
|
||||
def test_unicode_method_name(self, httpbin):
|
||||
files = {'file': open('test_requests.py', 'rb')}
|
||||
r = requests.request(
|
||||
method=u('POST'), url=httpbin('post'), files=files)
|
||||
assert r.status_code == 200
|
||||
|
||||
def test_custom_content_type(self):
|
||||
def test_unicode_method_name_with_request_object(self, httpbin):
|
||||
files = {'file': open('test_requests.py', 'rb')}
|
||||
s = requests.Session()
|
||||
req = requests.Request(u("POST"), httpbin('post'), files=files)
|
||||
prep = s.prepare_request(req)
|
||||
assert isinstance(prep.method, builtin_str)
|
||||
assert prep.method == "POST"
|
||||
|
||||
resp = s.send(prep)
|
||||
assert resp.status_code == 200
|
||||
|
||||
def test_custom_content_type(self, httpbin):
|
||||
r = requests.post(
|
||||
httpbin('post'),
|
||||
data={'stuff': json.dumps({'a': 123})},
|
||||
@@ -546,38 +601,38 @@ class RequestsTestCase(unittest.TestCase):
|
||||
assert r.status_code == 200
|
||||
assert b"text/py-content-type" in r.request.body
|
||||
|
||||
def test_hook_receives_request_arguments(self):
|
||||
def test_hook_receives_request_arguments(self, httpbin):
|
||||
def hook(resp, **kwargs):
|
||||
assert resp is not None
|
||||
assert kwargs != {}
|
||||
|
||||
requests.Request('GET', HTTPBIN, hooks={'response': hook})
|
||||
requests.Request('GET', httpbin(), hooks={'response': hook})
|
||||
|
||||
def test_session_hooks_are_used_with_no_request_hooks(self):
|
||||
def test_session_hooks_are_used_with_no_request_hooks(self, httpbin):
|
||||
hook = lambda x, *args, **kwargs: x
|
||||
s = requests.Session()
|
||||
s.hooks['response'].append(hook)
|
||||
r = requests.Request('GET', HTTPBIN)
|
||||
r = requests.Request('GET', httpbin())
|
||||
prep = s.prepare_request(r)
|
||||
assert prep.hooks['response'] != []
|
||||
assert prep.hooks['response'] == [hook]
|
||||
|
||||
def test_session_hooks_are_overriden_by_request_hooks(self):
|
||||
def test_session_hooks_are_overridden_by_request_hooks(self, httpbin):
|
||||
hook1 = lambda x, *args, **kwargs: x
|
||||
hook2 = lambda x, *args, **kwargs: x
|
||||
assert hook1 is not hook2
|
||||
s = requests.Session()
|
||||
s.hooks['response'].append(hook2)
|
||||
r = requests.Request('GET', HTTPBIN, hooks={'response': [hook1]})
|
||||
r = requests.Request('GET', httpbin(), hooks={'response': [hook1]})
|
||||
prep = s.prepare_request(r)
|
||||
assert prep.hooks['response'] == [hook1]
|
||||
|
||||
def test_prepared_request_hook(self):
|
||||
def test_prepared_request_hook(self, httpbin):
|
||||
def hook(resp, **kwargs):
|
||||
resp.hook_working = True
|
||||
return resp
|
||||
|
||||
req = requests.Request('GET', HTTPBIN, hooks={'response': hook})
|
||||
req = requests.Request('GET', httpbin(), hooks={'response': hook})
|
||||
prep = req.prepare()
|
||||
|
||||
s = requests.Session()
|
||||
@@ -586,7 +641,7 @@ class RequestsTestCase(unittest.TestCase):
|
||||
|
||||
assert hasattr(resp, 'hook_working')
|
||||
|
||||
def test_prepared_from_session(self):
|
||||
def test_prepared_from_session(self, httpbin):
|
||||
class DummyAuth(requests.auth.AuthBase):
|
||||
def __call__(self, r):
|
||||
r.headers['Dummy-Auth-Test'] = 'dummy-auth-test-ok'
|
||||
@@ -739,7 +794,7 @@ class RequestsTestCase(unittest.TestCase):
|
||||
# make sure one can use items multiple times
|
||||
assert list(items) == list(items)
|
||||
|
||||
def test_time_elapsed_blank(self):
|
||||
def test_time_elapsed_blank(self, httpbin):
|
||||
r = requests.get(httpbin('get'))
|
||||
td = r.elapsed
|
||||
total_seconds = ((td.microseconds + (td.seconds + td.days * 24 * 3600)
|
||||
@@ -778,7 +833,7 @@ class RequestsTestCase(unittest.TestCase):
|
||||
chunks = r.iter_content(decode_unicode=True)
|
||||
assert all(isinstance(chunk, str) for chunk in chunks)
|
||||
|
||||
def test_request_and_response_are_pickleable(self):
|
||||
def test_request_and_response_are_pickleable(self, httpbin):
|
||||
r = requests.get(httpbin('get'))
|
||||
|
||||
# verify we can pickle the original request
|
||||
@@ -810,8 +865,8 @@ class RequestsTestCase(unittest.TestCase):
|
||||
url = 'http://user:pass%23pass@complex.url.com/path?query=yes'
|
||||
assert ('user', 'pass#pass') == requests.utils.get_auth_from_url(url)
|
||||
|
||||
def test_cannot_send_unprepared_requests(self):
|
||||
r = requests.Request(url=HTTPBIN)
|
||||
def test_cannot_send_unprepared_requests(self, httpbin):
|
||||
r = requests.Request(url=httpbin())
|
||||
with pytest.raises(ValueError):
|
||||
requests.Session().send(r)
|
||||
|
||||
@@ -825,7 +880,7 @@ class RequestsTestCase(unittest.TestCase):
|
||||
assert str(error) == 'message'
|
||||
assert error.response == response
|
||||
|
||||
def test_session_pickling(self):
|
||||
def test_session_pickling(self, httpbin):
|
||||
r = requests.Request('GET', httpbin('get'))
|
||||
s = requests.Session()
|
||||
|
||||
@@ -835,7 +890,7 @@ class RequestsTestCase(unittest.TestCase):
|
||||
r = s.send(r.prepare())
|
||||
assert r.status_code == 200
|
||||
|
||||
def test_fixes_1329(self):
|
||||
def test_fixes_1329(self, httpbin):
|
||||
"""
|
||||
Ensure that header updates are done case-insensitively.
|
||||
"""
|
||||
@@ -848,7 +903,7 @@ class RequestsTestCase(unittest.TestCase):
|
||||
assert headers['Accept'] == 'application/json'
|
||||
assert headers['ACCEPT'] == 'application/json'
|
||||
|
||||
def test_uppercase_scheme_redirect(self):
|
||||
def test_uppercase_scheme_redirect(self, httpbin):
|
||||
parts = urlparse(httpbin('html'))
|
||||
url = "HTTP://" + parts.netloc + parts.path
|
||||
r = requests.get(httpbin('redirect-to'), params={'url': url})
|
||||
@@ -893,14 +948,14 @@ class RequestsTestCase(unittest.TestCase):
|
||||
assert 'http://' in s2.adapters
|
||||
assert 'https://' in s2.adapters
|
||||
|
||||
def test_header_remove_is_case_insensitive(self):
|
||||
def test_header_remove_is_case_insensitive(self, httpbin):
|
||||
# From issue #1321
|
||||
s = requests.Session()
|
||||
s.headers['foo'] = 'bar'
|
||||
r = s.get(httpbin('get'), headers={'FOO': None})
|
||||
assert 'foo' not in r.request.headers
|
||||
|
||||
def test_params_are_merged_case_sensitive(self):
|
||||
def test_params_are_merged_case_sensitive(self, httpbin):
|
||||
s = requests.Session()
|
||||
s.params['foo'] = 'bar'
|
||||
r = s.get(httpbin('get'), params={'FOO': 'bar'})
|
||||
@@ -915,7 +970,7 @@ class RequestsTestCase(unittest.TestCase):
|
||||
r = requests.Request('GET', url).prepare()
|
||||
assert r.url == url
|
||||
|
||||
def test_header_keys_are_native(self):
|
||||
def test_header_keys_are_native(self, httpbin):
|
||||
headers = {u('unicode'): 'blah', 'byte'.encode('ascii'): 'blah'}
|
||||
r = requests.Request('GET', httpbin('get'), headers=headers)
|
||||
p = r.prepare()
|
||||
@@ -925,7 +980,7 @@ class RequestsTestCase(unittest.TestCase):
|
||||
assert 'unicode' in p.headers.keys()
|
||||
assert 'byte' in p.headers.keys()
|
||||
|
||||
def test_can_send_nonstring_objects_with_files(self):
|
||||
def test_can_send_nonstring_objects_with_files(self, httpbin):
|
||||
data = {'a': 0.0}
|
||||
files = {'b': 'foo'}
|
||||
r = requests.Request('POST', httpbin('post'), data=data, files=files)
|
||||
@@ -933,7 +988,20 @@ class RequestsTestCase(unittest.TestCase):
|
||||
|
||||
assert 'multipart/form-data' in p.headers['Content-Type']
|
||||
|
||||
def test_can_send_file_object_with_non_string_filename(self):
|
||||
def test_can_send_bytes_bytearray_objects_with_files(self, httpbin):
|
||||
# Test bytes:
|
||||
data = {'a': 'this is a string'}
|
||||
files = {'b': b'foo'}
|
||||
r = requests.Request('POST', httpbin('post'), data=data, files=files)
|
||||
p = r.prepare()
|
||||
assert 'multipart/form-data' in p.headers['Content-Type']
|
||||
# Test bytearrays:
|
||||
files = {'b': bytearray(b'foo')}
|
||||
r = requests.Request('POST', httpbin('post'), data=data, files=files)
|
||||
p = r.prepare()
|
||||
assert 'multipart/form-data' in p.headers['Content-Type']
|
||||
|
||||
def test_can_send_file_object_with_non_string_filename(self, httpbin):
|
||||
f = io.BytesIO()
|
||||
f.name = 2
|
||||
r = requests.Request('POST', httpbin('post'), files={'f': f})
|
||||
@@ -941,7 +1009,7 @@ class RequestsTestCase(unittest.TestCase):
|
||||
|
||||
assert 'multipart/form-data' in p.headers['Content-Type']
|
||||
|
||||
def test_autoset_header_values_are_native(self):
|
||||
def test_autoset_header_values_are_native(self, httpbin):
|
||||
data = 'this is a string'
|
||||
length = '16'
|
||||
req = requests.Request('POST', httpbin('post'), data=data)
|
||||
@@ -960,7 +1028,7 @@ class RequestsTestCase(unittest.TestCase):
|
||||
preq = req.prepare()
|
||||
assert test_url == preq.url
|
||||
|
||||
def test_auth_is_stripped_on_redirect_off_host(self):
|
||||
def test_auth_is_stripped_on_redirect_off_host(self, httpbin):
|
||||
r = requests.get(
|
||||
httpbin('redirect-to'),
|
||||
params={'url': 'http://www.google.co.uk'},
|
||||
@@ -969,14 +1037,14 @@ class RequestsTestCase(unittest.TestCase):
|
||||
assert r.history[0].request.headers['Authorization']
|
||||
assert not r.request.headers.get('Authorization', '')
|
||||
|
||||
def test_auth_is_retained_for_redirect_on_host(self):
|
||||
def test_auth_is_retained_for_redirect_on_host(self, httpbin):
|
||||
r = requests.get(httpbin('redirect/1'), auth=('user', 'pass'))
|
||||
h1 = r.history[0].request.headers['Authorization']
|
||||
h2 = r.request.headers['Authorization']
|
||||
|
||||
assert h1 == h2
|
||||
|
||||
def test_manual_redirect_with_partial_body_read(self):
|
||||
def test_manual_redirect_with_partial_body_read(self, httpbin):
|
||||
s = requests.Session()
|
||||
r1 = s.get(httpbin('redirect/2'), allow_redirects=False, stream=True)
|
||||
assert r1.is_redirect
|
||||
@@ -1009,7 +1077,7 @@ class RequestsTestCase(unittest.TestCase):
|
||||
|
||||
adapter.build_response = build_response
|
||||
|
||||
def test_redirect_with_wrong_gzipped_header(self):
|
||||
def test_redirect_with_wrong_gzipped_header(self, httpbin):
|
||||
s = requests.Session()
|
||||
url = httpbin('redirect/1')
|
||||
self._patch_adapter_gzipped_redirect(s, url)
|
||||
@@ -1020,7 +1088,7 @@ class RequestsTestCase(unittest.TestCase):
|
||||
assert isinstance(s, builtin_str)
|
||||
assert s == "Basic dGVzdDp0ZXN0"
|
||||
|
||||
def test_requests_history_is_saved(self):
|
||||
def test_requests_history_is_saved(self, httpbin):
|
||||
r = requests.get(httpbin('redirect/5'))
|
||||
total = r.history[-1].history
|
||||
i = 0
|
||||
@@ -1028,7 +1096,7 @@ class RequestsTestCase(unittest.TestCase):
|
||||
assert item.history == total[0:i]
|
||||
i = i + 1
|
||||
|
||||
def test_json_param_post_content_type_works(self):
|
||||
def test_json_param_post_content_type_works(self, httpbin):
|
||||
r = requests.post(
|
||||
httpbin('post'),
|
||||
json={'life': 42}
|
||||
@@ -1037,6 +1105,39 @@ class RequestsTestCase(unittest.TestCase):
|
||||
assert 'application/json' in r.request.headers['Content-Type']
|
||||
assert {'life': 42} == r.json()['json']
|
||||
|
||||
def test_json_param_post_should_not_override_data_param(self, httpbin):
|
||||
r = requests.Request(method='POST', url=httpbin('post'),
|
||||
data={'stuff': 'elixr'},
|
||||
json={'music': 'flute'})
|
||||
prep = r.prepare()
|
||||
assert 'stuff=elixr' == prep.body
|
||||
|
||||
def test_response_iter_lines(self, httpbin):
|
||||
r = requests.get(httpbin('stream/4'), stream=True)
|
||||
assert r.status_code == 200
|
||||
|
||||
it = r.iter_lines()
|
||||
next(it)
|
||||
assert len(list(it)) == 3
|
||||
|
||||
def test_unconsumed_session_response_closes_connection(self, httpbin):
|
||||
s = requests.session()
|
||||
|
||||
with contextlib.closing(s.get(httpbin('stream/4'), stream=True)) as response:
|
||||
pass
|
||||
|
||||
assert response._content_consumed is False
|
||||
assert response.raw.closed
|
||||
|
||||
@pytest.mark.xfail
|
||||
def test_response_iter_lines_reentrant(self, httpbin):
|
||||
"""Response.iter_lines() is not reentrant safe"""
|
||||
r = requests.get(httpbin('stream/4'), stream=True)
|
||||
assert r.status_code == 200
|
||||
|
||||
next(r.iter_lines())
|
||||
assert len(list(r.iter_lines())) == 3
|
||||
|
||||
|
||||
class TestContentEncodingDetection(unittest.TestCase):
|
||||
|
||||
@@ -1182,6 +1283,7 @@ class TestCaseInsensitiveDict(unittest.TestCase):
|
||||
del othercid['spam']
|
||||
assert cid != othercid
|
||||
assert cid == {'spam': 'blueval', 'eggs': 'redval'}
|
||||
assert cid != object()
|
||||
|
||||
def test_setdefault(self):
|
||||
cid = CaseInsensitiveDict({'Spam': 'blueval'})
|
||||
@@ -1219,6 +1321,16 @@ class TestCaseInsensitiveDict(unittest.TestCase):
|
||||
assert frozenset(cid.keys()) == keyset
|
||||
assert frozenset(cid) == keyset
|
||||
|
||||
def test_copy(self):
|
||||
cid = CaseInsensitiveDict({
|
||||
'Accept': 'application/json',
|
||||
'user-Agent': 'requests',
|
||||
})
|
||||
cid_copy = cid.copy()
|
||||
assert cid == cid_copy
|
||||
cid['changed'] = True
|
||||
assert cid != cid_copy
|
||||
|
||||
|
||||
class UtilsTestCase(unittest.TestCase):
|
||||
|
||||
@@ -1244,6 +1356,13 @@ class UtilsTestCase(unittest.TestCase):
|
||||
assert super_len(
|
||||
cStringIO.StringIO('but some how, some way...')) == 25
|
||||
|
||||
def test_super_len_correctly_calculates_len_of_partially_read_file(self):
|
||||
"""Ensure that we handle partially consumed file like objects."""
|
||||
from requests.utils import super_len
|
||||
s = StringIO.StringIO()
|
||||
s.write('foobarbogus')
|
||||
assert super_len(s) == 0
|
||||
|
||||
def test_get_environ_proxies_ip_ranges(self):
|
||||
"""Ensures that IP addresses are correctly matches with ranges
|
||||
in no_proxy variable."""
|
||||
@@ -1265,6 +1384,41 @@ class UtilsTestCase(unittest.TestCase):
|
||||
'http://localhost.localdomain:5000/v1.0/') == {}
|
||||
assert get_environ_proxies('http://www.requests.com/') != {}
|
||||
|
||||
def test_select_proxies(self):
|
||||
"""Make sure we can select per-host proxies correctly."""
|
||||
from requests.utils import select_proxy
|
||||
proxies = {'http': 'http://http.proxy',
|
||||
'http://some.host': 'http://some.host.proxy'}
|
||||
assert select_proxy('hTTp://u:p@Some.Host/path', proxies) == 'http://some.host.proxy'
|
||||
assert select_proxy('hTTp://u:p@Other.Host/path', proxies) == 'http://http.proxy'
|
||||
assert select_proxy('hTTps://Other.Host', proxies) is None
|
||||
|
||||
def test_guess_filename_when_int(self):
|
||||
from requests.utils import guess_filename
|
||||
assert None is guess_filename(1)
|
||||
|
||||
def test_guess_filename_when_filename_is_an_int(self):
|
||||
from requests.utils import guess_filename
|
||||
fake = type('Fake', (object,), {'name': 1})()
|
||||
assert None is guess_filename(fake)
|
||||
|
||||
def test_guess_filename_with_file_like_obj(self):
|
||||
from requests.utils import guess_filename
|
||||
from requests import compat
|
||||
fake = type('Fake', (object,), {'name': b'value'})()
|
||||
guessed_name = guess_filename(fake)
|
||||
assert b'value' == guessed_name
|
||||
assert isinstance(guessed_name, compat.bytes)
|
||||
|
||||
def test_guess_filename_with_unicode_name(self):
|
||||
from requests.utils import guess_filename
|
||||
from requests import compat
|
||||
filename = b'value'.decode('utf-8')
|
||||
fake = type('Fake', (object,), {'name': filename})()
|
||||
guessed_name = guess_filename(fake)
|
||||
assert filename == guessed_name
|
||||
assert isinstance(guessed_name, compat.str)
|
||||
|
||||
def test_is_ipv4_address(self):
|
||||
from requests.utils import is_ipv4_address
|
||||
assert is_ipv4_address('8.8.8.8')
|
||||
@@ -1301,6 +1455,22 @@ class UtilsTestCase(unittest.TestCase):
|
||||
assert username == percent_encoding_test_chars
|
||||
assert password == percent_encoding_test_chars
|
||||
|
||||
def test_requote_uri_with_unquoted_percents(self):
|
||||
"""Ensure we handle unquoted percent signs in redirects.
|
||||
|
||||
See: https://github.com/kennethreitz/requests/issues/2356
|
||||
"""
|
||||
from requests.utils import requote_uri
|
||||
bad_uri = 'http://example.com/fiz?buz=%ppicture'
|
||||
quoted = 'http://example.com/fiz?buz=%25ppicture'
|
||||
assert quoted == requote_uri(bad_uri)
|
||||
|
||||
def test_requote_uri_properly_requotes(self):
|
||||
"""Ensure requoting doesn't break expectations."""
|
||||
from requests.utils import requote_uri
|
||||
quoted = 'http://example.com/fiz?buz=%25ppicture'
|
||||
assert quoted == requote_uri(quoted)
|
||||
|
||||
|
||||
class TestMorselToCookieExpires(unittest.TestCase):
|
||||
|
||||
@@ -1361,13 +1531,13 @@ class TestMorselToCookieMaxAge(unittest.TestCase):
|
||||
|
||||
|
||||
class TestTimeout:
|
||||
def test_stream_timeout(self):
|
||||
def test_stream_timeout(self, httpbin):
|
||||
try:
|
||||
requests.get(httpbin('delay/10'), timeout=2.0)
|
||||
except requests.exceptions.Timeout as e:
|
||||
assert 'Read timed out' in e.args[0].args[0]
|
||||
|
||||
def test_invalid_timeout(self):
|
||||
def test_invalid_timeout(self, httpbin):
|
||||
with pytest.raises(ValueError) as e:
|
||||
requests.get(httpbin('get'), timeout=(3, 4, 5))
|
||||
assert '(connect, read)' in str(e)
|
||||
@@ -1376,7 +1546,7 @@ class TestTimeout:
|
||||
requests.get(httpbin('get'), timeout="foo")
|
||||
assert 'must be an int or float' in str(e)
|
||||
|
||||
def test_none_timeout(self):
|
||||
def test_none_timeout(self, httpbin):
|
||||
""" Check that you can set None as a valid timeout value.
|
||||
|
||||
To actually test this behavior, we'd want to check that setting the
|
||||
@@ -1388,7 +1558,7 @@ class TestTimeout:
|
||||
r = requests.get(httpbin('get'), timeout=None)
|
||||
assert r.status_code == 200
|
||||
|
||||
def test_read_timeout(self):
|
||||
def test_read_timeout(self, httpbin):
|
||||
try:
|
||||
requests.get(httpbin('delay/10'), timeout=(None, 0.1))
|
||||
assert False, "The recv() request should time out."
|
||||
@@ -1410,7 +1580,7 @@ class TestTimeout:
|
||||
except ConnectTimeout:
|
||||
pass
|
||||
|
||||
def test_encoded_methods(self):
|
||||
def test_encoded_methods(self, httpbin):
|
||||
"""See: https://github.com/kennethreitz/requests/issues/2316"""
|
||||
r = requests.request(b'GET', httpbin('get'))
|
||||
assert r.ok
|
||||
@@ -1461,7 +1631,7 @@ class TestRedirects:
|
||||
'proxies': {},
|
||||
}
|
||||
|
||||
def test_requests_are_updated_each_time(self):
|
||||
def test_requests_are_updated_each_time(self, httpbin):
|
||||
session = RedirectSession([303, 307])
|
||||
prep = requests.Request('POST', httpbin('post')).prepare()
|
||||
r0 = session.send(prep)
|
||||
@@ -1539,12 +1709,11 @@ def test_prepare_unicode_url():
|
||||
p.prepare(
|
||||
method='GET',
|
||||
url=u('http://www.example.com/üniçø∂é'),
|
||||
hooks=[]
|
||||
)
|
||||
assert_copy(p, p.copy())
|
||||
|
||||
|
||||
def test_urllib3_retries():
|
||||
def test_urllib3_retries(httpbin):
|
||||
from requests.packages.urllib3.util import Retry
|
||||
s = requests.Session()
|
||||
s.mount('http://', HTTPAdapter(max_retries=Retry(
|
||||
@@ -1554,5 +1723,24 @@ def test_urllib3_retries():
|
||||
with pytest.raises(RetryError):
|
||||
s.get(httpbin('status/500'))
|
||||
|
||||
|
||||
def test_urllib3_pool_connection_closed(httpbin):
|
||||
s = requests.Session()
|
||||
s.mount('http://', HTTPAdapter(pool_connections=0, pool_maxsize=0))
|
||||
|
||||
try:
|
||||
s.get(httpbin('status/200'))
|
||||
except ConnectionError as e:
|
||||
assert u"Pool is closed." in str(e)
|
||||
|
||||
|
||||
def test_vendor_aliases():
|
||||
from requests.packages import urllib3
|
||||
from requests.packages import chardet
|
||||
|
||||
with pytest.raises(ImportError):
|
||||
from requests.packages import webbrowser
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
||||
|
||||
Reference in New Issue
Block a user