Skip to content

Commit

Permalink
s/plugin_map/auth_plugin_map/
Browse files Browse the repository at this point in the history
  • Loading branch information
methane committed Jan 4, 2016
1 parent 4ce7d7e commit a126c4e
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 27 deletions.
33 changes: 16 additions & 17 deletions pymysql/connections.py
Original file line number Diff line number Diff line change
Expand Up @@ -536,7 +536,7 @@ def __init__(self, host=None, user=None, password="",
compress=None, named_pipe=None, no_delay=None,
autocommit=False, db=None, passwd=None, local_infile=False,
max_allowed_packet=16*1024*1024, defer_connect=False,
plugin_map={}):
auth_plugin_map={}):
"""
Establish a connection to the MySQL database. Accepts several
arguments:
Expand Down Expand Up @@ -572,12 +572,11 @@ def __init__(self, host=None, user=None, password="",
max_allowed_packet: Max size of packet sent to server in bytes. (default: 16MB)
defer_connect: Don't explicitly connect on contruction - wait for connect call.
(default: False)
plugin_map: Map of plugin names to a class that processes that plugin. The class
will take the Connection object as the argument to the constructor. The class
needs an authenticate method taking an authentication packet as an argument.
For the dialog plugin, a prompt(echo, prompt) method can be used (if no
authenticate method) for returning a string from the user.
auth_plugin_map: A dict of plugin names to a class that processes that plugin.
The class will take the Connection object as the argument to the constructor.
The class needs an authenticate method taking an authentication packet as
an argument. For the dialog plugin, a prompt(echo, prompt) method can be used
(if no authenticate method) for returning a string from the user. (experimental)
db: Alias for database. (for compatibility to MySQLdb)
passwd: Alias for password. (for compatibility to MySQLdb)
"""
Expand Down Expand Up @@ -673,7 +672,7 @@ def _config(key, arg):
self.sql_mode = sql_mode
self.init_command = init_command
self.max_allowed_packet = max_allowed_packet
self.plugin_map = plugin_map
self._auth_plugin_map = auth_plugin_map
if defer_connect:
self.socket = None
else:
Expand Down Expand Up @@ -1075,18 +1074,15 @@ def _request_authentication(self):

data = data_init + self.user + b'\0'

authresp = ''
authresp = b''
if self._auth_plugin_name == 'mysql_native_password':
authresp = _scramble(self.password.encode('latin1'), self.salt)

if self.server_capabilities & CLIENT.PLUGIN_AUTH_LENENC_CLIENT_DATA:
data += lenenc_int(len(authresp))
data += authresp
data += lenenc_int(len(authresp)) + authresp
elif self.server_capabilities & CLIENT.SECURE_CONNECTION:
length = len(authresp)
data += struct.pack('B', length)
data += authresp
else: # pragma: no cover - not testing against servers without secure auth (>=5.0)
data += struct.pack('B', len(authresp)) + authresp
else: # pragma: no cover - not testing against servers without secure auth (>=5.0)
data += authresp + b'\0'

if self.db and self.server_capabilities & CLIENT.CONNECT_WITH_DB:
Expand Down Expand Up @@ -1117,10 +1113,13 @@ def _request_authentication(self):
self.write_packet(data)
auth_packet = self._read_packet()

#TODO: ok packet or error packet?


def _process_auth(self, plugin_name, auth_packet):
plugin_class = self.plugin_map.get(plugin_name)
plugin_class = self._auth_plugin_map.get(plugin_name)
if not plugin_class:
plugin_class = self.plugin_map.get(plugin_name.decode('ascii'))
plugin_class = self._auth_plugin_map.get(plugin_name.decode('ascii'))
if plugin_class:
try:
handler = plugin_class(self)
Expand Down
24 changes: 14 additions & 10 deletions pymysql/tests/test_connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ def realTestDialogAuthTwoQuestions(self):
self.databases[0]['db'], 'two_questions', 'notverysecret') as u:
with self.assertRaises(pymysql.err.OperationalError):
pymysql.connect(user='pymysql_2q', **self.db)
pymysql.connect(user='pymysql_2q', plugin_map={b'dialog': TestAuthentication.Dialog}, **self.db)
pymysql.connect(user='pymysql_2q', auth_plugin_map={b'dialog': TestAuthentication.Dialog}, **self.db)

