aboutsummaryrefslogtreecommitdiffstats
path: root/libpwman/argon2.py
blob: 493d42cd6e152f81a3b38320864c200d96eb66a5 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
# -*- coding: utf-8 -*-
"""
# Argon2 wrapper
# Copyright (c) 2023-2024 Michael Büsch <m@bues.ch>
# Licensed under the GNU/GPL version 2 or later.
"""

from libpwman.exception import PWManError

import gc
import os

__all__ = [
	"Argon2"
]

class Argon2:
	"""Abstraction layer for the Argon2 implementation.
	"""

	__singleton = None
	DEBUG = False

	@classmethod
	def get(cls):
		"""Get the Argon2 singleton.
		"""
		if cls.__singleton is None:
			cls.__singleton = cls()
		return cls.__singleton

	def __init__(self):
		self.__argon2cffi = None
		self.__argon2pure = None

		argon2lib = os.getenv("PWMAN_ARGON2LIB", "").lower().strip()

		if argon2lib in ("", "argon2-cffi", "argon2cffi"):
			# Try to use argon2-cffi
			try:
				import argon2
				self.__argon2cffi = argon2
				return
			except ImportError as e:
				pass

		if argon2lib == "argon2pure":
			# Use argon2pure, but only if explicitly selected,
			# because it's really really slow.
			try:
				import argon2pure
				self.__argon2pure = argon2pure
				return
			except ImportError as e:
				pass

		msg = "Python module import error."
		if argon2lib == "":
			msg += "\n'argon2-cffi' is not installed."
		else:
			msg += "\n'PWMAN_ARGON2LIB=%s' is not supported or not installed." % argon2lib
		raise PWManError(msg)

	def argon2id_v1p3(self, passphrase, salt, timeCost, memCost, parallel, keyLen):
		"""Run Argon2id v1.3.
		passphrase: The passphrase bytes.
		salt: The salt bytes.
		timeCost: The time cost, in number of iterations.
		memCost: The memory cost, in number of kiB.
		parallel: The number of parallel threads.
		keyLen: The number of bytes to return.
		"""

		# Check parameters.
		if (not isinstance(passphrase, bytes) or
		    len(passphrase) < 1 or
		    len(passphrase) > ((1 << 32) - 1)):
			raise PWManError("Argon2id: Invalid passphrase.")
		if (not isinstance(salt, bytes) or
		    len(salt) < 1 or
		    len(salt) > ((1 << 32) - 1)):
			raise PWManError("Argon2id: Invalid salt.")
		if (not isinstance(timeCost, int) or
		    timeCost < 1 or
		    timeCost > ((1 << 32) - 1)):
			raise PWManError("Argon2id: Invalid time cost.")
		if (not isinstance(parallel, int) or
		    parallel < 1 or
		    parallel > ((1 << 24) - 1)):
			raise PWManError("Argon2id: Invalid parallelism.")
		if (not isinstance(memCost, int) or
		    memCost < 8 * parallel or
		    memCost > ((1 << 32) - 1)):
			raise PWManError("Argon2id: Invalid memory cost.")
		if (not isinstance(keyLen, int) or
		    keyLen < 1 or
		    keyLen > ((1 << 32) - 1)):
			raise PWManError("Argon2id: Invalid hash length.")

		# Memory is locked (for security reasons)
		# and we might not have much of it.
		# Try to free some unused memory to avoid OOM.
		gc.collect()

		if self.DEBUG:
			import time
			begin = time.time()

		key = None
		try:
			if self.__argon2cffi is not None:
				# Use argon2-cffi.
				low_level = self.__argon2cffi.low_level
				key = low_level.hash_secret_raw(
					secret=passphrase,
					salt=salt,
					time_cost=timeCost,
					memory_cost=memCost,
					parallelism=parallel,
					hash_len=keyLen,
					type=low_level.Type.ID,
					version=0x13,
				)
			elif self.__argon2pure is not None:
				# Use argon2pure.
				# Avoid subprocesses:
				# Do not use multiprocessing to keep all memory locked.
				# Subprocesses do not inherit mlockall().
				argon2pure = self.__argon2pure
				key = argon2pure.argon2(
					password=passphrase,
					salt=salt,
					time_cost=timeCost,
					memory_cost=memCost,
					parallelism=parallel,
					tag_length=keyLen,
					type_code=argon2pure.ARGON2ID,
					threads=1, # no threads
					use_threads=True, # no subprocesses
					version=0x13,
				)
		except Exception as e:
			raise PWManError("Argon2 error: %s: %s" % (type(e), str(e)))
		if key is None:
			raise PWManError("Argon2 not implemented.")

		if self.DEBUG:
			print("Argon2id took %.02f s." % (time.time() - begin))

		return key

	@classmethod
	def quickSelfTest(cls):
		"""Run a quick algorithm self test.
		"""
		inst = cls.get()
		h = inst.argon2id_v1p3(
			passphrase=b"namwp",
			salt=(b"pwman"*4),
			timeCost=4,
			memCost=16,
			parallel=2,
			keyLen=32,
		)
		if h != bytes.fromhex("6aa4b71bbf34cce1383577f2fcedecf1074fa7e1f5a664e00cf92f509fb54a35"):
			raise PWManError("Argon2id: Quick self test failed.")
bues.ch cgit interface