欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

python轮询机制控制led实例

程序员文章站 2022-07-04 18:35:23
我就废话不多说了,大家还是直接看代码吧!# -*- coding:utf-8 -*- # file: ceshitianqi import urllib2import jsonimport timei...

我就废话不多说了,大家还是直接看代码吧!

# -*- coding:utf-8 -*- 
# file: ceshitianqi
 
import urllib2
import json
import time
import datetime
import serial 
import random
import os
import sys
 
apikey = 'zpdlyl***=' #改成你的apikey
ser=serial.serial("/dev/ttyusb2",9600,timeout=1)
 
def read(key):
  ser.write(key)
  print("output:"+key)
  time.sleep(1)
  response = ser.readall()
  print(response)
  print(type(response))
  return response
 
def http_put(key):
  val = read(key) #获取arduino的数据
  curtime = datetime.datetime.now()
  url='http://api.heclouds.com/devices/**1/datapoints'
  #values={'datastreams':[{"id":"temp","datapoints":[{"at":curtime.isoformat(),"value":val}]}]}
  print(type(val))
  if key== "a" :
   values={'datastreams':[{"id":"humidity","datapoints":[{"at":curtime.isoformat(),"value":val}]}]}
  if key== "b" :
   values={'datastreams':[{"id":"temperature","datapoints":[{"at":curtime.isoformat(),"value":val}]}]}
  jdata = json.dumps(values)         # 对数据进行json格式化编码
  #打印json内容
  print jdata
  request = urllib2.request(url, jdata)
  request.add_header('api-key', apikey)
  request.get_method = lambda:'post'     # 设置http的访问方式
  request = urllib2.urlopen(request)
  return request.read()  
 
str = ("a","b")
while true:
	for i in str: 
		f = open('1.txt')
		e = f.read()
		if e == "1\n":
			ser.write("c")
		if e == "0\n":
			ser.write("d")
 
		f.close()  
		resp = http_put(i)
   		time.sleep(2)

轮询1.txt

1则点亮

0则关闭

补充知识:python笔记(轮询、长轮询)

一、轮询

views.py

from flask import flask,render_template,request,jsonify

app = flask(__name__)

users = {
  '1':{'name':'贝贝','count':1},
  '2':{'name':'小东北','count':0},
  '3':{'name':'何伟明','count':0},
}

@app.route('/user/list')
def user_list():
  import time
  return render_template('user_list.html',users=users)

@app.route('/vote',methods=['post'])
def vote():
  uid = request.form.get('uid')
  users[uid]['count'] += 1
  return "投票成功"

@app.route('/get/vote',methods=['get'])
def get_vote():

  return jsonify(users)


if __name__ == '__main__':
  app.run(threaded=true)

html

<!doctype html>
<html lang="zh-cn">
<head>
  <meta charset="utf-8">
  <title>title</title>
  <meta name="viewport" content="width=device-width, initial-scale=1">
  <style>
    li{
      cursor: pointer;
    }
  </style>
</head>
<body>
  <ul id="userlist">
    {% for key,val in users.items() %}
      <li uid="{{key}}">{{val.name}} ({{val.count}})</li>
    {% endfor %}
  </ul>

  <script src="https://cdn.bootcss.com/jquery/3.3.0/jquery.min.js"></script>
  <script>

    $(function () {
      $('#userlist').on('dblclick','li',function () {
        var uid = $(this).attr('uid');
        $.ajax({
          url:'/vote',
          type:'post',
          data:{uid:uid},
          success:function (arg) {
            console.log(arg);
          }
        })
      });

    });


    /*
    获取投票信息
     */
    function get_vote() {
      $.ajax({
        url:'/get/vote',
        type:"get",
        datatype:'json',
        success:function (arg) {
          $('#userlist').empty();
          $.each(arg,function (k,v) {
            var li = document.createelement('li');
            li.setattribute('uid',k);
            li.innertext = v.name + "(" + v.count + ')' ;
            $('#userlist').append(li);
          })

        }
      })
    }


    setinterval(get_vote,3000);

  </script>
</body>
</html>

二、长轮询

views.py

from flask import flask,render_template,request,jsonify,session
import uuid
import queue

app = flask(__name__)
app.secret_key = 'asdfasdfasd'


users = {
  '1':{'name':'贝贝','count':1},
  '2':{'name':'小东北','count':0},
  '3':{'name':'何伟明','count':0},
}

queque_dict = {
}

@app.route('/user/list')
def user_list():
  user_uuid = str(uuid.uuid4())
  queque_dict[user_uuid] = queue.queue()

  session['current_user_uuid'] = user_uuid
  return render_template('user_list.html',users=users)

@app.route('/vote',methods=['post'])
def vote():
  uid = request.form.get('uid')
  users[uid]['count'] += 1
  for q in queque_dict.values():
    q.put(users)
  return "投票成功"


@app.route('/get/vote',methods=['get'])
def get_vote():
  user_uuid = session['current_user_uuid']
  q = queque_dict[user_uuid]

  ret = {'status':true,'data':none}
  try:
    users = q.get(timeout=5)
    ret['data'] = users
  except queue.empty:
    ret['status'] = false

  return jsonify(ret)



if __name__ == '__main__':
  app.run(threaded=true)

html

<!doctype html>
<html lang="zh-cn">
<head>
  <meta charset="utf-8">
  <title>title</title>
  <meta name="viewport" content="width=device-width, initial-scale=1">
  <style>
    li{
      cursor: pointer;
    }
  </style>
</head>
<body>
  <ul id="userlist">
    {% for key,val in users.items() %}
      <li uid="{{key}}">{{val.name}} ({{val.count}})</li>
    {% endfor %}
  </ul>

  <script src="https://cdn.bootcss.com/jquery/3.3.0/jquery.min.js"></script>
  <script>

    $(function () {
      $('#userlist').on('click','li',function () {
        var uid = $(this).attr('uid');
        $.ajax({
          url:'/vote',
          type:'post',
          data:{uid:uid},
          success:function (arg) {
            console.log(arg);
          }
        })
      });
      get_vote();
    });

    /*
    获取投票信息
     */
    function get_vote() {
      $.ajax({
        url:'/get/vote',
        type:"get",
        datatype:'json',
        success:function (arg) {
          if(arg.status){
            $('#userlist').empty();
              $.each(arg.data,function (k,v) {
                var li = document.createelement('li');
                li.setattribute('uid',k);
                li.innertext = v.name + "(" + v.count + ')' ;
                $('#userlist').append(li);
              })
          }
          get_vote();

        }
      })
    }

  </script>
</body>
</html>

以上这篇python轮询机制控制led实例就是小编分享给大家的全部内容了,希望能给大家一个参考,也希望大家多多支持。