328 lines
11 KiB
Python
Executable File
328 lines
11 KiB
Python
Executable File
# This file is part of Tryton. The COPYRIGHT file at the top level of
|
|
# this repository contains the full copyright notices and license terms.
|
|
|
|
import locale
|
|
from io import BytesIO
|
|
from itertools import zip_longest
|
|
|
|
from lxml import etree
|
|
from pypdf import PdfReader, PdfWriter
|
|
from zeep.exceptions import Fault
|
|
|
|
from trytond.i18n import gettext
|
|
from trytond.model import fields
|
|
from trytond.model.exceptions import AccessError
|
|
from trytond.modules.stock_package_shipping.exceptions import (
|
|
PackingValidationError)
|
|
from trytond.pool import Pool, PoolMeta
|
|
from trytond.transaction import Transaction
|
|
from trytond.wizard import StateAction, StateTransition, Wizard
|
|
|
|
from .configuration import SHIPMENT_SERVICE, get_client
|
|
from .exceptions import DPDError
|
|
|
|
TRACKING_URL = 'https://tracking.dpd.de/status/%(code)s/parcel/%(reference)s'
|
|
|
|
|
|
def iter_pdf_pages(document):
|
|
if hasattr(document, 'pages'):
|
|
yield from document.pages
|
|
else:
|
|
for i in range(document.getNumPages()):
|
|
yield document.getPage(i)
|
|
|
|
|
|
class Package(metaclass=PoolMeta):
|
|
__name__ = 'stock.package'
|
|
|
|
def get_shipping_tracking_url(self, name):
|
|
pool = Pool()
|
|
ShipmentOut = pool.get('stock.shipment.out')
|
|
ShipmentInReturn = pool.get('stock.shipment.in.return')
|
|
url = super().get_shipping_tracking_url(name)
|
|
if (self.shipping_reference
|
|
and self.shipment
|
|
and self.shipment.id >= 0
|
|
and self.shipment.carrier
|
|
and self.shipment.carrier.shipping_service == 'dpd'):
|
|
party = address = None
|
|
if isinstance(self.shipment, ShipmentOut):
|
|
party = self.shipment.customer
|
|
address = self.shipment.delivery_address
|
|
elif isinstance(self.shipment, ShipmentInReturn):
|
|
party = self.shipment.supplier
|
|
address = self.shipment.delivery_address
|
|
if party and party.lang:
|
|
lang_code = party.lang.code
|
|
else:
|
|
lang_code = Transaction().language
|
|
if address and address.country:
|
|
code = '_'.join(
|
|
(lang_code.split('_')[0], address.country.code))
|
|
else:
|
|
code = lang_code
|
|
url = TRACKING_URL % {
|
|
'code': code,
|
|
'reference': self.shipping_reference,
|
|
}
|
|
return url
|
|
|
|
|
|
class ShippingDPDMixin:
|
|
__slots__ = ()
|
|
|
|
def validate_packing_dpd(self):
|
|
warehouse = self.shipping_warehouse
|
|
if not warehouse.address:
|
|
raise PackingValidationError(
|
|
gettext('stock_package_shipping_dpd'
|
|
'.msg_warehouse_address_required',
|
|
shipment=self.rec_name,
|
|
warehouse=warehouse.rec_name))
|
|
|
|
|
|
class ShipmentOut(ShippingDPDMixin, metaclass=PoolMeta):
|
|
__name__ = 'stock.shipment.out'
|
|
|
|
|
|
class ShipmentInReturn(ShippingDPDMixin, metaclass=PoolMeta):
|
|
__name__ = 'stock.shipment.in.return'
|
|
|
|
|
|
class CreateShipping(metaclass=PoolMeta):
|
|
__name__ = 'stock.shipment.create_shipping'
|
|
|
|
dpd = StateAction(
|
|
'stock_package_shipping_dpd.act_create_shipping_dpd_wizard')
|
|
|
|
def transition_start(self):
|
|
next_state = super(CreateShipping, self).transition_start()
|
|
if self.record.carrier.shipping_service == 'dpd':
|
|
next_state = 'dpd'
|
|
return next_state
|
|
|
|
def do_dpd(self, action):
|
|
ctx = Transaction().context
|
|
return action, {
|
|
'model': ctx['active_model'],
|
|
'id': ctx['active_id'],
|
|
'ids': [ctx['active_id']],
|
|
}
|
|
|
|
|
|
class CreateDPDShipping(Wizard):
|
|
'Create DPD Shipping'
|
|
__name__ = 'stock.shipment.create_shipping.dpd'
|
|
|
|
start = StateTransition()
|
|
|
|
def transition_start(self):
|
|
pool = Pool()
|
|
Package = pool.get('stock.package')
|
|
|
|
shipment = self.record
|
|
if shipment.shipping_reference:
|
|
raise AccessError(
|
|
gettext('stock_package_shipping_dpd'
|
|
'.msg_shipment_has_reference_number',
|
|
shipment=shipment.rec_name))
|
|
|
|
credential = self.get_credential(shipment)
|
|
if not credential.depot or not credential.token:
|
|
credential.update_token()
|
|
|
|
carrier = shipment.carrier
|
|
shipping_client = get_client(credential.server, SHIPMENT_SERVICE)
|
|
print_options = self.get_print_options(shipment)
|
|
packages = shipment.root_packages
|
|
shipment_data = self.get_shipment_data(credential, shipment, packages)
|
|
|
|
count = 0
|
|
while count < 2:
|
|
lang = (credential.company.party.lang.code
|
|
if credential.company.party.lang else 'en')
|
|
lang = locale.normalize(lang)[:5]
|
|
authentication = {
|
|
'delisId': credential.user_id,
|
|
'authToken': credential.token,
|
|
'messageLanguage': lang,
|
|
}
|
|
try:
|
|
shipment_response = shipping_client.service.storeOrders(
|
|
print_options, shipment_data, _soapheaders={
|
|
'authentication': authentication,
|
|
})
|
|
break
|
|
except Fault as e:
|
|
if e.detail:
|
|
tag = etree.QName(e.detail[0].tag)
|
|
if tag.localname == 'authenticationFault':
|
|
count += 1
|
|
credential.update_token()
|
|
continue
|
|
raise DPDError(gettext(
|
|
'stock_package_shipping_dpd.'
|
|
'msg_dpd_webservice_error',
|
|
message=e.message)) from e
|
|
else:
|
|
raise DPDError(
|
|
gettext('stock_package_shipping_dpd.msg_dpd_login_error',
|
|
credential=credential.rec_name))
|
|
|
|
response, = shipment_response.shipmentResponses
|
|
if response.faults:
|
|
message = '\n'.join(f.message for f in response.faults)
|
|
raise DPDError(
|
|
gettext('stock_package_shipping_dpd.msg_dpd_webservice_error',
|
|
message=message))
|
|
|
|
labels = []
|
|
labels_pdf = BytesIO(shipment_response.parcellabelsPDF)
|
|
reader = PdfReader(labels_pdf)
|
|
for page in iter_pdf_pages(reader):
|
|
new_pdf = PdfWriter()
|
|
new_label = BytesIO()
|
|
new_pdf.add_page(page)
|
|
new_pdf.write(new_label)
|
|
labels.append(new_label)
|
|
|
|
shipment.shipping_reference = response.mpsId
|
|
parcels = response.parcelInformation
|
|
for package, label, parcel in zip_longest(packages, labels, parcels):
|
|
package.shipping_label = fields.Binary.cast(label.getvalue())
|
|
package.shipping_label_mimetype = (
|
|
carrier.shipping_label_mimetype)
|
|
package.shipping_reference = parcel.parcelLabelNumber
|
|
Package.save(packages)
|
|
shipment.save()
|
|
|
|
return 'end'
|
|
|
|
def get_credential_pattern(self, shipment):
|
|
return {
|
|
'company': shipment.company.id,
|
|
}
|
|
|
|
def get_credential(self, shipment):
|
|
pool = Pool()
|
|
DPDCredential = pool.get('carrier.credential.dpd')
|
|
|
|
credential_pattern = self.get_credential_pattern(shipment)
|
|
for credential in DPDCredential.search([]):
|
|
if credential.match(credential_pattern):
|
|
return credential
|
|
|
|
def get_print_options(self, shipment):
|
|
return {
|
|
'printerLanguage': shipment.carrier.dpd_printer_language,
|
|
'paperFormat': shipment.carrier.dpd_paper_format,
|
|
}
|
|
|
|
def shipping_party(self, party, address, usage=None):
|
|
shipping_party = {
|
|
'name1': address.party_full_name[:35],
|
|
'name2': '',
|
|
'street': ' '.join((address.street or '').splitlines())[:35],
|
|
'country': address.country.code if address.country else '',
|
|
'zipCode': address.postal_code[:9],
|
|
'city': address.city[:35],
|
|
}
|
|
if party.full_name != address.party_full_name:
|
|
shipping_party['name2'] = party.full_name[:35]
|
|
|
|
phone = address.contact_mechanism_get({'phone', 'mobile'}, usage=usage)
|
|
if phone and len(phone.value) <= 30:
|
|
shipping_party['phone'] = phone.value
|
|
email = address.contact_mechanism_get('email', usage=usage)
|
|
if email and len(email.value) <= 50:
|
|
shipping_party['email'] = email.value
|
|
|
|
return shipping_party
|
|
|
|
def get_parcel(self, package):
|
|
pool = Pool()
|
|
UoM = pool.get('product.uom')
|
|
ModelData = pool.get('ir.model.data')
|
|
|
|
cm = UoM(ModelData.get_id('product', 'uom_centimeter'))
|
|
|
|
parcel = {}
|
|
|
|
if package.total_weight:
|
|
# in grams rounded in 10 gram units
|
|
weight = int(package.total_weight * 10) * 10
|
|
if weight < 1000000000:
|
|
parcel['weight'] = weight
|
|
|
|
if (package.length is not None
|
|
and package.width is not None
|
|
and package.height is not None):
|
|
length = UoM.compute_qty(
|
|
package.length_uom, package.length, cm)
|
|
width = UoM.compute_qty(
|
|
package.width_uom, package.width, cm)
|
|
height = UoM.compute_qty(
|
|
package.height_uom, package.height, cm)
|
|
if length < 1000 and width < 1000 and height < 1000:
|
|
parcel['volume'] = int(
|
|
'%03i%03i%03i' % (length, width, height))
|
|
|
|
return parcel
|
|
|
|
def get_shipment_data(self, credential, shipment, packages):
|
|
return {
|
|
'generalShipmentData': {
|
|
'identificationNumber': shipment.number,
|
|
'sendingDepot': credential.depot,
|
|
'product': shipment.carrier.dpd_product,
|
|
'sender': self.shipping_party(
|
|
shipment.company.party,
|
|
shipment.shipping_warehouse.address),
|
|
'recipient': self.shipping_party(
|
|
shipment.shipping_to, shipment.shipping_to_address),
|
|
},
|
|
'parcels': [self.get_parcel(p) for p in packages],
|
|
'productAndServiceData': {
|
|
'orderType': 'consignment',
|
|
**self.get_notification(shipment),
|
|
},
|
|
}
|
|
|
|
def get_notification(self, shipment, usage=None):
|
|
carrier = shipment.carrier
|
|
if not carrier.dpd_notification:
|
|
return {}
|
|
party = shipment.shipping_to
|
|
if party and party.lang:
|
|
lang_code = party.lang.code
|
|
else:
|
|
lang_code = Transaction().language
|
|
lang_code = lang_code.upper()
|
|
channel2type = {
|
|
'sms': {'mobile'},
|
|
}
|
|
|
|
channels = [
|
|
(1, 'email'),
|
|
(3, 'sms'),
|
|
]
|
|
if carrier.dpd_notification == 'sms':
|
|
channels = reversed(channels)
|
|
|
|
for channel_id, channel in channels:
|
|
mechanism = party.contact_mechanism_get(
|
|
channel2type.get(channel, channel), usage=usage)
|
|
if not mechanism:
|
|
continue
|
|
value = mechanism.value
|
|
if len(value) > 50:
|
|
continue
|
|
return {
|
|
'predict': {
|
|
'channel': channel_id,
|
|
'value': value,
|
|
'language': lang_code,
|
|
},
|
|
}
|
|
return {}
|