mailsignaturevalidator.py 15.4 KB
Newer Older
Markus Kötter's avatar
Markus Kötter committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32

#!/bin/env python3
import argparse
import logging
import logging.config
import configparser
import datetime
import time
import imaplib
import re
import itertools
import email
import os
import stat

import M2Crypto.SMIME
import M2Crypto.X509
import M2Crypto
import hashlib

import asn1crypto.cms
from OpenSSL import crypto
from asn1crypto import x509, core, pem
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.asymmetric import padding

from jinja2 import FileSystemLoader, Environment
from email.mime.text import MIMEText
import smtplib


log = logging.getLogger()
Markus Kötter's avatar
Markus Kötter committed
33
log.setLevel(logging.DEBUG)
Markus Kötter's avatar
Markus Kötter committed
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138

logging.config.dictConfig({
        'version': 1,
        'formatters': {
                'detailed': {
                        'class': 'logging.Formatter',
                        'format': '%(asctime)s %(name)-9s %(levelname)-4s %(message)s'
                }
        },
        'handlers': {
                'console': {
                        'class': 'logging.StreamHandler',
                        'level': 'DEBUG',
						'formatter': 'detailed',
                },
        },
        'root': {
                'level': 'DEBUG',
                'handlers': ['console']
        },
}
)

def pemfiles(imdir):
	return map(lambda name: os.path.join(imdir, name), filter(lambda x: os.path.splitext(x)[1] == '.pem', os.listdir(imdir)))

def prepare(args):
	cfg = configparser.ConfigParser()

	if args.config:
		s = os.stat(args.config.name)
		if s.st_uid != os.geteuid():
			raise PermissionError('{} has to be owned by uid {}'.format(args.config.name, os.geteuid()))
		if s.st_mode & (stat.S_IRWXG|stat.S_IRWXO|stat.S_IXUSR):
			raise PermissionError('{} has to be 0600'.format(args.config.name))
		cfg.read_file(args.config)

	server = cfg.get('credentials', 'server')
	user = cfg.get('credentials', 'user')

	password = cfg.get('credentials', 'password', fallback=None)
	if password is None:
		print("enter password for {} on server {}".format(user,server))
		password = input(">")
		cfg.set('credentials','password', password)
	return cfg

def login(cfg):
	m = imaplib.IMAP4_SSL(cfg.get('credentials','server'))
	m.login(cfg.get('credentials','user'), cfg.get('credentials','password'))
#	m.select("[Gmail]/All Mail")
	return m

def grouper(iterable, n, fillvalue=None):
	"Collect data into fixed-length chunks or blocks"
	# grouper('ABCDEFG', 3, 'x') --> ABC DEF Gxx"
	args = [iter(iterable)] * n
	return itertools.zip_longest(*args, fillvalue=fillvalue)


def perms(directory):
	if not os.path.isdir(directory):
		raise ValueError("{} is not a directory".format(directory))
	if not os.access(directory, os.W_OK):
		raise PermissionError("{} is not writable".format(directory))


def validate(mail):
	"""
	validate signature using M2Crypto SMIME (OpenSSL pkcs7_verify)
	:param mail:
	:param intermediates:
	:return:
	"""
	if 'multipart/signed' == mail.get_content_type():
		if len(mail.get_payload()) != 2:
			raise ValueError()

		# extract signature & data
		if mail.get_payload(0).get_content_type() == 'application/pkcs7-signature':
			(p7, data) = mail.get_payload()
		else:
			(data, p7) = mail.get_payload()

		# convert
		_p7 = p7.get_payload(decode=True)
		p7 = M2Crypto.SMIME.load_pkcs7_bio_der(M2Crypto.BIO.MemoryBuffer(_p7))
		data = data.as_string().encode('utf-8').replace(b'\n', b'\r\n')

		# prepare x509 validation
		# root certificates
		store = M2Crypto.X509.X509_Store()
		for i in pemfiles('certs/root'):
			store.load_info(i)

		# possible signers
		stack = p7.get0_signers(M2Crypto.X509.X509_Stack())

		# smime
		s = M2Crypto.SMIME.SMIME()
		s.set_x509_store(store)
		s.set_x509_stack(stack)

		try:
			# hash data
