You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
249 lines
9.3 KiB
249 lines
9.3 KiB
import argparse
|
|
import datetime
|
|
import filecmp
|
|
import multiprocessing
|
|
import os
|
|
import tempfile
|
|
import time
|
|
import urllib.parse
|
|
import sys
|
|
import uuid
|
|
import xml.etree.ElementTree as ET
|
|
from pathlib import Path
|
|
|
|
import requests
|
|
|
|
# some constants
|
|
FILES_PATH_PREFIX = "/remote.php/dav/files/"
|
|
VERSIONS_PATH_PREFIX = "/remote.php/dav/versions/"
|
|
# the threshold for file timestamps (dates older than this are considered invalid)
|
|
DATE_THRESHOLD = datetime.datetime(1990, 1, 1)
|
|
# we only need one session for the whole script
|
|
SESSION = requests.Session()
|
|
|
|
|
|
def propfind(path, auth):
|
|
"""
|
|
Get a file's Last Modified timestamp and FileID via a PROPFIND request
|
|
:param path: The path of the file in question
|
|
:param auth: Auth data for the HTTP request (e.g. a requests.auth.HTTPBasicAuth object)
|
|
:return: An iterator of dictionaries, one for every directory entry. Entry properties are taken from the PROPFIND
|
|
response
|
|
"""
|
|
# do not descend further into subdirectories\
|
|
# TODO: we could probably be faster if we did
|
|
headers = {"Depth": "infinity"} # "1"}
|
|
# This body returns only the timelastmodified and the fileid variable
|
|
requested_data = \
|
|
"""
|
|
<d:propfind xmlns:d=\"DAV:\" xmlns:oc=\"http://owncloud.org/ns\" xmlns:nc=\"http://nextcloud.org/ns\">
|
|
<d:prop>
|
|
<d:getlastmodified />
|
|
<d:resourcetype />
|
|
<oc:fileid />
|
|
</d:prop>
|
|
</d:propfind>
|
|
"""
|
|
req = requests.Request("PROPFIND", path, headers=headers, auth=auth, data=requested_data)
|
|
resp = SESSION.send(req.prepare())
|
|
et = ET.fromstring(resp.text)
|
|
for dav_response in et.findall('{DAV:}response'):
|
|
entry = {}
|
|
entry["path"] = dav_response.find("{DAV:}href").text
|
|
# skip this entry itself
|
|
if path.endswith(entry["path"]):
|
|
continue
|
|
props = dav_response.find("{DAV:}propstat").find("{DAV:}prop")
|
|
try:
|
|
entry["last_modified"] = datetime.datetime.strptime(
|
|
props.find("{DAV:}getlastmodified").text,
|
|
"%a, %d %b %Y %H:%M:%S GMT"
|
|
)
|
|
except (AttributeError, TypeError):
|
|
pass
|
|
entry["resource_type"] = []
|
|
try:
|
|
for resourcetype in props.find("{DAV:}resourcetype"):
|
|
entry["resource_type"].append(resourcetype.tag)
|
|
except (AttributeError, TypeError):
|
|
pass
|
|
try:
|
|
entry["file_id"] = int(props.find("{http://owncloud.org/ns}fileid").text)
|
|
except (AttributeError, TypeError):
|
|
pass
|
|
yield entry
|
|
|
|
|
|
def find_valid_version(versions):
|
|
"""
|
|
This function returns the fileid of the version of a given fileid with the most current timestamp or None if
|
|
there are no versions with a timestamp younger than the threshold
|
|
:param versions: An iterator as returned by propfind()
|
|
:return: The entry of the iterator which has the most recent date or None if none exists
|
|
"""
|
|
all_versions = {}
|
|
for version in versions:
|
|
if "last_modified" in version and DATE_THRESHOLD < version["last_modified"]:
|
|
all_versions[version["last_modified"]] = version
|
|
if len(all_versions) == 0:
|
|
return None
|
|
return all_versions
|
|
|
|
|
|
def download_file(path, auth):
|
|
"""
|
|
This function downloads one file and saves it on the local device.
|
|
:param path: The path to the file in question
|
|
:param auth: Auth data for the HTTP request (e.g. a requests.auth.HTTPBasicAuth object)
|
|
:return: The path of the created file or '' if no file could be downloaded
|
|
"""
|
|
r = requests.request(
|
|
method='get',
|
|
url=path,
|
|
auth=auth
|
|
)
|
|
|
|
_, filename = os.path.split(path)
|
|
if r.status_code == 200:
|
|
with open(filename, 'wb') as file:
|
|
file.write(r.content)
|
|
return filename
|
|
return ''
|
|
|
|
|
|
def upload_file(local_path, remote_path, auth):
|
|
"""
|
|
Uploads a file to the cloud
|
|
:param local_path: File path of the file to be uploaded
|
|
:param remote_path: Path where it should be uploaded on the cloud
|
|
:param auth: Auth data for the HTTP request
|
|
:return: True if the file was successfully uploaded, False otherwise
|
|
"""
|
|
r = requests.put(
|
|
url=remote_path,
|
|
auth=auth,
|
|
data=open(local_path, 'rb').read()
|
|
)
|
|
if 200 <= r.status_code < 300:
|
|
return True
|
|
return False
|
|
|
|
|
|
def content_equal(original, fixed_version, auth):
|
|
"""
|
|
Compares the two file versions for replacement.
|
|
:param original: local filename of the entry with wrong timestamp.
|
|
:param fixed_version: Filename of the version for comparison with the original.
|
|
:param auth: Auth data for the HTTP request (e.g. a requests.auth.HTTPBasicAuth object)
|
|
:return: True, if they are equal in Metadata and content. False otherwise. Also if one or both files couldn't
|
|
be downloaded
|
|
"""
|
|
fixed = download_file(fixed_version, auth)
|
|
if original == '' or fixed == '':
|
|
# TODO: sth better if the downloading failed?
|
|
return False
|
|
# shallow comparison
|
|
shallow = filecmp.cmp(original, fixed)
|
|
# deep comparison
|
|
deep = filecmp.cmp(original, fixed, shallow=False)
|
|
os.remove(fixed)
|
|
return deep and shallow
|
|
|
|
|
|
def restore_file(packed):
|
|
"""
|
|
Handles one file. Searches for the latest older version with intact timestamp and compares them.
|
|
:param packed: data needed for one file: (entry, arguments, auth). entry represents the original file, arguments
|
|
are the runtime arguments and auth is for the http authentification
|
|
"""
|
|
entry, arguments, auth = packed
|
|
fixed_versions = find_valid_version(propfind(
|
|
arguments.server + VERSIONS_PATH_PREFIX + arguments.username + "/versions/" + str(entry["file_id"]),
|
|
auth))
|
|
original = download_file(arguments.server + entry['path'], auth)
|
|
restored = False
|
|
if fixed_versions is not None and len(fixed_versions) > 0:
|
|
keys = sorted(fixed_versions, reverse=True ) # sort dates descending to start with latest version
|
|
|
|
for i in range(0, len(keys)):
|
|
if content_equal(original, arguments.server + fixed_versions[keys[i]]['path'], auth):
|
|
# found latest matching version
|
|
# print("Restore from {}".format(fixed_versions[keys[i]]))
|
|
restored = restore_by_version(arguments.server + fixed_versions[keys[i]]['path'], auth, arguments)
|
|
break # stop looking any further
|
|
if not restored:
|
|
# print("Touch file.")
|
|
restored = restore_by_touch(arguments.server + entry['path'], original, auth)
|
|
if not restored:
|
|
print('File couldn\'t be restored: ' + entry['path'])
|
|
os.remove(original)
|
|
|
|
|
|
def restore_by_version(path_version, auth, args):
|
|
"""
|
|
Restores the given old version of a file
|
|
:param path_version: cloud path to the version to be restored
|
|
:param auth: Auth data for the HTTP request
|
|
:param args: Runtime arguments
|
|
:return: True if the version was successfully restored, false otherwise
|
|
"""
|
|
# uuid4 should create a random uuid
|
|
headers = {"Destination": args.server + VERSIONS_PATH_PREFIX + args.username + "/restore/" + str(uuid.uuid4())}
|
|
r = requests.request(
|
|
method='move',
|
|
url=path_version,
|
|
auth=auth,
|
|
headers=headers
|
|
)
|
|
|
|
if 200 <= r.status_code < 300:
|
|
return True
|
|
return False
|
|
|
|
|
|
def restore_by_touch(path, local_path, auth):
|
|
"""
|
|
Restores a file by touch: Touch on cloud isn't possible so the file is simply downloaded and uploaded again
|
|
:param path: The cloud path to the file in question
|
|
:param local_path: The local path to the file in question
|
|
:param auth: Auth data for the HTTP request
|
|
:return: True if the restoring was successful, False otherwise.
|
|
"""
|
|
if local_path != '':
|
|
return upload_file(local_path, path, auth)
|
|
else:
|
|
return False
|
|
|
|
|
|
if __name__ == "__main__":
|
|
# get all necessary data from the command line
|
|
argparser = argparse.ArgumentParser(description="Fix broken dates in Nextcloud folders.")
|
|
argparser.add_argument("server", help="The base URL of the Nextcloud server.")
|
|
argparser.add_argument("username", help="The user to log in as.")
|
|
argparser.add_argument("password", help="The password for accessing Nextcloud. Hint: Use an App Token!")
|
|
argparser.add_argument(
|
|
"-p", "--path",
|
|
default="/",
|
|
help="The path to search, relative to the user's root. Default: /",
|
|
dest="search_path"
|
|
)
|
|
arguments = argparser.parse_args()
|
|
# Prepare HTTP Basic Authentication
|
|
auth = requests.auth.HTTPBasicAuth(arguments.username, arguments.password)
|
|
# Prepare the path we want to use
|
|
mainpath = FILES_PATH_PREFIX + arguments.username + arguments.search_path
|
|
# List of all entries with wrong time
|
|
wrongtime = []
|
|
|
|
# Iterate through all folders and check for wrong timestamps
|
|
url = arguments.server + mainpath
|
|
for entry in propfind(url, auth):
|
|
if "last_modified" not in entry or entry["last_modified"] < DATE_THRESHOLD:
|
|
wrongtime.append(entry)
|
|
|
|
print()
|
|
# Iterate through all fileids with wrong timestamps and replace with versions with intact timestamp or touch
|
|
# done parallel
|
|
data = [(entry, arguments, auth) for entry in wrongtime]
|
|
pool_obj = multiprocessing.Pool(processes=1)
|
|
results = pool_obj.map(restore_file, data)
|