forked from twisted/twisted
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathtest_sslverify.py
3337 lines (2714 loc) · 113 KB
/
test_sslverify.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
# Copyright 2005 Divmod, Inc. See LICENSE file for details
# Copyright (c) Twisted Matrix Laboratories.
# See LICENSE for details.
"""
Tests for L{twisted.internet._sslverify}.
"""
from __future__ import division, absolute_import
import sys
import itertools
import datetime
from zope.interface import implementer
from twisted.python.reflect import requireModule
skipSSL = None
skipSNI = None
skipNPN = None
skipALPN = None
if requireModule("OpenSSL"):
import ipaddress
from twisted.internet import ssl
from OpenSSL import SSL
from OpenSSL.crypto import get_elliptic_curves
from OpenSSL.crypto import PKey, X509
from OpenSSL.crypto import TYPE_RSA, FILETYPE_PEM
from cryptography import x509
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.asymmetric import rsa
from cryptography.hazmat.primitives.serialization import (
PrivateFormat, NoEncryption
)
from cryptography.x509.oid import NameOID
from cryptography.hazmat.primitives.serialization import Encoding
try:
ctx = SSL.Context(SSL.SSLv23_METHOD)
ctx.set_npn_advertise_callback(lambda c: None)
except NotImplementedError:
skipNPN = "OpenSSL 1.0.1 or greater required for NPN support"
try:
ctx = SSL.Context(SSL.SSLv23_METHOD)
ctx.set_alpn_select_callback(lambda c: None)
except NotImplementedError:
skipALPN = "OpenSSL 1.0.2 or greater required for ALPN support"
else:
skipSSL = "OpenSSL is required for SSL tests."
skipSNI = skipSSL
skipNPN = skipSSL
skipALPN = skipSSL
from twisted.test.test_twisted import SetAsideModule
from twisted.test.iosim import connectedServerAndClient
from twisted.internet.error import ConnectionClosed
from twisted.python.compat import nativeString
from twisted.python.filepath import FilePath
from twisted.python.modules import getModule
from twisted.trial import unittest, util
from twisted.internet import protocol, defer, reactor
from twisted.internet._idna import _idnaText
from twisted.internet.error import CertificateError, ConnectionLost
from twisted.internet import interfaces
from incremental import Version
if not skipSSL:
from twisted.internet.ssl import platformTrust, VerificationError
from twisted.internet import _sslverify as sslverify
from twisted.protocols.tls import TLSMemoryBIOFactory
# A couple of static PEM-format certificates to be used by various tests.
A_HOST_CERTIFICATE_PEM = """
-----BEGIN CERTIFICATE-----
MIIC2jCCAkMCAjA5MA0GCSqGSIb3DQEBBAUAMIG0MQswCQYDVQQGEwJVUzEiMCAG
A1UEAxMZZXhhbXBsZS50d2lzdGVkbWF0cml4LmNvbTEPMA0GA1UEBxMGQm9zdG9u
MRwwGgYDVQQKExNUd2lzdGVkIE1hdHJpeCBMYWJzMRYwFAYDVQQIEw1NYXNzYWNo
dXNldHRzMScwJQYJKoZIhvcNAQkBFhhub2JvZHlAdHdpc3RlZG1hdHJpeC5jb20x
ETAPBgNVBAsTCFNlY3VyaXR5MB4XDTA2MDgxNjAxMDEwOFoXDTA3MDgxNjAxMDEw
OFowgbQxCzAJBgNVBAYTAlVTMSIwIAYDVQQDExlleGFtcGxlLnR3aXN0ZWRtYXRy
aXguY29tMQ8wDQYDVQQHEwZCb3N0b24xHDAaBgNVBAoTE1R3aXN0ZWQgTWF0cml4
IExhYnMxFjAUBgNVBAgTDU1hc3NhY2h1c2V0dHMxJzAlBgkqhkiG9w0BCQEWGG5v
Ym9keUB0d2lzdGVkbWF0cml4LmNvbTERMA8GA1UECxMIU2VjdXJpdHkwgZ8wDQYJ
KoZIhvcNAQEBBQADgY0AMIGJAoGBAMzH8CDF/U91y/bdbdbJKnLgnyvQ9Ig9ZNZp
8hpsu4huil60zF03+Lexg2l1FIfURScjBuaJMR6HiMYTMjhzLuByRZ17KW4wYkGi
KXstz03VIKy4Tjc+v4aXFI4XdRw10gGMGQlGGscXF/RSoN84VoDKBfOMWdXeConJ
VyC4w3iJAgMBAAEwDQYJKoZIhvcNAQEEBQADgYEAviMT4lBoxOgQy32LIgZ4lVCj
JNOiZYg8GMQ6y0ugp86X80UjOvkGtNf/R7YgED/giKRN/q/XJiLJDEhzknkocwmO
S+4b2XpiaZYxRyKWwL221O7CGmtWYyZl2+92YYmmCiNzWQPfP6BOMlfax0AGLHls
fXzCWdG0O/3Lk2SRM0I=
-----END CERTIFICATE-----
"""
A_PEER_CERTIFICATE_PEM = """
-----BEGIN CERTIFICATE-----
MIIC3jCCAkcCAjA6MA0GCSqGSIb3DQEBBAUAMIG2MQswCQYDVQQGEwJVUzEiMCAG
A1UEAxMZZXhhbXBsZS50d2lzdGVkbWF0cml4LmNvbTEPMA0GA1UEBxMGQm9zdG9u
MRwwGgYDVQQKExNUd2lzdGVkIE1hdHJpeCBMYWJzMRYwFAYDVQQIEw1NYXNzYWNo
dXNldHRzMSkwJwYJKoZIhvcNAQkBFhpzb21lYm9keUB0d2lzdGVkbWF0cml4LmNv
bTERMA8GA1UECxMIU2VjdXJpdHkwHhcNMDYwODE2MDEwMTU2WhcNMDcwODE2MDEw
MTU2WjCBtjELMAkGA1UEBhMCVVMxIjAgBgNVBAMTGWV4YW1wbGUudHdpc3RlZG1h
dHJpeC5jb20xDzANBgNVBAcTBkJvc3RvbjEcMBoGA1UEChMTVHdpc3RlZCBNYXRy
aXggTGFiczEWMBQGA1UECBMNTWFzc2FjaHVzZXR0czEpMCcGCSqGSIb3DQEJARYa
c29tZWJvZHlAdHdpc3RlZG1hdHJpeC5jb20xETAPBgNVBAsTCFNlY3VyaXR5MIGf
MA0GCSqGSIb3DQEBAQUAA4GNADCBiQKBgQCnm+WBlgFNbMlHehib9ePGGDXF+Nz4
CjGuUmVBaXCRCiVjg3kSDecwqfb0fqTksBZ+oQ1UBjMcSh7OcvFXJZnUesBikGWE
JE4V8Bjh+RmbJ1ZAlUPZ40bAkww0OpyIRAGMvKG+4yLFTO4WDxKmfDcrOb6ID8WJ
e1u+i3XGkIf/5QIDAQABMA0GCSqGSIb3DQEBBAUAA4GBAD4Oukm3YYkhedUepBEA
vvXIQhVDqL7mk6OqYdXmNj6R7ZMC8WWvGZxrzDI1bZuB+4aIxxd1FXC3UOHiR/xg
i9cDl1y8P/qRp4aEBNF6rI0D4AxTbfnHQx4ERDAOShJdYZs/2zifPJ6va6YvrEyr
yqDtGhklsWW3ZwBzEh5VEOUp
-----END CERTIFICATE-----
"""
A_KEYPAIR = getModule(__name__).filePath.sibling('server.pem').getContent()
def counter(counter=itertools.count()):
"""
Each time we're called, return the next integer in the natural numbers.
"""
return next(counter)
def makeCertificate(**kw):
keypair = PKey()
keypair.generate_key(TYPE_RSA, 1024)
certificate = X509()
certificate.gmtime_adj_notBefore(0)
certificate.gmtime_adj_notAfter(60 * 60 * 24 * 365) # One year
for xname in certificate.get_issuer(), certificate.get_subject():
for (k, v) in kw.items():
setattr(xname, k, nativeString(v))
certificate.set_serial_number(counter())
certificate.set_pubkey(keypair)
certificate.sign(keypair, "md5")
return keypair, certificate
def certificatesForAuthorityAndServer(serviceIdentity=u'example.com'):
"""
Create a self-signed CA certificate and server certificate signed by the
CA.
@param serviceIdentity: The identity (hostname) of the server.
@type serviceIdentity: L{unicode}
@return: a 2-tuple of C{(certificate_authority_certificate,
server_certificate)}
@rtype: L{tuple} of (L{sslverify.Certificate},
L{sslverify.PrivateCertificate})
"""
commonNameForCA = x509.Name(
[x509.NameAttribute(NameOID.COMMON_NAME, u'Testing Example CA')]
)
commonNameForServer = x509.Name(
[x509.NameAttribute(NameOID.COMMON_NAME, u'Testing Example Server')]
)
oneDay = datetime.timedelta(1, 0, 0)
privateKeyForCA = rsa.generate_private_key(
public_exponent=65537,
key_size=4096,
backend=default_backend()
)
publicKeyForCA = privateKeyForCA.public_key()
caCertificate = (
x509.CertificateBuilder()
.subject_name(commonNameForCA)
.issuer_name(commonNameForCA)
.not_valid_before(datetime.datetime.today() - oneDay)
.not_valid_after(datetime.datetime.today() + oneDay)
.serial_number(x509.random_serial_number())
.public_key(publicKeyForCA)
.add_extension(
x509.BasicConstraints(ca=True, path_length=9), critical=True,
)
.sign(
private_key=privateKeyForCA, algorithm=hashes.SHA256(),
backend=default_backend()
)
)
privateKeyForServer = rsa.generate_private_key(
public_exponent=65537,
key_size=4096,
backend=default_backend()
)
publicKeyForServer = privateKeyForServer.public_key()
try:
ipAddress = ipaddress.ip_address(serviceIdentity)
except ValueError:
subjectAlternativeNames = [
x509.DNSName(serviceIdentity.encode("idna").decode("ascii"))
]
else:
subjectAlternativeNames = [x509.IPAddress(ipAddress)]
serverCertificate = (
x509.CertificateBuilder()
.subject_name(commonNameForServer)
.issuer_name(commonNameForCA)
.not_valid_before(datetime.datetime.today() - oneDay)
.not_valid_after(datetime.datetime.today() + oneDay)
.serial_number(x509.random_serial_number())
.public_key(publicKeyForServer)
.add_extension(
x509.BasicConstraints(ca=False, path_length=None), critical=True,
)
.add_extension(
x509.SubjectAlternativeName(
subjectAlternativeNames
),
critical=True,
)
.sign(
private_key=privateKeyForCA, algorithm=hashes.SHA256(),
backend=default_backend()
)
)
caSelfCert = sslverify.Certificate.loadPEM(
caCertificate.public_bytes(Encoding.PEM)
)
serverCert = sslverify.PrivateCertificate.loadPEM(
b"\n".join([privateKeyForServer.private_bytes(
Encoding.PEM,
PrivateFormat.TraditionalOpenSSL,
NoEncryption(),
),
serverCertificate.public_bytes(Encoding.PEM)])
)
return caSelfCert, serverCert
def _loopbackTLSConnection(serverOpts, clientOpts):
"""
Common implementation code for both L{loopbackTLSConnection} and
L{loopbackTLSConnectionInMemory}. Creates a loopback TLS connection
using the provided server and client context factories.
@param serverOpts: An OpenSSL context factory for the server.
@type serverOpts: C{OpenSSLCertificateOptions}, or any class with an
equivalent API.
@param clientOpts: An OpenSSL context factory for the client.
@type clientOpts: C{OpenSSLCertificateOptions}, or any class with an
equivalent API.
@return: 5-tuple of server-tls-protocol, server-inner-protocol,
client-tls-protocol, client-inner-protocol and L{IOPump}
@rtype: L{tuple}
"""
class GreetingServer(protocol.Protocol):
greeting = b"greetings!"
def connectionMade(self):
self.transport.write(self.greeting)
class ListeningClient(protocol.Protocol):
data = b''
lostReason = None
def dataReceived(self, data):
self.data += data
def connectionLost(self, reason):
self.lostReason = reason
clientWrappedProto = ListeningClient()
serverWrappedProto = GreetingServer()
plainClientFactory = protocol.Factory()
plainClientFactory.protocol = lambda: clientWrappedProto
plainServerFactory = protocol.Factory()
plainServerFactory.protocol = lambda: serverWrappedProto
clientFactory = TLSMemoryBIOFactory(
clientOpts, isClient=True,
wrappedFactory=plainServerFactory
)
serverFactory = TLSMemoryBIOFactory(
serverOpts, isClient=False,
wrappedFactory=plainClientFactory
)
sProto, cProto, pump = connectedServerAndClient(
lambda: serverFactory.buildProtocol(None),
lambda: clientFactory.buildProtocol(None)
)
return sProto, cProto, serverWrappedProto, clientWrappedProto, pump
def loopbackTLSConnection(trustRoot, privateKeyFile, chainedCertFile=None):
"""
Create a loopback TLS connection with the given trust and keys.
@param trustRoot: the C{trustRoot} argument for the client connection's
context.
@type trustRoot: L{sslverify.IOpenSSLTrustRoot}
@param privateKeyFile: The name of the file containing the private key.
@type privateKeyFile: L{str} (native string; file name)
@param chainedCertFile: The name of the chained certificate file.
@type chainedCertFile: L{str} (native string; file name)
@return: 3-tuple of server-protocol, client-protocol, and L{IOPump}
@rtype: L{tuple}
"""
class ContextFactory(object):
def getContext(self):
"""
Create a context for the server side of the connection.
@return: an SSL context using a certificate and key.
@rtype: C{OpenSSL.SSL.Context}
"""
ctx = SSL.Context(SSL.TLSv1_METHOD)
if chainedCertFile is not None:
ctx.use_certificate_chain_file(chainedCertFile)
ctx.use_privatekey_file(privateKeyFile)
# Let the test author know if they screwed something up.
ctx.check_privatekey()
return ctx
serverOpts = ContextFactory()
clientOpts = sslverify.OpenSSLCertificateOptions(trustRoot=trustRoot)
return _loopbackTLSConnection(serverOpts, clientOpts)
def loopbackTLSConnectionInMemory(trustRoot, privateKey,
serverCertificate, clientProtocols=None,
serverProtocols=None,
clientOptions=None):
"""
Create a loopback TLS connection with the given trust and keys. Like
L{loopbackTLSConnection}, but using in-memory certificates and keys rather
than writing them to disk.
@param trustRoot: the C{trustRoot} argument for the client connection's
context.
@type trustRoot: L{sslverify.IOpenSSLTrustRoot}
@param privateKey: The private key.
@type privateKey: L{str} (native string)
@param serverCertificate: The certificate used by the server.
@type chainedCertFile: L{str} (native string)
@param clientProtocols: The protocols the client is willing to negotiate
using NPN/ALPN.
@param serverProtocols: The protocols the server is willing to negotiate
using NPN/ALPN.
@param clientOptions: The type of C{OpenSSLCertificateOptions} class to
use for the client. Defaults to C{OpenSSLCertificateOptions}.
@return: 3-tuple of server-protocol, client-protocol, and L{IOPump}
@rtype: L{tuple}
"""
if clientOptions is None:
clientOptions = sslverify.OpenSSLCertificateOptions
clientCertOpts = clientOptions(
trustRoot=trustRoot,
acceptableProtocols=clientProtocols
)
serverCertOpts = sslverify.OpenSSLCertificateOptions(
privateKey=privateKey,
certificate=serverCertificate,
acceptableProtocols=serverProtocols,
)
return _loopbackTLSConnection(serverCertOpts, clientCertOpts)
def pathContainingDumpOf(testCase, *dumpables):
"""
Create a temporary file to store some serializable-as-PEM objects in, and
return its name.
@param testCase: a test case to use for generating a temporary directory.
@type testCase: L{twisted.trial.unittest.TestCase}
@param dumpables: arguments are objects from pyOpenSSL with a C{dump}
method, taking a pyOpenSSL file-type constant, such as
L{OpenSSL.crypto.FILETYPE_PEM} or L{OpenSSL.crypto.FILETYPE_ASN1}.
@type dumpables: L{tuple} of L{object} with C{dump} method taking L{int}
returning L{bytes}
@return: the path to a file where all of the dumpables were dumped in PEM
format.
@rtype: L{str}
"""
fname = testCase.mktemp()
with open(fname, "wb") as f:
for dumpable in dumpables:
f.write(dumpable.dump(FILETYPE_PEM))
return fname
class DataCallbackProtocol(protocol.Protocol):
def dataReceived(self, data):
d, self.factory.onData = self.factory.onData, None
if d is not None:
d.callback(data)
def connectionLost(self, reason):
d, self.factory.onLost = self.factory.onLost, None
if d is not None:
d.errback(reason)
class WritingProtocol(protocol.Protocol):
byte = b'x'
def connectionMade(self):
self.transport.write(self.byte)
def connectionLost(self, reason):
self.factory.onLost.errback(reason)
class FakeContext(object):
"""
Introspectable fake of an C{OpenSSL.SSL.Context}.
Saves call arguments for later introspection.
Necessary because C{Context} offers poor introspection. cf. this
U{pyOpenSSL bug<https://bugs.launchpad.net/pyopenssl/+bug/1173899>}.
@ivar _method: See C{method} parameter of L{__init__}.
@ivar _options: L{int} of C{OR}ed values from calls of L{set_options}.
@ivar _certificate: Set by L{use_certificate}.
@ivar _privateKey: Set by L{use_privatekey}.
@ivar _verify: Set by L{set_verify}.
@ivar _verifyDepth: Set by L{set_verify_depth}.
@ivar _mode: Set by L{set_mode}.
@ivar _sessionID: Set by L{set_session_id}.
@ivar _extraCertChain: Accumulated L{list} of all extra certificates added
by L{add_extra_chain_cert}.
@ivar _cipherList: Set by L{set_cipher_list}.
@ivar _dhFilename: Set by L{load_tmp_dh}.
@ivar _defaultVerifyPathsSet: Set by L{set_default_verify_paths}
@ivar _ecCurve: Set by L{set_tmp_ecdh}
"""
_options = 0
def __init__(self, method):
self._method = method
self._extraCertChain = []
self._defaultVerifyPathsSet = False
self._ecCurve = None
def set_options(self, options):
self._options |= options
def use_certificate(self, certificate):
self._certificate = certificate
def use_privatekey(self, privateKey):
self._privateKey = privateKey
def check_privatekey(self):
return None
def set_mode(self, mode):
"""
Set the mode. See L{SSL.Context.set_mode}.
@param mode: See L{SSL.Context.set_mode}.
"""
self._mode = mode
def set_verify(self, flags, callback):
self._verify = flags, callback
def set_verify_depth(self, depth):
self._verifyDepth = depth
def set_session_id(self, sessionID):
self._sessionID = sessionID
def add_extra_chain_cert(self, cert):
self._extraCertChain.append(cert)
def set_cipher_list(self, cipherList):
self._cipherList = cipherList
def load_tmp_dh(self, dhfilename):
self._dhFilename = dhfilename
def set_default_verify_paths(self):
"""
Set the default paths for the platform.
"""
self._defaultVerifyPathsSet = True
def set_tmp_ecdh(self, curve):
"""
Set an ECDH curve. Should only be called by OpenSSL 1.0.1
code.
@param curve: See L{OpenSSL.SSL.Context.set_tmp_ecdh}
"""
self._ecCurve = curve
class ClientOptionsTests(unittest.SynchronousTestCase):
"""
Tests for L{sslverify.optionsForClientTLS}.
"""
if skipSSL:
skip = skipSSL
def test_extraKeywords(self):
"""
When passed a keyword parameter other than C{extraCertificateOptions},
L{sslverify.optionsForClientTLS} raises an exception just like a
normal Python function would.
"""
error = self.assertRaises(
TypeError,
sslverify.optionsForClientTLS,
hostname=u'alpha', someRandomThing=u'beta',
)
self.assertEqual(
str(error),
"optionsForClientTLS() got an unexpected keyword argument "
"'someRandomThing'"
)
def test_bytesFailFast(self):
"""
If you pass L{bytes} as the hostname to
L{sslverify.optionsForClientTLS} it immediately raises a L{TypeError}.
"""
error = self.assertRaises(
TypeError,
sslverify.optionsForClientTLS, b'not-actually-a-hostname.com'
)
expectedText = (
"optionsForClientTLS requires text for host names, not " +
bytes.__name__
)
self.assertEqual(str(error), expectedText)
def test_dNSNameHostname(self):
"""
If you pass a dNSName to L{sslverify.optionsForClientTLS}
L{_hostnameIsDnsName} will be True
"""
options = sslverify.optionsForClientTLS(u'example.com')
self.assertTrue(options._hostnameIsDnsName)
def test_IPv4AddressHostname(self):
"""
If you pass an IPv4 address to L{sslverify.optionsForClientTLS}
L{_hostnameIsDnsName} will be False
"""
options = sslverify.optionsForClientTLS(u'127.0.0.1')
self.assertFalse(options._hostnameIsDnsName)
def test_IPv6AddressHostname(self):
"""
If you pass an IPv6 address to L{sslverify.optionsForClientTLS}
L{_hostnameIsDnsName} will be False
"""
options = sslverify.optionsForClientTLS(u'::1')
self.assertFalse(options._hostnameIsDnsName)
class FakeChooseDiffieHellmanEllipticCurve(object):
"""
A fake implementation of L{_ChooseDiffieHellmanEllipticCurve}
"""
def __init__(self, versionNumber, openSSLlib, openSSLcrypto):
"""
A no-op constructor.
"""
def configureECDHCurve(self, ctx):
"""
A null configuration.
@param ctx: An L{OpenSSL.SSL.Context} that would be
configured.
"""
class OpenSSLOptionsTestsMixin(object):
"""
A mixin for L{OpenSSLOptions} test cases creates client and server
certificates, signs them with a CA, and provides a L{loopback}
that creates TLS a connections with them.
"""
if skipSSL:
skip = skipSSL
serverPort = clientConn = None
onServerLost = onClientLost = None
def setUp(self):
"""
Create class variables of client and server certificates.
"""
self.sKey, self.sCert = makeCertificate(
O=b"Server Test Certificate",
CN=b"server")
self.cKey, self.cCert = makeCertificate(
O=b"Client Test Certificate",
CN=b"client")
self.caCert1 = makeCertificate(
O=b"CA Test Certificate 1",
CN=b"ca1")[1]
self.caCert2 = makeCertificate(
O=b"CA Test Certificate",
CN=b"ca2")[1]
self.caCerts = [self.caCert1, self.caCert2]
self.extraCertChain = self.caCerts
def tearDown(self):
if self.serverPort is not None:
self.serverPort.stopListening()
if self.clientConn is not None:
self.clientConn.disconnect()
L = []
if self.onServerLost is not None:
L.append(self.onServerLost)
if self.onClientLost is not None:
L.append(self.onClientLost)
return defer.DeferredList(L, consumeErrors=True)
def loopback(self, serverCertOpts, clientCertOpts,
onServerLost=None, onClientLost=None, onData=None):
if onServerLost is None:
self.onServerLost = onServerLost = defer.Deferred()
if onClientLost is None:
self.onClientLost = onClientLost = defer.Deferred()
if onData is None:
onData = defer.Deferred()
serverFactory = protocol.ServerFactory()
serverFactory.protocol = DataCallbackProtocol
serverFactory.onLost = onServerLost
serverFactory.onData = onData
clientFactory = protocol.ClientFactory()
clientFactory.protocol = WritingProtocol
clientFactory.onLost = onClientLost
self.serverPort = reactor.listenSSL(0, serverFactory, serverCertOpts)
self.clientConn = reactor.connectSSL('127.0.0.1',
self.serverPort.getHost().port, clientFactory, clientCertOpts)
class OpenSSLOptionsTests(OpenSSLOptionsTestsMixin, unittest.TestCase):
"""
Tests for L{sslverify.OpenSSLOptions}.
"""
def setUp(self):
"""
Same as L{OpenSSLOptionsTestsMixin.setUp}, but it also patches
L{sslverify._ChooseDiffieHellmanEllipticCurve}.
"""
super(OpenSSLOptionsTests, self).setUp()
self.patch(sslverify, "_ChooseDiffieHellmanEllipticCurve",
FakeChooseDiffieHellmanEllipticCurve)
def test_constructorWithOnlyPrivateKey(self):
"""
C{privateKey} and C{certificate} make only sense if both are set.
"""
self.assertRaises(
ValueError,
sslverify.OpenSSLCertificateOptions, privateKey=self.sKey
)
def test_constructorWithOnlyCertificate(self):
"""
C{privateKey} and C{certificate} make only sense if both are set.
"""
self.assertRaises(
ValueError,
sslverify.OpenSSLCertificateOptions, certificate=self.sCert
)
def test_constructorWithCertificateAndPrivateKey(self):
"""
Specifying C{privateKey} and C{certificate} initializes correctly.
"""
opts = sslverify.OpenSSLCertificateOptions(privateKey=self.sKey,
certificate=self.sCert)
self.assertEqual(opts.privateKey, self.sKey)
self.assertEqual(opts.certificate, self.sCert)
self.assertEqual(opts.extraCertChain, [])
def test_constructorDoesNotAllowVerifyWithoutCACerts(self):
"""
C{verify} must not be C{True} without specifying C{caCerts}.
"""
self.assertRaises(
ValueError,
sslverify.OpenSSLCertificateOptions,
privateKey=self.sKey, certificate=self.sCert, verify=True
)
def test_constructorDoesNotAllowLegacyWithTrustRoot(self):
"""
C{verify}, C{requireCertificate}, and C{caCerts} must not be specified
by the caller (to be I{any} value, even the default!) when specifying
C{trustRoot}.
"""
self.assertRaises(
TypeError,
sslverify.OpenSSLCertificateOptions,
privateKey=self.sKey, certificate=self.sCert,
verify=True, trustRoot=None, caCerts=self.caCerts,
)
self.assertRaises(
TypeError,
sslverify.OpenSSLCertificateOptions,
privateKey=self.sKey, certificate=self.sCert,
trustRoot=None, requireCertificate=True,
)
def test_constructorAllowsCACertsWithoutVerify(self):
"""
It's currently a NOP, but valid.
"""
opts = sslverify.OpenSSLCertificateOptions(privateKey=self.sKey,
certificate=self.sCert,
caCerts=self.caCerts)
self.assertFalse(opts.verify)
self.assertEqual(self.caCerts, opts.caCerts)
def test_constructorWithVerifyAndCACerts(self):
"""
Specifying C{verify} and C{caCerts} initializes correctly.
"""
opts = sslverify.OpenSSLCertificateOptions(privateKey=self.sKey,
certificate=self.sCert,
verify=True,
caCerts=self.caCerts)
self.assertTrue(opts.verify)
self.assertEqual(self.caCerts, opts.caCerts)
def test_constructorSetsExtraChain(self):
"""
Setting C{extraCertChain} works if C{certificate} and C{privateKey} are
set along with it.
"""
opts = sslverify.OpenSSLCertificateOptions(
privateKey=self.sKey,
certificate=self.sCert,
extraCertChain=self.extraCertChain,
)
self.assertEqual(self.extraCertChain, opts.extraCertChain)
def test_constructorDoesNotAllowExtraChainWithoutPrivateKey(self):
"""
A C{extraCertChain} without C{privateKey} doesn't make sense and is
thus rejected.
"""
self.assertRaises(
ValueError,
sslverify.OpenSSLCertificateOptions,
certificate=self.sCert,
extraCertChain=self.extraCertChain,
)
def test_constructorDoesNotAllowExtraChainWithOutPrivateKey(self):
"""
A C{extraCertChain} without C{certificate} doesn't make sense and is
thus rejected.
"""
self.assertRaises(
ValueError,
sslverify.OpenSSLCertificateOptions,
privateKey=self.sKey,
extraCertChain=self.extraCertChain,
)
def test_extraChainFilesAreAddedIfSupplied(self):
"""
If C{extraCertChain} is set and all prerequisites are met, the
specified chain certificates are added to C{Context}s that get
created.
"""
opts = sslverify.OpenSSLCertificateOptions(
privateKey=self.sKey,
certificate=self.sCert,
extraCertChain=self.extraCertChain,
)
opts._contextFactory = FakeContext
ctx = opts.getContext()
self.assertEqual(self.sKey, ctx._privateKey)
self.assertEqual(self.sCert, ctx._certificate)
self.assertEqual(self.extraCertChain, ctx._extraCertChain)
def test_extraChainDoesNotBreakPyOpenSSL(self):
"""
C{extraCertChain} doesn't break C{OpenSSL.SSL.Context} creation.
"""
opts = sslverify.OpenSSLCertificateOptions(
privateKey=self.sKey,
certificate=self.sCert,
extraCertChain=self.extraCertChain,
)
ctx = opts.getContext()
self.assertIsInstance(ctx, SSL.Context)
def test_acceptableCiphersAreAlwaysSet(self):
"""
If the user doesn't supply custom acceptable ciphers, a shipped secure
default is used. We can't check directly for it because the effective
cipher string we set varies with platforms.
"""
opts = sslverify.OpenSSLCertificateOptions(
privateKey=self.sKey,
certificate=self.sCert,
)
opts._contextFactory = FakeContext
ctx = opts.getContext()
self.assertEqual(opts._cipherString.encode('ascii'), ctx._cipherList)
def test_givesMeaningfulErrorMessageIfNoCipherMatches(self):
"""
If there is no valid cipher that matches the user's wishes,
a L{ValueError} is raised.
"""
self.assertRaises(
ValueError,
sslverify.OpenSSLCertificateOptions,
privateKey=self.sKey,
certificate=self.sCert,
acceptableCiphers=
sslverify.OpenSSLAcceptableCiphers.fromOpenSSLCipherString('')
)
def test_honorsAcceptableCiphersArgument(self):
"""
If acceptable ciphers are passed, they are used.
"""
@implementer(interfaces.IAcceptableCiphers)
class FakeAcceptableCiphers(object):
def selectCiphers(self, _):
return [sslverify.OpenSSLCipher(u'sentinel')]
opts = sslverify.OpenSSLCertificateOptions(
privateKey=self.sKey,
certificate=self.sCert,
acceptableCiphers=FakeAcceptableCiphers(),
)
opts._contextFactory = FakeContext
ctx = opts.getContext()
self.assertEqual(b'sentinel', ctx._cipherList)
def test_basicSecurityOptionsAreSet(self):
"""
Every context must have C{OP_NO_SSLv2}, C{OP_NO_COMPRESSION}, and
C{OP_CIPHER_SERVER_PREFERENCE} set.
"""
opts = sslverify.OpenSSLCertificateOptions(
privateKey=self.sKey,
certificate=self.sCert,
)
opts._contextFactory = FakeContext
ctx = opts.getContext()
options = (SSL.OP_NO_SSLv2 | SSL.OP_NO_COMPRESSION |
SSL.OP_CIPHER_SERVER_PREFERENCE)
self.assertEqual(options, ctx._options & options)
def test_modeIsSet(self):
"""
Every context must be in C{MODE_RELEASE_BUFFERS} mode.
"""
opts = sslverify.OpenSSLCertificateOptions(
privateKey=self.sKey,
certificate=self.sCert,
)
opts._contextFactory = FakeContext
ctx = opts.getContext()
self.assertEqual(SSL.MODE_RELEASE_BUFFERS, ctx._mode)
def test_singleUseKeys(self):
"""
If C{singleUseKeys} is set, every context must have
C{OP_SINGLE_DH_USE} and C{OP_SINGLE_ECDH_USE} set.
"""
opts = sslverify.OpenSSLCertificateOptions(
privateKey=self.sKey,
certificate=self.sCert,
enableSingleUseKeys=True,
)
opts._contextFactory = FakeContext
ctx = opts.getContext()
options = SSL.OP_SINGLE_DH_USE | SSL.OP_SINGLE_ECDH_USE
self.assertEqual(options, ctx._options & options)
def test_methodIsDeprecated(self):
"""
Passing C{method} to L{sslverify.OpenSSLCertificateOptions} is
deprecated.
"""
sslverify.OpenSSLCertificateOptions(
privateKey=self.sKey,
certificate=self.sCert,
method=SSL.SSLv23_METHOD,
)
message = ("Passing method to twisted.internet.ssl.CertificateOptions "
"was deprecated in Twisted 17.1.0. Please use a "
"combination of insecurelyLowerMinimumTo, raiseMinimumTo, "
"and lowerMaximumSecurityTo instead, as Twisted will "
"correctly configure the method.")
warnings = self.flushWarnings([self.test_methodIsDeprecated])
self.assertEqual(1, len(warnings))