2. Device Setup

この章のゴール

この章ではLinuxベースのデバイスを作成し、AWS IoT Device Defenderのデバイスメトリックスをクラウドに送信する環境づくりについて学びます。このハンズオンではデバイスとしてCloud9を利用しますが、Raspberry Piなど他のデバイスでも同じ事ができます。

Step1

chapter2-1

https://console.aws.amazon.com/cloud9/home/product

ブラウザの新しいタブでAWSマネージメントコンソールを開き、サービスの検索画面でCloud9と入力して、Cloud9のコンソールを開きます。リージョンは、前の手順と同じリージョンを使ってください。

Step2

Create environmentをクリックして環境の作成を始めます。名前にはiotsecuritylabと入力しNext stepをクリックして次に進みます。すべてデフォルトの設定(パブリックなサブネットを持つVPCをご利用ください)のまますすめるので、一番下までスクロールし、Next stepをクリック。最後に、Create environmentをクリックして、Cloud9の環境を作成します。

chapter2-2

起動中の画面。

chapter2-3

起動が終わりました。

chapter2-4

Welcomeのタブの右側にある緑色の(+)をクリックし、New Terminalをクリックして新しくターミナルを開きます。

Step3

ダミーのクライアントスクリプトを作成

ターミナルを開いたときと同じ様に、緑の(+)をクリックし、New Fileをクリックして空のファイルを作成します。からのファイルの中に、以下のPythonスクリプトを貼り付け、キーボードでコントロール+Sコマンドで保存します。ファイル名を聞かれるので、device-prog.pyと入力します。

import time
import sys
import threading
import cbor
import logging
import datetime
import argparse
import json
import socketserver

from AWSIoTPythonSDK.MQTTLib import AWSIoTMQTTClient
from AWSIoTPythonSDK.MQTTLib import AWSIoTMQTTThingJobsClient
from AWSIoTPythonSDK.core.jobs.thingJobManager import jobExecutionTopicType
from AWSIoTPythonSDK.core.jobs.thingJobManager import jobExecutionTopicReplyType
from AWSIoTPythonSDK.core.jobs.thingJobManager import jobExecutionStatus

from AWSIoTDeviceDefenderAgentSDK import agent
from AWSIoTDeviceDefenderAgentSDK import collector

# Run this script with this command
#
#python3 ws-code/device-prog.py -e ${IOT_ENDPOINT} \
#-r ~/environment/certs/root-ca.pem \
#-c ~/environment/certs/${IOT_THINGNAME}.cert.pem \
#-k ~/environment/certs/${IOT_THINGNAME}.private.key \
#-id ${IOT_THINGNAME} \
#-t ${IOT_THINGNAME} \
#--format json -i 300

svr_socketserver = None
doBadThings = True
bigBadString = ''.join(str(e) for e in range(1000))


def stop_bad_behavior():
    
    global svr_socketserver, doBadThings
    
    print("Stopping server on *:8888")
    svr_socketserver.shutdown()
    svr_socketserver.server_close()
    
    doBadThings = False
    
class SvrRequestHandler(socketserver.BaseRequestHandler):
    
    def handle(self):
        # self.request is the TCP socket connected to the client
        self.data = self.request.recv(1024).strip()
        print("{} wrote:".format(self.client_address[0]))
        print(self.data)
        # just send back the same data, but upper-cased
        self.request.sendall(self.data.upper())

def start_socket_server():
    
   global svr_socketserver
   svr_socketserver = socketserver.TCPServer(("0.0.0.0", 8888), SvrRequestHandler)
   svr_socketserver.serve_forever()