139
140
141
142
143
144
145
146
			micalg = mail.get_param('micalg')
			if '.' in micalg: # oid
				# use OpenSSL OBJ_txt2obj to resolve
				micalg = M2Crypto.m2.obj_txt2obj(micalg, 1)
				micalg = M2Crypto.m2.obj_obj2txt(micalg, 0)
				micalg = micalg.decode('ascii')
			else:
				micalg = micalg.lower().replace('-', '') or 'sha1' # sha-256 -> sha256
Markus Kötter's avatar
Markus Kötter committed
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
			digest = M2Crypto.BIO.MemoryBuffer(hashlib.new(micalg, data).digest())
			try:
				# compare hash with signed hash in smime signature
				s.verify(p7, digest, flags=M2Crypto.SMIME.PKCS7_DETACHED | M2Crypto.SMIME.PKCS7_SIGNED)
			except M2Crypto.SMIME.PKCS7_Error as e0:
				log.warning("Invalid Signature")
				return False, _p7, data, micalg, digest
		except Exception as e1:
			log.exception(e1)
			raise ValueError(e1)
		else:
			log.info("Valid signature")
			return True, _p7, data, micalg, digest
	else:
		raise ValueError("not signed")


def verify_chain(certs, cert_pem):
	"""
	verify chain using pyca/cryptography
	:param certs:
	:param cert_pem:
	:return:
	"""
	# Create a X590StoreContext with the cert and trusted certs
	# and verify the the chain of trust
	store = crypto.X509Store()
	store.add_cert(crypto.load_certificate(crypto.FILETYPE_PEM, open('certs/root/dfnglobal2-ttelesecca.pem', 'rb').read()))

	for c in certs:
		store.add_cert(c)

	ctx = crypto.X509StoreContext(store, cert_pem)
	# Returns None if certificate can be validated
	try:
		ctx.verify_certificate()
#		chain = ctx.get1_chain()
		return True
	except crypto.X509StoreContextError:
		return False
	except Exception as e:
		log.exception(e)
		return False

Markus Kötter's avatar
Markus Kötter committed
191
def render(p7, data, micalg, digest, debug=False, tplname='default.tpl'):
Markus Kötter's avatar
Markus Kötter committed
192
193
194
195
196
197
198
199
200
201
202
	"""
	render signature details

	:param p7:
	:param data:
	:param micalg:
	:param digest:
	:return:
	"""
	loader = FileSystemLoader(os.path.abspath(os.path.normpath(os.path.join('.','tpl'))))
	env = Environment(loader=loader, extensions=['jinja2.ext.loopcontrols'])
Markus Kötter's avatar
Markus Kötter committed
203
	template = env.get_template(tplname)
Markus Kötter's avatar
Markus Kötter committed
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254

	# endesive had some ideas how to
	# https://github.com/m32/endesive/blob/master/endesive/verifier.py#L41
	cms = asn1crypto.cms.ContentInfo.load(p7)
	signed_data = cms['content']
	signature = signed_data['signer_infos'][0]['signature'].native
	algo = signed_data['digest_algorithms'][0]['algorithm'].native
	attrs = signed_data['signer_infos'][0]['signed_attrs']
	mdData = getattr(hashlib, algo)(data).digest()
	if attrs is not None and not isinstance(attrs, core.Void):
		mdSigned = None
		for attr in attrs:
			if attr['type'].native == 'message_digest':
				mdSigned = attr['values'].native[0]
		signedData = attrs.dump()
		signedData = b'\x31' + signedData[1:]
	else:
		mdSigned = mdData
		signedData = data
	hashok = mdData == mdSigned

	serial = signed_data['signer_infos'][0]['sid'].native['serial_number']
	public_key = None
	for cert in signed_data['certificates']:
		if serial == cert.native['tbs_certificate']['serial_number']:
			cert = cert.dump()
			cert = pem.armor(u'CERTIFICATE', cert)
			sigcert = crypto.load_certificate(crypto.FILETYPE_PEM, cert)
			public_key = sigcert.get_pubkey().to_cryptography_key()
			break

	try:
		public_key.verify(
			signature,
			signedData,
			padding.PKCS1v15(),
			getattr(hashes, algo.upper())()
		)
		signatureok = True
	except:
		signatureok = False

	certs = [crypto.load_certificate(crypto.FILETYPE_PEM, pem.armor(u'CERTIFICATE', cert.dump()).decode()) for cert in signed_data['certificates']]
	rcerts = []
	for i,cert in enumerate(certs):
		subcerts = [certs[j] for j in filter(lambda c: c != i, range(len(certs)))]
		certok = verify_chain(subcerts, cert)
		rcerts.append((cert, certok))

