# -*- coding: utf-8 -*- # # cms.py - simple WSGI/Python based CMS script # # Copyright (C) 2011-2016 Michael Buesch # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 2 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see . import sys if sys.version_info[0] < 3 or sys.version_info[1] < 3: raise Exception("Need Python 3.3 or later") import os from stat import S_ISDIR from datetime import datetime import re import PIL.Image as Image from io import BytesIO import urllib.request, urllib.parse, urllib.error import cgi from functools import reduce import random import importlib.machinery UPPERCASE = 'ABCDEFGHIJKLMNOPQRSTUVWXYZ' LOWERCASE = 'abcdefghijklmnopqrstuvwxyz' NUMBERS = '0123456789' # Find the index in 'string' that is _not_ in 'template'. # Start search at 'idx'. # Returns -1 on failure to find. def findNot(string, template, idx=0): while idx < len(string): if string[idx] not in template: return idx idx += 1 return -1 # Find the index in 'string' that matches _any_ character in 'template'. # Start search at 'idx'. # Returns -1 on failure to find. def findAny(string, template, idx=0): while idx < len(string): if string[idx] in template: return idx idx += 1 return -1 def htmlEscape(string): return cgi.escape(string, True) def stringBool(string, default=False): s = string.lower() if s in ("true", "yes", "on"): return True if s in ("false", "no", "off"): return False try: return bool(int(s, 10)) except ValueError: return default # Create a path string from path element strings. def mkpath(*path_elements): # Do not use os.path.join, because it discards elements, if # one element begins with a separator (= is absolute). return os.path.sep.join(path_elements) def f_exists(*path_elements): try: os.stat(mkpath(*path_elements)) except OSError: return False return True def f_exists_nonempty(*path_elements): if f_exists(*path_elements): return bool(f_read(*path_elements).strip()) return False def f_read(*path_elements): try: with open(mkpath(*path_elements), "rb") as fd: return fd.read().decode("UTF-8") except IOError: return "" except UnicodeError: raise CMSException(500, "Unicode decode error") def f_read_int(*path_elements): data = f_read(*path_elements) try: return int(data.strip(), 10) except ValueError: return None def f_mtime(*path_elements): try: return datetime.utcfromtimestamp(os.stat(mkpath(*path_elements)).st_mtime) except OSError: raise CMSException(404) def f_mtime_nofail(*path_elements): try: return f_mtime(*path_elements) except CMSException: return datetime.utcnow() def f_subdirList(*path_elements): def dirfilter(dentry): if dentry.startswith("."): return False # Omit ".", ".." and hidden entries if dentry.startswith("__"): return False # Omit system folders/files. try: if not S_ISDIR(os.stat(mkpath(path, dentry)).st_mode): return False except OSError: return False return True path = mkpath(*path_elements) try: return [ dentry for dentry in os.listdir(path) \ if dirfilter(dentry) ] except OSError: return [] class CMSPageIdent(list): # Page identifier. __pageFileName_re = re.compile( r'^(.*)((?:\.html?)|(?:\.py)|(?:\.php))$', re.DOTALL) __indexPages = {"", "index"} # Parse a page identifier from a string. @classmethod def parse(cls, path, maxPathLen = 512, maxIdentDepth = 32): if len(path) > maxPathLen: raise CMSException(400, "Invalid URL") pageIdent = cls() # Strip whitespace and slashes path = path.strip(' \t/') # Remove page file extensions like .html and such. m = cls.__pageFileName_re.match(path) if m: path = m.group(1) # Use the ident elements, if this is not the root page. if path not in cls.__indexPages: pageIdent.extend(path.split("/")) if len(pageIdent) > maxIdentDepth: raise CMSException(400, "Invalid URL") return pageIdent __pathSep = os.path.sep __validPathChars = LOWERCASE + UPPERCASE + NUMBERS + "-_." # Validate a path component. Avoid any directory change. # Raises CMSException on failure. @classmethod def validateSafePathComponent(cls, pcomp): if pcomp.startswith('.'): # No ".", ".." and hidden files. raise CMSException(404, "Invalid page path") if [ c for c in pcomp if c not in cls.__validPathChars ]: raise CMSException(404, "Invalid page path") return pcomp # Validate a path. Avoid going back in the hierarchy (. and ..) # Raises CMSException on failure. @classmethod def validateSafePath(cls, path): for pcomp in path.split(cls.__pathSep): cls.validateSafePathComponent(pcomp) return path # Validate a page name. # Raises CMSException on failure. # If allowSysNames is True, system names starting with "__" are allowed. @classmethod def validateName(cls, name, allowSysNames = False): if name.startswith("__") and not allowSysNames: # Page names with __ are system folders. raise CMSException(404, "Invalid page name") return cls.validateSafePathComponent(name) def __init__(self, *args): list.__init__(self, *args) self.__allValidated = False # Validate all page identifier name components. # (Do not allow system name components) def __validateAll(self): if not self.__allValidated: for pcomp in self: self.validateName(pcomp) # Remember that we validated. # Note that this assumes no components are added later! self.__allValidated = True # Get one page identifier component by index. def get(self, index, default = None, allowSysNames = False): try: return self.validateName(self[index], allowSysNames) except IndexError: return default # Get the page identifier as URL. def getUrl(self, protocol = None, domain = None, urlBase = None, pageSuffix = ".html"): self.__validateAll() url = [] if protocol: url.append(protocol + ":/") if domain: url.append(domain) if urlBase: url.append(urlBase.strip("/")) url.extend(self) if not protocol and not domain: url.insert(0, "") url = "/".join(url) if self and pageSuffix: url += pageSuffix return url # Get the page identifier as filesystem path. def getFilesystemPath(self, rstrip = 0): self.__validateAll() if self: if rstrip: pcomps = self[ : 0 - rstrip] if pcomps: return mkpath(*pcomps) return "" return mkpath(*self) return "" # Test if this identifier starts with the same elements # as another one. def startswith(self, other): return other is not None and\ len(self) >= len(other) and\ self[ : len(other)] == other class CMSException(Exception): __stats = { 301 : "Moved Permanently", 400 : "Bad Request", 404 : "Not Found", 405 : "Method Not Allowed", 409 : "Conflict", 500 : "Internal Server Error", } def __init__(self, httpStatusCode=500, message=""): try: httpStatus = self.__stats[httpStatusCode] except KeyError: httpStatusCode = 500 httpStatus = self.__stats[httpStatusCode] self.httpStatusCode = httpStatusCode self.httpStatus = "%d %s" % (httpStatusCode, httpStatus) self.message = message def getHttpHeaders(self, resolveCallback): return () def getHtmlHeader(self, db): return "" def getHtmlBody(self, db): return db.getString('http-error-page', '

