import imaplib import time import requests import os import email from email import policy from email.message import EmailMessage from datetime import datetime from email.header import decode_header, make_header IMAP_SERVER = "imap.gmail.com" IMAP_PORT = 993 EMAIL_ACCOUNT = os.getenv("EMAIL_ACCOUNT") EMAIL_PASSWORD = os.getenv("EMAIL_PASSWORD") TRYTON_ENDPOINT = os.getenv("TRYTON_ENDPOINT") INTERVAL = int(os.getenv("INTERVAL", "30")) # secondes LOG_FILE = "/app/logs/app.log" def log(msg): timestamp = datetime.now().isoformat() line = f"[{timestamp}] {msg}" print(line, flush=True) with open(LOG_FILE, "a") as f: f.write(line + "\n") def connect_imap(): log("Connexion à Gmail via IMAP...") mail = imaplib.IMAP4_SSL(IMAP_SERVER, IMAP_PORT) mail.login(EMAIL_ACCOUNT, EMAIL_PASSWORD) mail.select("INBOX") log("Connexion IMAP réussie.") return mail def fetch_unread_messages(mail): status, data = mail.search(None, '(UNSEEN)') if status != "OK": log("Erreur lors de la recherche de mails non lus.") return [] msg_ids = data[0].split() log(f"{len(msg_ids)} mails non lus récupérés.") return msg_ids def get_raw_message(mail, msg_id): status, data = mail.fetch(msg_id, '(RFC822)') if status != "OK": log(f"Erreur récupération message {msg_id}") return None raw = data[0][1] log(f"Mail {msg_id.decode()} récupéré ({len(raw)} bytes)") return raw def post_to_tryton(raw_message): headers = {"Content-Type": "message/rfc822"} log(f"Envoi du mail vers Tryton : {TRYTON_ENDPOINT}") try: r = requests.post(TRYTON_ENDPOINT, data=raw_message, headers=headers) except Exception as e: log(f"❌ Erreur réseau lors de l'envoi à Tryton : {e}") return False log(f"Réponse HTTP Tryton : {r.status_code}") if r.status_code != 200: log(f"Corps de la réponse Tryton : {r.text[:500]}") return False log("✔ Envoi réussi vers Tryton.") return True def decode_clean_header(value): if not value: return "" try: value = str(make_header(decode_header(value))) except Exception: value = str(value) return value.replace("\r", " ").replace("\n", " ").strip() def split_and_post_attachments(raw_message): msg = email.message_from_bytes(raw_message, policy=policy.default) original_from = decode_clean_header(msg.get("From")) original_to = decode_clean_header(msg.get("To")) original_subject = decode_clean_header(msg.get("Subject")) attachments = [] for part in msg.walk(): if part.get_content_disposition() == "attachment": filename = part.get_filename() content = part.get_payload(decode=True) content_type = part.get_content_type() if filename and content: attachments.append((filename, content, content_type)) if not attachments: log("Aucune pièce jointe trouvée, envoi du mail complet.") return post_to_tryton(raw_message) log(f"{len(attachments)} pièce(s) jointe(s) trouvée(s).") all_success = True for filename, content, content_type in attachments: try: maintype, subtype = content_type.split("/", 1) except ValueError: maintype, subtype = "application", "octet-stream" new_msg = EmailMessage(policy=policy.SMTP) new_msg["Subject"] = original_subject or "Document" new_msg["From"] = original_from new_msg["To"] = original_to # Corps ultra simple new_msg.set_content("Document attached.") new_msg.add_attachment( content, maintype=maintype, subtype=subtype, filename=filename.replace("'", "").replace('"', "").replace("\n", " ").replace("\r", " ") ) log(f"Envoi vers Tryton de la pièce jointe : {filename}") if not post_to_tryton(new_msg.as_bytes(policy=policy.SMTP)): all_success = False return all_success def main_loop(): log("🚀 Worker Gmail → Tryton démarré.") while True: try: mail = connect_imap() unread_ids = fetch_unread_messages(mail) if not unread_ids: log("Aucun nouvel email à traiter.") for msg_id in unread_ids: raw = get_raw_message(mail, msg_id) if not raw: continue success = split_and_post_attachments(raw) if success: mail.store(msg_id, '+FLAGS', '\\Seen') log(f"Mail {msg_id.decode()} marqué comme lu.") else: log(f"Mail {msg_id.decode()} NON marqué comme lu (échec partiel ou total).") mail.logout() except Exception as e: log(f"❌ Erreur générale : {e}") log(f"⏳ Pause {INTERVAL} secondes avant la prochaine vérification...") time.sleep(INTERVAL) if __name__ == "__main__": main_loop()