You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 

475 lines
20 KiB

import json
import multiprocessing
import os
import re
import shutil
import pandas as pd
import settings
pool_processes = 8
pool_maxtask = 10
pool_chunksize = 30
leitsatz_str = 'leitsatz'
tenor_str = 'tenor'
tatbestand_str = 'tatbestand'
entscheidungsgruende_str = 'entscheidungsgruende'
aktenzeichen_str = 'aktenzeichen'
rii_text_columns = [leitsatz_str, tenor_str, tatbestand_str, entscheidungsgruende_str]
sentence_marks = ['.', ',', ';', '!', '?']
pp_option_lemmatize = 'preprocessing: lemmatize the text'
pp_option_stopwords = 'preprocessing: remove stopwords'
pp_option_case_normalize = 'preprocessing: normalize cases / put to lower'
pp_option_remove_qout_marks_sing = 'preprocessing: remove qoutation marks around single words'
no_stopword_list = ['nicht', 'kein']
entsch_gr_start_sentences = ['II.', 'B.', 'B']
def server_path(current_path, path):
"""
Method to add path in case it is run on server.
:param current_path: Path to add when run on server
:param path: Path for local
:return: Final path for local or server
"""
if settings.server:
path = current_path + '/' + path
return path
def open_file(current_path, path, modes, encoding=None, newline=None):
"""
Wraps the builtin open function to adjust to server settings
:param current_path: path of the calling file to adjust for server (without /)
:param path: Path for file loading relative to calling file
:param modes: Modes to apply
:param newline: newline option of the original method, if None nothing will be passed
:param encoding: encoding option of the original method, if None nothing will be passed
:return: the opened file
"""
if encoding is not None:
return open(server_path(current_path=current_path, path=path), modes, encoding=encoding)
if newline is not None:
return open(server_path(current_path=current_path, path=path), modes, newline=newline)
if newline is not None and encoding is not None:
return open(server_path(current_path=current_path, path=path), modes, encoding=encoding, newline=newline)
return open(server_path(current_path=current_path, path=path), modes)
def file_exists(current_path, path):
"""
Wraps the builtin exists function to adjust to server settings
:param current_path: path of the calling file to adjust for server (without /)
:param path: Path for file loading relative to calling file
:return: True if the file exists
"""
return os.path.exists(server_path(current_path=current_path, path=path))
def list_dir_files(current_path, path):
"""
Wraps the builtin os.listdir function to adjust to server settings
:param current_path: path of the calling file to adjust for server (without /)
:param path: Path for file loading relative to calling file
:return: The filenames of the directory
"""
return os.listdir(server_path(current_path=current_path, path=path))
def df_from_pickle(current_path, path):
"""
Wraps the pd.read_pickle function to adjust to server settings
:param current_path: path of the calling file to adjust for server (without /)
:param path: Path for file loading relative to calling file
:return: The loaded dataframe
"""
return pd.read_pickle(server_path(current_path=current_path, path=path))
def df_to_json(current_path, path, dataframe):
"""
Wraps the df.to_json function to adjust to server settings
:param current_path: path of the calling file to adjust for server (without /)
:param path: Path for file loading relative to calling file
:param dataframe: The dataframe to save
"""
dataframe.to_json(server_path(current_path=current_path, path=path))
def df_from_json(current_path, path):
"""
Wraps the json.load function in combination with a dataframe creation to adjust to server settings
:param current_path: path of the calling file to adjust for server (without /)
:param path: Path for file loading relative to calling file
:return: The loaded dataframe
"""
return pd.DataFrame(json.load(open_file(current_path=current_path, path=path, modes="r")))
def time_convert(sec):
"""
Gibt eine Zeitangabe hübsch aus. Format : Time Lapsed = hh:mm:ss
:param sec: Zeit zu zeigen
"""
mins = sec // 60
sec = sec % 60
hours = mins // 60
mins = mins % 60
print("Time Lapsed = {0}:{1}:{2}".format(int(hours), int(mins), sec))
def parallel_imap(function, packaged_args):
"""
Executes the given function in a parallel way. For list data.
:param function: Function to do in parallel.
:param packaged_args: Iterable of argumentpairs for each run to be done.
:return: Result of the parallel work
"""
if settings.server:
pool_obj = multiprocessing.Pool(maxtasksperchild=pool_maxtask)
result = pool_obj.imap(function, packaged_args, chunksize=pool_chunksize)
else:
pool_obj = multiprocessing.Pool(processes=pool_processes)
result = pool_obj.imap(function, packaged_args)
pool_obj.close()
pool_obj.join()
return result
def get_step_subset_raw(steps, path_to_dest_dataframe, source_data, dest_data, call_path):
"""
Method for stepwise work on datasets. Reads in the already present data and starts
where last time ended. Used for raw pickle-files in destination
:param steps: How many rows should be selcted now
:param path_to_dest_dataframe: Path on where to load the destination data
:param source_data: Source dataframe to select the rows
:param dest_data: empty dataframe to load the data into
:param call_path: path from which the method was called, for server path
:return: the subset of the source data an the loaded destintion data (source, dest)
"""
if steps > 0:
try:
try:
var = df_from_pickle(current_path=call_path, path=path_to_dest_dataframe)
except Exception:
var = df_from_json(current_path=call_path, path=path_to_dest_dataframe)
dest_data = pd.concat([dest_data, var], ignore_index=True)
start = dest_data.shape[0]
except OSError as _:
start = 0
finally:
end = start + steps
try: # case source is a dataframe
if end >= source_data.shape[0]:
return source_data.iloc[start:], dest_data # subset
else:
return source_data.iloc[start:end], dest_data # subset
except Exception:
if end >= len(source_data):
return source_data[start:], dest_data # subset
else:
return source_data[start:end], dest_data # subset
def remove_spaces_before_sentence_marks(text):
"""
Removes unneccessary spaces before '.' etc.
:param text: Text to replace in
:return: The cleaned text
"""
for sentence_mark in sentence_marks:
while ' ' + sentence_mark in text:
text = text.replace(' ' + sentence_mark, sentence_mark)
return text
def remove_brackets(text):
"""
Removes all matching round bracktet pairs () with their content. Always takes the first brackets that
appear in the text, so could also be an enumeration like a)
:param text: Text to remove the brackets from.
:return: Resulting text
"""
startindex = text.find('(')
res = ''
while startindex > -1:
endindex = startindex + text[startindex:].find(')')
if endindex > -1:
# in case there is a ' ' in front or after the brackets, remove one space
if startindex > 0 and text[startindex - 1] == ' ':
startindex -= 1
# if endindex < len(text) - 1 and text[endindex + 1] == ' ':
# endindex += 1
res += text[:startindex]
text = text[endindex + 1:]
else:
break
startindex = text.find('(')
res += text
return res
def remove_leading_keywords_and_listing_sentences(sentences):
"""
Method intended for Leitsätze. Some of them start with a single keyword in the first line.
This is removed. Additionally, Sentences which are only a listin ('1.') will also be removed.
:param sentences: List of sentences in the original order to remove these things from
:return: the list of sentences after removing
"""
# remove leading keywords and sentences which are only enumerations
sentences_var = list()
sentence_var = ''
for i in range(len(sentences)):
sentence = sentences[i].strip()
if len(sentence) > 1 and sentence[-1] == '.' and ' ' not in sentence: # at least two chars
if any(char.isdigit() for char in sentence) and sentence[0].isdigit(): # most likely enumeration like '1.'
continue
if i > 0 or (i == 0 and len(sentence) > 20):
# most likely not a short keyword at the beginning
if sentence[-1] == '.' or sentence[-1] == ',' or sentence[-1] == ':' or \
sentence[-1] == ';' or sentence[-1] == '!' or sentence[-1] == '?':
# sentence end
sentence_var += sentence
sentences_var.append(remove_spaces_before_sentence_marks(sentence_var))
sentence_var = ''
else:
# continuing sentence
sentence_var += sentence + ' '
return sentences_var
def prepare_leitsatz(l_satz):
"""
Does the preparation for Leitsätze: First splits into sentences, removes leading keywords and
single listing sentences and leading listings of sentences
:param l_satz: Original Leitsatz as one string
:return: prepared Leitsatz as a list of String
"""
sentences = split_into_sentences(l_satz)
sentences = remove_leading_keywords_and_listing_sentences(sentences)
sentences = [remove_leading_listing(sentence) for sentence in sentences]
return sentences
def select_list_subset(list_of_string, start_strings, end_string=None):
"""
Selects a subset of a list of strings. If the start_string is not in the list,
the whole original list is returned. (case-sensitive)
If more start strings are given, then it will be copied from the first occuring start string.
sometimes entscheidungsgruende II. is started not with II. but B. Use start_String_2 here
:param list_of_string: List to get subset from
:param start_strings: List of Strings to start to copy
:param end_string: First string where one shouldn't copy anymore, if none is given, then till the end
:return: Selected subset
"""
result_list = []
copy = False
for i in range(len(list_of_string)):
string = list_of_string[i]
if string in start_strings:
copy = True
if end_string is not None and string == end_string:
copy = False
if copy:
result_list.append(string)
# if nothing was found or very little was found
if len(result_list) == 0 or len(result_list) / len(list_of_string) < 0.2:
return list_of_string
return result_list
def abbreviation_ending(text):
"""
Checks for an input text whether it ends with a known legal abbreviation.
Known issues: numbers and roman numbering with following dots arent matched
:param text: Input Text
:return: True, if it does and with such an abbreviation, False otherwise
"""
abbrev_list = ['A.', ' a.', 'a.A.', 'a.a.O.', 'ABl.', ' abl.', 'Abs.', ' abs.', 'Abschn.', 'Abse.',
' abzgl.', 'a.D.', 'a.E.', ' a.F.', ' ähnl.', 'a.l.i.c.', ' allg.', ' allgem.',
'Alt.', 'AmtsBl.', ' and.', ' angef.', 'Anh.', 'Anl.', 'Anm.', ' Art.', '(Art.', ' aufgeh.',
'Aufl.', ' ausf.', 'Ausn.', 'BAnz.', 'BArbBl.', 'BayJMBl.', 'Bd.', 'Bde.', 'Bdg.',
'Bearb.', ' begr.', 'Beil.', 'Bek.', ' ber.', ' bes.', 'Beschl.', ' best.', ' bestr.',
'Betr.', ' betr.', 'Bf.', 'BGBl.', ' bish.', ' Bl.', 'BPräs.', 'BReg.', 'Bsp.', 'Bst.',
'BStBl.', 'BT-Drucks.', 'Buchst.', 'bzgl.', 'bzw.', 'c.i.c.', 'Co.', 'c.p.c.',
'c.s.q.n.', 'Ct.', ' dar.', 'Darst.', ' ders.', 'd.h.', 'Diss.', ' div.', 'Dr.',
'Drucks.', ' dto.', 'DVBl.', ' ebd.', ' Ed.', 'E.G.', ' eingef.', 'Einf.', 'Einl.',
' einschl.', 'Erg.', ' erk.Sen.', ' erk.', ' Erl.', 'etc.', 'E.U.', ' e.V.',
'EVertr.', ' evtl.', 'E.W.G.', ' F.', ' f.', ' Fa.', ' Festschr.', ' ff.', ' Fn.',
' form.', ' fr.', ' fr.Rspr.', ' Fz.', 'GBl.', ' geänd.', 'Gedschr.', ' geg.',
' gem.', 'Ges.', ' gg.', ' ggf.', ' ggü.', ' ggüb.', ' Gl.', ' GMBl.', 'G.o.A.',
'Grds.', ' grdsl.', 'Großkomm.', 'Großkomm.z.', 'GVBl.', 'GVOBl.', ' h.A.', 'Halbs.',
' h.c.', 'Hdlg.', 'Hess.', ' heut.', ' heut.Rspr.', ' hins.', ' h.L.', ' h.Lit.',
' h.M.', 'Hrsg.', ' h.Rspr.', 'HS.', 'Hs.', ' i.A.', ' ib.', ' ibd.', ' ibid.',
'i.d.', 'i.d.F.', 'i.d.R.', 'i.d.S.', 'i.E.', 'i.e.', 'i.e.S.', 'i.H.d.', 'i.H.v.',
'i.K.', ' incl.', ' inkl.', 'inkl.MwSt.', ' insb.', ' insbes.', 'Int.', ' i.O.',
' i.R.', ' i.R.d.', 'i.S.', 'i.S.d.', 'i.S.e.', 'i.S.v.', 'i.ü.', ' iur.', 'i.V.',
'i.V.m.', 'i.W.', 'i.Wes.', 'i.w.S.', 'i.Zw.', 'Jahrb.', ' jew.', ' Jh.', 'JMBl.',
' jur.', ' Kap.', ' Ko.', ' krit.', ' kzfr.', 'Lb.', 'Lfg.', 'lgfr.', ' Lief.',
'Lit.', ' lit.', ' lt.', 'Ltd.', 'M.A.', 'm.Änd.', 'MABl.', 'mat.', 'm.a.W.', 'm.E.',
' med.', 'mgl.', 'Mglkt.', 'MinBl.', 'Mio.', ' Mot.', 'M.M.', 'm.N.', 'Mod.',
' mögl.', 'Mot.', 'MünchKomm.', 'm.w.', 'm.w.N.', 'MwSt.', 'Mwst.', 'm.W.v.',
'm.zust.Anm.', 'Nachw.', 'Nachw.b.', ' nat.', 'Nds.', 'Neubearb.', 'Neuf.',
' neugef.', 'n.F.', 'Nr.', 'Nrn.', ' o.', 'o.Ä.', ' od.', ' oec.', ' öff.',
' o.g.', ' österr.', 'p.F.V.', ' pharm.', ' phil.', ' pol.', 'Postf.', ' pp.',
' ppA.', ' ppa.', 'Prof.', 'Prot.', ' publ.', ' p.V.', 'p.V.V.', 'q.e.d.',
'RdErl.', 'Rdn.', 'Rdnr.', 'RdSchr.', ' rel.', ' rer.', 'RGBl.', 'Rn.', 'Rspr.',
'Rz.', 'S.', ' s.', 's.a.', 'Schr.', ' scil.', 'Sen.', ' sinngem.', 'SiZess.',
'Slg.', 's.o.', ' sog.', 'Sonderbeil.', 'Stpfl.', ' str.', ' st.', 'st.Rspr.',
' st. Rspr.', 'stud.iur.', 's.u.', ' teilw.', ' theol.', 'Thür.', ' TO.', ' tw.',
'Tz.', ' u.', 'u.a.', 'UAbs.', 'u.a.m.', ' umstr.', ' unmgl.', 'Unmglkt.', ' unmögl.',
'Urt.', ' usw.', ' u.U.', ' V.', ' v.', 'Var.', 'Ver.', ' vgl.', 'V.m.', 'VOBl.',
'Vor.', 'Vorbem.', 'Warn.', ' weg.', ' wg.', 'W.G.G.', 'w.z.b.w.', 'z.B.', 'z.Hd.',
'Ziff.', 'z.T.', ' zust.', 'zust.Anm.', ' zw.' 'z.Z.', ' zzgl.', ';', 'II.1.a.', '(s.',
]
for abbrev in abbrev_list:
if text.endswith(abbrev):
return True
if len(text) >= 3 and re.search(" .\\.", text[-3:]):
return True
return False
def remove_leading_listing(sentence):
"""
removes leading listings / enumerations like 1. or a)
:param sentence: Sentence to remove from
:return: Processed sentence
"""
return split_leading_listing(sentence)[1]
def split_leading_listing(sentence):
"""
Splits the sentence from a possible listing (1. or a) ) at the start.
:param sentence: Sentence to split
:return: (start, rest) with start being the listing or None, if there is no listing and
rest being the rest of the sentence or the original sentence if there was no listing
"""
first_word = sentence.split()
if first_word is None or len(first_word) == 0:
first_word = ''
else:
first_word = first_word[0]
rest = sentence[len(first_word) + 1:]
# could be a name like M. Leicht
if (first_word.endswith('.') or first_word.endswith(')')) and len(rest.split()) > 1 and first_word != 'Art.':
# Enumeration!
return first_word, rest
else:
return None, sentence
def split_into_sentences(input_text):
"""
Splits text into sentences. Uses spacy sentences but fixes broken sentences on \n or Abbreviations
:param input_text: Text to split into sentences
:return: A list of sentences which where split
"""
paragraphs = input_text.split('\n')
sentences = list()
sentence_var = ''
# roughly split original leitsatz into sentences
for paragraph in paragraphs:
nlp_paragraph = settings.nlp(paragraph)
sentences_paragraph = []
for sent in nlp_paragraph.sents:
sent = sent.text.strip()
# some leading listings aren't detected
a, b = split_leading_listing(sent)
if a is not None:
sentences_paragraph.append(a)
sentences_paragraph.append(b)
for i in range(0, len(sentences_paragraph)):
# add a space before next token if it isn't a sentence mark
if not (sentences_paragraph[i].startswith('.') or sentences_paragraph[i].startswith(':')
or sentences_paragraph[i].startswith('?') or sentences_paragraph[i].startswith('!')):
sentence_var += ' '
sentence_var += sentences_paragraph[i]
# if not sentence_var.count('(') > sentence_var.count(
# ')') and not sentence_var.strip() == '': # no unclosed brackets
if (sentences_paragraph[i].endswith('.') or sentences_paragraph[i].endswith(':')
or sentences_paragraph[i].endswith('?') or sentences_paragraph[i].endswith('!')) \
and not abbreviation_ending(sentence_var) \
and not sentence_var.strip() == '':
# Satz sehr wahrscheinlich wirklich zuende
sentences.append(sentence_var.strip())
sentence_var = ''
if not sentence_var.strip() == '':
# if not sentence_var.count('(') > sentence_var.count(
# ')') and not sentence_var.strip() == '': # no unclosed brackets
sentences.append(sentence_var.strip()) # am Ende des Paragraphen soll auch fertig sein
sentence_var = ''
# end of whole text
if sentence_var.strip() != '':
sentences.append(sentence_var.strip())
return sentences
def preprocess_text(text, options):
"""
Allows simple preprocessing like lemmatization on strings.
:param text: Text to preprocess
:param options: Options specifying on what preprocessing is to be done, if None, text will be returned
:return: the preprocessed text, if text is None, the result will also be ''
"""
if text is None:
return ''
if options is None:
return text
text_spacy = settings.nlp(text)
result_text = ''
for token in text_spacy:
# stop-words removing: no stopwords or stopwords shouldn't be removed
if not token.is_stop or pp_option_stopwords not in options or token.lemma_ in no_stopword_list:
# lemmatization if wanted
if pp_option_lemmatize in options and token.text not in sentence_marks:
to_append = token.lemma_
else:
to_append = token.text
if pp_option_remove_qout_marks_sing in options and to_append[0] == '"' and to_append[-1] == '"':
to_append = to_append.replace('"', '')
result_text += to_append + ' '
result_text = result_text.strip()
# case-normlaization, all to lower
if pp_option_case_normalize in options:
return result_text.lower()
else:
return result_text
def create_dir(current_path, directory_name, delete=True):
"""
Creates a directory if it doesn't exist
:param current_path: path of the calling file
:param directory_name: name / path to create
:param delete: if True, than an old directory with same name will be delted
"""
if delete and file_exists(current_path=current_path, path=directory_name):
shutil.rmtree(server_path(current_path=current_path, path=directory_name))
if not file_exists(current_path=current_path, path=directory_name):
os.makedirs(server_path(current_path=current_path, path=directory_name))