消息在线推送
接下去的(2)会给消息监听器加上界面并改进服务器代码的逻辑
服务器代码
# coding : utf-8
# author : ['Wang Suyin', ]
# data : 2020/2/20 16:48
# software : PyCharm
# python_version : '3.5.3 64bit'
# file : main.py
"""
说明文档:
"""
from queue import Queue
import json
from flask import Flask, render_template, request,jsonify
from flask_sockets import Sockets
app = Flask(__name__)
sockets = Sockets(app)
ws_pool = [] # 推送目标池
message_queue = Queue()
def get_new_message():
"""
从队列拉取新消息
"""
m = message_queue.get()
message_queue.task_done()
return m
@app.route('/add_message',methods=['POST'])
def add_message():
print(1111)
data = request.json
# data['token']
message_queue.put(data['text'])
return jsonify({'code':200,'msg':'提交成功'})
# ws://
@sockets.route('/echo')
def echo_socket(ws):
r_data = ws.receive()
r_data = json.loads(r_data)
if not r_data['type'] == 'init' or not r_data['data']['token']:
ws.send(json.dumps({'code': 400, 'msg': '连接失败'}))
return
ws.send(json.dumps({'code': 200, 'msg': '连接成功'}))
# token = r_data['data']['token']
ws_pool.append(ws)
while not ws.closed:
ws.receive() #心跳
data = get_new_message()
for e in ws_pool:
try:
e.send(data)
except:
ws_pool.remove(e)
ws_pool.remove(ws)
if __name__ == '__main__':
from gevent import pywsgi
from geventwebsocket.handler import WebSocketHandler
from gevent import monkey
monkey.patch_all()
server = pywsgi.WSGIServer(('0.0.0.0', 5003), app, handler_class = WebSocketHandler)
print('web server start ... ')
server.serve_forever()
消息监听器(客户端)
import json
import asyncio
import websockets
async def auth_token(websocket):
"""
验证token
"""
cred_text = json.dumps({'type':'init', 'data':{'token':'123456qwe'}})
await websocket.send(cred_text)
response_str = await websocket.recv()
print(json.loads(response_str)['code'])
if json.loads(response_str)['code'] == 200:
print('连接成功')
return True
else:
print('连接失败')
return False
# 监听消息
async def listen_msg(websocket):
while True:
await websocket.send('心跳')
recv_text = await websocket.recv()
print(recv_text)
async def main():
async with websockets.connect('ws://127.0.0.1:5003/echo') as websocket:
await auth_token(websocket)
await listen_msg(websocket)
asyncio.get_event_loop().run_until_complete(main())
消息发布器
import requests
r = requests.post('http://127.0.0.1:5003/add_message',json={'text':'测试'})
print(r.text)
网友评论