欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

Django+Vue实现WebSocket连接的示例代码

程序员文章站 2023-12-01 21:11:10
近期有一需求:前端页面点击执行任务,实时显示后端执行情况,思考一波;发现 websocket 最适合做这件事。 效果 测试 ping www.baidu.com 效果...

近期有一需求:前端页面点击执行任务,实时显示后端执行情况,思考一波;发现 websocket 最适合做这件事。

效果

测试 ping www.baidu.com 效果

点击连接建立ws连接

Django+Vue实现WebSocket连接的示例代码

后端实现

所需软件包

后端主要借助django channels 实现socket连接,

这里想实现每个连接进来加入组进行广播,所以还需要引入 channels-redis

pip

channels==2.2.0
channels-redis==2.4.0

引入

settings.py

installed_apps = [
  'django.contrib.admin',
  'django.contrib.auth',
  'django.contrib.contenttypes',
  'django.contrib.sessions',
  'django.contrib.messages',
  'django.contrib.staticfiles',
  'rest_framework.authtoken',
  'rest_framework',
        ...
  'channels',
]

# redis配置
redis_host = env_dict.get('redis_host', '127.0.0.1')
redis_port = env_dict.get('redis_port', 6379)
channel_layers = {
  "default": {
    "backend": "channels_redis.core.redischannellayer",
    "config": {
      "hosts": [(redis_host, redis_port)],
    },
  },
}

代码

apps/consumers.py

新建一个消费处理

实现: 默认连接加入组,发送信息时的处理。

from channels.generic.websocket import websocketconsumer

class myconsumer(websocketconsumer):
  def connect(self):
    """
    每个任务作为一个频道
    默认进入对应任务执行频道
    """
    self.job_name = self.scope['url_route']['kwargs']['job_name']
    self.job_group_name = 'job_%s' % self.job_name
    async_to_sync(self.channel_layer.group_add)(
      self.job_group_name,
      self.channel_name
    )
    self.accept()

  def disconnect(self, close_code):
    async_to_sync(self.channel_layer.group_discard)(
      self.job_group_name,
      self.channel_name
    )

  # job.message类型处理
  def job_message(self, event):

    # 默认发送收到信息
    self.send(text_data=event["text"])

apps/routing.py

ws类型路由

实现:ws/job/<job_name>由 myconsumer 去处理。

from . import consumers
from django.urls import path
from channels.routing import protocoltyperouter, urlrouter
from channels.sessions import sessionmiddlewarestack

application = protocoltyperouter({
  'websocket': sessionmiddlewarestack(
    urlrouter(
     [
       path('ws/job/<str:job_name>', consumers.myconsumer)
     ]
    )
  ),
})

apps/views.py

在执行命令中获取 websocket 消费通道,进行异步推送

  • 使用异步推送async_to_sync是因为在连接的时候采用的异步连接,所以推送必须采用异步推送。
  • 因为执行任务时间过长,启动触发运行时加入多线程,直接先返回ok,后端运行任务。
from subprocess import popen,pipe
import threading

def runpopen(job):
  """
  执行命令,返回popen
  """
  path = os.path
  path = path.abspath(path.join(base_dir, path.pardir))
  script_path = path.abspath(path.join(path,'run.sh'))
  cmd = "sh %s %s" % (script_path, job)
  return popen(cmd, shell=true, stdout=pipe, stderr=pipe)

def runscript(job):
  channel_layer = get_channel_layer()
  group_name = "job_%s" % job

  popen = runpopen(job)
  while true:
    output = popen.stdout.readline()
    if output == '' and popen.poll() is not none:
      break

    if output:
      output_text = str(output.strip())
      async_to_sync(
        channel_layer.group_send
        )(
          group_name, 
          {"type": "job.message", "text": output_text}
        )
    else:
      err = popen.stderr.readline()
      err_text = str(err.strip())
      async_to_sync(
        channel_layer.group_send
        )(
          group_name,
          {"type": "job.message", "text": err_text}
        )
      break

class startjob(apiview): 
  def get(self, request, job=none):
    run = threading.thread(target=runscript, args=(job,))
    run.start()
    return httpresponse('ok')

apps/urls.py

get请求就能启动任务

urlpatterns = [
        ...
  path('start_job/<str:job>', startjob.as_view())
]

