美文网首页Django
在线消息推送和离线消息推送(1)

在线消息推送和离线消息推送(1)

作者: 独步江雪 | 来源:发表于2020-02-20 18:05 被阅读0次

消息在线推送

接下去的(2)会给消息监听器加上界面并改进服务器代码的逻辑

服务器代码

#         coding : utf-8
#         author : ['Wang Suyin', ]
#           data : 2020/2/20 16:48
#       software : PyCharm
# python_version : '3.5.3 64bit'
#           file : main.py
"""
说明文档:

"""
from queue import Queue

import json
from flask import Flask, render_template, request,jsonify
from flask_sockets import Sockets

app = Flask(__name__)
sockets = Sockets(app)

ws_pool = []  # 推送目标池

message_queue = Queue()

def get_new_message():
    """
    从队列拉取新消息
    """
    m = message_queue.get()
    message_queue.task_done()
    return m

@app.route('/add_message',methods=['POST'])
def add_message():
    print(1111)
    data = request.json
    # data['token']

    message_queue.put(data['text'])
    return jsonify({'code':200,'msg':'提交成功'})



#  ws://
@sockets.route('/echo')
def echo_socket(ws):

    r_data = ws.receive()
    r_data = json.loads(r_data)
    if not r_data['type'] == 'init' or not r_data['data']['token']:
        ws.send(json.dumps({'code': 400, 'msg': '连接失败'}))
        return
    ws.send(json.dumps({'code': 200, 'msg': '连接成功'}))
    # token = r_data['data']['token']
    ws_pool.append(ws)

    while not ws.closed:
        ws.receive() #心跳

        data = get_new_message()

        for e in ws_pool:
            try:
                e.send(data)
            except:
                ws_pool.remove(e)


    ws_pool.remove(ws)


if __name__ == '__main__':
    from gevent import pywsgi
    from geventwebsocket.handler import WebSocketHandler
    from gevent import monkey
    monkey.patch_all()

    server = pywsgi.WSGIServer(('0.0.0.0', 5003), app, handler_class = WebSocketHandler)
    print('web server start ... ')
    server.serve_forever()

消息监听器(客户端)

import json
import asyncio
import websockets


async def auth_token(websocket):
    """
    验证token
    """
    cred_text = json.dumps({'type':'init', 'data':{'token':'123456qwe'}})
    await websocket.send(cred_text)
    response_str = await websocket.recv()
    print(json.loads(response_str)['code'])
    if json.loads(response_str)['code'] == 200:
        print('连接成功')
        return True
    else:
        print('连接失败')
        return False
# 监听消息
async def listen_msg(websocket):
    while True:
        await websocket.send('心跳')
        recv_text = await websocket.recv()
        print(recv_text)


async def main():
    async with websockets.connect('ws://127.0.0.1:5003/echo') as websocket:
        await auth_token(websocket)
        await listen_msg(websocket)

asyncio.get_event_loop().run_until_complete(main())

消息发布器

import requests
r = requests.post('http://127.0.0.1:5003/add_message',json={'text':'测试'})
print(r.text)

相关文章

网友评论

    本文标题:在线消息推送和离线消息推送(1)

    本文链接:https://www.haomeiwen.com/subject/hlxfqhtx.html