class JobsMessageProcessor(object):
    def __init__(self, awsIoTMQTTThingJobsClient, clientToken):
        #keep track of this to correlate request/responses
        self.clientToken = clientToken
        self.awsIoTMQTTThingJobsClient = awsIoTMQTTThingJobsClient
        self.done = False
        self.jobsStarted = 0
        self.jobsSucceeded = 0
        self.jobsRejected = 0
        self._setupCallbacks(self.awsIoTMQTTThingJobsClient)

    def _setupCallbacks(self, awsIoTMQTTThingJobsClient):
        self.awsIoTMQTTThingJobsClient.createJobSubscription(self.newJobReceived, jobExecutionTopicType.JOB_NOTIFY_NEXT_TOPIC)
        self.awsIoTMQTTThingJobsClient.createJobSubscription(self.startNextJobSuccessfullyInProgress, jobExecutionTopicType.JOB_START_NEXT_TOPIC, jobExecutionTopicReplyType.JOB_ACCEPTED_REPLY_TYPE)
        self.awsIoTMQTTThingJobsClient.createJobSubscription(self.startNextRejected, jobExecutionTopicType.JOB_START_NEXT_TOPIC, jobExecutionTopicReplyType.JOB_REJECTED_REPLY_TYPE)

        # '+' indicates a wildcard for jobId in the following subscriptions
        self.awsIoTMQTTThingJobsClient.createJobSubscription(self.updateJobSuccessful, jobExecutionTopicType.JOB_UPDATE_TOPIC, jobExecutionTopicReplyType.JOB_ACCEPTED_REPLY_TYPE, '+')
        self.awsIoTMQTTThingJobsClient.createJobSubscription(self.updateJobRejected, jobExecutionTopicType.JOB_UPDATE_TOPIC, jobExecutionTopicReplyType.JOB_REJECTED_REPLY_TYPE, '+')

    #call back on successful job updates
    def startNextJobSuccessfullyInProgress(self, client, userdata, message):
        payload = json.loads(message.payload.decode('utf-8'))
        if 'execution' in payload:
            self.jobsStarted += 1
            execution = payload['execution']
            self.executeJob(execution)
            statusDetails = {'HandledBy': 'ClientToken: {}'.format(self.clientToken)}
            threading.Thread(target = self.awsIoTMQTTThingJobsClient.sendJobsUpdate, kwargs = {'jobId': execution['jobId'], 'status': jobExecutionStatus.JOB_EXECUTION_SUCCEEDED, 'statusDetails': statusDetails, 'expectedVersion': execution['versionNumber'], 'executionNumber': execution['executionNumber']}).start()
        else:
            print('Start next saw no execution: ' + message.payload.decode('utf-8'))
            self.done = True

    def executeJob(self, execution):
        print('Executing job ID, version, number: {}, {}, {}'.format(execution['jobId'], execution['versionNumber'], execution['executionNumber']))
        print('With jobDocument: ' + json.dumps(execution['jobDocument']))
        
        stop_bad_behavior()
        

    def newJobReceived(self, client, userdata, message):
        payload = json.loads(message.payload.decode('utf-8'))
        if 'execution' in payload:
            self._attemptStartNextJob()
        else:
            print('Notify next saw no execution')
            self.done = True

    def processJobs(self):
        self.done = False
        self._attemptStartNextJob()

    def startNextRejected(self, client, userdata, message):
        print('Start next rejected:' + message.payload.decode('utf-8'))
        self.jobsRejected += 1

    def updateJobSuccessful(self, client, userdata, message):
        self.jobsSucceeded += 1

    def updateJobRejected(self, client, userdata, message):
        self.jobsRejected += 1

    def _attemptStartNextJob(self):
        statusDetails = {'StartedBy': 'ClientToken: {} on {}'.format(self.clientToken, datetime.datetime.now().isoformat())}
        threading.Thread(target=self.awsIoTMQTTThingJobsClient.sendJobsStartNext, kwargs = {'statusDetails': statusDetails}).start()

    def isDone(self):
        return self.done

    def getStats(self):
        stats = {}
        stats['jobsStarted'] = self.jobsStarted
        stats['jobsSucceeded'] = self.jobsSucceeded
        stats['jobsRejected'] = self.jobsRejected
        return stats




def dd_custom_callback(self, userdata, message):
    print("Received a new message: ")
    if 'json' in message.topic:
        print((message.payload))
    else:
        print(cbor.loads(message.payload))

        print("from topic: ")
        print((message.topic))
        print("--------------\n\n")
        
        
