Page Menu
Home
Phorge
Search
Configure Global Search
Log In
Files
F2527498
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Flag For Later
Award Token
Size
29 KB
Referenced Files
None
Subscribers
None
View Options
diff --git a/utils/activesynccli.py b/utils/activesynccli.py
new file mode 100755
index 00000000..3173ec09
--- /dev/null
+++ b/utils/activesynccli.py
@@ -0,0 +1,344 @@
+#!/bin/env python3
+
+"""
+activesynccli.py
+ --host apps.kolabnow.com
+ --devicetype WindowsOutlook15
+ --deviceid windowsascli
+ --user user@kolab.org --password Secret
+ --verbose
+ list --folder INBOX
+
+# Dependencies
+
+ dnf install libwbxml-devel
+ pip install --global-option=build_ext --global-option="-I/usr/include/libwbxml-1.0/wbxml/" git+https://github.com/Apheleia-IT/python-wbxml#egg=wbxml
+
+"""
+
+import argparse
+import base64
+import http.client
+import urllib.parse
+import struct
+import xml.etree.ElementTree as ET
+import ssl
+import wbxml
+
+
+def decode_timezone(tz):
+ decoded = base64.b64decode(tz)
+ bias, standardName, standardDate, standardBias, daylightName, daylightDate, daylightBias = struct.unpack('i64s16si64s16si', decoded)
+ print(f" TimeZone bias: {bias}min")
+ print(f" Standard Name: {standardName.decode()}")
+ year, month, day, week, hour, minute, second, millis = struct.unpack('hhhhhhhh', standardDate)
+ print(f" Standard Date: Year: {year} Month: {month} Day: {day} Week: {week} Hour: {hour} Minute: {minute} Second: {second} Millisecond: {millis}")
+ print(f" Daylight Name: {daylightName.decode()}")
+ year, month, day, week, hour, minute, second, millis = struct.unpack('hhhhhhhh', daylightDate)
+ print(f" Daylight Date: Year: {year} Month: {month} Day: {day} Week: {week} Hour: {hour} Minute: {minute} Second: {second} Millisecond: {millis}")
+ print(f" Daylight Bias: {daylightBias}min")
+ print()
+
+
+def http_request(url, method, params=None, headers=None, body=None):
+ """
+ Perform an HTTP request.
+ """
+
+ # print(url)
+ parsed_url = urllib.parse.urlparse(url)
+ # print("Connecting to ", parsed_url.netloc)
+ if url.startswith('https://'):
+ conn = http.client.HTTPSConnection(parsed_url.netloc, 443, context = ssl._create_unverified_context())
+ else:
+ conn = http.client.HTTPConnection(parsed_url.netloc, 80)
+
+ if params is None:
+ params = {}
+
+ if headers is None:
+ headers = {
+ "Content-Type": "application/x-www-form-urlencoded; charset=utf-8"
+ }
+
+ if body is None:
+ body = urllib.parse.urlencode(params)
+
+ # print("Requesting", parsed_url.geturl(), "From", parsed_url.netloc)
+ conn.request(method, parsed_url.geturl(), body, headers)
+ response = conn.getresponse()
+
+ # Handle redirects
+ if response.status in (301, 302,):
+ # print("Following redirect ", response.getheader('location', ''))
+ return http_request(
+ urllib.parse.urljoin(url, response.getheader('location', '')),
+ method,
+ params,
+ headers,
+ body)
+
+ if not response.status == 200:
+ print(" ", "Status", response.status)
+ print(" ", response.read().decode())
+
+ return response
+
+
+def basic_auth_headers(username, password):
+ user_and_pass = base64.b64encode(
+ f"{username}:{password}".encode("ascii")
+ ).decode("ascii")
+
+ return {
+ "Authorization": "Basic {}".format(user_and_pass)
+ }
+
+
+def try_get(name, url, verbose, headers = None, body = None):
+ response = http_request(
+ url,
+ "GET",
+ None,
+ headers,
+ body
+ )
+ success = response.status == 200
+ if not success:
+ print(f"=> Error: {name} is not available")
+
+ if verbose or not success:
+ print(" ", "Status", response.status)
+ print(" ", response.read().decode())
+
+ return success
+
+
+class ActiveSync:
+ def __init__(self, options):
+ self.host = options.host
+ self.username = options.username
+ self.password = options.password
+ self.verbose = options.verbose
+
+ if options.deviceid:
+ self.deviceid = options.deviceid
+ else:
+ self.deviceid = 'v140Device'
+
+ if options.devicetype:
+ self.devicetype = options.devicetype
+ else:
+ self.devicetype = 'iphone'
+
+ if hasattr(options, 'folder') and options.folder:
+ self.folder = options.folder
+ else:
+ self.folder = None
+
+
+ def send_request(self, command, request, extra_args = None):
+ body = wbxml.xml_to_wbxml(request)
+
+ headers = {
+ "Host": self.host,
+ **basic_auth_headers(self.username, self.password)
+ }
+
+ headers.update(
+ {
+ "Content-Type": "application/vnd.ms-sync.wbxml",
+ 'MS-ASProtocolVersion': "14.0",
+ }
+ )
+
+ if extra_args is None:
+ extra_args = ""
+
+ return http_request(
+ f"https://{self.host}/Microsoft-Server-ActiveSync?Cmd={command}&User={self.username}&DeviceId={self.deviceid}&DeviceType={self.devicetype}{extra_args}",
+ "POST",
+ None,
+ headers,
+ body
+ )
+
+
+ def check(self):
+ headers = {
+ "Host": self.host,
+ **basic_auth_headers(self.username, self.password)
+ }
+
+ response = http_request(
+ f"https://{self.host}/Microsoft-Server-ActiveSync",
+ "OPTIONS",
+ None,
+ headers,
+ None
+ )
+
+ success = response.status == 200
+ data = response.read().decode()
+ if not success:
+ print("=> Error: Activesync is not available")
+ else:
+ # Sanity check of the data
+ assert response.getheader('MS-Server-ActiveSync', '')
+ assert '14.1' in response.getheader('MS-ASProtocolVersions', '')
+ assert 'FolderSync' in response.getheader('MS-ASProtocolCommands', '')
+
+ if self.verbose or not success:
+ print(" ", "Status", response.status)
+ print(" ", data)
+
+ return success
+
+
+ def fetch(self, collection_id, sync_key = 0):
+ request = """
+ <?xml version="1.0" encoding="utf-8"?>
+ <!DOCTYPE AirSync PUBLIC "-//AIRSYNC//DTD AirSync//EN" "http://www.microsoft.com/">
+ <Sync xmlns="uri:AirSync" xmlns:AirSyncBase="uri:AirSyncBase">
+ <Collections>
+ <Collection>
+ <SyncKey>{sync_key}</SyncKey>
+ <CollectionId>{collection_id}</CollectionId>
+ <DeletesAsMoves>0</DeletesAsMoves>
+ <DeletesAsMoves>0</DeletesAsMoves>
+ <WindowSize>512</WindowSize>
+ <Options>
+ <FilterType>0</FilterType>
+ <MIMESupport>2</MIMESupport>
+ <MIMETruncation>8</MIMETruncation>
+ <BodyPreference xmlns="uri:AirSyncBase">
+ <Type>4</Type>
+ <AllOrNone>1</AllOrNone>
+ </BodyPreference>
+ </Options>
+ </Collection>
+ </Collections>
+ <WindowSize>512</WindowSize>
+ </Sync>
+ """.replace(' ', '').replace('\n', '')
+
+ response = self.send_request('Sync', request.format(collection_id=collection_id, sync_key=sync_key))
+
+ assert response.status == 200
+
+ result = wbxml.wbxml_to_xml(response.read())
+
+ if self.verbose:
+ print(result)
+
+ root = ET.fromstring(result)
+ xmlns = "http://synce.org/formats/airsync_wm5/airsync"
+ sync_key = root.find(f".//{{{xmlns}}}SyncKey").text
+ more_available = (len(root.findall(f".//{{{xmlns}}}MoreAvailable")) == 1)
+ if self.verbose:
+ print("Current SyncKey:", sync_key)
+
+ for add in root.findall(f".//{{{xmlns}}}Add"):
+ serverId = add.find(f"{{{xmlns}}}ServerId").text
+ print(" ServerId", serverId)
+ applicationData = add.find(f"{{{xmlns}}}ApplicationData")
+
+ calxmlns = "http://synce.org/formats/airsync_wm5/calendar"
+ subject = applicationData.find(f"{{{calxmlns}}}Subject")
+ if subject is not None:
+ print(" Subject", subject.text)
+ startTime = applicationData.find(f"{{{calxmlns}}}StartTime")
+ if startTime is not None:
+ print(" StartTime", startTime.text)
+ timeZone = applicationData.find(f"{{{calxmlns}}}TimeZone")
+ if timeZone is not None:
+ decode_timezone(timeZone.text)
+ #the dates are encoded like so: vstdyear/vstdmonth/vstdday/vstdweek/vstdhour/vstdminute/vstdsecond/vstdmillis
+ decoded = base64.b64decode(timeZone.text)
+ bias, standardName, standardDate, standardBias, daylightName, daylightDate, daylightBias = struct.unpack('i64s16si64s16si', decoded)
+ print(f" TimeZone bias: {bias}min")
+ print("")
+
+
+ print("\n")
+
+ # Fetch after the initial sync
+ if sync_key == "1":
+ print("after initial sync")
+ self.fetch(collection_id, sync_key)
+
+ # Fetch more
+ if more_available:
+ print("more available")
+ print(root.findall(f".//{{{xmlns}}}MoreAvailable"))
+ self.fetch(collection_id, sync_key)
+
+
+
+ def list(self):
+ request = """
+ <?xml version="1.0" encoding="utf-8"?>
+ <!DOCTYPE ActiveSync PUBLIC "-//MICROSOFT//DTD ActiveSync//EN" "http://www.microsoft.com/">
+ <FolderSync xmlns="FolderHierarchy:">
+ <SyncKey>0</SyncKey>
+ </FolderSync>
+ """.replace(' ', '').replace('\n', '')
+
+ response = self.send_request('FolderSync', request)
+
+ assert response.status == 200
+
+ result = wbxml.wbxml_to_xml(response.read())
+
+ if self.verbose:
+ print(result)
+
+ root = ET.fromstring(result)
+ xmlns = "http://synce.org/formats/airsync_wm5/folderhierarchy"
+ sync_key = root.find(f".//{{{xmlns}}}SyncKey").text
+ if self.verbose:
+ print("Current SyncKey:", sync_key)
+
+ for add in root.findall(f".//{{{xmlns}}}Add"):
+ displayName = add.find(f"{{{xmlns}}}DisplayName").text
+ serverId = add.find(f"{{{xmlns}}}ServerId").text
+ print("ServerId", serverId)
+ print("DisplayName", displayName)
+
+ if self.folder and displayName == self.folder:
+ self.fetch(serverId)
+
+
+
+def main():
+ parser = argparse.ArgumentParser()
+
+ parser.add_argument("--host", help="Host")
+ parser.add_argument("--user", help="Username")
+ parser.add_argument("--password", help="User password")
+ parser.add_argument("--verbose", action='store_true', help="Verbose output")
+ parser.add_argument("--deviceid", help="Device identifier ")
+ parser.add_argument("--devicetype", help="devicetype (WindowsOutlook15, iphone)")
+
+ subparsers = parser.add_subparsers()
+
+ parser_list = subparsers.add_parser('decode_timezone')
+ parser_list.add_argument("timezone", help="Base64 encoded timezone string ('Lv///0lyYW....///w==') ")
+ parser_list.set_defaults(func=lambda args: decode_timezone(args.timezone))
+
+ parser_list = subparsers.add_parser('list')
+ parser_list.add_argument("--folder", help="Folder")
+ parser_list.set_defaults(func=lambda args: ActiveSync(args).list())
+
+ parser_check = subparsers.add_parser('check')
+ parser_check.set_defaults(func=lambda args: ActiveSync(args).check())
+
+ options = parser.parse_args()
+
+ if 'func' in options:
+ options.func(options)
+
+
+
+if __name__ == "__main__":
+ main()
diff --git a/utils/generatemail.py b/utils/generatemail.py
new file mode 100755
index 00000000..23945817
--- /dev/null
+++ b/utils/generatemail.py
@@ -0,0 +1,97 @@
+#!/bin/env python3
+"""
+ Generate a bunch of dummy messages in a folder.
+"""
+# import glob
+# import os
+from datetime import datetime, timedelta
+import random
+import argparse
+
+
+mailtemplate = '''
+Return-Path: <christian@example.ch>
+Received: from imapb010.mykolab.com ([unix socket])
+ by imapb010.mykolab.com (Cyrus 2.5.10-49-g2e214b4-Kolab-2.5.10-8.1.el7.kolab_14) with LMTPA;
+ Wed, 09 Aug 2017 18:37:01 +0200
+X-Sieve: CMU Sieve 2.4
+Received: from int-mx002.mykolab.com (unknown [10.9.13.2])
+ by imapb010.mykolab.com (Postfix) with ESMTPS id 0A93910A25047
+ for <christian@example.ch>; Wed, 9 Aug 2017 18:37:01 +0200 (CEST)
+Received: from int-subm002.mykolab.com (unknown [10.9.37.2])
+ by int-mx002.mykolab.com (Postfix) with ESMTPS id EC06AF6E
+ for <christian@example.ch>; Wed, 9 Aug 2017 18:37:00 +0200 (CEST)
+MIME-Version: 1.0
+Content-Type: multipart/mixed;
+ boundary="=_291b8e96564265636432c6d494e02322"
+Date: {date}
+From: "Mollekopf, Christian" <christian@example.ch>
+To: christian@example.ch
+Subject: {subject}
+Message-ID: {messageid}
+
+--=_291b8e96564265636432c6d494e02322
+Content-Type: multipart/alternative;
+ boundary="=_ceff0fd19756f45ed1295ee2069ff8e0"
+
+--=_ceff0fd19756f45ed1295ee2069ff8e0
+Content-Transfer-Encoding: 7bit
+Content-Type: text/plain; charset=US-ASCII
+
+sdlkjsdjf
+--=_ceff0fd19756f45ed1295ee2069ff8e0
+Content-Transfer-Encoding: quoted-printable
+Content-Type: text/html; charset=UTF-8
+
+<html><head><meta http-equiv=3D"Content-Type" content=3D"text/html; charset=
+=3DUTF-8" /></head><body style=3D'font-size: 10pt; font-family: Verdana,Gen=
+eva,sans-serif'>
+<p>sdlkjsdjf</p>
+
+</body></html>
+
+--=_ceff0fd19756f45ed1295ee2069ff8e0--
+
+--=_291b8e96564265636432c6d494e02322
+Content-Transfer-Encoding: base64
+Content-Type: text/plain;
+ name=xorg.conf
+Content-Disposition: attachment;
+ filename=xorg.conf;
+ size=211
+
+U2VjdGlvbiAiRGV2aWNlIgogICAgSWRlbnRpZmllciAgICAgIkRldmljZTAiCiAgICBEcml2ZXIg
+{attachment}ICAgIEJvYXJkTmFtZSAgICAgICJOVlMgNDIwME0iCiAgICBPcHRpb24gIk5vTG9nbyIgInRydWUi
+CiAgICBPcHRpb24gIlVzZUVESUQiICJ0cnVlIgpFbmRTZWN0aW9uCg==
+--=_291b8e96564265636432c6d494e02322--
+'''.strip()
+
+
+def populatemailbox(target_directory, count):
+ dtstamp = datetime.utcnow()
+
+ # Reproducible results
+ random.seed(30)
+
+ for i in range(1, count + 1):
+ dtstamp = dtstamp - timedelta(seconds=600)
+
+ attachmentMultiplier = 50000 * random.randint(0, 10) # Approx 20 MB
+ result = mailtemplate.format(
+ messageid="<foobar{}@example.org>".format(i),
+ subject="Foobar {}".format(i),
+ date=dtstamp.strftime("%a, %d %b %Y %H:%M:%S %z"),
+ attachment='ICAgIEJvYXJkTmFtZSAgICAgICJOVlMgNDIwME0iCiAgICBPcHRpb24gIk5vTG9nbyIgInRydWUi\n' * attachmentMultiplier
+ )
+ fname = "{}/{}.".format(target_directory, i)
+ with open(fname, 'wb') as f:
+ f.write(result.encode())
+
+
+parser = argparse.ArgumentParser(description='Generate some mail.')
+parser.add_argument('target_directory', help='the target directory')
+parser.add_argument('--count', help='Number of emails to generate', type=int)
+
+args = parser.parse_args()
+
+populatemailbox(args.target_directory, args.count)
diff --git a/utils/kolabendpointtester.py b/utils/kolabendpointtester.py
new file mode 100755
index 00000000..28075a9b
--- /dev/null
+++ b/utils/kolabendpointtester.py
@@ -0,0 +1,452 @@
+#!/bin/env python3
+
+"""
+kolabendpointtester.py
+ --host apps.kolabnow.com
+ --user user@kolab.org
+ --password Secret
+ --dav https://apps.kolabnow.com
+ --fb https://apps.kolabnow.com/calendars/user@kolab.org/6f552d35-95c4-41f6-a7d2-cfd02dd867db
+"""
+
+import sys
+import traceback
+import socket
+import ssl
+import argparse
+from base64 import b64encode
+import http.client
+import urllib.parse
+import dns.resolver
+
+SSLNOVERIFY = False
+
+def print_assertion_failure():
+ """
+ Print an error message about a failed assertion
+ """
+ _, _, trace = sys.exc_info()
+ tb_info = traceback.extract_tb(trace)
+ _filename, line, _func, text = tb_info[-1]
+ print(f" ERROR assertion on line {line} failed on {text}")
+
+
+def http_request(url, method, params=None, headers=None, body=None):
+ """
+ Perform an HTTP request.
+ """
+
+ parsed_url = urllib.parse.urlparse(url)
+ # print("Connecting to ", parsed_url.netloc)
+ if url.startswith('https://'):
+ conn = http.client.HTTPSConnection(parsed_url.netloc, 443, context = (ssl._create_unverified_context() if SSLNOVERIFY else None))
+ else:
+ conn = http.client.HTTPConnection(parsed_url.netloc, 80)
+
+ if params is None:
+ params = {}
+
+ if headers is None:
+ headers = {
+ "Content-Type": "application/x-www-form-urlencoded; charset=utf-8"
+ }
+
+ if body is None:
+ body = urllib.parse.urlencode(params)
+
+ print("Requesting", parsed_url.path, "From", parsed_url.netloc)
+ conn.request(method, parsed_url.geturl(), body, headers)
+ response = conn.getresponse()
+
+ # Handle redirects
+ if response.status in (301, 302,):
+ print("Following redirect ", response.getheader('location', ''))
+ return http_request(
+ urllib.parse.urljoin(url, response.getheader('location', '')),
+ method,
+ params,
+ headers,
+ body)
+
+ return response
+
+
+def basic_auth_headers(username, password):
+ user_and_pass = b64encode(
+ f"{username}:{password}".encode("ascii")
+ ).decode("ascii")
+
+ return {
+ "Authorization": "Basic {}".format(user_and_pass)
+ }
+
+
+def try_get(name, url, verbose, headers = None, body = None):
+ response = http_request(
+ url,
+ "GET",
+ None,
+ headers,
+ body
+ )
+ success = response.status == 200
+ if not success:
+ print(f"=> Error: {name} is not available")
+
+ if verbose or not success:
+ print(" ", "Status", response.status)
+ print(" ", response.read().decode())
+
+ return success
+
+
+def discover_principal(url, username, password, verbose = False):
+ body = '<d:propfind xmlns:d="DAV:" xmlns:cs="https://calendarserver.org/ns/"><d:prop><d:resourcetype /><d:displayname /></d:prop></d:propfind>'
+
+ headers = {
+ "Content-Type": "application/xml; charset=utf-8",
+ "Depth": "infinity",
+ **basic_auth_headers(username, password)
+ }
+
+ response = http_request(
+ f"{url}/principals/{username}/",
+ "PROPFIND",
+ None,
+ headers,
+ body
+ )
+
+ success = response.status == 207
+ if not success:
+ print("=> Error: Caldav is not available")
+
+ if verbose or not success:
+ print(" ", "Status", response.status)
+ print(" ", response.read().decode())
+
+ return success
+
+
+def test_freebusy_authenticated(url, username, password, verbose = False):
+ # Request our own freebusy authenticated
+ return try_get("Authenticated Freebusy", f"{url}/{username}.ifb", verbose, headers = basic_auth_headers(username, password))
+
+
+def test_freebusy_unauthenticated(url, username, password, verbose = False):
+ return try_get("Unauthenticated Freebusy", f"{url}/{username}.ifb", verbose)
+
+
+def test_autoconfig(host, username, password, verbose = False):
+ if not try_get("Autoconf .well-known", f"https://{host}/.well-known/autoconfig/mail/config-v1.1.xml?emailaddress={username}", verbose):
+ return False
+ if not try_get("Autoconf /mail", f"https://{host}/mail/config-v1.1.xml?emailaddress={username}", verbose):
+ return False
+
+# TODO
+# def test_007_well_known_outlook():
+# body = '''<Autodiscover \
+# xmlns="http://schemas.microsoft.com/exchange/autodiscover/outlook/requestschema/2006">
+# <Request>
+# <EMailAddress>admin@example.local</EMailAddress>
+# <AcceptableResponseSchema>
+# http://schemas.microsoft.com/exchange/autodiscover/outlook/responseschema/2006a
+# </AcceptableResponseSchema>
+# </Request>
+# </Autodiscover>'''
+
+# headers = {
+# "Content-Type": "text/xml; charset=utf-8"
+# }
+# response = http_post(
+# "https://kolab-vanilla.{}.local/autodiscover/autodiscover.xml".format(hostname),
+# None,
+# headers,
+# body
+# )
+# assert response.status == 200
+# data = response.read()
+# decoded = codecs.decode(data)
+# # Sanity check of the data
+# assert '<Server>example.local</Server>' in decoded
+# assert "admin@example.local" in decoded
+
+# # Ensure the alternative urls also work
+# assert http_post(
+# "https://kolab-vanilla.{}.local/Autodiscover/Autodiscover.xml".format(hostname),
+# None,
+# headers,
+# body
+# ).status == 200
+
+# assert http_post(
+# "https://kolab-vanilla.{}.local/AutoDiscover/AutoDiscover.xml".format(hostname),
+# None,
+# headers,
+# body
+# ).status == 200
+
+
+def test_autodiscover_activesync(host, username, password, verbose = False):
+ """
+ We expect something along the lines of
+
+ <?xml version="1.0" encoding="UTF-8"?>
+ <Autodiscover xmlns="http://schemas.microsoft.com/exchange/autodiscover/responseschema/2006">
+ <Response xmlns="http://schemas.microsoft.com/exchange/autodiscover/mobilesync/responseschema/2006">
+ <User>
+ <DisplayName>User Name</DisplayName>
+ <EMailAddress>user@example.com</EMailAddress>
+ </User>
+ <Action>
+ <Settings>
+ <Server>
+ <Type>MobileSync</Type>
+ <Url>https://kolab.example.com/Microsoft-Server-ActiveSync</Url>
+ <Name>https://kolab.example.com/Microsoft-Server-ActiveSync</Name>
+ </Server>
+ </Settings>
+ </Action>
+ </Response>
+ </Autodiscover>
+ """
+
+ body = f'''<Autodiscover \
+xmlns="http://schemas.microsoft.com/exchange/autodiscover/mobilesync/requestschema/2006">
+ <Request>
+ <EMailAddress>{username}</EMailAddress>
+ <AcceptableResponseSchema>
+ http://schemas.microsoft.com/exchange/autodiscover/mobilesync/responseschema/2006
+ </AcceptableResponseSchema>
+ </Request>
+</Autodiscover>'''
+
+ headers = {
+ "Content-Type": "text/xml; charset=utf-8",
+ **basic_auth_headers(username, password)
+ }
+
+ response = http_request(
+ f"https://{host}/autodiscover/autodiscover.xml",
+ "POST",
+ None,
+ headers,
+ body
+ )
+
+ success = response.status == 200
+ data = response.read().decode()
+ if success:
+ try:
+ # Sanity check of the data
+ assert "<Type>MobileSync</Type>" in data
+ assert f"<Url>https://{host}/Microsoft-Server-ActiveSync</Url>" in data
+ assert username in data
+ except AssertionError:
+ print_assertion_failure()
+ success = False
+
+ if not success:
+ print("=> Error: Activesync autodiscover is not available")
+
+ if verbose or not success:
+ print(" ", "Status", response.status)
+ print(" ", data)
+
+ return success
+
+
+def test_activesync(host, username, password, verbose = False):
+ headers = {
+ "Host": host,
+ **basic_auth_headers(username, password)
+ }
+
+ response = http_request(
+ f"https://{host}/Microsoft-Server-ActiveSync",
+ "OPTIONS",
+ None,
+ headers,
+ None
+ )
+
+ success = response.status == 200
+ data = response.read().decode()
+ if success:
+ try:
+ assert response.getheader('MS-Server-ActiveSync', '')
+ assert '14.1' in response.getheader('MS-ASProtocolVersions', '')
+ assert 'FolderSync' in response.getheader('MS-ASProtocolCommands', '')
+ except AssertionError:
+ print_assertion_failure()
+ success = False
+
+ if not success:
+ print("=> Error: Activesync is not available")
+
+ if verbose or not success:
+ print(" ", "Status", response.status)
+ print(" ", data)
+
+ return success
+
+
+def test_dns(host, verbose = False):
+ success = True
+ try:
+ answers = dns.resolver.resolve(host, 'MX')
+ for rdata in answers:
+ print(' MX Host', rdata.exchange, 'has preference', rdata.preference)
+ except dns.resolver.NXDOMAIN:
+ success = False
+ print(" ERROR on MX record")
+ except dns.resolver.NoAnswer:
+ success = False
+ print(" ERROR on MX record")
+
+ try:
+ answers = dns.resolver.resolve(f"autodiscover.{host}", 'CNAME')
+ for rdata in answers:
+ print(' autodiscover CNAME', rdata.target)
+ except dns.resolver.NXDOMAIN:
+ success = False
+ print(" ERROR on autodiscover. CNAME entry")
+ except dns.resolver.NoAnswer:
+ success = False
+ print(" ERROR on autodiscover. CNAME entry")
+
+ srv_records = [
+ f"_autodiscover._tcp.{host}",
+ f"_caldav._tcp.{host}",
+ f"_caldavs._tcp.{host}",
+ f"_carddav._tcp.{host}",
+ f"_carddavs._tcp.{host}",
+ f"_imap._tcp.{host}",
+ f"_imaps._tcp.{host}",
+ f"_sieve._tcp.{host}",
+ f"_submission._tcp.{host}",
+ f"_webdav._tcp.{host}",
+ f"_webdavs._tcp.{host}",
+ ]
+
+ for record in srv_records:
+ try:
+ answers = dns.resolver.resolve(record, 'SRV')
+ for rdata in answers:
+ print(" ", record, rdata.target)
+ except dns.resolver.NXDOMAIN:
+ success = False
+ print(" ERROR on record", record)
+ except dns.resolver.NoAnswer:
+ success = False
+ print(" ERROR on record", record)
+
+ if not success:
+ print(f"=> Error: Dns entires on {host} not available")
+
+ return success
+
+
+def test_email_dns(host, verbose = False):
+ success = True
+
+ srv_records = [
+ f"_autodiscover._tcp.{host}"
+ ]
+
+ for record in srv_records:
+ try:
+ answers = dns.resolver.resolve(record, 'SRV')
+ for rdata in answers:
+ print(" ", record, rdata.target)
+ except dns.resolver.NXDOMAIN:
+ success = False
+ print(" ERROR on record", record)
+ except dns.resolver.NoAnswer:
+ success = False
+ print(" ERROR on record", record)
+
+ if not success:
+ print(f"=> Error: Dns entires on {host} not available")
+
+ return success
+
+def test_certificates(host, davhost, imaphost, verbose):
+ success = True
+ hosts = [
+ (host, 443),
+ ]
+
+ if davhost:
+ hosts.append((urllib.parse.urlparse(davhost).netloc, 443))
+ if imaphost:
+ hosts.append((imaphost, 993))
+ hosts.append((imaphost, 587))
+
+ context = ssl.create_default_context()
+
+ for hosttuple in hosts:
+ hostname, _port = hosttuple
+ try:
+ conn = context.wrap_socket(socket.socket(socket.AF_INET), server_hostname=hostname)
+ conn.connect(hosttuple)
+ cert = conn.getpeercert()
+ if verbose:
+ print(cert)
+ except OSError as err:
+ print(" ERROR on peer", hosttuple, err)
+ success = False
+
+ if not success:
+ print("=> Error: Not all certificates are valid")
+
+ return success
+
+
+def main():
+ parser = argparse.ArgumentParser()
+ parser.add_argument("--host", help="Host")
+ parser.add_argument("--username", help="Username")
+ parser.add_argument("--password", help="User password")
+ parser.add_argument("--imap", help="IMAP URI")
+ parser.add_argument("--dav", help="DAV URI")
+ parser.add_argument("--fb", help="Freebusy url as displayed in roundcube")
+ parser.add_argument("--verbose", action='store_true', help="Verbose output")
+ options = parser.parse_args()
+
+ if options.dav:
+ if discover_principal(options.dav, options.username, options.password, options.verbose):
+ print("=> Caldav is available")
+
+ if discover_principal("https://" + options.host + "/.well-known/caldav", options.username, options.password, options.verbose):
+ print("=> Caldav on .well-known/caldav is available")
+
+ if test_autoconfig(options.host, options.username, options.password, options.verbose):
+ print("=> Autoconf available")
+
+ if test_autodiscover_activesync(options.host, options.username, options.password, options.verbose):
+ print("=> Activesync Autodsicovery available")
+
+ if test_activesync(options.host, options.username, options.password, options.verbose):
+ print("=> Activesync available")
+
+ if options.fb and test_freebusy_authenticated(options.fb, options.username, options.password, options.verbose):
+ print("=> Authenticated Freebusy is available")
+
+ # We rely on the activesync test to have generated the token for unauthenticated access.
+ if options.fb and test_freebusy_unauthenticated(options.fb, options.username, options.password, options.verbose):
+ print("=> Unauthenticated Freebusy is available")
+
+ if test_dns(options.host, options.verbose):
+ print(f"=> DNS entries on {options.host} available")
+
+ userhost = options.username.split('@')[1]
+ if test_email_dns(userhost, options.verbose):
+ print(f"=> DNS entries on {userhost} available")
+
+ if test_certificates(options.host, options.dav, options.imap, options.verbose):
+ print("=> All certificates are valid")
+
+
+if __name__ == "__main__":
+ main()
File Metadata
Details
Attached
Mime Type
text/x-diff
Expires
Fri, Jan 30, 8:15 PM (1 h, 22 m)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
426038
Default Alt Text
(29 KB)
Attached To
Mode
R2 kolab
Attached
Detach File
Event Timeline
Log In to Comment