# The COPYRIGHT file at the top level of this repository contains the full
# copyright notices and license terms.
import io
import mimetypes
import os
import re
import unicodedata
import warnings
from functools import wraps
from hashlib import md5
from time import time
from zlib import adler32
from flask.helpers import _PackageBoundObject
from flask.helpers import flash as _flash # noqa
from flask.helpers import (
get_flashed_messages, locked_cached_property, safe_join)
from flask.helpers import url_for as flask_url_for
from flask_login import login_required # noqa
from speaklater import is_lazy_string
from werkzeug.datastructures import Headers
from werkzeug.exceptions import (
BadRequest, NotFound, RequestedRangeNotSatisfiable, abort)
from werkzeug.urls import url_quote
from werkzeug.utils import redirect
from werkzeug.wsgi import wrap_file
import trytond.modules
from trytond.tools.misc import slugify as _slugify
from trytond.config import config
from trytond.transaction import Transaction
from .globals import ( # noqa
current_app, current_locale, current_user, current_website, request)
DATABASE_PATH = config.get('database', 'path')
[docs]def url_for(endpoint, **values):
"""
Generates a URL to the given endpoint with the method provided.
The endpoint is relative to the active module if modules are in use.
The functionality is documented in `flask.helpers.url_for`
In addition to the arguments provided by flask, nereid allows the locale
of the url to be generated to be specified using the locale attribute.
The default value of locale is the locale of the current request.
For example::
url_for('nereid.website.home', locale='us')
"""
if 'language' in values:
warnings.warn(
"language argument is deprecated in favor of locale",
DeprecationWarning, stacklevel=2
)
# 'static' is Flask's default endpoint for static files.
# There is no need to set language in URL for static files
if endpoint != 'static' and \
'locale' not in values and current_website.locales:
values['locale'] = current_locale.code
return flask_url_for(endpoint, **values)
def secure(function):
@wraps(function)
def decorated_function(*args, **kwargs):
if not request.is_secure:
return redirect(request.url.replace('http://', 'https://'))
else:
return function(*args, **kwargs)
return decorated_function
def send_from_directory(directory, filename, **options):
"""
Send a file from a given directory with :func:`send_file`. This
is a secure way to quickly expose static files from an upload folder
or something similar.
Example usage::
@app.route('/uploads/<path:filename>')
def download_file(filename):
return send_from_directory(app.config['UPLOAD_FOLDER'],
filename, as_attachment=True)
.. admonition:: Sending files and Performance
It is strongly recommended to activate either ``X-Sendfile`` support in
your webserver or (if no authentication happens) to tell the webserver
to serve files for the given path on its own without calling into the
web application for improved performance.
.. versionadded:: 0.5
:param directory: the directory where all the files are stored.
:param filename: the filename relative to that directory to
download.
:param options: optional keyword arguments that are directly
forwarded to :func:`send_file`.
"""
filename = os.fspath(filename)
directory = os.fspath(directory)
filename = safe_join(directory, filename)
if not os.path.isabs(filename):
#filename = os.path.join(current_app.root_path, filename)
filename = os.path.join(DATABASE_PATH, current_app.database_name,
filename)
try:
if not os.path.isfile(filename):
raise NotFound()
except (TypeError, ValueError):
raise BadRequest()
options.setdefault("conditional", True)
return send_file(filename, **options)
def send_file(filename_or_fp, mimetype=None,
as_attachment=False, attachment_filename=None,
add_etags=True, cache_timeout=None, conditional=False,
last_modified=None):
"""Sends the contents of a file to the client. This will use the
most efficient method available and configured. By default it will
try to use the WSGI server's file_wrapper support. Alternatively
you can set the application's :attr:`~Flask.use_x_sendfile` attribute
to ``True`` to directly emit an ``X-Sendfile`` header. This however
requires support of the underlying webserver for ``X-Sendfile``.
By default it will try to guess the mimetype for you, but you can
also explicitly provide one. For extra security you probably want
to send certain files as attachment (HTML for instance). The mimetype
guessing requires a `filename` or an `attachment_filename` to be
provided.
ETags will also be attached automatically if a `filename` is provided. You
can turn this off by setting `add_etags=False`.
If `conditional=True` and `filename` is provided, this method will try to
upgrade the response stream to support range requests. This will allow
the request to be answered with partial content response.
Please never pass filenames to this function from user sources;
you should use :func:`send_from_directory` instead.
.. versionadded:: 0.2
.. versionadded:: 0.5
The `add_etags`, `cache_timeout` and `conditional` parameters were
added. The default behavior is now to attach etags.
.. versionchanged:: 0.7
mimetype guessing and etag support for file objects was
deprecated because it was unreliable. Pass a filename if you are
able to, otherwise attach an etag yourself. This functionality
will be removed in Flask 1.0
.. versionchanged:: 0.9
cache_timeout pulls its default from application config, when None.
.. versionchanged:: 0.12
The filename is no longer automatically inferred from file objects. If
you want to use automatic mimetype and etag support, pass a filepath via
`filename_or_fp` or `attachment_filename`.
.. versionchanged:: 0.12
The `attachment_filename` is preferred over `filename` for MIME-type
detection.
.. versionchanged:: 1.0
UTF-8 filenames, as specified in `RFC 2231`_, are supported.
.. _RFC 2231: https://tools.ietf.org/html/rfc2231#section-4
.. versionchanged:: 1.0.3
Filenames are encoded with ASCII instead of Latin-1 for broader
compatibility with WSGI servers.
.. versionchanged:: 1.1
Filename may be a :class:`~os.PathLike` object.
.. versionadded:: 1.1
Partial content supports :class:`~io.BytesIO`.
.. versionchanged:: 6.0.8
The slugify method from nereid went partly into Tryton 5.4
Nereid now uses this method and extends it to return lower
case on slugs.
:param filename_or_fp: the filename of the file to send.
This is relative to the :attr:`~nereid.root_path`
if a relative path is specified.
Alternatively a file object might be provided in
which case ``X-Sendfile`` might not work and fall
back to the traditional method. Make sure that the
file pointer is positioned at the start of data to
send before calling :func:`send_file`.
:param mimetype: the mimetype of the file if provided. If a file path is
given, auto detection happens as fallback, otherwise an
error will be raised.
:param as_attachment: set to ``True`` if you want to send this file with
a ``Content-Disposition: attachment`` header.
:param attachment_filename: the filename for the attachment if it
differs from the file's filename.
:param add_etags: set to ``False`` to disable attaching of etags.
:param conditional: set to ``True`` to enable conditional responses.
:param cache_timeout: the timeout in seconds for the headers. When ``None``
(default), this value is set by
:meth:`~Flask.get_send_file_max_age` of
:data:`~flask.current_app`.
:param last_modified: set the ``Last-Modified`` header to this value,
a :class:`~datetime.datetime` or timestamp.
If a file was passed, this overrides its mtime.
"""
mtime = None
fsize = None
if hasattr(filename_or_fp, "__fspath__"):
filename_or_fp = os.fspath(filename_or_fp)
if isinstance(filename_or_fp, str):
filename = filename_or_fp
if not os.path.isabs(filename):
#filename = os.path.join(current_app.root_path, filename)
filename = os.path.join(DATABASE_PATH, current_app.database_name,
filename)
file = None
if attachment_filename is None:
attachment_filename = os.path.basename(filename)
else:
file = filename_or_fp
filename = None
if mimetype is None:
if attachment_filename is not None:
mimetype = (
mimetypes.guess_type(attachment_filename)[0]
or "application/octet-stream"
)
if mimetype is None:
raise ValueError(
"Unable to infer MIME-type because no filename is available. "
"Please set either `attachment_filename`, pass a filepath to "
"`filename_or_fp` or set your own MIME-type via `mimetype`."
)
headers = Headers()
if as_attachment:
if attachment_filename is None:
raise TypeError(
"filename unavailable, required for sending as attachment")
if not isinstance(attachment_filename, str):
attachment_filename = attachment_filename.decode("utf-8")
try:
attachment_filename = attachment_filename.encode("ascii")
except UnicodeEncodeError:
filenames = {
"filename": unicodedata.normalize(
"NFKD", attachment_filename).encode(
"ascii", "ignore"
),
"filename*": "UTF-8''%s" % url_quote(
attachment_filename, safe=b""),
}
else:
filenames = {"filename": attachment_filename}
headers.add("Content-Disposition", "attachment", **filenames)
if current_app.use_x_sendfile and filename:
if file is not None:
file.close()
headers["X-Sendfile"] = filename
fsize = os.path.getsize(filename)
headers["Content-Length"] = fsize
data = None
else:
if file is None:
file = open(filename, "rb")
mtime = os.path.getmtime(filename)
fsize = os.path.getsize(filename)
headers["Content-Length"] = fsize
elif isinstance(file, io.BytesIO):
try:
fsize = file.getbuffer().nbytes
except AttributeError:
# Python 2 doesn't have getbuffer
fsize = len(file.getvalue())
headers["Content-Length"] = fsize
data = wrap_file(request.environ, file)
rv = current_app.response_class(
data, mimetype=mimetype, headers=headers, direct_passthrough=True
)
if last_modified is not None:
rv.last_modified = last_modified
elif mtime is not None:
rv.last_modified = mtime
rv.cache_control.public = True
if cache_timeout is None:
cache_timeout = current_app.get_send_file_max_age(filename)
if cache_timeout is not None:
rv.cache_control.max_age = cache_timeout
rv.expires = int(time() + cache_timeout)
if add_etags and filename is not None:
from warnings import warn
try:
rv.set_etag(
"nereid-%s-%s-%s"
% (
os.path.getmtime(filename),
os.path.getsize(filename),
adler32(
filename.encode("utf-8")
if isinstance(filename, str)
else filename
)
& 0xFFFFFFFF,
)
)
except OSError:
warn(
"Access %s failed, maybe it does not exist, so ignore etags in "
"headers" % filename,
stacklevel=2,
)
if conditional:
try:
rv = rv.make_conditional(request, accept_ranges=True,
complete_length=fsize)
except RequestedRangeNotSatisfiable:
if file is not None:
file.close()
raise
# make sure we don't send x-sendfile for servers that
# ignore the 304 status code for x-sendfile.
if rv.status_code == 304:
rv.headers.pop("x-sendfile", None)
return rv
def slugify(value, hyphenate='-'):
return _slugify(value, hyphenate=hyphenate).lower()
def _rst_to_html_filter(value):
"""
Converts RST text to HTML
~~~~~~~~~~~~~~~~~~~~~~~~~
This uses docutils, if the library is missing, then the
original text is returned
Loading to environment::
from jinja2 import Environment
env = Environment()
env.filters['rst'] = rst_to_html
template = env.from_string("Welcome {{name|rst}}")
template.render(name="**Sharoon**")
"""
try:
from docutils import core
parts = core.publish_parts(source=value, writer_name='html')
return parts['body_pre_docinfo'] + parts['fragment']
except Exception:
return value
def key_from_list(list_of_args):
"""
Builds a key from a list of arguments which could be used for caching
The key s constructed as an md5 hash
"""
hash = md5()
hash.update(repr(list_of_args).encode('utf-8'))
return hash.hexdigest()
def get_website_from_host(http_host):
"""Try to find the website name from the HTTP_HOST name"""
return http_host.split(':')[0]
def make_crumbs(browse_record, endpoint, add_home=True, max_depth=10,
field_map_changes=None, root_ids=None):
"""
Makes bread crumbs for a given browse record based on the field
parent of the browse record
:param browse_record: The browse record of the object from which upward
tracing of crumbs need to be done
:param endpoint: The endpoint against which the urls have to be generated
:param add_home: If provided will add home and home url as the first item
:param max_depth: Maximum depth of the crumbs
:param field_map_changes: A dictionary/list of key value pair (tuples) to
update the default field_map. Only the changing
entries need to be provided.
:param root_ids: IDs of root nodes where the recursion to a parent node
will need to be stopped. If not specified the recursion
continues upto the max_depth. Expects a list or tuple of
ids.
.. versionchanged:: 0.3
Added root_ids
"""
field_map = dict(
parent_field='parent',
uri_field='uri',
title_field='title',
)
if field_map_changes is not None:
field_map.update(field_map_changes)
if root_ids is None:
root_ids = tuple()
def recurse(node, level=1):
if level > max_depth or not node:
return []
data_pair = (
url_for(endpoint, uri=getattr(node, field_map['uri_field'])),
getattr(node, field_map['title_field'])
)
if node.id in root_ids:
return [data_pair]
else:
return [data_pair] + recurse(
getattr(node, field_map['parent_field']), level + 1
)
items = recurse(browse_record)
if add_home:
items.append((url_for('nereid.website.home'), 'Home'))
# The bread crumb is now in reverse order with home at end, reverse it
items.reverse()
return items
def root_transaction_if_required(function):
"""
Starts a root transaction if one is not there. This behavior is used when
run from tests cases which manage the transaction on its own.
"""
@wraps(function)
def decorated_function(self, *args, **kwargs):
transaction = None
if Transaction().connection is None:
# Start transaction since connection is None
transaction = Transaction().start(
self.database_name, 0, readonly=True
)
try:
return function(self, *args, **kwargs)
finally:
if transaction is not None:
# TODO: Find some better way close transaction
transaction.__exit__(None, None, None)
return decorated_function
def flash(message, category='message'):
"""
Lazy strings are no real strings so pickling them results in strange issues.
Pickling cannot be avoided because of the way sessions work. Hence, this
special flash function converts lazy strings to unicode content.
.. versionadded:: 3.0.4.1
:param message: the message to be flashed.
:param category: the category for the message. The following values
are recommended: ``'message'`` for any kind of message,
``'error'`` for errors, ``'info'`` for information
messages and ``'warning'`` for warnings. However any
kind of string can be used as category.
"""
if is_lazy_string(message):
message = str(message)
return _flash(message, category)
[docs]def route(rule, **options):
"""Like :meth:`Flask.route` but for nereid.
.. versionadded:: 3.0.7.0
Unlike the implementation in flask and flask.blueprint route decorator does
not require an existing nereid application or a blueprint instance. Instead
the decorator adds an attribute to the method called `_url_rules`.
.. code-block:: python
:emphasize-lines: 1,7
from nereid import route
class Product:
__name__ = 'product.product'
@classmethod
@route('/product/<uri>')
def render_product(cls, uri):
...
return 'Product Information'
"""
def decorator(f):
if not hasattr(f, '_url_rules'):
f._url_rules = []
f._url_rules.append((rule, options))
return f
return decorator
def get_version():
"""
Return the version of nereid by looking up the version as read by the
tryton module loader.
"""
return trytond.modules.get_module_info('nereid')['version']
[docs]def context_processor(name=None):
"""Makes method available in template context. By default method will be
registered by its name.
Decorator adds an attribute to the method called `_context_processor`.
.. code-block:: python
:emphasize-lines: 1,7
from nereid import context_processor
class Product:
__name__ = 'product.product'
@classmethod
@context_processor('get_sale_price')
def get_sale_price(cls):
...
return 'Product sale price'
"""
def decorator(f):
f._context_processor = True
if name is not None:
f.__name__ = name
return f
return decorator
[docs]def template_filter(name=None):
"""
If you want to register your own filters in Jinja2 you have two ways to do
that. You can either put them by hand into the jinja_env of the application
or use the template_filter() decorator.
The two following examples work the same and both reverse an object::
from nereid import template_filter
class MyModel:
__name__ = 'product.product'
@classmethod
@template_filter('reverse')
def reverse_filter(cls, s):
return s[::-1]
Alternatively you can inject it into the jinja environment in your
`application.py`::
def reverse_filter(s):
return s[::-1]
app.jinja_env.filters['reverse'] = reverse_filter
In case of the decorator the argument is optional if you want to use the
function name as name of the filter. Once registered, you can use the
filter in your templates in the same way as Jinja2’s builtin filters, for
example if you have a Python list in context called mylist::
{% for x in mylist | reverse %}
{% endfor %}
"""
def decorator(f):
f._template_filter = True
if name is not None:
f.__name__ = name
return f
return decorator