Files
tradon/modules/inbound_email/inbound_email.py
2026-02-02 08:42:48 +01:00

325 lines
11 KiB
Python
Executable File

# This file is part of Tryton. The COPYRIGHT file at the top level of
# this repository contains the full copyright notices and license terms.
import base64
import email
import json
import re
import urllib
import uuid
from email.utils import getaddresses, parseaddr
from functools import partial
from trytond.config import config
from trytond.model import ModelSQL, ModelView, fields, sequence_ordered
from trytond.pool import Pool
from trytond.pyson import Eval
from trytond.transaction import Transaction
from trytond.url import http_host
import logging
logger = logging.getLogger(__name__)
if config.getboolean('inbound_email', 'filestore', default=True):
file_id = 'data_id'
store_prefix = config.get('inbound_email', 'store_prefix', default=None)
else:
file_id = store_prefix = None
class Inbox(ModelSQL, ModelView):
"Inbound Email Inbox"
__name__ = 'inbound.email.inbox'
name = fields.Char("Name", required=True)
identifier = fields.Char("Identifier", readonly=True)
endpoint = fields.Function(
fields.Char(
"Endpoint",
help="The URL where the emails must be posted."),
'on_change_with_endpoint')
rules = fields.One2Many(
'inbound.email.rule', 'inbox', "Rules",
help="The action of the first matching line is run.")
@classmethod
def __setup__(cls):
super().__setup__()
cls._buttons.update(
new_identifier={
'icon': 'tryton-refresh',
},
)
@fields.depends('identifier')
def on_change_with_endpoint(self, name=None):
if self.identifier:
url_part = {
'identifier': self.identifier,
'database_name': Transaction().database.name,
}
return http_host() + (
urllib.parse.quote(
'/%(database_name)s/inbound_email/inbox/%(identifier)s'
% url_part))
@classmethod
@ModelView.button
def new_identifier(cls, inboxes):
for inbox in inboxes:
if inbox.identifier:
inbox.identifier = None
else:
inbox.identifier = uuid.uuid4().hex
cls.save(inboxes)
def process(self, email_):
assert email_.inbox == self
for rule in self.rules:
if rule.match(email_.as_dict()):
logger.info("RULE_MATCHED:%s",rule)
email_.rule = rule
rule.run(email_)
return
def _email_text(message, type_='plain'):
if message.get_content_maintype() != 'multipart':
return message.get_payload()
for part in message.walk():
if part.get_content_type() == f'text/{type_}':
return part.get_payload()
def _email_attachments(message):
if message.get_content_maintype() != 'multipart':
return
for i, part in enumerate(message.walk()):
if part.get_content_maintype() == 'multipart':
continue
if 'attachment' not in part.get('Content-Disposition', '').lower():
continue
filename = part.get_filename()
yield {
'filename': filename,
'type': part.get_content_type(),
'data': part.get_payload(decode=True),
}
class Email(ModelSQL, ModelView):
"Inbound Email"
__name__ = 'inbound.email'
inbox = fields.Many2One(
'inbound.email.inbox', "Inbox",
required=True, readonly=True, ondelete='CASCADE')
data = fields.Binary(
"Data", file_id=file_id, store_prefix=store_prefix,
required=True, readonly=True)
data_id = fields.Char("Data ID", readonly=True)
data_type = fields.Selection([
('mailchimp', "Mailchimp"),
('mailpace', "MailPace"),
('postmark', "Postmark"),
('raw', "Raw"),
('sendgrid', "SendGrid"),
], "Data Type", required=True, readonly=True, translate=False)
rule = fields.Many2One('inbound.email.rule', "Rule", readonly=True)
result = fields.Reference("Result", selection='get_models', readonly=True)
@classmethod
def __setup__(cls):
super().__setup__()
cls._order = [('id', 'DESC')]
cls._buttons.update(
process={
'readonly': Eval('rule'),
'depends': ['rule'],
},
)
@classmethod
def get_models(cls):
return [(None, "")] + Pool().get('ir.model').get_name_items()
@classmethod
def from_webhook(cls, inbox, data, data_type):
emails = []
if data_type in {'raw', 'mailpace', 'sendgrid', 'postmark'}:
emails.append(cls(inbox=inbox, data=data, data_type=data_type))
elif data_type == 'mailchimp':
payload = json.loads(data)
for event in payload['mandrill_events']:
if event['event'] == 'inbound':
emails.append(cls(
inbox=inbox,
data=json.dumps(event),
data_type=data_type))
return emails
@classmethod
@ModelView.button
def process(cls, emails):
for email_ in emails:
if not email_.rule:
email_.inbox.process(email_)
cls.save(emails)
def as_dict(self):
value = {}
if self.data_type == 'raw':
value.update(self._as_dict(self.data))
elif self.data_type == 'mailchimp':
event = json.loads(self.data)
value.update(self._as_dict(event['raw_msg']))
elif self.data_type == 'mailpace':
payload = json.loads(self.data)
value.update(self._as_dict(payload['raw']))
elif self.data_type == 'postmark':
payload = json.loads(self.data)
value.update(self._as_dict_postmark(payload))
elif self.data_type == 'sendgrid':
payload = json.loads(self.data)
value.update(self._as_dict(payload['email']))
return value
def _as_dict(self, raw):
value = {}
if isinstance(raw, str):
message = email.message_from_string(raw)
else:
message = email.message_from_bytes(raw)
if 'From' in message:
value['from'] = parseaddr(message.get('From'))[1]
for key in ['To', 'Cc', 'Bcc']:
if key in message:
value[key.lower()] = [
a for _, a in getaddresses(message.get_all(key))]
if 'Subject' in message:
value['subject'] = message.get('Subject')
text = _email_text(message)
if text is not None:
value['text'] = text
html = _email_text(message, 'html')
if html is not None:
value['html'] = html
value['attachments'] = list(_email_attachments(message))
value['headers'] = dict(message.items())
return value
def _as_dict_postmark(self, payload):
value = {}
if 'FromFull' in payload:
value['from'] = payload['FromFull']['Email']
for key in ['To', 'Cc', 'Bcc']:
if f'{key}Full' in payload:
value[key.lower()] = [
a['Email'] for a in payload[f'{key}Full']]
if 'Subject' in payload:
value['subject'] = payload['Subject']
if 'TextBody' in payload:
value['text'] = payload['TextBody']
if 'HtmlBody' in payload:
value['html'] = payload['HtmlBody']
if 'Attachments' in payload:
value['attachments'] = [{
'filename': a['Name'],
'type': a['ContentType'],
'data': base64.b64decode(a['Content']),
} for a in payload['Attachments']]
if 'Headers' in payload:
value['headers'] = {
h['Name']: h['Value'] for h in payload['Headers']}
return value
class Rule(sequence_ordered(), ModelSQL, ModelView):
"Inbound Email Rule"
__name__ = 'inbound.email.rule'
inbox = fields.Many2One(
'inbound.email.inbox', "Inbox", required=True, ondelete='CASCADE')
origin = fields.Char(
"Origin",
help="A regular expression to match the sender email address.")
destination = fields.Char(
"Destination",
help="A regular expression to match any receiver email addresses.")
subject = fields.Char(
"Subject",
help="A regular expression to match the subject.")
attachment_name = fields.Char(
"Attachment Name",
help="A regular expression to match any attachment name.")
headers = fields.One2Many('inbound.email.rule.header', 'rule', "Headers")
action = fields.Selection([
(None, ""),
], "Action")
@classmethod
def __setup__(cls):
super().__setup__()
cls.__access__.add('inbox')
cls._order.insert(0, ('inbox.id', 'DESC'))
def match(self, email_):
flags = re.IGNORECASE
search = partial(re.search, flags=flags)
compile_ = partial(re.compile, flags=flags)
if self.origin:
if not search(self.origin, email_.get('from', '')):
return False
if self.destination:
destinations = [
*email_.get('to', []),
*email_.get('cc', []),
*email_.get('bcc', []),
]
pattern = compile_(self.destination)
if not any(pattern.search(d) for d in destinations):
return False
if self.subject:
if not search(self.subject, email_.get('subject', '')):
return False
if self.attachment_name:
pattern = compile_(self.attachment_name)
if not any(
pattern.search(a.get('filename', ''))
for a in email_.get('attachments', [])):
return False
if self.headers:
for header in self.headers:
if not search(
header.value,
email_.get('headers', {}).get(header.name, '')):
return False
return True
def run(self, email_):
pool = Pool()
if self.action:
model, method = self.action.split('|')
Model = pool.get(model)
email_.result = getattr(Model, method)(email_, self)
class RuleHeader(ModelSQL, ModelView):
"Inbound Email Rule Header"
__name__ = 'inbound.email.rule.header'
rule = fields.Many2One(
'inbound.email.rule', "Rule", required=True, ondelete='CASCADE')
name = fields.Char(
"Name", required=True,
help="The name of the header.")
value = fields.Char(
"Value",
help="A regular expression to match the header value.")
@classmethod
def __setup__(cls):
super().__setup__()
cls.__access__.add('rule')