%s

' %\ self.httpStatus) class CMSException301(CMSException): # "Moved Permanently" exception def __init__(self, newUrl): CMSException.__init__(self, 301, newUrl) def url(self): return self.message def getHttpHeaders(self, resolveCallback): return ( ('Location', resolveCallback(self.url())), ) def getHtmlHeader(self, db): return '' %\ self.url() def getHtmlBody(self, db): return '

' \ 'Moved permanently to ' \ '%s' \ '

' %\ (self.url(), self.url()) class CMSDatabase(object): validate = CMSPageIdent.validateName def __init__(self, basePath): self.pageBase = mkpath(basePath, "pages") self.macroBase = mkpath(basePath, "macros") self.stringBase = mkpath(basePath, "strings") def __redirect(self, redirectString): raise CMSException301(redirectString) def __getPageTitle(self, pagePath): title = f_read(pagePath, "title").strip() if not title: title = f_read(pagePath, "nav_label").strip() return title def getNavStop(self, pageIdent): path = mkpath(self.pageBase, pageIdent.getFilesystemPath()) return bool(f_read_int(path, "nav_stop")) def getHeader(self, pageIdent): path = mkpath(self.pageBase, pageIdent.getFilesystemPath()) return f_read(path, "header.html") def getPage(self, pageIdent): path = mkpath(self.pageBase, pageIdent.getFilesystemPath()) redirect = f_read(path, "redirect").strip() if redirect: return self.__redirect(redirect) title = self.__getPageTitle(path) data = f_read(path, "content.html") stamp = f_mtime_nofail(path, "content.html") return (title, data, stamp) def getPageTitle(self, pageIdent): path = mkpath(self.pageBase, pageIdent.getFilesystemPath()) return self.__getPageTitle(path) # Get a list of sub-pages. # Returns list of (pagename, navlabel, prio) def getSubPages(self, pageIdent, sortByPrio = True): res = [] gpath = mkpath(self.pageBase, pageIdent.getFilesystemPath()) for pagename in f_subdirList(gpath): path = mkpath(gpath, pagename) if f_exists(path, "hidden") or \ f_exists_nonempty(path, "redirect"): continue navlabel = f_read(path, "nav_label").strip() prio = f_read_int(path, "priority") if prio is None: prio = 500 res.append( (pagename, navlabel, prio) ) if sortByPrio: res.sort(key = lambda e: "%010d_%s" % (e[2], e[1])) return res def getMacro(self, macroname, pageIdent = None): data = None macroname = self.validate(macroname) if pageIdent: rstrip = 0 while not data: path = pageIdent.getFilesystemPath(rstrip) if not path: break data = f_read(self.pageBase, path, "__macros", macroname) rstrip += 1 if not data: data = f_read(self.pageBase, "__macros", macroname) if not data: data = f_read(self.macroBase, macroname) return '\n'.join( l for l in data.splitlines() if l ) def getString(self, name, default=None): name = self.validate(name) string = f_read(self.stringBase, name).strip() if string: return string return default or "" def getPostHandler(self, pageIdent): path = mkpath(self.pageBase, pageIdent.getFilesystemPath()) handlerModFile = mkpath(path, "post.py") if not f_exists(handlerModFile): return None try: loader = importlib.machinery.SourceFileLoader( re.sub(r"[^A-Za-z]", "_", handlerModFile), handlerModFile) mod = loader.load_module() except OSError: return None if not hasattr(mod, "post"): return None return mod class CMSStatementResolver(object): # Macro argument expansion: $1, $2, $3... macro_arg_re = re.compile(r'\$(\d+)', re.DOTALL) # Valid characters for variable names (without the leading $) VARNAME_CHARS = UPPERCASE + '_' __genericVars = { "DOMAIN" : lambda self, n: self.cms.domain, "CMS_BASE" : lambda self, n: self.cms.urlBase, "IMAGES_DIR" : lambda self, n: self.cms.imagesDir, "THUMBS_DIR" : lambda self, n: self.cms.urlBase + "/__thumbs", "DEBUG" : lambda self, n: "1" if self.cms.debug else "", "__DUMPVARS__" : lambda self, n: self.__dumpVars(), } class StackElem(object): # Call stack element def __init__(self, name): self.name = name self.lineno = 1 class IndexRef(object): # Index references def __init__(self, charOffset): self.charOffset = charOffset class Anchor(object): # Anchor def __init__(self, name, text, indent=-1, noIndex=False): self.name = name self.text = text self.indent = indent self.noIndex = noIndex def makeUrl(self, resolver): return "%s#%s" % ( CMSPageIdent(( resolver.expandVariable("GROUP"), resolver.expandVariable("PAGE"))).getUrl( urlBase = resolver.cms.urlBase), self.name) def __init__(self, cms): self.cms = cms self.__reset() def __reset(self, variables = {}, pageIdent = None): self.variables = variables.copy() self.variables.update(self.__genericVars) self.pageIdent = pageIdent self.callStack = [ self.StackElem("content.html") ] self.charCount = 0 self.indexRefs = [] self.anchors = [] def __stmtError(self, msg): pfx = "" if self.cms.debug: pfx = "%s:%d: " %\ (self.callStack[-1].name, self.callStack[-1].lineno) raise CMSException(500, pfx + msg) def expandVariable(self, name): try: value = self.variables[name] try: value = value(self, name) except (TypeError) as e: pass return str(value) except (KeyError, TypeError) as e: return "" def __dumpVars(self, force=False): if not force and not self.cms.debug: return "" ret = [] for name in sorted(self.variables.keys()): if name == "__DUMPVARS__": value = "-- variable dump --" else: value = self.expandVariable(name) sep = "\t" * (3 - len(name) // 8) ret.append("%s%s=> %s" % (name, sep, value)) return "\n".join(ret) __escapedChars = ('\\', ',', '@', '$', '(', ')') @classmethod def escape(cls, data): for c in cls.__escapedChars: data = data.replace(c, '\\' + c) return data @classmethod def unescape(cls, data): for c in cls.__escapedChars: data = data.replace('\\' + c, c) return data # Parse statement arguments. # Returns (consumed-characters-count, arguments) tuple. def __parseArguments(self, d, strip=False): arguments, cons = [], 0 while cons < len(d): c, arg = self.__expandRecStmts(d[cons:], ',)') cons += c arguments.append(arg.strip() if strip else arg) if cons <= 0 or d[cons - 1] == ')': break return cons, arguments # Statement: $(if CONDITION, THEN, ELSE) # Statement: $(if CONDITION, THEN) # Returns THEN if CONDITION is nonempty after stripping whitespace. # Returns ELSE otherwise. def __stmt_if(self, d): cons, args = self.__parseArguments(d) if len(args) != 2 and len(args) != 3: self.__stmtError("IF: invalid number of arguments (%d)" %\ len(args)) condition, b_then = args[0], args[1] b_else = args[2] if len(args) == 3 else "" result = b_then if condition.strip() else b_else return cons, result def __do_compare(self, d, invert): cons, args = self.__parseArguments(d, strip=True) result = reduce(lambda a, b: a and b == args[0], args[1:], True) result = not result if invert else result return cons, (args[-1] if result else "") # Statement: $(eq A, B, ...) # Returns the last argument, if all stripped arguments are equal. # Returns an empty string otherwise. def __stmt_eq(self, d): return self.__do_compare(d, False) # Statement: $(ne A, B, ...) # Returns the last argument, if not all stripped arguments are equal. # Returns an empty string otherwise. def __stmt_ne(self, d): return self.__do_compare(d, True) # Statement: $(and A, B, ...) # Returns A, if all stripped arguments are non-empty strings. # Returns an empty string otherwise. def __stmt_and(self, d): cons, args = self.__parseArguments(d, strip=True) return cons, (args[0] if all(args) else "") # Statement: $(or A, B, ...) # Returns the first stripped non-empty argument. # Returns an empty string, if there is no non-empty argument. def __stmt_or(self, d): cons, args = self.__parseArguments(d, strip=True) nonempty = [ a for a in args if a ] return cons, (nonempty[0] if nonempty else "") # Statement: $(not A) # Returns 1, if A is an empty string after stripping. # Returns an empty string, if A is a non-empty stripped string. def __stmt_not(self, d): cons, args = self.__parseArguments(d, strip=True) if len(args) != 1: self.__stmtError("NOT: invalid args") return cons, ("" if args[0] else "1") # Statement: $(assert A, ...) # Raises a 500-assertion-failed exception, if any argument # is empty after stripping. # Returns an empty string, otherwise. def __stmt_assert(self, d): cons, args = self.__parseArguments(d, strip=True) if not all(args): self.__stmtError("ASSERT: failed") return cons, "" # Statement: $(strip STRING) # Strip whitespace at the start and at the end of the string. def __stmt_strip(self, d): cons, args = self.__parseArguments(d, strip=True) return cons, "".join(args) # Statement: $(item STRING, N) # Statement: $(item STRING, N, SEPARATOR) # Split a string into tokens and return the N'th token. # SEPARATOR defaults to whitespace. def __stmt_item(self, d): cons, args = self.__parseArguments(d) if len(args) not in {2, 3}: self.__stmtError("ITEM: invalid args") string, n, sep = args[0], args[1], args[2].strip() if len(args) == 3 else "" tokens = string.split(sep) if sep else string.split() try: token = tokens[int(n)] except ValueError: self.__stmtError("ITEM: N is not an integer") except IndexError: token = "" return cons, token # Statement: $(substr STRING, START) # Statement: $(substr STRING, START, END) # Returns a sub-string of STRING. def __stmt_substr(self, d): cons, args = self.__parseArguments(d) if len(args) not in {2, 3}: self.__stmtError("SUBSTR: invalid args") string, start, end = args[0], args[1], args[2] if len(args) == 3 else "" try: if end.strip(): substr = string[int(start) : int(end)] else: substr = string[int(start)] except ValueError: self.__stmtError("SUBSTR: START or END is not an integer") except IndexError: substr = "" return cons, substr # Statement: $(sanitize STRING) # Sanitize a string. # Replaces all non-alphanumeric characters by an underscore. Forces lower-case. def __stmt_sanitize(self, d): cons, args = self.__parseArguments(d) string = "_".join(args) validChars = LOWERCASE + NUMBERS string = string.lower() string = "".join( c if c in validChars else '_' for c in string ) string = re.sub(r'_+', '_', string).strip('_') return cons, string # Statement: $(file_exists RELATIVE_PATH) # Statement: $(file_exists RELATIVE_PATH, DOES_NOT_EXIST) # Checks if a file exists relative to the wwwPath base. # Returns the path, if the file exists or an empty string if it doesn't. # If DOES_NOT_EXIST is specified, it returns this if the file doesn't exist. def __stmt_fileExists(self, d): cons, args = self.__parseArguments(d) if len(args) != 1 and len(args) != 2: self.__stmtError("FILE_EXISTS: invalid args") relpath, enoent = args[0], args[1] if len(args) == 2 else "" try: exists = f_exists(self.cms.wwwPath, CMSPageIdent.validateSafePath(relpath)) except (CMSException) as e: exists = False return cons, (relpath if exists else enoent) # Statement: $(file_mdatet RELATIVE_PATH) # Statement: $(file_mdatet RELATIVE_PATH, DOES_NOT_EXIST, FORMAT_STRING) # Returns the file modification time. # If the file does not exist, it returns DOES_NOT_EXIST or and empty string. # RELATIVE_PATH is relative to wwwPath. # FORMAT_STRING is an optional strftime format string. def __stmt_fileModDateTime(self, d): cons, args = self.__parseArguments(d) if len(args) not in {1, 2, 3}: self.__stmtError("FILE_MDATET: invalid args") relpath, enoent, fmtstr =\ args[0],\ args[1] if len(args) >= 2 else "",\ args[2] if len(args) >= 3 else "%d %B %Y %H:%M (UTC)" try: stamp = f_mtime(self.cms.wwwPath, CMSPageIdent.validateSafePath(relpath)) except (CMSException) as e: return cons, enoent return cons, stamp.strftime(fmtstr.strip()) # Statement: $(index) # Returns the site index. def __stmt_index(self, d): cons, args = self.__parseArguments(d) if len(args) != 1 or args[0]: self.__stmtError("INDEX: invalid args") self.indexRefs.append(self.IndexRef(self.charCount)) return cons, "" # Statement: $(anchor NAME, TEXT) # Statement: $(anchor NAME, TEXT, INDENT_LEVEL) # Statement: $(anchor NAME, TEXT, INDENT_LEVEL, NO_INDEX) # Sets an index-anchor def __stmt_anchor(self, d): cons, args = self.__parseArguments(d) if len(args) < 2 or len(args) > 4: self.__stmtError("ANCHOR: invalid args") name, text = args[0:2] indent, noIndex = -1, False if len(args) >= 3: indent = args[2].strip() try: indent = int(indent) if indent else -1 except ValueError: self.__stmtError("ANCHOR: indent level " "is not an integer") if len(args) >= 4: noIndex = bool(args[3].strip()) name, text = name.strip(), text.strip() anchor = self.Anchor(name, text, indent, noIndex) # Cache anchor for index creation self.anchors.append(anchor) # Create the anchor HTML return cons, '%s' %\ (name, anchor.makeUrl(self), text) # Statement: $(pagelist BASEPAGE, ...) # Returns an