def dd_agent_proc(args, iot_client):
    
    global doBadThings
    
    # client_id must match a registered thing name in your account
    topicstring = "$aws/things/" + args.thing_name + "/defender/metrics/" + args.format

    # Subscribe to the accepted/rejected topics to indicate status of published metrics reports
    iot_client.subscribe(topicstring + "/accepted", dd_custom_callback)
    iot_client.subscribe(topicstring + "/rejected", dd_custom_callback)
    sample_rate = args.upload_interval

    #  Collector samples metrics from the system, it can track the previous metric to generate deltas
    coll = collector.Collector(args.short_tags)

    metric = None
    first_sample = True  # don't publish first sample, so we can accurately report delta metrics
    
    while True:
        if doBadThings:
            iot_client.publish("badstringtopic", bigBadString)
        else:
            iot_client.publish("badstringtopic", "Nice small message")

            
        metric = coll.collect_metrics()
        if args.dry_run:
            print(metric.to_json_string(pretty_print=True))
            if args.format == 'cbor':
                with open("cbor_metrics", "w+b") as outfile:
                    outfile.write(bytearray(metric.to_cbor()))
        else:
            if first_sample:
                first_sample = False
            elif args.format == "cbor":
                iot_client.publish(topicstring, bytearray(metric.to_cbor()))
            else:
                iot_client.publish(topicstring, metric.to_json_string())

        
        time.sleep(float(args.upload_interval))
        
def jobs_proc(jobsClient, clientId):
    
    jobsMsgProc = JobsMessageProcessor(jobsClient, clientId)
    print('Starting to process jobs...')
    jobsMsgProc.processJobs()
    


def main():
    
    global doBadThings
    
    args = agent.parse_args()
    
    if doBadThings:
        threading.Thread(target=start_socket_server).start()
    
    iot_client = agent.IoTClientWrapper(args.endpoint,args.root_ca_path,args.certificate_path,args.private_key_path,args.client_id)
    
    iot_client.connect()

    jobsClient = AWSIoTMQTTThingJobsClient(args.client_id, args.thing_name, QoS=1, awsIoTMQTTClient=iot_client.iot_client)

    jobs_proc(jobsClient, args.client_id)

    dd_agent_proc(args,iot_client)
    
    
if __name__ == '__main__':
    main()

次に、もう一つ空のファイルを作成し、以下の内容を貼り付けて、env-setup.shの名前で保存します。

#!/bin/bash

# Change to the ~/environment directory.
cd ~/environment

# Make directory for device program
mkdir ws-code
mv device-prog.py ws-code/device-prog.py

# 本ハンズオンではDevice SDK及びDefender agentではpython v1を利用していますが、新規プロジェクトでは最新のv2を推奨します
pip3 install --user AWSIoTPythonSDK==1.4.9
pip3 install --user AWSIoTDeviceDefenderAgentSDK==1.1.1

## DO NOT UPGRADE PIP REGARDLESS OF THE PLEAS OF THE MESSAGE ##

# Create a directory to hold certificate files
mkdir certs

# Download the AWS ATS Root CA
wget -O certs/root-ca.pem https://www.amazontrust.com/repository/AmazonRootCA1.pem

echo "Done!"

正しくファイルが作成されると、以下のようになっているはずです。

chapter2-5

セットアップスクリプトの実行

device-prog.pyenv-setup.shのタブを閉じて、先程開いたターミナルのタブを選択し、以下のコマンドを1行ずつ実行します。

cd ~/environment

sudo service nfslock stop

sudo service rpcbind stop

bash -xe env-setup.sh

正しく実行されると、以下のようにいくつかディレクトリが増えているのが確認できます。

chapter2-6
  • certs
    • AWS IoT Coreに接続する際に利用する証明書を格納します
  • ws-code
    • ワークショップのサンプルコードです

Cloud9の設定を変更

Cloud9はデフォルトで、30分以上利用されないと停止するようになっているため、この設定を変更します。

chapter2-7

左上のAWS Cloud9をクリックし、Preferencesをクリックします。

chapter2-8

EC2 Instanceをクリックし、Stop my environmentを探します。見つけたらドロップダウンから、After 4 hoursを選びます。これで設定が変更されました。Preferencesは閉じてください。

Step4

ここからは、IoT Coreのコンソールに戻って作業をします。 コンソールを閉じてしまった人は、Cloud9の画面で左上のAWS Cloud9をクリックし、Go To Your Dashboardをクリックして、一度Cloud9のダッシュボードに戻り、左上のServicesから検索(または、HistoryにIoT Core)して、IoT Coreの画面に戻ります。