前端实现

所需软件包

vue-native-websocket 

代码实现

plugins/vuenativewebsocket.js

import vue from 'vue'
import vuenativesock from '../utils/socket/main.js'

export default function ({ store }) {
 vue.use(vuenativesock, 'http://localhost:8000/ws/job', {connectmanually: true,});
}

nuxt.config.js

配置文件引入, 这里我使用的是 nuxt 框架

 plugins: [ 
   { 
    src: '@/plugins/vuenativewebsocket.js', 
    ***: false 
   },
  ],

封装 socket

export default (connection_url, option) => {
  // 事件
  let event = ['message', 'close', 'error', 'open'];

  // 拷贝选项字典
  let opts = object.assign({}, option);

  // 定义实例字典
  let instance = {

   // socket实例
   socket: '',

   // 是否连接状态
   is_conncet: false,

   // 具体连接方法
   connect: function() {
    if(connection_url) {
     let scheme = window.location.protocol === 'https:' ? 'wss' : 'ws'
     connection_url = scheme + '://' + connection_url.split('://')[1];
     this.socket = new websocket(connection_url);
     this.initevent();
    }else{
     console.log('wsurl為空');
    }
   },

   // 初始化事件
   initevent: function() {
    for(let i = 0; i < event.length; i++){
     this.addlistener(event[i]);
    }
   },

   // 判断事件
   addlistener: function(event) {
    this.socket.addeventlistener(event, (e) => {
     switch(event){
      case 'open':
       this.is_conncet = true;
       break;
      case 'close':
       this.is_conncet = false;
       break;
     }
     typeof opts[event] == 'function' && opts[event](e);
    });
   },

   // 发送方法,失败则回调
   send: function(data, closecallback) {
    console.log('socket ---> ' + data)
    if(this.socket.readystate >= 2) {
     console.log('ws已经关闭');
     closecallback && closecallback();
    }else{
     this.socket.send(data);
    }
   }

  };

  // 调用连接方法
  instance.connect();
  return instance;
 }

index.vue

具体代码

x2str 方法,因为后端返回的是bytes,格式 b'xxx' ,编写了方法对其进行转换。

<template>
    <div>

        <el-button type="primary" @click="runfunction" >执行</el-button>
        <el-button type="primary" @click="connectwebsock" >显示</el-button>

  <div class="socketview">
   <span v-for="i in socketmessage" :key="i">{{i}}</span>
  </div>
 </div>
</template>
<script>
 import r from '@/plugins/axios';
 import ws from '@/plugins/socket'
 export default {
  data() {
   return {
    websocket: '',
    socketmessage: [],
   }
  },

    methods: {
     // 打开连接的处理
   opensocket(e) {
    if (e.istrusted) {
     const h = this.$createelement;
     this.$notify({
      title: '提示',
      message: h('i', { style: 'color: teal'}, '已建立socket连接')
     });
    }
   },

  // 连接时的处理
  listensocket(e) {
   if (e.data){
    this.socketmessage.push(this.x2str(e.data))
   }
  },

  // 连接websocket
        connectwebsock() {
   let wsuri = process.env.backend_url + '/ws/job/' + this.selectfunctions
   this.websocket = ws(wsuri, {
    open: e => this.opensocket(e),
    message: e => this.listensocket(e),
    close: e => this.closesocket(e)
   })
  },

     // 转码
  x2str(str) {
   if (str) {
    let reg = new regexp("(?<=^b').*(?='$)")
    let result = str.replace(/(?:\\x[\da-fa-f]{2})+/g, m =>
     decodeuricomponent(m.replace(/\\x/g, '%'))
    )
    return reg.exec(result)[0]
   }
  },

  // 执行方法
  runfunction() {
   r.myrequest('get','api/start_job/' + this.selectfunctions, {}, {}).then((response) => {
    if (response.hasownproperty('response')){
      this.$message({
      type: 'error',
      message: '服务端返回错误,返回码:' + response.response.status 
      });
    }; 
    if (response.data == 'ok') {
      this.$message({
       type: 'success',
       message: '开始执行[' + this.selectfunctions + ']'
      });
    }
   });
  }   
  }
}
</script>

至此,实现前后端 websocket 通讯。

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持。