Fix broken modification dates in Nextcloud folders
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

193 lines
7.2 KiB

import argparse
import datetime
import filecmp
import os
import tempfile
import time
import urllib.parse
import sys
import xml.etree.ElementTree as ET
import requests
# some constants
FILES_PATH_PREFIX = "/remote.php/dav/files/"
VERSIONS_PATH_PREFIX = "/remote.php/dav/versions/"
# the threshold for file timestamps (dates older than this are considered invalid)
DATE_THRESHOLD = datetime.datetime(1990, 1, 1)
# we only need one session for the whole script
SESSION = requests.Session()
def propfind(path, auth):
"""
Get a file's Last Modified timestamp and FileID via a PROPFIND request
:param path: The path of the file in question
:param auth: Auth data for the HTTP request (e.g. a requests.auth.HTTPBasicAuth object)
:return: An iterator of dictionaries, one for every directory entry. Entry properties are taken from the PROPFIND
response
"""
# do not descend further into subdirectories\
# TODO: we could probably be faster if we did
headers = {"Depth": "1"}
# This body returns only the timelastmodified and the fileid variable
requested_data = \
"""
<d:propfind xmlns:d=\"DAV:\" xmlns:oc=\"http://owncloud.org/ns\" xmlns:nc=\"http://nextcloud.org/ns\">
<d:prop>
<d:getlastmodified />
<d:resourcetype />
<oc:fileid />
</d:prop>
</d:propfind>
"""
req = requests.Request("PROPFIND", path, headers=headers, auth=auth, data=requested_data)
resp = SESSION.send(req.prepare())
et = ET.fromstring(resp.text)
for dav_response in et.findall('{DAV:}response'):
entry = {}
entry["path"] = dav_response.find("{DAV:}href").text
# skip this entry itself
if path.endswith(entry["path"]):
continue
props = dav_response.find("{DAV:}propstat").find("{DAV:}prop")
try:
entry["last_modified"] = datetime.datetime.strptime(
props.find("{DAV:}getlastmodified").text,
"%a, %d %b %Y %H:%M:%S GMT"
)
except (AttributeError, TypeError):
pass
entry["resource_type"] = []
try:
for resourcetype in props.find("{DAV:}resourcetype"):
entry["resource_type"].append(resourcetype.tag)
except (AttributeError, TypeError):
pass
try:
entry["file_id"] = int(props.find("{http://owncloud.org/ns}fileid").text)
except (AttributeError, TypeError):
pass
yield entry
def find_valid_version(versions):
"""
This function returns the fileid of the version of a given fileid with the most current timestamp or None if
there are no versions with a timestamp younger than the threshold
:param versions: An iterator as returned by propfind()
:return: The entry of the iterator which has the most recent date or None if none exists
"""
# mock entry for comparison
most_recent = {"last_modified": DATE_THRESHOLD}
for version in versions:
if "last_modified" in version and version["last_modified"] > most_recent["last_modified"]:
most_recent = version
if most_recent["last_modified"] == DATE_THRESHOLD:
return None
return most_recent
def download_file(path, auth):
"""
This function downloads one file and saves it on the local device.
:param path: The path to the file in question
:param auth: Auth data for the HTTP request (e.g. a requests.auth.HTTPBasicAuth object)
:return: The path of the created file
"""
r = requests.request(
method='get',
url=path,
auth=auth
)
_, filename = os.path.split(path)
if r.status_code == 200:
with open(filename, 'wb') as file:
file.write(r.content)
return filename
return ''
def content_equal(original_entry, fixed_version, auth):
"""
Compares the two file versions for replacement.
:param original_entry: Filename of the entry with wrong timestamp.
:param fixed_version: Filename of the version for comparison with the original.
:param auth: Auth data for the HTTP request (e.g. a requests.auth.HTTPBasicAuth object)
:return: True, if they are equal in Metadata anc content. False otherwise.
"""
original = download_file(original_entry, auth)
fixed = download_file(fixed_version, auth)
# shallow comparison
shallow = filecmp.cmp(original, fixed)
# deep comparison
deep = filecmp.cmp(original, fixed, shallow=False)
if deep != shallow:
print(deep)
os.remove(original)
os.remove(fixed)
return deep and shallow
#TODO: parallelisieren
if __name__ == "__main__":
# get all necessary data from the command line
argparser = argparse.ArgumentParser(description="Fix broken dates in Nextcloud folders.")
argparser.add_argument("server", help="The base URL of the Nextcloud server.")
argparser.add_argument("username", help="The user to log in as.")
argparser.add_argument("password", help="The password for accessing Nextcloud. Hint: Use an App Token!")
argparser.add_argument(
"-p", "--path",
default="/",
help="The path to search, relative to the user's root. Default: /",
dest="search_path"
)
arguments = argparser.parse_args()
# Prepare HTTP Basic Authentication
auth = requests.auth.HTTPBasicAuth(arguments.username, arguments.password)
# Prepare the path we want to use
mainpath = FILES_PATH_PREFIX + arguments.username + arguments.search_path
# List of all folders we need to enter
folders = [mainpath]
# List of all entries with wrong time
wrongtime = []
restore_coumt = 0
fixed_count = 0
touch_count = 0
# Iterate through all folders and check for wrong timestamps
while folders:
url = arguments.server + folders.pop(0)
print("+", end="", flush=True)
for entry in propfind(url, auth):
print(".", end="", flush=True)
# put directories in search list
if "resource_type" in entry and "{DAV:}collection" in entry["resource_type"]:
folders.append(entry["path"])
# put files with wrong date in wrong date list (we don't know what to do if a directory has an invalid date)
elif "last_modified" in entry and entry["last_modified"] < DATE_THRESHOLD:
wrongtime.append(entry)
# Iterate through all fileids with wrong timestamps and check for versions with intact timestamp
print()
# NOTE: you can indent this into the loop above to fix things on-the-fly instead of all at once
# TODO: indented for quicker access to examples
for entry in wrongtime:
# print(urllib.parse.unquote(entry["path"][len(FILES_PATH_PREFIX):]))
fixed_version = find_valid_version(propfind(
arguments.server + VERSIONS_PATH_PREFIX + arguments.username + "/versions/" + str(entry["file_id"]),
auth))
if fixed_version:
fixed_count+=1
if fixed_version and content_equal(arguments.server + entry['path'], arguments.server + fixed_version['path'], auth):
# print("Restore from {}".format(fixed_version))
restore_coumt +=1
else:
touch_count +=1
# print("Touch file.")
print(restore_coumt)
print(touch_count)