使用 Raspberry Pi 板进行批量更新
此示例演示如何使用运行 Python 2.7 的 Wi-Fi 连接的 Raspberry Pi 板收集数据。您可以每 15 秒连续收集一次 CPU 温度和 CPU 利用率,并每 2 分钟批量更新 ThingSpeak通道。此示例使用 Bulk-Write JSON Data
API 批量收集数据并将其发送到 ThingSpeak 通道。通过使用批量更新,您可以降低设备的功耗。由于 Raspberry Pi 板没有配备实时时钟,因此您可以使用相对时间戳来进行批量更新消息。
设置
创建一个通道,如 Collect Data in a New Channel 所示。
代码
1)导入脚本所需的库。
import json import time import os import psutil import requests
2) 定义跟踪上次连接时间和上次更新时间的全局变量。定义更新数据的时间间隔,并将数据发布到 ThingSpeak。
last_connection_time = time.time() # Track the last connection time last_update_time = time.time() # Track the last update time posting_interval = 120 # Post data once every 2 minutes update_interval = 15 # Update once every 15 seconds
3) 定义您的 ThingSpeak 写入 API 密钥和通道ID 设置,以及 ThingSpeak 服务器设置。
write_api_key = "YOUR-CHANNEL-WRITEAPIKEY" # Replace YOUR-CHANNEL-write_api_key with your channel write API key channel_ID = "YOUR-CHANNELID" # Replace YOUR-channel_ID with your channel ID url = "https://api.thingspeak.com/channels/" + channel_ID + "/bulk_update.json" # ThingSpeak server settings message_buffer = []
4) 定义函数 httpRequest
,将数据发送到 ThingSpeak 并打印服务器的响应代码。响应代码202
表示服务器已接受请求并将对其进行处理。
def httpRequest(): # Function to send the POST request to ThingSpeak channel for bulk update. global message_buffer bulk_data = json.dumps({'write_api_key':write_api_key,'updates':message_buffer}) # Format the json data buffer request_headers = {"User-Agent":"mw.doc.bulk-update (Raspberry Pi)","Content-Type":"application/json","Content-Length":str(len(bulk_data))} # Make the request to ThingSpeak try: print(request_headers) response = requests.post(url,headers=request_headers,data=bulk_data) print (response) # A 202 indicates that the server has accepted the request except e: print(e.code) # Print the error code message_buffer = [] # Reinitialize the message buffer global last_connection_time last_connection_time = time.time() # Update the connection time
5) 定义函数 getData
,该函数返回以摄氏度为单位的 CPU 温度以及以百分比表示的 CPU 利用率。
def getData(): # Function that returns the CPU temperature and percentage of CPU utilization cmd = '/opt/vc/bin/vcgencmd measure_temp' process = os.popen(cmd).readline().strip() cpu_temp = process.split('=')[1].split("'")[0] cpu_usage = psutil.cpu_percent(interval=2) return cpu_temp,cpu_usage
6) 定义函数updatesJson
,每15秒持续更新消息缓冲区。
def updatesJson(): # Function to update the message buffer every 15 seconds with data. # And then call the httpRequest function every 2 minutes. # This examples uses the relative timestamp as it uses the "delta_t" parameter. # If your device has a real-time clock, you can also provide the absolute timestamp # using the "created_at" parameter. global last_update_time message = {} message['delta_t'] = int(round(time.time() - last_update_time)) Temp,Usage = getData() message['field1'] = Temp message['field2'] = Usage global message_buffer message_buffer.append(message) # If posting interval time has crossed 2 minutes update the ThingSpeak channel with your data if time.time() - last_connection_time >= posting_interval: httpRequest() last_update_time = time.time()
7) 运行无限循环,每15秒连续调用函数updatesJson
。
if __name__ == "__main__": # To ensure that this is run directly and does not run when imported while True: # If update interval time has crossed 15 seconds update the message buffer with data if time.time() - last_update_time >= update_interval: updatesJson()