@unittest2.skipUnless(socket_auth, "connection to unix_socket required")
@unittest2.skipIf(three_attempts_found, "three_attempts plugin already installed")
Expand All @@ -226,21 +226,21 @@ def realTestDialogAuthThreeAttempts(self):
TestAuthentication.Dialog.fail=True # fail just once. We've got three attempts after all
with TempUser(self.connections[0].cursor(), 'pymysql_3a@localhost',
self.databases[0]['db'], 'three_attempts', 'stillnotverysecret') as u:
pymysql.connect(user='pymysql_3a', plugin_map={b'dialog': TestAuthentication.Dialog}, **self.db)
pymysql.connect(user='pymysql_3a', plugin_map={b'dialog': TestAuthentication.DialogHandler}, **self.db)
pymysql.connect(user='pymysql_3a', auth_plugin_map={b'dialog': TestAuthentication.Dialog}, **self.db)
pymysql.connect(user='pymysql_3a', auth_plugin_map={b'dialog': TestAuthentication.DialogHandler}, **self.db)
with self.assertRaises(pymysql.err.OperationalError):
pymysql.connect(user='pymysql_3a', plugin_map={b'dialog': object}, **self.db)
pymysql.connect(user='pymysql_3a', auth_plugin_map={b'dialog': object}, **self.db)

with self.assertRaises(pymysql.err.OperationalError):
pymysql.connect(user='pymysql_3a', plugin_map={b'dialog': TestAuthentication.DefectiveHandler}, **self.db)
pymysql.connect(user='pymysql_3a', auth_plugin_map={b'dialog': TestAuthentication.DefectiveHandler}, **self.db)
with self.assertRaises(pymysql.err.OperationalError):
pymysql.connect(user='pymysql_3a', plugin_map={b'notdialogplugin': TestAuthentication.Dialog}, **self.db)
pymysql.connect(user='pymysql_3a', auth_plugin_map={b'notdialogplugin': TestAuthentication.Dialog}, **self.db)
TestAuthentication.Dialog.m = {b'Password, please:': b'I do not know'}
with self.assertRaises(pymysql.err.OperationalError):
pymysql.connect(user='pymysql_3a', plugin_map={b'dialog': TestAuthentication.Dialog}, **self.db)
pymysql.connect(user='pymysql_3a', auth_plugin_map={b'dialog': TestAuthentication.Dialog}, **self.db)
TestAuthentication.Dialog.m = {b'Password, please:': None}
with self.assertRaises(pymysql.err.OperationalError):
pymysql.connect(user='pymysql_3a', plugin_map={b'dialog': TestAuthentication.Dialog}, **self.db)
pymysql.connect(user='pymysql_3a', auth_plugin_map={b'dialog': TestAuthentication.Dialog}, **self.db)

@unittest2.skipUnless(socket_auth, "connection to unix_socket required")
@unittest2.skipIf(pam_found, "pam plugin already installed")
Expand Down Expand Up @@ -286,12 +286,16 @@ def realTestPamAuth(self):
c = pymysql.connect(user=TestAuthentication.osuser, **db)
db['password'] = 'very bad guess at password'
with self.assertRaises(pymysql.err.OperationalError):
pymysql.connect(user=TestAuthentication.osuser, plugin_map={b'mysql_cleartext_password': TestAuthentication.DefectiveHandler}, **self.db)
pymysql.connect(user=TestAuthentication.osuser,
auth_plugin_map={b'mysql_cleartext_password': TestAuthentication.DefectiveHandler},
**self.db)
except pymysql.OperationalError as e:
self.assertEqual(1045, e.args[0])
# we had 'bad guess at password' work with pam. Well at least we get a permission denied here
with self.assertRaises(pymysql.err.OperationalError):
pymysql.connect(user=TestAuthentication.osuser, plugin_map={b'mysql_cleartext_password': TestAuthentication.DefectiveHandler}, **self.db)
pymysql.connect(user=TestAuthentication.osuser,
auth_plugin_map={b'mysql_cleartext_password': TestAuthentication.DefectiveHandler},
**self.db)
if grants:
# recreate the user
cur.execute(grants)
Expand Down

0 comments on commit a126c4e

Please sign in to comment.