#	chain = getchain(M2Crypto.SMIME.load_pkcs7_bio_der(M2Crypto.BIO.MemoryBuffer(p7)), digest)

Markus Kötter's avatar
Markus Kötter committed
255
	return template.render(hashok=hashok, hash={'micalg':algo.upper(), 'digest':mdData}, signatureok=signatureok, certok=min([c[1] for c in rcerts]), cert=sigcert, certs=rcerts, cms=cms, debug=debug)
Markus Kötter's avatar
Markus Kötter committed
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338

g_chain = []


def getchain(p7, data):
	"""
	FIXME - does not work - not yet possible

	Idea was: get a valid chain using known but untrusted intermediate certificates
	this valid chain could be used to advise which certificates were missing

	X509_STORE_CTX_get1_chain seems handy, but it is not possible to use a custom list of untrusted intermediate
	certificates for PKCS7_verify
	There is no X509_verify in M2Crypto which could be used in conjunction with X509_Store & X509_Store_Context
	There is X509_verify in pyOpenSSL, but it lacks X509_STORE_CTX get1_chain

	:param p7:
	:param data:
	:return:
	"""
	global g_chain
	def verify_cb(ok, ctx):
		print(ctx)
		global g_chain
		if True: #ok == 1:  # no error
			g_chain = [i for i in ctx.get1_chain()]
			for i in g_chain:
				print('s: {} {}'.format(i.get_subject().as_text(), i.get_serial_number()))
				print('i: {} {}'.format(i.get_issuer().as_text(), i.get_serial_number()))
		else:
			g_chain = []
		return ok

	store = M2Crypto.X509.X509_Store()
	store.set_verify_cb(verify_cb)

	# root
	for i in pemfiles('certs/root/'):
		store.load_info(i)

	# intermediates
	stack = M2Crypto.X509.X509_Stack()
	for i in list(pemfiles('certs/intermediate/'))[::-1]:
		c = M2Crypto.X509.load_cert(i)
		print('stack {}'.format(c.get_subject().as_text()))
		stack.push(c)

	for i in stack:
		print('a {} {}'.format(i.get_subject().as_text(), i.get_serial_number()))

	stack = p7.get0_signers(stack)

	for i in stack:
		print('b {} {}'.format(i.get_subject().as_text(), i.get_serial_number()))

	# smime
	s = M2Crypto.SMIME.SMIME()
	s.set_x509_store(store)
	s.set_x509_stack(stack)
	try:
		s.verify(p7, data, flags=M2Crypto.SMIME.PKCS7_DETACHED | M2Crypto.SMIME.PKCS7_SIGNED)
		print("ok")
	except M2Crypto.SMIME.PKCS7_Error:
		print("fail")
		pass

	return g_chain

def respond(cfg, body, old):
	msg = MIMEText(body)
	msg["Subject"] = 'Re: ' + old['Subject']
	msg["From"] = old['To']
	msg["To"] = old['From']

	msg.add_header("In-Reply-To", old['Message-ID'])
	msg.add_header("References", old['Message-ID'])

	s = smtplib.SMTP("smtp.uni-hannover.de", 587)
	s.starttls()
	s.login(cfg.get('credentials','user'), cfg.get('credentials','password'))
	s.sendmail(old['To'], [old['From']], msg.as_string())


