Fix broken modification dates in Nextcloud folders
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

249 lines
9.3 KiB

import argparse
import datetime
import filecmp
import multiprocessing
import os
import tempfile
import time
import urllib.parse
import sys
import uuid
import xml.etree.ElementTree as ET
from pathlib import Path
import requests
# some constants
FILES_PATH_PREFIX = "/remote.php/dav/files/"
VERSIONS_PATH_PREFIX = "/remote.php/dav/versions/"
# the threshold for file timestamps (dates older than this are considered invalid)
DATE_THRESHOLD = datetime.datetime(1990, 1, 1)
# we only need one session for the whole script
SESSION = requests.Session()
def propfind(path, auth):
"""
Get a file's Last Modified timestamp and FileID via a PROPFIND request
:param path: The path of the file in question
:param auth: Auth data for the HTTP request (e.g. a requests.auth.HTTPBasicAuth object)
:return: An iterator of dictionaries, one for every directory entry. Entry properties are taken from the PROPFIND
response
"""
# do not descend further into subdirectories\
# TODO: we could probably be faster if we did
headers = {"Depth": "infinity"} # "1"}
# This body returns only the timelastmodified and the fileid variable
requested_data = \
"""
<d:propfind xmlns:d=\"DAV:\" xmlns:oc=\"http://owncloud.org/ns\" xmlns:nc=\"http://nextcloud.org/ns\">
<d:prop>
<d:getlastmodified />
<d:resourcetype />
<oc:fileid />
</d:prop>
</d:propfind>
"""
req = requests.Request("PROPFIND", path, headers=headers, auth=auth, data=requested_data)
resp = SESSION.send(req.prepare())
et = ET.fromstring(resp.text)
for dav_response in et.findall('{DAV:}response'):
entry = {}
entry["path"] = dav_response.find("{DAV:}href").text
# skip this entry itself
if path.endswith(entry["path"]):
continue
props = dav_response.find("{DAV:}propstat").find("{DAV:}prop")
try:
entry["last_modified"] = datetime.datetime.strptime(
props.find("{DAV:}getlastmodified").text,
"%a, %d %b %Y %H:%M:%S GMT"
)
except (AttributeError, TypeError):
pass
entry["resource_type"] = []
try:
for resourcetype in props.find("{DAV:}resourcetype"):
entry["resource_type"].append(resourcetype.tag)
except (AttributeError, TypeError):
pass
try:
entry["file_id"] = int(props.find("{http://owncloud.org/ns}fileid").text)
except (AttributeError, TypeError):
pass
yield entry
def find_valid_version(versions):
"""
This function returns the fileid of the version of a given fileid with the most current timestamp or None if
there are no versions with a timestamp younger than the threshold
:param versions: An iterator as returned by propfind()
:return: The entry of the iterator which has the most recent date or None if none exists
"""
all_versions = {}
for version in versions:
if "last_modified" in version and DATE_THRESHOLD < version["last_modified"]:
all_versions[version["last_modified"]] = version
if len(all_versions) == 0:
return None
return all_versions
def download_file(path, auth):
"""
This function downloads one file and saves it on the local device.
:param path: The path to the file in question
:param auth: Auth data for the HTTP request (e.g. a requests.auth.HTTPBasicAuth object)
:return: The path of the created file or '' if no file could be downloaded
"""
r = requests.request(
method='get',
url=path,
auth=auth
)
_, filename = os.path.split(path)
if r.status_code == 200:
with open(filename, 'wb') as file:
file.write(r.content)
return filename
return ''
def upload_file(local_path, remote_path, auth):
"""
Uploads a file to the cloud
:param local_path: File path of the file to be uploaded
:param remote_path: Path where it should be uploaded on the cloud
:param auth: Auth data for the HTTP request
:return: True if the file was successfully uploaded, False otherwise
"""
r = requests.put(
url=remote_path,
auth=auth,
data=open(local_path, 'rb').read()
)
if 200 <= r.status_code < 300:
return True
return False
def content_equal(original, fixed_version, auth):
"""
Compares the two file versions for replacement.
:param original: local filename of the entry with wrong timestamp.
:param fixed_version: Filename of the version for comparison with the original.
:param auth: Auth data for the HTTP request (e.g. a requests.auth.HTTPBasicAuth object)
:return: True, if they are equal in Metadata and content. False otherwise. Also if one or both files couldn't
be downloaded
"""
fixed = download_file(fixed_version, auth)
if original == '' or fixed == '':
# TODO: sth better if the downloading failed?
return False
# shallow comparison
shallow = filecmp.cmp(original, fixed)
# deep comparison
deep = filecmp.cmp(original, fixed, shallow=False)
os.remove(fixed)
return deep and shallow
def restore_file(packed):
"""
Handles one file. Searches for the latest older version with intact timestamp and compares them.
:param packed: data needed for one file: (entry, arguments, auth). entry represents the original file, arguments
are the runtime arguments and auth is for the http authentification
"""
entry, arguments, auth = packed
fixed_versions = find_valid_version(propfind(
arguments.server + VERSIONS_PATH_PREFIX + arguments.username + "/versions/" + str(entry["file_id"]),
auth))
original = download_file(arguments.server + entry['path'], auth)
restored = False
if fixed_versions is not None and len(fixed_versions) > 0:
keys = sorted(fixed_versions, reverse=True ) # sort dates descending to start with latest version
for i in range(0, len(keys)):
if content_equal(original, arguments.server + fixed_versions[keys[i]]['path'], auth):
# found latest matching version
# print("Restore from {}".format(fixed_versions[keys[i]]))
restored = restore_by_version(arguments.server + fixed_versions[keys[i]]['path'], auth, arguments)
break # stop looking any further
if not restored:
# print("Touch file.")
restored = restore_by_touch(arguments.server + entry['path'], original, auth)
if not restored:
print('File couldn\'t be restored: ' + entry['path'])
os.remove(original)
def restore_by_version(path_version, auth, args):
"""
Restores the given old version of a file
:param path_version: cloud path to the version to be restored
:param auth: Auth data for the HTTP request
:param args: Runtime arguments
:return: True if the version was successfully restored, false otherwise
"""
# uuid4 should create a random uuid
headers = {"Destination": args.server + VERSIONS_PATH_PREFIX + args.username + "/restore/" + str(uuid.uuid4())}
r = requests.request(
method='move',
url=path_version,
auth=auth,
headers=headers
)
if 200 <= r.status_code < 300:
return True
return False
def restore_by_touch(path, local_path, auth):
"""
Restores a file by touch: Touch on cloud isn't possible so the file is simply downloaded and uploaded again
:param path: The cloud path to the file in question
:param local_path: The local path to the file in question
:param auth: Auth data for the HTTP request
:return: True if the restoring was successful, False otherwise.
"""
if local_path != '':
return upload_file(local_path, path, auth)
else:
return False
if __name__ == "__main__":
# get all necessary data from the command line
argparser = argparse.ArgumentParser(description="Fix broken dates in Nextcloud folders.")
argparser.add_argument("server", help="The base URL of the Nextcloud server.")
argparser.add_argument("username", help="The user to log in as.")
argparser.add_argument("password", help="The password for accessing Nextcloud. Hint: Use an App Token!")
argparser.add_argument(
"-p", "--path",
default="/",
help="The path to search, relative to the user's root. Default: /",
dest="search_path"
)
arguments = argparser.parse_args()
# Prepare HTTP Basic Authentication
auth = requests.auth.HTTPBasicAuth(arguments.username, arguments.password)
# Prepare the path we want to use
mainpath = FILES_PATH_PREFIX + arguments.username + arguments.search_path
# List of all entries with wrong time
wrongtime = []
# Iterate through all folders and check for wrong timestamps
url = arguments.server + mainpath
for entry in propfind(url, auth):
if "last_modified" not in entry or entry["last_modified"] < DATE_THRESHOLD:
wrongtime.append(entry)
print()
# Iterate through all fileids with wrong timestamps and replace with versions with intact timestamp or touch
# done parallel
data = [(entry, arguments, auth) for entry in wrongtime]
pool_obj = multiprocessing.Pool(processes=1)
results = pool_obj.map(restore_file, data)