Main Content

本页采用了机器翻译。点击此处可查看英文原文。

使用 Raspberry Pi 板进行批量更新

此示例演示如何使用运行 Python 2.7 的 Wi-Fi 连接的 Raspberry Pi 板收集数据。您可以每 15 秒连续收集一次 CPU 温度和 CPU 利用率,并每 2 分钟批量更新 ThingSpeak通道。此示例使用 Bulk-Write JSON Data API 批量收集数据并将其发送到 ThingSpeak 通道。通过使用批量更新,您可以降低设备的功耗。由于 Raspberry Pi 板没有配备实时时钟,因此您可以使用相对时间戳来进行批量更新消息。

设置

创建一个通道,如 Collect Data in a New Channel 所示。

代码

1)导入脚本所需的库。

import json
import time
import os
import psutil
import requests

2) 定义跟踪上次连接时间和上次更新时间的全局变量。定义更新数据的时间间隔,并将数据发布到 ThingSpeak。

last_connection_time = time.time() # Track the last connection time
last_update_time = time.time()     # Track the last update time
posting_interval = 120             # Post data once every 2 minutes
update_interval = 15               # Update once every 15 seconds

3) 定义您的 ThingSpeak 写入 API 密钥和通道ID 设置,以及 ThingSpeak 服务器设置。

write_api_key = "YOUR-CHANNEL-WRITEAPIKEY" # Replace YOUR-CHANNEL-write_api_key with your channel write API key
channel_ID = "YOUR-CHANNELID"              # Replace YOUR-channel_ID with your channel ID
url = "https://api.thingspeak.com/channels/" + channel_ID + "/bulk_update.json" # ThingSpeak server settings
message_buffer = []

4) 定义函数 httpRequest,将数据发送到 ThingSpeak 并打印服务器的响应代码。响应代码202表示服务器已接受请求并将对其进行处理。

def httpRequest():
    # Function to send the POST request to ThingSpeak channel for bulk update.
        global message_buffer
        bulk_data = json.dumps({'write_api_key':write_api_key,'updates':message_buffer}) # Format the json data buffer
        request_headers = {"User-Agent":"mw.doc.bulk-update (Raspberry Pi)","Content-Type":"application/json","Content-Length":str(len(bulk_data))}
    # Make the request to ThingSpeak
        try:
            print(request_headers)
            response = requests.post(url,headers=request_headers,data=bulk_data)
            print (response) # A 202 indicates that the server has accepted the request
        except e:
            print(e.code) # Print the error code
        message_buffer = [] # Reinitialize the message buffer
        global last_connection_time
        last_connection_time = time.time() # Update the connection time

5) 定义函数 getData,该函数返回以摄氏度为单位的 CPU 温度以及以百分比表示的 CPU 利用率。

def getData():
    # Function that returns the CPU temperature and percentage of CPU utilization
        cmd = '/opt/vc/bin/vcgencmd measure_temp'
        process = os.popen(cmd).readline().strip()
        cpu_temp = process.split('=')[1].split("'")[0]
        cpu_usage = psutil.cpu_percent(interval=2)
        return cpu_temp,cpu_usage

6) 定义函数updatesJson,每15秒持续更新消息缓冲区。

def updatesJson():
    # Function to update the message buffer every 15 seconds with data. 
    # And then call the httpRequest function every 2 minutes. 
    # This examples uses the relative timestamp as it uses the "delta_t" parameter.
    # If your device has a real-time clock, you can also provide the absolute timestamp 
    # using the "created_at" parameter.

        global last_update_time
        message = {}
        message['delta_t'] = int(round(time.time() - last_update_time))
        Temp,Usage = getData()
        message['field1'] = Temp
        message['field2'] = Usage
        global message_buffer
        message_buffer.append(message)
    # If posting interval time has crossed 2 minutes update the ThingSpeak channel with your data
        if time.time() - last_connection_time >= posting_interval:
                httpRequest()
                last_update_time = time.time()

7) 运行无限循环,每15秒连续调用函数updatesJson

if __name__ == "__main__":  # To ensure that this is run directly and does not run when imported
        while True:
                # If update interval time has crossed 15 seconds update the message buffer with data
            if time.time() - last_update_time >= update_interval:
                updatesJson()

相关示例

详细信息