Markus Kötter's avatar
Markus Kötter committed
339
def process(cfg, m, msgids, action, moveto=None, debug=False):
340
341
342
343
344
345
346
347
	for msgid in filter(lambda x: x != '', msgids[0].decode('utf-8').split(' ')[::-1]):
		try:
			typ, msg = m.fetch(msgid, '(RFC822)')
		except imaplib.IMAP4.error as e:
			log.exception(e)
			log.info("msgids {}".format(msgids))
			log.info("msgid {}".format(msgid))
			return
Markus Kötter's avatar
Markus Kötter committed
348
349
350
		for part in msg:
			if not isinstance(part, tuple):
				continue
Markus Kötter's avatar
Markus Kötter committed
351
			# FIXME python3.6 + will have EmailMessage and require policy=email.policy.compat32 …
Markus Kötter's avatar
Markus Kötter committed
352
			mail = email.message_from_bytes(part[1])
Markus Kötter's avatar
Markus Kötter committed
353

354
355
356
			HEADERS = frozenset(['From', 'To', 'Date', 'Subject'])
			if len(set(mail.keys()) & HEADERS) != 4:
				log.warning("missing header {}".format(HEADERS - set(mail.keys())))
Markus Kötter's avatar
Markus Kötter committed
357
				continue
358
			date = email.utils.parsedate_to_datetime(mail['Date'])
Markus Kötter's avatar
Markus Kötter committed
359
360
361
			Subject = mail['Subject']
			if '+debug' in Subject:
				debug = True
Markus Kötter's avatar
Markus Kötter committed
362
			log.info('Processing {date} "{From}" "{To}" "{Subject}"'.format(date=date, **mail))
363
364
			# valid is 3 state
			# True False None, None is error
Markus Kötter's avatar
Markus Kötter committed
365
366
367
368
369
			try:
				valid, p7, data, micalg, digest = validate(mail)
			except Exception as e:
				log.exception(e)
				valid = None
370
371
			else:
				try:
Markus Kötter's avatar
Markus Kötter committed
372
373
					response = render(p7, data, micalg, digest, debug=debug)
					respond(cfg, response, mail)
374
375
376
				except Exception as e:
					log.exception(e)
					valid = None
Markus Kötter's avatar
Markus Kötter committed
377
			finally:
378
379
380
381
382
				if action == 'move':
					# move
					assert valid in moveto
					target = moveto[valid]
					log.info('Moving to {}'.format(target))
383
					state, _ = m.copy(msgid, target)
384
385
386
387
388
389
390
391
392
					assert state == 'OK'
					state, _ = m.store(msgid, '+FLAGS', '\Deleted')
					assert state == 'OK'
				elif action == 'see':
					log.debug("Marked as Seen")
					m.store(msgid, '+FLAGS', '\Seen')
				elif action == 'delete':
					m.store(msgid, '+FLAGS', '\Deleted')
					log.debug("Deleted Message")
393
				m.expunge()
394
395
396

			# this msg is processed
			break
Markus Kötter's avatar
Markus Kötter committed
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470


