aboutsummaryrefslogtreecommitdiffstats
path: root/awlsim/common/net.py
blob: b11dd8ce875e42c466a57ea996e56887042e6905 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
# -*- coding: utf-8 -*-
#
# AWL simulator - networking utility functions
#
# Copyright 2013-2018 Michael Buesch <m@bues.ch>
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
#

from __future__ import division, absolute_import, print_function, unicode_literals
#from awlsim.common.cython_support cimport * #@cy
from awlsim.common.compat import *

from awlsim.common.util import *
from awlsim.common.exceptions import *
import socket
import os


__all__ = [
	"AF_UNIX",
	"SocketErrors",
	"netGetAddrInfo",
	"netPortIsUnused",
]


if hasattr(socket, "AF_UNIX"): #@nocov
	AF_UNIX = socket.AF_UNIX
else: #@nocov
	AF_UNIX = None

SocketErrors = (socket.error if hasattr(socket, "error") else OSError,
		BlockingIOError,
		ConnectionError)


def netGetAddrInfo(host, port, family = None):
	"""getaddrinfo() wrapper.
	"""
	socktype = socket.SOCK_STREAM
	if family in {None, socket.AF_UNSPEC}:
		# First try IPv4
		try:
			family, socktype, proto, canonname, sockaddr =\
				socket.getaddrinfo(host, port,
						   socket.AF_INET,
						   socktype)[0]
		except socket.gaierror as e:
			if excErrno(e) == socket.EAI_ADDRFAMILY:
				# Also try IPv6
				family, socktype, proto, canonname, sockaddr =\
					socket.getaddrinfo(host, port,
							   socket.AF_INET6,
							   socktype)[0]
			else:
				raise e
	else:
		family, socktype, proto, canonname, sockaddr =\
			socket.getaddrinfo(host, port,
					   family,
					   socktype)[0]
	return (family, socktype, sockaddr)

def netPortIsUnused(host, port):
	"""Check if a port is not used.
	"""
	sock = None
	_SocketErrors = SocketErrors
	try:
		family, socktype, sockaddr = netGetAddrInfo(host, port)
		if family == AF_UNIX:
			if not os.path.exists(sockaddr):
				return True
			return False
		sock = socket.socket(family, socktype)
		sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
		sock.bind(sockaddr)
	except _SocketErrors as e:
		return False
	finally:
		if sock:
			with suppressAllExc:
				if hasattr(sock, "shutdown"):
					sock.shutdown(socket.SHUT_RDWR)
			with suppressAllExc:
				sock.close()
	return True
bues.ch cgit interface