import base64 import hmac import time from django.conf import settings from import BaseRequest from import digest, as_request from common.utils import get_logger from users.utils import construct_user_email, flatten_dict, map_attributes logger = get_logger(__file__) def sign(secret, data): digest = hmac.HMAC( key=secret.encode('utf8'), msg=data.encode('utf8'), digestmod=hmac._hashlib.sha256 ).digest() signature = base64.standard_b64encode(digest).decode('utf8') # signature = urllib.parse.quote(signature, safe='') # signature = signature.replace('+', '%20').replace('*', '%2A').replace('~', '%7E').replace('/', '%2F') return signature class ErrorCode: INVALID_TOKEN = 88 class URL: QR_CONNECT = '' OAUTH_CONNECT = '' GET_USER_ACCESSTOKEN = '' GET_USER_INFO = '' GET_TOKEN = '' SEND_MESSAGE_BY_TEMPLATE = '' SEND_MESSAGE = '' GET_SEND_MSG_PROGRESS = '' GET_USERID_BY_UNIONID = '' GET_USER_INFO_BY_USER_ID = '' class DingTalkRequests(BaseRequest): invalid_token_errcodes = (ErrorCode.INVALID_TOKEN,) msg_key = 'errmsg' def __init__(self, appid, appsecret, agentid, timeout=None): self._appid = appid or '' self._appsecret = appsecret or '' self._agentid = agentid or '' super().__init__(timeout=timeout) def get_access_token_cache_key(self): return digest(self._appid, self._appsecret) def request_access_token(self): # params = {'appkey': self._appid, 'appsecret': self._appsecret} data = self.raw_request('get', url=URL.GET_TOKEN, params=params) access_token = data['access_token'] expires_in = data['expires_in'] return access_token, expires_in def add_token(self, kwargs: dict): params = kwargs.get('params') if params is None: params = {} kwargs['params'] = params params['access_token'] = self.access_token def get(self, url, params=None, with_token=False, with_sign=False, check_errcode_is_0=True, **kwargs) -> dict: pass get = as_request(get) def post(self, url, json=None, params=None, with_token=False, with_sign=False, check_errcode_is_0=True, **kwargs) -> dict: pass post = as_request(post) def _add_sign(self, kwargs: dict): params = kwargs.get('params') if params is None: params = {} kwargs['params'] = params timestamp = str(int(time.time() * 1000)) signature = sign(self._appsecret, timestamp) params['timestamp'] = timestamp params['signature'] = signature params['accessKey'] = self._appid def request(self, method, url, with_token=False, with_sign=False, check_errcode_is_0=True, **kwargs): if with_sign: self._add_sign(kwargs) data = super().request( method, url, with_token=with_token, check_errcode_is_0=check_errcode_is_0, **kwargs ) return data class DingTalk: attributes = settings.DINGTALK_RENAME_ATTRIBUTES def __init__(self, appid, appsecret, agentid, timeout=None): self._appid = appid or '' self._appsecret = appsecret or '' self._agentid = agentid or '' self._request = DingTalkRequests( appid=appid, appsecret=appsecret, agentid=agentid, timeout=timeout ) def get_userinfo_bycode(self, code): body = { 'clientId': self._appid, 'clientSecret': self._appsecret, 'code': code, 'grantType': 'authorization_code' } data =, json=body, check_errcode_is_0=False) token = data['accessToken'] user = self._request.get(URL.GET_USER_INFO, headers={'x-acs-dingtalk-access-token': token}, check_errcode_is_0=False) return user def get_user_id_by_code(self, code): user_info = self.get_userinfo_bycode(code) unionid = user_info['unionId'] userid = self.get_userid_by_unionid(unionid) return userid, None def get_userid_by_unionid(self, unionid): body = { 'unionid': unionid } data =, json=body, with_token=True) userid = data['result']['userid'] return userid def send_by_template(self, template_id, user_ids, dept_ids, data): body = { 'agent_id': self._agentid, 'template_id': template_id, 'userid_list': ','.join(user_ids), 'dept_id_list': ','.join(dept_ids), 'data': data } data =, json=body, with_token=True) return data def send_markdown(self, user_ids, title, msg): body = { 'agent_id': self._agentid, 'userid_list': ','.join(user_ids), 'to_all_user': False, 'msg': { 'msgtype': 'markdown', 'markdown': { 'title': title, 'text': msg } } }'Dingtalk send markdown to user {user_ids}: {msg}') data =, json=body, with_token=True) return data def send_text(self, user_ids, msg): body = { 'agent_id': self._agentid, 'userid_list': ','.join(user_ids), 'to_all_user': False, 'msg': { 'msgtype': 'text', 'text': { 'content': msg } } }'Dingtalk send msg to user {user_ids}: {msg}') data =, json=body, with_token=True) return data def get_send_msg_progress(self, task_id): body = { 'agent_id': self._agentid, 'task_id': task_id } data =, json=body, with_token=True) return data @staticmethod def default_user_detail(data): username = data['user_id'] name = data.get('name', username) email = data.get('email') or data.get('org_email') email = construct_user_email(username, email) return { 'username': username, 'name': name, 'email': email } def get_user_detail(self, user_id, **kwargs): # data = URL.GET_USER_INFO_BY_USER_ID, json={'userid': user_id}, with_token=True ) data = data['result'] data['user_id'] = user_id info = flatten_dict(data) default_detail = self.default_user_detail(data) detail = map_attributes(default_detail, info, self.attributes) return detail