# This file is part of Tryton. The COPYRIGHT file at the top level of # this repository contains the full copyright notices and license terms. import heapq import mimetypes import re from email.message import EmailMessage from email.utils import getaddresses try: import html2text except ImportError: html2text = None from genshi.template import TextTemplate from trytond.config import config from trytond.i18n import gettext from trytond.model import EvalEnvironment, ModelSQL, ModelView, fields from trytond.model.exceptions import AccessError, ValidationError from trytond.pool import Pool from trytond.pyson import Bool, Eval, PYSONDecoder from trytond.report import Report from trytond.rpc import RPC from trytond.sendmail import send_message_transactional from trytond.tools import escape_wildcard from trytond.tools.email_ import ( convert_ascii_email, format_address, set_from_header) from trytond.tools.string_ import StringMatcher from trytond.transaction import Transaction from .resource import ResourceAccessMixin HTML_EMAIL = """ %(subject)s %(body)s


%(signature)s """ specialsre = re.compile(r'[][\\()<>@,:;".]') escapesre = re.compile(r'[\\"]') class EmailTemplateError(ValidationError): pass def _formataddr(pair): "Format address without encoding" name, address = pair convert_ascii_email(address).encode('ascii') if name: quotes = '' if specialsre.search(name): quotes = '"' name = escapesre.sub(r'\\\g<0>', name) return '%s%s%s <%s>' % (quotes, name, quotes, address) return address class Email(ResourceAccessMixin, ModelSQL, ModelView): "Email" __name__ = 'ir.email' user = fields.Function(fields.Char("User"), 'get_user') at = fields.Function(fields.DateTime("At"), 'get_at') recipients = fields.Char("Recipients", readonly=True) recipients_secondary = fields.Char("Secondary Recipients", readonly=True) recipients_hidden = fields.Char("Hidden Recipients", readonly=True) addresses = fields.One2Many( 'ir.email.address', 'email', "Addresses", readonly=True) subject = fields.Char("Subject", readonly=True) body = fields.Text("Body", readonly=True) @classmethod def __setup__(cls): super().__setup__() cls._order.insert(0, ('create_date', 'DESC')) cls.__rpc__.update({ 'send': RPC(readonly=False, result=int), 'complete': RPC(check_access=False), }) del cls.__rpc__['create'] def get_user(self, name): return self.create_uid.rec_name def get_at(self, name): return self.create_date.replace(microsecond=0) @classmethod def send(cls, to='', cc='', bcc='', subject='', body='', files=None, record=None, reports=None, attachments=None): pool = Pool() User = pool.get('res.user') ActionReport = pool.get('ir.action.report') Attachment = pool.get('ir.attachment') transaction = Transaction() user = User(transaction.user) Model = pool.get(record[0]) record = Model(record[1]) msg = EmailMessage() body_html = HTML_EMAIL % { 'subject': subject, 'body': body, 'signature': user.signature or '', } if html2text: body_text = HTML_EMAIL % { 'subject': subject, 'body': body, 'signature': '', } converter = html2text.HTML2Text() body_text = converter.handle(body_text) if user.signature: body_text += '\n-- \n' + converter.handle(user.signature) msg.add_alternative(body_text, subtype='plain') if msg.is_multipart(): msg.add_alternative(body_html, subtype='html') else: msg.set_content(body_html, subtype='html') if files or reports or attachments: if files is None: files = [] else: files = list(files) for report_id in (reports or []): report = ActionReport(report_id) Report = pool.get(report.report_name, type='report') ext, content, _, title = Report.execute( [record.id], { 'action_id': report.id, }) name = '%s.%s' % (title, ext) if isinstance(content, str): content = content.encode('utf-8') files.append((name, content)) if attachments: files += [ (a.name, a.data) for a in Attachment.browse(attachments)] for name, data in files: ctype, _ = mimetypes.guess_type(name) if not ctype: ctype = 'application/octet-stream' maintype, subtype = ctype.split('/', 1) msg.add_attachment( data, maintype=maintype, subtype=subtype, filename=('utf-8', '', name)) from_ = config.get('email', 'from') set_from_header(msg, from_, user.email or from_) msg['To'] = [format_address(a, n) for n, a in getaddresses([to])] msg['Cc'] = [format_address(a, n) for n, a in getaddresses([cc])] msg['Bcc'] = [format_address(a, n) for n, a in getaddresses([bcc])] msg['Subject'] = subject send_message_transactional(msg, strict=True) email = cls.from_message(msg, body=body, resource=record) email.save() if files: attachments_ = [] for name, data in files: attachments_.append( Attachment(resource=email, name=name, data=data)) Attachment.save(attachments_) return email @classmethod def complete(cls, text, limit): limit = int(limit) if not limit > 0: raise ValueError('limit must be > 0: %r' % (limit,)) emails = getaddresses([text]) if not emails: return [] name, email = map(str.strip, emails[-1]) if not name and not email: return [] s = StringMatcher() try: s.set_seq2(_formataddr((name, email))) except UnicodeEncodeError: return [] def generate(name, email): for name, email in cls._match(name, email): try: address = _formataddr((name, email)) except UnicodeEncodeError: continue s.set_seq1(address) yield ( s.ratio(), address, ', '.join(map(_formataddr, emails[:-1] + [(name, email)]))) return heapq.nlargest(limit, generate(name, email)) @classmethod def _match(cls, name, email): pool = Pool() User = pool.get('res.user') domain = ['OR'] for field in ['name', 'login', 'email']: for value in [name, email]: if value and len(value) >= 3: domain.append( (field, 'ilike', '%' + escape_wildcard(value) + '%')) for user in User.search([ ('email', '!=', ''), domain, ], order=[]): yield user.name, user.email @classmethod def from_message(cls, msg, **values): to_addrs = [e for _, e in getaddresses( filter(None, (msg['To'], msg['Cc'], msg['Bcc'])))] return cls( recipients=msg['To'], recipients_secondary=msg['Cc'], recipients_hidden=msg['Bcc'], addresses=[{'address': a} for a in to_addrs], subject=msg['Subject'], **values) class EmailAddress(ModelSQL): "Email Address" __name__ = 'ir.email.address' email = fields.Many2One( 'ir.email', "E-mail", required=True, ondelete='CASCADE') address = fields.Char("Address", required=True) class EmailTemplate(ModelSQL, ModelView): "Email Template" __name__ = 'ir.email.template' model = fields.Many2One('ir.model', "Model", required=True) name = fields.Char("Name", required=True, translate=True) recipients = fields.Many2One( 'ir.model.field', "Recipients", states={ 'invisible': Bool(Eval('recipients_pyson')), }, depends=['recipients_pyson'], help="The field that contains the recipient(s).") recipients_pyson = fields.Char( "Recipients", states={ 'invisible': Bool(Eval('recipients')), }, depends=['recipients'], help="A PYSON expression that generates a list of recipients " 'with the record represented by "self".') recipients_secondary = fields.Many2One( 'ir.model.field', "Secondary Recipients", states={ 'invisible': Bool(Eval('recipients_secondary_pyson')), }, depends=['recipients_secondary_pyson'], help="The field that contains the secondary recipient(s).") recipients_secondary_pyson = fields.Char( "Secondary Recipients", states={ 'invisible': Bool(Eval('recipients_secondary')), }, depends=['recipients_secondary'], help="A PYSON expression that generates a list " 'of secondary recipients with the record represented by "self".') recipients_hidden = fields.Many2One( 'ir.model.field', "Hidden Recipients", states={ 'invisible': Bool(Eval('recipients_hidden_pyson')), }, depends=['recipients_hidden_pyson'], help="The field that contains the secondary recipient(s).") recipients_hidden_pyson = fields.Char( "Hidden Recipients", states={ 'invisible': Bool(Eval('recipients_hidden')), }, depends=['recipients_hidden'], help="A PYSON expression that generates a list of hidden recipients " 'with the record represented by "self".') subject = fields.Char("Subject", translate=True) body = fields.Text("Body", translate=True) reports = fields.Many2Many( 'ir.email.template-ir.action.report', 'template', 'report', "Reports", domain=[ ('model', '=', Eval('model_name')), ], depends=['model_name']) model_name = fields.Function( fields.Char("Model Name"), 'on_change_with_model_name') @classmethod def __setup__(cls): super().__setup__() for field in [ 'recipients', 'recipients_secondary', 'recipients_hidden', ]: field = getattr(cls, field) field.domain = [ ('model_ref.id', '=', Eval('model', -1)), ['OR', ('relation', 'in', cls.email_models()), [ ('model', 'in', cls.email_models()), ('name', '=', 'id'), ], ] ] field.depends.add('model') cls.__rpc__.update({ 'get': RPC(instantiate=0), 'get_default': RPC(), }) @fields.depends('model') def on_change_with_model_name(self, name=None): if self.model: return self.model.model @classmethod def validate_fields(cls, templates, field_names): super().validate_fields(templates, field_names) cls.check_subject(templates, field_names) cls.check_body(templates, field_names) cls.check_fields_pyson(templates, field_names) @classmethod def check_subject(cls, templates, field_names=None): if field_names and 'subject' not in field_names: return for template in templates: if not template.subject: continue try: TextTemplate(template.subject) except Exception as exception: raise EmailTemplateError(gettext( 'ir.msg_email_template_invalid_subject', template=template.rec_name, exception=exception)) from exception @classmethod def check_body(self, templates, field_names=None): if field_names and 'body' not in field_names: return for template in templates: if not template.body: continue try: TextTemplate(template.body) except Exception as exception: raise EmailTemplateError(gettext( 'ir.msg_email_template_invalid_body', template=template.rec_name, exception=exception)) from exception @classmethod def check_fields_pyson(cls, templates, field_names=None): pyson_fields = { 'recipients_pyson', 'recipients_secondary_pyson', 'recipients_hidden_pyson', } if field_names: pyson_fields &= field_names if not pyson_fields: return encoder = PYSONDecoder(noeval=True) for template in templates: for field in pyson_fields: value = getattr(template, field) if not value: continue try: pyson = encoder.decode(value) except Exception as exception: raise EmailTemplateError( gettext('ir.msg_email_template_invalid_field_pyson', template=template.rec_name, field=cls.__names__(field)['field'], exception=exception)) from exception if not isinstance(pyson, list) and pyson.types() != {list}: raise EmailTemplateError(gettext( 'ir.msg_email_template_invalid_field_pyson_type', template=template.rec_name, field=cls.__names__(field)['field'], )) def get(self, record): pool = Pool() Model = pool.get(self.model.model) record = Model(int(record)) values = {} for attr, key in [ ('recipients', 'to'), ('recipients_secondary', 'cc'), ('recipients_hidden', 'bcc'), ]: field = getattr(self, attr) try: if field: if field.name == 'id': value = record else: value = getattr(record, field.name, None) if value: values[key] = self.get_addresses(value) else: value = getattr(self, attr + '_pyson') if value: value = self.eval(record, value) if value: values[key] = self.get_addresses(value) except AccessError: continue if self.subject: try: values['subject'] = (TextTemplate(self.subject) .generate(**self.get_context(record)) .render()) except AccessError: pass if self.body: try: values['body'] = (TextTemplate(self.body) .generate(**self.get_context(record)) .render()) except AccessError: pass if self.reports: values['reports'] = [r.id for r in self.reports] return values def get_context(self, record): pool = Pool() User = pool.get('res.user') return { 'context': Transaction().context, 'user': User(Transaction().user), 'record': record, 'format_date': Report.format_date, 'format_datetime': Report.format_datetime, 'format_timedelta': Report.format_timedelta, 'format_currency': Report.format_currency, 'format_number': Report.format_number, } def eval(self, record, pyson, _env=None): 'Evaluate the pyson with the record' if _env is None: env = {} else: env = _env.copy() env['context'] = Transaction().context env['self'] = EvalEnvironment(record, record.__class__) return PYSONDecoder(env).decode(pyson) @classmethod def _get_default_exclude(cls, record): return ['create_uid', 'write_uid'] @classmethod def get_default(cls, model, record): pool = Pool() Field = pool.get('ir.model.field') Model = pool.get(model) record = Model(int(record)) values = {} fields = Field.search([ ('model.model', '=', model), ('name', 'not in', cls._get_default_exclude(record)), ['OR', ('relation', 'in', cls.email_models()), [ ('model.model', 'in', cls.email_models()), ('name', '=', 'id'), ], ], ]) addresses = set() for field in fields: try: if field.name == 'id': value = record else: value = getattr(record, field.name) addresses.update(cls.get_addresses(value)) except AccessError: pass values['to'] = list(addresses) try: values['subject'] = '%s: %s' % ( Model.__names__()['model'], record.rec_name) except AccessError: pass return values @classmethod def email_models(cls): return ['res.user'] @classmethod def get_addresses(cls, value): if isinstance(value, (list, tuple)): addresses = (cls._get_address(v) for v in value) else: addresses = [cls._get_address(value)] return [ _formataddr((name, email)) for name, email in filter(None, addresses) if email] @classmethod def _get_address(cls, record): pool = Pool() User = pool.get('res.user') if isinstance(record, str): return (None, record) elif isinstance(record, User) and record.email: return (record.name, record.email) @classmethod def get_languages(cls, value): pool = Pool() Configuration = pool.get('ir.configuration') Lang = pool.get('ir.lang') if isinstance(value, (list, tuple)): languages = {cls._get_language(v) for v in value} else: languages = {cls._get_language(value)} languages = list(filter(None, languages)) if not languages: return Lang.search([ ('code', '=', Configuration.get_language()), ], limit=1) return languages @classmethod def _get_language(cls, record): pool = Pool() User = pool.get('res.user') if isinstance(record, User) and record.language: return record.language @classmethod def create(cls, vlist): ModelView._view_toolbar_get_cache.clear() return super().create(vlist) @classmethod def write(cls, *args): if any({'name', 'model'} & v.keys() for v in args[1:None:2]): ModelView._view_toolbar_get_cache.clear() super().write(*args) @classmethod def delete(cls, records): ModelView._view_toolbar_get_cache.clear() super().delete(records) class EmailTemplate_Report(ModelSQL): "Email Template - Report" __name__ = 'ir.email.template-ir.action.report' template = fields.Many2One( 'ir.email.template', "Template", required=True, ondelete='CASCADE') report = fields.Many2One( 'ir.action.report', "Report", required=True, ondelete='CASCADE')