Page MenuHomePhorge

No OneTemporary

diff --git a/utils/kolabendpointtester.py b/utils/kolabendpointtester.py
index 064d600f..922e30ce 100755
--- a/utils/kolabendpointtester.py
+++ b/utils/kolabendpointtester.py
@@ -1,635 +1,675 @@
#!/bin/env python3
"""
kolabendpointtester.py
--host apps.kolabnow.com
--user user@kolab.org
--password Secret
--dav https://apps.kolabnow.com
--fb https://apps.kolabnow.com/calendars/user@kolab.org/6f552d35-95c4-41f6-a7d2-cfd02dd867db
"""
import sys
import traceback
import socket
import ssl
import argparse
from base64 import b64encode
import http.client
import urllib.parse
from imaplib import IMAP4
from imaplib import IMAP4_SSL
from smtplib import SMTP
from smtplib import SMTP_SSL
import dns.resolver
+# print('\033[31m' + 'some red text')
+
+RED='\033[31m'
+GREEN='\033[32m'
+RESET='\033[39m'
+
SSLNOVERIFY = False
+def print_error(msg):
+ print(RED + f"=> ERROR: {msg}")
+ print(RESET) # and reset to default color
+
+def print_success(msg):
+ print(GREEN + f"=> {msg}")
+ print(RESET) # and reset to default color
+
+
def print_assertion_failure():
"""
Print an error message about a failed assertion
"""
_, _, trace = sys.exc_info()
tb_info = traceback.extract_tb(trace)
_filename, line, _func, text = tb_info[-1]
print(f" ERROR assertion on line {line} failed on {text}")
def http_request(url, method, params=None, headers=None, body=None, verbose=False):
"""
Perform an HTTP request.
"""
parsed_url = urllib.parse.urlparse(url)
# print("Connecting to ", parsed_url.netloc)
if url.startswith('https://'):
conn = http.client.HTTPSConnection(parsed_url.netloc, 443, context = (ssl._create_unverified_context() if SSLNOVERIFY else None))
else:
conn = http.client.HTTPConnection(parsed_url.netloc, 80)
if verbose:
conn.set_debuglevel(9)
conn.connect()
if params is None:
params = {}
if headers is None:
headers = {
"Content-Type": "application/x-www-form-urlencoded; charset=utf-8"
}
if body is None:
body = urllib.parse.urlencode(params)
# Assemble a relative url
url = urllib.parse.urlunsplit(["", "", parsed_url.path, parsed_url.query, parsed_url.fragment])
print(f"Requesting {url} From {parsed_url.netloc} Using {method}")
conn.request(method, url, body, headers)
response = conn.getresponse()
# Handle redirects
if response.status in (301, 302,):
print("Following redirect ", response.getheader('location', ''))
return http_request(
urllib.parse.urljoin(url, response.getheader('location', '')),
method,
params,
headers,
body,
verbose)
return response
def basic_auth_headers(username, password):
user_and_pass = b64encode(
f"{username}:{password}".encode("ascii")
).decode("ascii")
return {
"Authorization": "Basic {}".format(user_and_pass)
}
-
def try_get(name, url, verbose, headers = None, body = None):
- response = http_request(
- url,
- "GET",
- None,
- headers,
- body,
- verbose
- )
- success = response.status == 200
+ try:
+ response = http_request(
+ url,
+ "GET",
+ None,
+ headers,
+ body,
+ verbose
+ )
+ success = response.status == 200
+ except http.client.RemoteDisconnected:
+ print("Remote disconnected")
+ print_error(f"{name} is not available")
+ return False
+
if not success:
- print(f"=> Error: {name} is not available")
+ print_error(f"{name} is not available")
if verbose or not success:
print(" ", "Status", response.status)
print(" ", response.read().decode())
return success
def test_caldav_redirect(host, username, password, verbose):
headers = {
"Content-Type": "application/xml; charset=utf-8",
"Depth": "infinity",
**basic_auth_headers(username, password)
}
- response = http_request(
- "https://" + host + "/.well-known/caldav",
- "GET",
- None,
- headers,
- None,
- verbose
- )
+ try:
+ response = http_request(
+ "https://" + host + "/.well-known/caldav",
+ "GET",
+ None,
+ headers,
+ None,
+ verbose
+ )
+ except http.client.RemoteDisconnected:
+ print("Remote disconnected")
+ print_error(".well-known/caldav is not available")
+ return False
success = response.status in (200, 301, 302)
if not success:
- print("=> Error: .well-known/caldav is not available")
+ print_error(".well-known/caldav is not available")
if verbose or not success:
print(" ", "Status", response.status)
print(" ", response.read().decode())
return success
def discover_principal(url, username, password, verbose):
body = '<d:propfind xmlns:d="DAV:" xmlns:cs="https://calendarserver.org/ns/"><d:prop><d:resourcetype /><d:displayname /></d:prop></d:propfind>'
headers = {
"Content-Type": "application/xml; charset=utf-8",
"Depth": "infinity",
**basic_auth_headers(username, password)
}
- response = http_request(
- f"{url}/principals/{username}/",
- "PROPFIND",
- None,
- headers,
- body,
- verbose
- )
+ try:
+ response = http_request(
+ f"{url}/principals/{username}/",
+ "PROPFIND",
+ None,
+ headers,
+ body,
+ verbose
+ )
+ except http.client.RemoteDisconnected:
+ print("Remote disconnected")
+ print_error("Caldav is not available")
+ return False
success = response.status == 207
if not success:
- print("=> Error: Caldav is not available")
+ print_error("Caldav is not available")
if verbose or not success:
print(" ", "Status", response.status)
print(" ", response.read().decode())
return success
def test_freebusy_authenticated(url, username, password, verbose = False):
# Request our own freebusy authenticated
return try_get("Authenticated Freebusy", f"{url}/{username}.ifb", verbose, headers = basic_auth_headers(username, password))
def test_freebusy_unauthenticated(url, username, password, verbose = False):
return try_get("Unauthenticated Freebusy", f"{url}/{username}.ifb", verbose)
def test_autoconfig(host, username, password, verbose = False):
if not try_get("Autoconf .well-known", f"https://{host}/.well-known/autoconfig/mail/config-v1.1.xml?emailaddress={username}", verbose):
return False
if not try_get("Autoconf /mail", f"https://{host}/mail/config-v1.1.xml?emailaddress={username}", verbose):
return False
return True
# TODO
# def test_007_well_known_outlook():
# body = '''<Autodiscover \
# xmlns="http://schemas.microsoft.com/exchange/autodiscover/outlook/requestschema/2006">
# <Request>
# <EMailAddress>admin@example.local</EMailAddress>
# <AcceptableResponseSchema>
# http://schemas.microsoft.com/exchange/autodiscover/outlook/responseschema/2006a
# </AcceptableResponseSchema>
# </Request>
# </Autodiscover>'''
# headers = {
# "Content-Type": "text/xml; charset=utf-8"
# }
# response = http_post(
# "https://kolab-vanilla.{}.local/autodiscover/autodiscover.xml".format(hostname),
# None,
# headers,
# body
# )
# assert response.status == 200
# data = response.read()
# decoded = codecs.decode(data)
# # Sanity check of the data
# assert '<Server>example.local</Server>' in decoded
# assert "admin@example.local" in decoded
# # Ensure the alternative urls also work
# assert http_post(
# "https://kolab-vanilla.{}.local/Autodiscover/Autodiscover.xml".format(hostname),
# None,
# headers,
# body
# ).status == 200
# assert http_post(
# "https://kolab-vanilla.{}.local/AutoDiscover/AutoDiscover.xml".format(hostname),
# None,
# headers,
# body
# ).status == 200
def test_autodiscover_activesync(host, activesynchost, username, password, verbose = False):
"""
We expect something along the lines of
<?xml version="1.0" encoding="UTF-8"?>
<Autodiscover xmlns="http://schemas.microsoft.com/exchange/autodiscover/responseschema/2006">
<Response xmlns="http://schemas.microsoft.com/exchange/autodiscover/mobilesync/responseschema/2006">
<User>
<DisplayName>User Name</DisplayName>
<EMailAddress>user@example.com</EMailAddress>
</User>
<Action>
<Settings>
<Server>
<Type>MobileSync</Type>
<Url>https://kolab.example.com/Microsoft-Server-ActiveSync</Url>
<Name>https://kolab.example.com/Microsoft-Server-ActiveSync</Name>
</Server>
</Settings>
</Action>
</Response>
</Autodiscover>
"""
body = f'''<Autodiscover \
xmlns="http://schemas.microsoft.com/exchange/autodiscover/mobilesync/requestschema/2006">
<Request>
<EMailAddress>{username}</EMailAddress>
<AcceptableResponseSchema>
http://schemas.microsoft.com/exchange/autodiscover/mobilesync/responseschema/2006
</AcceptableResponseSchema>
</Request>
</Autodiscover>'''
headers = {
"Content-Type": "text/xml; charset=utf-8",
**basic_auth_headers(username, password)
}
- response = http_request(
- f"https://{host}/autodiscover/autodiscover.xml",
- "POST",
- None,
- headers,
- body,
- verbose
- )
+ try:
+ response = http_request(
+ f"https://{host}/autodiscover/autodiscover.xml",
+ "POST",
+ None,
+ headers,
+ body,
+ verbose
+ )
+ except http.client.RemoteDisconnected:
+ print("Remote disconnected")
+ print_error("Activesync autodiscover is not available")
+ return False
success = response.status == 200
data = response.read().decode()
if success:
try:
# Sanity check of the data
assert "<Type>MobileSync</Type>" in data
assert f"<Url>https://{activesynchost}/Microsoft-Server-ActiveSync</Url>" in data
assert username in data
except AssertionError:
print(data)
print_assertion_failure()
success = False
if not success:
- print("=> Error: Activesync autodiscover is not available")
+ print_error("Activesync autodiscover is not available")
if verbose or not success:
print(" ", "Status", response.status)
print(" ", data)
return success
def test_activesync(host, username, password, verbose = False):
headers = {
"Host": host,
**basic_auth_headers(username, password)
}
- response = http_request(
- f"https://{host}/Microsoft-Server-ActiveSync",
- "OPTIONS",
- None,
- headers,
- None,
- verbose
- )
+ try:
+ response = http_request(
+ f"https://{host}/Microsoft-Server-ActiveSync",
+ "OPTIONS",
+ None,
+ headers,
+ None,
+ verbose
+ )
+ except http.client.RemoteDisconnected:
+ print("Remote disconnected")
+ print_error("Activesync is not available")
+ return False
success = response.status == 200
data = response.read().decode()
if success:
try:
assert response.getheader('MS-Server-ActiveSync', '')
assert '14.1' in response.getheader('MS-ASProtocolVersions', '')
assert 'FolderSync' in response.getheader('MS-ASProtocolCommands', '')
except AssertionError:
print_assertion_failure()
success = False
if not success:
- print("=> Error: Activesync is not available")
+ print_error("Activesync is not available")
if verbose or not success:
print(" ", "Status", response.status)
print(" ", data)
return success
def test_dns(host, verbose = False):
success = True
try:
answers = dns.resolver.resolve(host, 'MX')
for rdata in answers:
print(' MX Host', rdata.exchange, 'has preference', rdata.preference)
except dns.resolver.NXDOMAIN:
success = False
print(" ERROR on MX record")
except dns.resolver.NoAnswer:
success = False
print(" ERROR on MX record")
try:
answers = dns.resolver.resolve(f"autodiscover.{host}", 'CNAME')
for rdata in answers:
print(' autodiscover CNAME', rdata.target)
except dns.resolver.NXDOMAIN:
success = False
print(f" ERROR on autodiscover.{host} CNAME entry")
except dns.resolver.NoAnswer:
success = False
print(f" ERROR on autodiscover.{host} CNAME entry")
srv_records = [
f"_autodiscover._tcp.{host}",
f"_caldav._tcp.{host}",
f"_caldavs._tcp.{host}",
f"_carddav._tcp.{host}",
f"_carddavs._tcp.{host}",
f"_imap._tcp.{host}",
f"_imaps._tcp.{host}",
f"_sieve._tcp.{host}",
f"_submission._tcp.{host}",
f"_webdav._tcp.{host}",
f"_webdavs._tcp.{host}",
]
for record in srv_records:
try:
answers = dns.resolver.resolve(record, 'SRV')
for rdata in answers:
print(" ", record, rdata.target)
except dns.resolver.NXDOMAIN:
success = False
print(" ERROR on record", record)
except dns.resolver.NoAnswer:
success = False
print(" ERROR on record", record)
if not success:
- print(f"=> Error: Dns entires on {host} not available")
+ print(f"=> ERROR: Dns entires on {host} not available")
return success
def test_email_dns(host, verbose = False):
success = True
srv_records = [
f"_autodiscover._tcp.{host}"
]
for record in srv_records:
try:
answers = dns.resolver.resolve(record, 'SRV')
for rdata in answers:
print(" ", record, rdata.target)
except dns.resolver.NXDOMAIN:
success = False
print(" ERROR on record", record)
except dns.resolver.NoAnswer:
success = False
print(" ERROR on record", record)
if not success:
- print(f"=> Error: Dns entires on {host} not available")
+ print(f"=> ERROR: Dns entires on {host} not available")
return success
def test_imap(host, user, password, verbose):
success = True
hosts = [
(host, 993, True, False),
# (host, 143, False, True),
]
for hosttuple in hosts:
if verbose:
print("Connecting to ", hosttuple)
host, port, usessl, starttls = hosttuple
try:
if usessl:
imap = IMAP4_SSL(host=host, port=port)
else:
imap = IMAP4(host=host, port=port)
if starttls:
imap.starttls()
imap.login(user, password)
status, list_response = imap.list()
assert status == 'OK'
for folder in list_response:
if 'INBOX' in folder.decode('utf-8'):
inbox_found = True
assert inbox_found
except AssertionError as err:
print(" ERROR on peer", hosttuple, err)
success = False
except Exception as err: # pylint: disable=broad-except
print(" ERROR on peer", hosttuple, err)
success = False
if not success:
- print("=> Error: IMAP failed")
+ print_error("IMAP failed")
return success
def test_smtp(host, user, password, verbose):
success = True
hosts = [
(host, 465, True, False),
(host, 587, False, True),
]
for hosttuple in hosts:
if verbose:
print("Connecting to ", hosttuple)
host, port, usessl, starttls = hosttuple
try:
if usessl:
smtp = SMTP_SSL(host=host, port=port)
else:
smtp = SMTP(host=host, port=port)
if starttls:
smtp.starttls()
# check we have an open socket
assert smtp.sock
# run a no-operation, which is basically a server-side pass-through
status, _response = smtp.noop()
assert status == 250
status, _response = smtp.login(user, password)
assert status == 235
status, _response = smtp.quit()
assert status == 221
except AssertionError as err:
print(" ERROR on peer", hosttuple, err)
success = False
except Exception as err: # pylint: disable=broad-except
print(" ERROR on peer", hosttuple, err)
success = False
if not success:
- print("=> Error: SMTP failed")
+ print_error("SMTP failed")
return success
def test_certificates(host, davhost, imaphost, verbose):
success = True
hosts = [
(host, 443),
]
if davhost:
hosts.append((urllib.parse.urlparse(davhost).netloc, 443))
if imaphost:
hosts.append((imaphost, 993))
hosts.append((imaphost, 465))
context = ssl.create_default_context()
for hosttuple in hosts:
hostname, _port = hosttuple
try:
conn = context.wrap_socket(socket.socket(socket.AF_INET), server_hostname=hostname)
conn.connect(hosttuple)
cert = conn.getpeercert()
if verbose:
print(f"Certificate for {hosttuple}: {cert}")
except OSError as err:
print(" ERROR on peer", hosttuple, err)
success = False
if not success:
- print("=> Error: Not all certificates are valid")
+ print_error("Not all certificates are valid")
return success
def main():
parser = argparse.ArgumentParser()
parser.add_argument("--host", help="Host")
parser.add_argument("--username", help="Username")
parser.add_argument("--password", help="User password")
parser.add_argument("--imap", help="IMAP URI")
parser.add_argument("--dav", help="DAV URI")
parser.add_argument("--activesync", help="ActiveSync URI")
parser.add_argument("--fb", help="Freebusy url as displayed in roundcube")
parser.add_argument("--verbose", action='store_true', help="Verbose output")
options = parser.parse_args()
error = False
if options.dav:
if discover_principal(options.dav, options.username, options.password, options.verbose):
- print("=> Caldav is available")
+ print_success("Caldav is available")
else:
error = True
if options.host:
if test_caldav_redirect(options.host, options.username, options.password, options.verbose):
- print("=> Caldav on .well-known/caldav is available")
+ print_success("Caldav on .well-known/caldav is available")
else:
# Kolabnow doesn't support this atm (it offers the redirect on apps.kolabnow.com
error = False
if test_autoconfig(options.host, options.username, options.password, options.verbose):
- print("=> Autoconf available")
+ print_success("Autoconf available")
else:
error = True
if options.activesync:
if test_autodiscover_activesync(options.host, options.activesync, options.username, options.password, options.verbose):
- print("=> Activesync Autodsicovery available")
+ print_success("Activesync Autodsicovery available")
else:
# Kolabnow doesn't support this
error = False
if test_activesync(options.activesync, options.username, options.password, options.verbose):
- print("=> Activesync available")
+ print_success("Activesync available")
else:
error = True
if options.fb:
if test_freebusy_authenticated(options.fb, options.username, options.password, options.verbose):
- print("=> Authenticated Freebusy is available")
+ print_success("Authenticated Freebusy is available")
else:
error = True
# We rely on the activesync test to have generated the token for unauthenticated access.
if test_freebusy_unauthenticated(options.fb, options.username, options.password, options.verbose):
- print("=> Unauthenticated Freebusy is available")
+ print_success("Unauthenticated Freebusy is available")
else:
error = True
if test_dns(options.host, options.verbose):
print(f"=> DNS entries on {options.host} available")
else:
error = True
userhost = options.username.split('@')[1]
if test_email_dns(userhost, options.verbose):
print(f"=> DNS entries on {userhost} available")
else:
error = True
if test_certificates(options.host, options.dav, options.imap, options.verbose):
- print("=> All certificates are valid")
+ print_success("All certificates are valid")
else:
error = True
if options.imap:
if test_imap(options.imap, options.username, options.password, options.verbose):
- print("=> IMAP is available")
+ print_success("IMAP is available")
else:
error = True
if test_smtp(options.imap, options.username, options.password, options.verbose):
- print("=> SMTP is available")
+ print_success("SMTP is available")
else:
error = True
if error:
print("At least one check failed")
sys.exit(1)
if __name__ == "__main__":
main()

File Metadata

Mime Type
text/x-diff
Expires
Sat, Apr 5, 1:14 AM (7 h, 24 m)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
175735
Default Alt Text
(22 KB)

Event Timeline