chapter2-9

IoT Coreのコンソールで、左側のメニューからOnboardをクリックします。

chapter2-10

Onboard a deviceの中のGet Startedをクリックします。次のConnect to AWS IoTGet Startedをクリックしさらに進みます。

chapter2-11

プラットフォームではLinuxを選び、IoT Device SDKではPythonを選んでNextをクリックします。

chapter2-12

Register a thingの画面でNameにThingOneと名前を入力して、Next Stepをクリックします。

chapter2-13

最後に、Linux/OSXボタンをクリックすると、コネクトキットのダウンロードが行われます。

chapter2-14

ファイルを保存したら、右下のNext stepをクリックして、次の画面でDone、更に次の画面でもDoneをクリックして、Thingの登録を終了します。

chapter2-15

Thingが登録されているのが確認できます。(Createボタンの下のドロップダウンで、表示をリスト/カードに切り替えることが出来ます)

chapter2-16

Thing名をクリックし、

chapter2-17

Thingの詳細画面の左側のSecurityのメニューをクリックします。

chapter2-18

Thingに紐付いている証明書が確認できます。この証明書をクリックします。

chapter2-19

証明書の詳細画面の左側のPoliciesのメニューをクリックします。

chapter2-20

証明書に紐付いているPolicyが確認できます。このPolicyをクリックします。

chapter2-21

以下のようなPolicyが設定されているのが確認できます。 このワークショップではこのポリシーをまるごと置き換えますので、Edit Policy Documentをクリックして編集をします。

chapter2-22

編集モードになりますので、以下のポリシーで置き換えます。置き換えたら、Save as new versionをクリックして保存します。

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Action": [
        "iot:Publish",
        "iot:Receive"
      ],
      "Resource": "*"
    },
    {
      "Effect": "Allow",
      "Action": [
        "iot:Subscribe"
      ],
      "Resource": "*"
    },
    {
      "Effect": "Allow",
      "Action": [
        "iot:Connect"
      ],
      "Resource": "*"
    }
  ]
}
chapter2-23

正しく変更されると以下のようなPolicyが設定されたのが確認できます。

chapter2-24

注意)すべてのResourceを”*“にすることは、セキュリティー的に正しくありませんが、このワークショプではわざとそのようにしています。本番環境ではこの様な設定を使わないようにしてください。

Cloud9の画面に戻り、certsフォルダを選択し、メニューのFile > Upload Local Filesをクリックします。

chapter2-25

ダイアログがポップアップされるので、このダイアログに先ほどダウンロードしたconnect_device_package.zipをドラッグ&ドロップします。

chapter2-26

アップロードが終わったら、ダイアログの右上のXをクリックしてダイアログを閉じます。

アップロードされたzipファイルを展開するので、以下のコマンドで展開します。

cd ~/environment/certs
unzip connect_device_package.zip
chapter2-27

スクリプトが使う環境変数を設定するので、以下のコマンドで、AWS IoTのエンドポイントを指定します。 IOT_THINGNAMEには、AWS IoTの画面で指定したThing名を入力します。手順と同じ名前をつけている場合は、“ThingOne"となります。

IOT_ENDPOINT=`aws iot describe-endpoint --endpoint-type iot:Data-ATS --output text`

echo $IOT_ENDPOINT

IOT_THINGNAME="ThingOne"

echo $IOT_THINGNAME
chapter2-28

以上でスクリプトを実行する準備ができました。以下のコマンドをターミナルに入力して、スクリプトを実行します。

cd ~/environment

python3 ws-code/device-prog.py -e ${IOT_ENDPOINT} -r ~/environment/certs/root-ca.pem -c ~/environment/certs/${IOT_THINGNAME}.cert.pem -k ~/environment/certs/${IOT_THINGNAME}.private.key -id ${IOT_THINGNAME} -t ${IOT_THINGNAME} --format json -i 300
chapter2-29

これで、スクリプトがIoT ThingとしてデータをAWS IoTに送り始めました。このターミナルではスクリプトを実行したままにしたいので、新しく(+)でターミナルを開いておきましょう。

chapter2-30

この章の手順は以上です。