def main():
	parser = argparse.ArgumentParser(description='Mail Cryptographic Signature Validator')
	parser.add_argument('-c', '--config', type=argparse.FileType('rt', encoding='UTF-8'), help='ini configuration')

	sub = parser.add_subparsers()

	cmd = sub.add_parser('list')
	def cmd_list(args):
		list_response_pattern = re.compile(u'\((?P<flags>.*?)\) "(?P<delimiter>.*)" (?P<name>.*)')
		try:
			m = login(cfg)
			t, data = m.list()
			for line in data:
				flags, delimiter, mailbox = list_response_pattern.match(line.decode('utf-8')).groups()
				state, data = m.status(mailbox, '(MESSAGES UNSEEN)')
				if state == 'OK':
					data = data[0].decode()[len(mailbox) + 2:-1] # skip "NAME (" ")"

					status = dict(grouper(data.split(' '),2))
					print('\t{} (all:{} unseen:{})'.format(mailbox, status['MESSAGES'], status['UNSEEN']))
		except Exception as e:
			log.exception(e)
		finally:
			m.logout()
	cmd.set_defaults(func=cmd_list)


	cmd = sub.add_parser('search')
	cmd.add_argument('mailbox')
	cmd.add_argument('subject', nargs='?', default=None)
	def cmd_search(args):
		try:
			m = login(cfg)
			t, data = m.select(args.mailbox, readonly=True)
			t, data = m.search(None, '(SUBJECT "{}")'.format(args.subject) if args.subject else 'ALL')
			for msgid in data[0].decode('utf-8').split(' '):
				typ, msg = m.fetch(msgid, '(RFC822)')
				for part in msg:
					if not isinstance(part, tuple):
						continue
					mail = email.message_from_bytes(part[1])
					if len(set(mail.keys()) & set(['From','To','Date','Subject'])) != 4:
						continue
					date = email.utils.parsedate_to_datetime(mail['Date'])
					print('{date}\n\t"{Subject}"\n\t\t{From} ->\n\t\t\t{To}'.format(date=date, **mail))
		except Exception as e:
			log.exception(e)
		finally:
			m.logout()
	cmd.set_defaults(func=cmd_search)

	cmd = sub.add_parser('store')
	cmd.add_argument('mailbox')
	cmd.add_argument('--subject', nargs='?', default=None)
	cmd.add_argument('--extension', action='append')
	cmd.add_argument('--unseen', default=False, action='store_true')
	cmd.add_argument('--action', choices=['see','delete'], default=None)
	cmd.add_argument('directory', type=str, default='/tmp/scan2mail/')
	def cmd_store(args):
		perms(args.directory)
		try:
			m = login(cfg)
			state, data = m.select(args.mailbox, readonly=args.action is None)
			assert state == 'OK'

			pattern = ''
			pattern += '(SUBJECT "{}") '.format(args.subject) if args.subject else 'ALL'
			pattern += '(UNSEEN) '.format(args.subject) if args.unseen else ''

			t, data = m.search(None, pattern)
			assert state == 'OK'

471
			process(cfg, m, data, args.extension, args.directory, args.action, moveto=None)
Markus Kötter's avatar
Markus Kötter committed
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488

		except Exception as e:
			log.exception(e)
		finally:
			m.logout()
	cmd.set_defaults(func=cmd_store)


	cmd = sub.add_parser('debug')
	cmd.add_argument('eml', type=argparse.FileType('rb'))
	def cmd_debug(args):
		mail = email.message_from_binary_file(args.eml)
		try:
			valid, p7, data, micalg, digest = validate(mail)
		except Exception as e:
			log.exception(e)
			return
Markus Kötter's avatar
Markus Kötter committed
489
		response = render(p7, data, micalg, digest, debug=True, tplname='debug.tpl')
Markus Kötter's avatar
Markus Kötter committed
490
491
492
493
494
		print(response)
	cmd.set_defaults(func=cmd_debug)

	cmd = sub.add_parser('run')
	def cmd_run(args):
495
496
497
498
499
500
		action = cfg.get('cmd_run', 'action')
		assert action in ['see','delete','move']
		moveto = {}
		if action == 'move':
			for k,v in {True:'good',False:'bad',None:'error'}.items():
				moveto[k] = cfg.get('cmd_run', 'move_on_{}'.format(v))
Markus Kötter's avatar
Markus Kötter committed
501
502
503
504
505
506
507
508
509
510
		pattern = '(UNSEEN) '
		pattern += '(SUBJECT "{}") '.format(cfg.get('cmd_run', 'subject'))

		while True:
			try:
				m = login(cfg)
				state, data = m.select(cfg.get('cmd_run','mailbox'), readonly=False)
				assert state == 'OK'
				while True:
					state, data = m.search(None, pattern)
511
					process(cfg, m, data, action, moveto)
Markus Kötter's avatar
Markus Kötter committed
512
513
514
					time.sleep(10)
			except Exception as e:
				log.exception(e)
515
			time.sleep(10)
Markus Kötter's avatar
Markus Kötter committed
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
	cmd.set_defaults(func=cmd_run)


	args = parser.parse_args()

	try:
		cfg = prepare(args)
	except Exception as e:
		log.exception(e)
		return

	if hasattr(args, 'func'):
		args.func(args)
	else:
		parser.print_usage()

if __name__ == '__main__':
	main()