この章ではLinuxベースのデバイスを作成し、AWS IoT Device Defenderのデバイスメトリックスをクラウドに送信する環境づくりについて学びます。このハンズオンではデバイスとしてCloud9を利用しますが、Raspberry Piなど他のデバイスでも同じ事ができます。
https://console.aws.amazon.com/cloud9/home/product
ブラウザの新しいタブでAWSマネージメントコンソールを開き、サービスの検索画面でCloud9
と入力して、Cloud9のコンソールを開きます。リージョンは、前の手順と同じリージョンを使ってください。
Create environment
をクリックして環境の作成を始めます。名前にはiotsecuritylab
と入力しNext step
をクリックして次に進みます。すべてデフォルトの設定(パブリックなサブネットを持つVPCをご利用ください)のまますすめるので、一番下までスクロールし、Next step
をクリック。最後に、Create environment
をクリックして、Cloud9の環境を作成します。
起動中の画面。
起動が終わりました。
Welcomeのタブの右側にある緑色の(+)
をクリックし、New Terminal
をクリックして新しくターミナルを開きます。
ターミナルを開いたときと同じ様に、緑の(+)をクリックし、New File
をクリックして空のファイルを作成します。からのファイルの中に、以下のPythonスクリプトを貼り付け、キーボードでコントロール+S
コマンドで保存します。ファイル名を聞かれるので、device-prog.py
と入力します。
import time
import sys
import threading
import cbor
import logging
import datetime
import argparse
import json
import socketserver
from AWSIoTPythonSDK.MQTTLib import AWSIoTMQTTClient
from AWSIoTPythonSDK.MQTTLib import AWSIoTMQTTThingJobsClient
from AWSIoTPythonSDK.core.jobs.thingJobManager import jobExecutionTopicType
from AWSIoTPythonSDK.core.jobs.thingJobManager import jobExecutionTopicReplyType
from AWSIoTPythonSDK.core.jobs.thingJobManager import jobExecutionStatus
from AWSIoTDeviceDefenderAgentSDK import agent
from AWSIoTDeviceDefenderAgentSDK import collector
# Run this script with this command
#
#python3 ws-code/device-prog.py -e ${IOT_ENDPOINT} \
#-r ~/environment/certs/root-ca.pem \
#-c ~/environment/certs/${IOT_THINGNAME}.cert.pem \
#-k ~/environment/certs/${IOT_THINGNAME}.private.key \
#-id ${IOT_THINGNAME} \
#-t ${IOT_THINGNAME} \
#--format json -i 300
svr_socketserver = None
doBadThings = True
bigBadString = ''.join(str(e) for e in range(1000))
def stop_bad_behavior():
global svr_socketserver, doBadThings
print("Stopping server on *:8888")
svr_socketserver.shutdown()
svr_socketserver.server_close()
doBadThings = False
class SvrRequestHandler(socketserver.BaseRequestHandler):
def handle(self):
# self.request is the TCP socket connected to the client
self.data = self.request.recv(1024).strip()
print("{} wrote:".format(self.client_address[0]))
print(self.data)
# just send back the same data, but upper-cased
self.request.sendall(self.data.upper())
def start_socket_server():
global svr_socketserver
svr_socketserver = socketserver.TCPServer(("0.0.0.0", 8888), SvrRequestHandler)
svr_socketserver.serve_forever()
class JobsMessageProcessor(object):
def __init__(self, awsIoTMQTTThingJobsClient, clientToken):
#keep track of this to correlate request/responses
self.clientToken = clientToken
self.awsIoTMQTTThingJobsClient = awsIoTMQTTThingJobsClient
self.done = False
self.jobsStarted = 0
self.jobsSucceeded = 0
self.jobsRejected = 0
self._setupCallbacks(self.awsIoTMQTTThingJobsClient)
def _setupCallbacks(self, awsIoTMQTTThingJobsClient):
self.awsIoTMQTTThingJobsClient.createJobSubscription(self.newJobReceived, jobExecutionTopicType.JOB_NOTIFY_NEXT_TOPIC)
self.awsIoTMQTTThingJobsClient.createJobSubscription(self.startNextJobSuccessfullyInProgress, jobExecutionTopicType.JOB_START_NEXT_TOPIC, jobExecutionTopicReplyType.JOB_ACCEPTED_REPLY_TYPE)
self.awsIoTMQTTThingJobsClient.createJobSubscription(self.startNextRejected, jobExecutionTopicType.JOB_START_NEXT_TOPIC, jobExecutionTopicReplyType.JOB_REJECTED_REPLY_TYPE)
# '+' indicates a wildcard for jobId in the following subscriptions
self.awsIoTMQTTThingJobsClient.createJobSubscription(self.updateJobSuccessful, jobExecutionTopicType.JOB_UPDATE_TOPIC, jobExecutionTopicReplyType.JOB_ACCEPTED_REPLY_TYPE, '+')
self.awsIoTMQTTThingJobsClient.createJobSubscription(self.updateJobRejected, jobExecutionTopicType.JOB_UPDATE_TOPIC, jobExecutionTopicReplyType.JOB_REJECTED_REPLY_TYPE, '+')
#call back on successful job updates
def startNextJobSuccessfullyInProgress(self, client, userdata, message):
payload = json.loads(message.payload.decode('utf-8'))
if 'execution' in payload:
self.jobsStarted += 1
execution = payload['execution']
self.executeJob(execution)
statusDetails = {'HandledBy': 'ClientToken: {}'.format(self.clientToken)}
threading.Thread(target = self.awsIoTMQTTThingJobsClient.sendJobsUpdate, kwargs = {'jobId': execution['jobId'], 'status': jobExecutionStatus.JOB_EXECUTION_SUCCEEDED, 'statusDetails': statusDetails, 'expectedVersion': execution['versionNumber'], 'executionNumber': execution['executionNumber']}).start()
else:
print('Start next saw no execution: ' + message.payload.decode('utf-8'))
self.done = True
def executeJob(self, execution):
print('Executing job ID, version, number: {}, {}, {}'.format(execution['jobId'], execution['versionNumber'], execution['executionNumber']))
print('With jobDocument: ' + json.dumps(execution['jobDocument']))
stop_bad_behavior()
def newJobReceived(self, client, userdata, message):
payload = json.loads(message.payload.decode('utf-8'))
if 'execution' in payload:
self._attemptStartNextJob()
else:
print('Notify next saw no execution')
self.done = True
def processJobs(self):
self.done = False
self._attemptStartNextJob()
def startNextRejected(self, client, userdata, message):
print('Start next rejected:' + message.payload.decode('utf-8'))
self.jobsRejected += 1
def updateJobSuccessful(self, client, userdata, message):
self.jobsSucceeded += 1
def updateJobRejected(self, client, userdata, message):
self.jobsRejected += 1
def _attemptStartNextJob(self):
statusDetails = {'StartedBy': 'ClientToken: {} on {}'.format(self.clientToken, datetime.datetime.now().isoformat())}
threading.Thread(target=self.awsIoTMQTTThingJobsClient.sendJobsStartNext, kwargs = {'statusDetails': statusDetails}).start()
def isDone(self):
return self.done
def getStats(self):
stats = {}
stats['jobsStarted'] = self.jobsStarted
stats['jobsSucceeded'] = self.jobsSucceeded
stats['jobsRejected'] = self.jobsRejected
return stats
def dd_custom_callback(self, userdata, message):
print("Received a new message: ")
if 'json' in message.topic:
print((message.payload))
else:
print(cbor.loads(message.payload))
print("from topic: ")
print((message.topic))
print("--------------\n\n")
def dd_agent_proc(args, iot_client):
global doBadThings
# client_id must match a registered thing name in your account
topicstring = "$aws/things/" + args.thing_name + "/defender/metrics/" + args.format
# Subscribe to the accepted/rejected topics to indicate status of published metrics reports
iot_client.subscribe(topicstring + "/accepted", dd_custom_callback)
iot_client.subscribe(topicstring + "/rejected", dd_custom_callback)
sample_rate = args.upload_interval
# Collector samples metrics from the system, it can track the previous metric to generate deltas
coll = collector.Collector(args.short_tags)
metric = None
first_sample = True # don't publish first sample, so we can accurately report delta metrics
while True:
if doBadThings:
iot_client.publish("badstringtopic", bigBadString)
else:
iot_client.publish("badstringtopic", "Nice small message")
metric = coll.collect_metrics()
if args.dry_run:
print(metric.to_json_string(pretty_print=True))
if args.format == 'cbor':
with open("cbor_metrics", "w+b") as outfile:
outfile.write(bytearray(metric.to_cbor()))
else:
if first_sample:
first_sample = False
elif args.format == "cbor":
iot_client.publish(topicstring, bytearray(metric.to_cbor()))
else:
iot_client.publish(topicstring, metric.to_json_string())
time.sleep(float(args.upload_interval))
def jobs_proc(jobsClient, clientId):
jobsMsgProc = JobsMessageProcessor(jobsClient, clientId)
print('Starting to process jobs...')
jobsMsgProc.processJobs()
def main():
global doBadThings
args = agent.parse_args()
if doBadThings:
threading.Thread(target=start_socket_server).start()
iot_client = agent.IoTClientWrapper(args.endpoint,args.root_ca_path,args.certificate_path,args.private_key_path,args.client_id)
iot_client.connect()
jobsClient = AWSIoTMQTTThingJobsClient(args.client_id, args.thing_name, QoS=1, awsIoTMQTTClient=iot_client.iot_client)
jobs_proc(jobsClient, args.client_id)
dd_agent_proc(args,iot_client)
if __name__ == '__main__':
main()
次に、もう一つ空のファイルを作成し、以下の内容を貼り付けて、env-setup.sh
の名前で保存します。
#!/bin/bash
# Change to the ~/environment directory.
cd ~/environment
# Make directory for device program
mkdir ws-code
mv device-prog.py ws-code/device-prog.py
# 本ハンズオンではDevice SDK及びDefender agentではpython v1を利用していますが、新規プロジェクトでは最新のv2を推奨します
pip3 install --user AWSIoTPythonSDK==1.4.9
pip3 install --user AWSIoTDeviceDefenderAgentSDK==1.1.1
## DO NOT UPGRADE PIP REGARDLESS OF THE PLEAS OF THE MESSAGE ##
# Create a directory to hold certificate files
mkdir certs
# Download the AWS ATS Root CA
wget -O certs/root-ca.pem https://www.amazontrust.com/repository/AmazonRootCA1.pem
echo "Done!"
正しくファイルが作成されると、以下のようになっているはずです。
device-prog.py
とenv-setup.sh
のタブを閉じて、先程開いたターミナルのタブを選択し、以下のコマンドを1行ずつ実行します。
cd ~/environment
sudo service nfslock stop
sudo service rpcbind stop
bash -xe env-setup.sh
正しく実行されると、以下のようにいくつかディレクトリが増えているのが確認できます。
Cloud9はデフォルトで、30分以上利用されないと停止するようになっているため、この設定を変更します。
左上のAWS Cloud9
をクリックし、Preferences
をクリックします。
EC2 Instance
をクリックし、Stop my environment
を探します。見つけたらドロップダウンから、After 4 hours
を選びます。これで設定が変更されました。Preferences
は閉じてください。
ここからは、IoT Coreのコンソールに戻って作業をします。
コンソールを閉じてしまった人は、Cloud9の画面で左上のAWS Cloud9
をクリックし、Go To Your Dashboard
をクリックして、一度Cloud9のダッシュボードに戻り、左上のServices
から検索(または、HistoryにIoT Core)して、IoT Coreの画面に戻ります。
IoT Coreのコンソールで、左側のメニューからOnboard
をクリックします。
Onboard a device
の中のGet Started
をクリックします。次のConnect to AWS IoT
のGet Started
をクリックしさらに進みます。
プラットフォームではLinux
を選び、IoT Device SDKではPython
を選んでNext
をクリックします。
Register a thing
の画面でNameにThingOne
と名前を入力して、Next Step
をクリックします。
最後に、Linux/OSX
ボタンをクリックすると、コネクトキットのダウンロードが行われます。
ファイルを保存したら、右下のNext step
をクリックして、次の画面でDone
、更に次の画面でもDone
をクリックして、Thingの登録を終了します。
Thingが登録されているのが確認できます。(Createボタンの下のドロップダウンで、表示をリスト/カードに切り替えることが出来ます)
Thing名をクリックし、
Thingの詳細画面の左側のSecurity
のメニューをクリックします。
Thingに紐付いている証明書が確認できます。この証明書をクリックします。
証明書の詳細画面の左側のPolicies
のメニューをクリックします。
証明書に紐付いているPolicyが確認できます。このPolicyをクリックします。
以下のようなPolicyが設定されているのが確認できます。
このワークショップではこのポリシーをまるごと置き換えますので、Edit Policy Document
をクリックして編集をします。
編集モードになりますので、以下のポリシーで置き換えます。置き換えたら、Save as new version
をクリックして保存します。
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"iot:Publish",
"iot:Receive"
],
"Resource": "*"
},
{
"Effect": "Allow",
"Action": [
"iot:Subscribe"
],
"Resource": "*"
},
{
"Effect": "Allow",
"Action": [
"iot:Connect"
],
"Resource": "*"
}
]
}
正しく変更されると以下のようなPolicyが設定されたのが確認できます。
注意)すべてのResourceを”*“にすることは、セキュリティー的に正しくありませんが、このワークショプではわざとそのようにしています。本番環境ではこの様な設定を使わないようにしてください。
Cloud9の画面に戻り、certs
フォルダを選択し、メニューのFile
> Upload Local Files
をクリックします。
ダイアログがポップアップされるので、このダイアログに先ほどダウンロードしたconnect_device_package.zip
をドラッグ&ドロップします。
アップロードが終わったら、ダイアログの右上のX
をクリックしてダイアログを閉じます。
アップロードされたzipファイルを展開するので、以下のコマンドで展開します。
cd ~/environment/certs
unzip connect_device_package.zip
スクリプトが使う環境変数を設定するので、以下のコマンドで、AWS IoTのエンドポイントを指定します。 IOT_THINGNAMEには、AWS IoTの画面で指定したThing名を入力します。手順と同じ名前をつけている場合は、“ThingOne"となります。
IOT_ENDPOINT=`aws iot describe-endpoint --endpoint-type iot:Data-ATS --output text`
echo $IOT_ENDPOINT
IOT_THINGNAME="ThingOne"
echo $IOT_THINGNAME
以上でスクリプトを実行する準備ができました。以下のコマンドをターミナルに入力して、スクリプトを実行します。
cd ~/environment
python3 ws-code/device-prog.py -e ${IOT_ENDPOINT} -r ~/environment/certs/root-ca.pem -c ~/environment/certs/${IOT_THINGNAME}.cert.pem -k ~/environment/certs/${IOT_THINGNAME}.private.key -id ${IOT_THINGNAME} -t ${IOT_THINGNAME} --format json -i 300
これで、スクリプトがIoT ThingとしてデータをAWS IoTに送り始めました。このターミナルではスクリプトを実行したままにしたいので、新しく(+)でターミナルを開いておきましょう。
この章の手順は以上です。