Advanced I/Oファンクションの設定
次に、ファンクションコンポーネントを設定してToDoリストアプリケーションのコーディングを開始します。
ファンクションのディレクトリ、functions/to_do_list_functionには以下が含まれます:
-
main.py メインファンクションファイル
-
catalyst-config.json 設定ファイル
-
requirements.txt 追加する外部ライブラリを記述するファイル
main.pyファイルにコードを追加します。
サーバーとData Store間のルーティングを処理するAdvanced I/Oファンクションの3つのAPIは以下の通りです:
-
GET /todo: Data StoreのTodoItemsテーブルからToDoリストアイテムを取得します
-
POST /todo: 新しいリストアイテムを作成してData Storeに保存します
-
DELETE /todo: Data Storeからリストアイテムを削除します
次に、ファンクションファイルにコードを追加しましょう。以下のPythonコードをコピーして、プロジェクトのfunctions/to_do_list_functionディレクトリにあるmain.pyにペーストし、ファイルを保存してください。任意のIDEを使用してアプリケーションのファイルを操作できます。
Note: このセクションのコードを十分に理解するため、必ず目を通してください。ファンクションとクライアントのアーキテクチャについては、次のセクションで説明します。
main.py
copy
import json
import zcatalyst_sdk
import logging
from flask import Request, make_response, jsonify
logger = logging.getLogger()
def handler(request: Request):
try:
app = zcatalyst_sdk.initialize()
logger = logging.getLogger()
if request.path == "/add" and request.method == 'POST':
req_data = request.get_json()
notes = req_data.get("notes")
table = app.datastore().table('TodoItems')
row = table.insert_row({
'Notes': notes
})
todo_item = {'id': row['ROWID'], 'notes': notes}
response_data = {
'status': 'success',
'data': {
'todoItem': todo_item
}
}
return make_response(jsonify(response_data), 200)
elif request.path == "/all" and request.method == 'GET':
page = request.args.get('page')
per_page = request.args.get('perPage')
page = int(page)
logger.info(page)
per_page = int(per_page)
logger.info(per_page)
zcql_service = app.zcql()
row_count = zcql_service.execute_query('SELECT COUNT(ROWID) FROM TodoItems')
row_id = int(row_count[0]['TodoItems']['COUNT(ROWID)'])
has_more = int(row_id) > (page) * (per_page)
logger.info(has_more)
query_result = zcql_service.execute_query(f'SELECT ROWID, Notes FROM TodoItems LIMIT {(page - 1) * per_page + 1},{per_page}')
todo_items = [{'id': item['TodoItems']['ROWID'], 'notes': item['TodoItems']['Notes']} for item in query_result]
get_resp = {
'status': 'success',
'data': {
'todoItems': todo_items,
'hasMore': has_more
}
}
return make_response(jsonify(get_resp), 200)
elif request.method == 'DELETE':
row_id = request.path[1:]
if row_id:
datastore_service = app.datastore()
table_service = datastore_service.table("ToDoItems")
table_service.delete_row(row_id)
row_response = {
'status': 'success',
'data': {
'todoItem': {
'id': row_id
}
}}
logging.info(row_response)
return make_response(jsonify(row_response), 200)
else:
message = {
'status': 'failure',
'error': 'Please provide a valid rowid in the request path'
}
return make_response(jsonify(message), 400)
else:
print('working')
except Exception as err:
logger.error(f"Exception in to_do_list_function :{err}")
response = make_response(jsonify({
"error": "Internal server error occurred. Please try again in some time."
}), 500)
return response
ファンクションディレクトリの設定が完了しました。
最終更新日 2026-03-05 11:43:24 +0530 IST