在使用Django开发web应用的时候,很多场景都会有需要微信相关功能的介入,最近我们公司在使用python的Django框架配合国产数据库OceanBase数据库进行开发互联网应急指挥系统的时候,就用到了微信通知,在发生舆情事件的时候通过微信公众号,通知对应人员有新的事件发生,或者提醒相关人员对应事件的进度情况
想要使用微信的信息推送,就需要提前做好一些准备,因为发送微信模板消息,您需要使用微信公众平台 API。
需要获取微信公众平台的 API 密钥,并且配置好相应的回调 URL。
需要根据微信公众平台的要求,选择合适的模板,并填写好模板内容。
根据微信公众平台的 API 文档,构造合适的请求参数,发送请求。
对于推送消息的响应结果,需要进行适当的处理和解析,以便于判断推送消息是否成功。
废话直接不多说,直接贴代码
import copy import json import redis import requests from django.conf import settings from api.models import DvadminSystemUsers class WeChat: def __init__(self, app_id=settings.WX_APP_ID, secret=settings.WX_APP_SECRET, template=None): self.app_id = app_id self.secret = secret self.template = { "touser": "", "template_id": "", "url": "", "data": { "first": { "value": "", }, "keyword1": { "value": "", }, "keyword2": { "value": "", }, "remark": { "value": "", }, } } if template: self.template = template self.access_token = None self.req_list = list() self.user_list = list() self.data_list = list() def _get_token(self, force_update=False): key_name = 'wechat_token_{}'.format(self.app_id) if force_update: self.access_token = None else: self.access_token = get_data(key_name) if not self.access_token: url = "https://api.weixin.qq.com/cgi-bin/token?" payload = { 'grant_type': 'client_credential', 'appid': self.app_id, 'secret': self.secret, } response = requests.get(url, params=payload, timeout=50) access_token = response.json().get("access_token") if access_token: set_data(key_name, access_token, ex=7100) self.access_token = get_data(key_name) def make_data_list(self): user_openid_list = DvadminSystemUsers.objects.filter( id__in=self.user_list ).exclude( openId='' ).exclude( openId__isnull=True ).values_list('openid', flat=True) for openid in user_openid_list: user_template = copy.deepcopy(self.template) user_template['touser'] = openid self.data_list.append(user_template) def post_data(self): # 获取 token self._get_token() url = "https://api.weixin.qq.com/cgi-bin/message/template/send?access_token={}".format(self.access_token) # 准备数据 if self.user_list: self.make_data_list() # 发送请求 for data in self.data_list: json_template = json.dumps(data) res = requests.post(url, data=json_template) print('post_data', res.text) def get_redis(): """ 连接redis """ redis = redis.Redis(host='localhost', port=6379, db=0) return redis def set_data(name, value, **kwargs): # 将数据存入redis缓存 redis = get_redis() value = json.dumps(value) redis.set(name, value, **kwargs) def get_data(name): # 取出数据 redis = get_redis() value = redis.get(name) if value: value = json.loads(value) return value
调用微信推送类