# -*- coding: utf-8 -*- # Part of Odoo. See LICENSE file for full copyright and licensing details. import werkzeug import itertools import pytz import babel.dates from collections import OrderedDict from odoo import http, fields from odoo.addons.http_routing.models.ir_http import slug, unslug from odoo.addons.website.controllers.main import QueryURL from odoo.addons.portal.controllers.portal import _build_url_w_params from odoo.http import request from odoo.osv import expression from odoo.tools import html2plaintext from odoo.tools.misc import get_lang from odoo.tools import sql class WebsiteBlog(http.Controller): _blog_post_per_page = 12 # multiple of 2,3,4 _post_comment_per_page = 10 def tags_list(self, tag_ids, current_tag): tag_ids = list(tag_ids) # required to avoid using the same list if current_tag in tag_ids: tag_ids.remove(current_tag) else: tag_ids.append(current_tag) tag_ids = request.env['blog.tag'].browse(tag_ids) return ','.join(slug(tag) for tag in tag_ids) def nav_list(self, blog=None): dom = blog and [('blog_id', '=', blog.id)] or [] if not request.env.user.has_group('website.group_website_designer'): dom += [('post_date', '<=', fields.Datetime.now())] groups = request.env['blog.post']._read_group_raw( dom, ['name', 'post_date'], groupby=["post_date"], orderby="post_date desc") for group in groups: (r, label) = group['post_date'] start, end = r.split('/') group['post_date'] = label group['date_begin'] = start group['date_end'] = end locale = get_lang(request.env).code start = pytz.UTC.localize(fields.Datetime.from_string(start)) tzinfo = pytz.timezone(request.context.get('tz', 'utc') or 'utc') group['month'] = babel.dates.format_datetime(start, format='MMMM', tzinfo=tzinfo, locale=locale) group['year'] = babel.dates.format_datetime(start, format='yyyy', tzinfo=tzinfo, locale=locale) return OrderedDict((year, [m for m in months]) for year, months in itertools.groupby(groups, lambda g: g['year'])) def _prepare_blog_values(self, blogs, blog=False, date_begin=False, date_end=False, tags=False, state=False, page=False, search=None): """ Prepare all values to display the blogs index page or one specific blog""" BlogPost = request.env['blog.post'] BlogTag = request.env['blog.tag'] # prepare domain domain = request.website.website_domain() if blog: domain += [('blog_id', '=', blog.id)] if date_begin and date_end: domain += [("post_date", ">=", date_begin), ("post_date", "<=", date_end)] active_tag_ids = tags and [unslug(tag)[1] for tag in tags.split(',')] or [] active_tags = BlogTag if active_tag_ids: active_tags = BlogTag.browse(active_tag_ids).exists() fixed_tag_slug = ",".join(slug(t) for t in active_tags) if fixed_tag_slug != tags: new_url = request.httprequest.full_path.replace("/tag/%s" % tags, "/tag/%s" % fixed_tag_slug, 1) if new_url != request.httprequest.full_path: # check that really replaced and avoid loop return request.redirect(new_url, 301) domain += [('tag_ids', 'in', active_tags.ids)] if request.env.user.has_group('website.group_website_designer'): count_domain = domain + [("website_published", "=", True), ("post_date", "<=", fields.Datetime.now())] published_count = BlogPost.search_count(count_domain) unpublished_count = BlogPost.search_count(domain) - published_count if state == "published": domain += [("website_published", "=", True), ("post_date", "<=", fields.Datetime.now())] elif state == "unpublished": domain += ['|', ("website_published", "=", False), ("post_date", ">", fields.Datetime.now())] else: domain += [("post_date", "<=", fields.Datetime.now())] use_cover = request.website.is_view_active('website_blog.opt_blog_cover_post') fullwidth_cover = request.website.is_view_active('website_blog.opt_blog_cover_post_fullwidth_design') # if blog, we show blog title, if use_cover and not fullwidth_cover we need pager + latest always offset = (page - 1) * self._blog_post_per_page first_post = BlogPost if not blog: first_post = BlogPost.search(domain + [('website_published', '=', True)], order="post_date desc, id asc", limit=1) if use_cover and not fullwidth_cover: offset += 1 if search: tags_like_search = BlogTag.search([('name', 'ilike', search)]) domain += ['|', '|', '|', ('author_name', 'ilike', search), ('name', 'ilike', search), ('content', 'ilike', search), ('tag_ids', 'in', tags_like_search.ids)] posts = BlogPost.search(domain, offset=offset, limit=self._blog_post_per_page, order="is_published desc, post_date desc, id asc") total = BlogPost.search_count(domain) pager = request.website.pager( url=request.httprequest.path.partition('/page/')[0], total=total, page=page, step=self._blog_post_per_page, ) if not blogs: all_tags = request.env['blog.tag'] else: all_tags = blogs.all_tags(join=True) if not blog else blogs.all_tags().get(blog.id, request.env['blog.tag']) tag_category = sorted(all_tags.mapped('category_id'), key=lambda category: category.name.upper()) other_tags = sorted(all_tags.filtered(lambda x: not x.category_id), key=lambda tag: tag.name.upper()) # for performance prefetch the first post with the others post_ids = (first_post | posts).ids return { 'date_begin': date_begin, 'date_end': date_end, 'first_post': first_post.with_prefetch(post_ids), 'other_tags': other_tags, 'tag_category': tag_category, 'nav_list': self.nav_list(), 'tags_list': self.tags_list, 'pager': pager, 'posts': posts.with_prefetch(post_ids), 'tag': tags, 'active_tag_ids': active_tags.ids, 'domain': domain, 'state_info': state and {"state": state, "published": published_count, "unpublished": unpublished_count}, 'blogs': blogs, 'blog': blog, 'search': search, 'search_count': total, } @http.route([ '/blog', '/blog/page/', '/blog/tag/', '/blog/tag//page/', '''/blog/''', '''/blog//page/''', '''/blog//tag/''', '''/blog//tag//page/''', ], type='http', auth="public", website=True, sitemap=True) def blog(self, blog=None, tag=None, page=1, search=None, **opt): Blog = request.env['blog.blog'] if blog and not blog.can_access_from_current_website(): raise werkzeug.exceptions.NotFound() blogs = Blog.search(request.website.website_domain(), order="create_date asc, id asc") if not blog and len(blogs) == 1: return werkzeug.utils.redirect('/blog/%s' % slug(blogs[0]), code=302) date_begin, date_end, state = opt.get('date_begin'), opt.get('date_end'), opt.get('state') if tag and request.httprequest.method == 'GET': # redirect get tag-1,tag-2 -> get tag-1 tags = tag.split(',') if len(tags) > 1: url = QueryURL('' if blog else '/blog', ['blog', 'tag'], blog=blog, tag=tags[0], date_begin=date_begin, date_end=date_end, search=search)() return request.redirect(url, code=302) values = self._prepare_blog_values(blogs=blogs, blog=blog, date_begin=date_begin, date_end=date_end, tags=tag, state=state, page=page, search=search) # in case of a redirection need by `_prepare_blog_values` we follow it if isinstance(values, werkzeug.wrappers.Response): return values if blog: values['main_object'] = blog values['edit_in_backend'] = True values['blog_url'] = QueryURL('', ['blog', 'tag'], blog=blog, tag=tag, date_begin=date_begin, date_end=date_end, search=search) else: values['blog_url'] = QueryURL('/blog', ['tag'], date_begin=date_begin, date_end=date_end, search=search) return request.render("website_blog.blog_post_short", values) @http.route(['''/blog//feed'''], type='http', auth="public", website=True, sitemap=True) def blog_feed(self, blog, limit='15', **kwargs): v = {} v['blog'] = blog v['base_url'] = blog.get_base_url() v['posts'] = request.env['blog.post'].search([('blog_id', '=', blog.id)], limit=min(int(limit), 50), order="post_date DESC") v['html2plaintext'] = html2plaintext r = request.render("website_blog.blog_feed", v, headers=[('Content-Type', 'application/atom+xml')]) return r @http.route([ '''/blog//post///post/new', type='http', auth="user", website=True) def blog_post_create(self, blog_id, **post): # Use sudo so this line prevents both editor and admin to access blog from another website # as browse() will return the record even if forbidden by security rules but editor won't # be able to access it if not request.env['blog.blog'].browse(blog_id).sudo().can_access_from_current_website(): raise werkzeug.exceptions.NotFound() new_blog_post = request.env['blog.post'].create({ 'blog_id': blog_id, 'is_published': False, }) return werkzeug.utils.redirect("/blog/%s/%s?enable_editor=1" % (slug(new_blog_post.blog_id), slug(new_blog_post))) @http.route('/blog/post_duplicate', type='http', auth="user", website=True, methods=['POST']) def blog_post_copy(self, blog_post_id, **post): """ Duplicate a blog. :param blog_post_id: id of the blog post currently browsed. :return redirect to the new blog created """ new_blog_post = request.env['blog.post'].with_context(mail_create_nosubscribe=True).browse(int(blog_post_id)).copy() return werkzeug.utils.redirect("/blog/%s/%s?enable_editor=1" % (slug(new_blog_post.blog_id), slug(new_blog_post))) @http.route(['/blog/render_latest_posts'], type='json', auth='public', website=True) def render_latest_posts(self, template, domain, limit=None, order='published_date desc'): dom = expression.AND([ [('website_published', '=', True), ('post_date', '<=', fields.Datetime.now())], request.website.website_domain() ]) if domain: dom = expression.AND([dom, domain]) posts = request.env['blog.post'].search(dom, limit=limit, order=order) return request.website.viewref(template)._render({'posts': posts})