関数ディレクトリの設定
Advanced I/O関数を設定して、Lead Managerアプリケーションのコーディングを開始します。
関数ディレクトリ functions/lead_manager_functionには以下が含まれています:
- main.pyメイン関数ファイル
- catalyst-config.json設定ファイル
- 追加する外部ライブラリを記述するためのrequirements.txtファイル
main.pyにコードを追加します。関数の設定には任意のIDEを使用できます。
Note: このセクションのコードを十分に理解するために、必ず目を通してください。関数とクライアントのコードについては、クライアントの設定後に説明します。
以下のコードをコピーして、functions/lead_manager_functionディレクトリにあるmain.pyに貼り付けてファイルを保存してください。
main.py
copy
import json
import random
import zcatalyst_sdk
import logging
from flask import Request, make_response, jsonify,redirect
import requests
import os
apiUrl = "https://www.zohoapis.com/crm/v2/Leads"
url = "https://accounts.zoho.com/oauth/v2/token"
GET = "GET"
POST = "POST"
PUT = "PUT"
DELETE = "DELETE"
CLIENT_ID = "1000.CEE5336HDV5JE3UHERR2BRITBI4NCN"
CLIENT_SECRET = "5292351d5c5960d7cba13ee1684c646ed8df146530"
logger = logging.getLogger()
def handler(request: Request):
try:
app = zcatalyst_sdk.initialize()
Datastore = app.datastore()
id = os.path.basename(request.path)
if request.path == "/generateToken" and request.method == GET:
try:
code = request.args.get('code')
user_management_service = app.authentication()
is_local = os.getenv('X_ZOHO_CATALYST_IS_LOCAL') == 'true'
protocol = 'http' if is_local else 'https'
host = request.headers['host'] if is_local else request.headers['host'].split(':')[0]
domain = f"{protocol}://{host}"
userDetails = user_management_service.get_current_user()
user_id = userDetails["user_id"]
tab = Datastore.table(12130000003381913)
row = {"refresh_token":getRefreshToken(code,domain),"userId":user_id}
tab.insert_row(row)
redirect_uri = f"{domain}/app/index.html"
return redirect(redirect_uri)
except Exception as err:
logger.error(err)
response = make_response(({
"message": "Internal Server Error. Please try again after sometime.",
"error": err,
}), 500)
elif request.path == "/getUserDetails" and request.method == GET:
try:
app = zcatalyst_sdk.initialize()
userDetails = getUserId(app)
if len(userDetails) != 0:
response = make_response({
"userId":userDetails[0]["Token"]["userId"]
},200)
else:
response = make_response({
"userId":None
},200)
return response
except Exception as err:
response = make_response({
"message":"Internal Server Error in Getting User Details. Please try again after sometime.",
"error":err
},500)
return response
elif request.path == "/crmData" and request.method == GET:
try:
app = zcatalyst_sdk.initialize()
userDetails = getUserId(app)
accesstoken = getAccessToken(userDetails,app)
headers = {
'Authorization':f'Zoho-oauthtoken {accesstoken}',
"Content-Type": "application/json"
}
response1 = requests.get(apiUrl,headers=headers)
response = make_response(jsonify(response1.json()),200)
return response
except Exception as err:
logger.error(err)
response = make_response({
"message":"Internal Server Error.Please try again after sometime"
},500)
return response
elif request.path == "/crmData" and request.method == POST:
try:
app = zcatalyst_sdk.initialize()
data = {"data": [json.loads(request.data)]}
userDetails = getUserId(app)
accesstoken = getAccessToken(userDetails,app)
headers = {
'Authorization':f'Zoho-oauthtoken {accesstoken}',
"Content-Type": "application/json"
}
response1 = requests.post(apiUrl,headers=headers,json=data)
if response1.status_code == 200 or response1.status_code == 201:
response = make_response((response1.json()),200)
return response
else:
logger.error(response1.json())
response = make_response((response1.json()),500)
return response
except Exception as err:
logger.error(err)
response = make_response({
"message":"Internal Server Error.Please try again after sometime"
},500)
return response
elif request.path == f"/crmData/{id}" and request.method == GET:
try:
app = zcatalyst_sdk.initialize()
userDetails = getUserId(app)
accesstoken = getAccessToken(userDetails,app)
headers = {
'Authorization':f'Zoho-oauthtoken {accesstoken}',
"Content-Type": "application/json"
}
path = f"/{id}"
response1 = requests.get(apiUrl+path,headers=headers)
response = make_response((response1.json()),200)
return response
except Exception as err:
logger.error(err)
response = make_response({
"message":"Internal Server Errro.Please try again after sometime"
},500)
return response
elif request.path == f"/crmData/{id}" and request.method == PUT:
try:
app = zcatalyst_sdk.initialize()
reqData = request.get_json()
data = {
"data":[reqData]
}
if reqData is None:
response = make_response({
"message":"Update Data not found"
},400)
userDetails = getUserId(app)
accesstoken = getAccessToken(userDetails,app)
headers = {
'Authorization':f'Zoho-oauthtoken {accesstoken}',
"Content-Type": "application/json"
}
path = f"/{id}"
response1 = requests.put(apiUrl+path,headers=headers,json=data)
response = make_response((response1.json()),200)
return response
except Exception as err:
logger.error(err)
response = make_response({
"message":"Internal Server Error.Please try again after sometime"
},500)
return response
elif request.path ==f"/crmData/{id}" and request.method == DELETE:
try:
app = zcatalyst_sdk.initialize()
userDetails = getUserId(app)
accesstoken = getAccessToken(userDetails,app)
headers = {
'Authorization':f'Zoho-oauthtoken {accesstoken}',
"Content-Type": "application/json"
}
path = f"/{id}"
response1 = requests.delete(apiUrl+path,headers=headers)
response = make_response((response1.json()),200)
return response
except Exception as err:
logger.error(err)
response = make_response({
"message":"Internal Server Errro.Please try again after sometime"
},500)
return response
else:
response = make_response('Unknown path')
response.status_code = 400
return response
except Exception as err:
logger.error(f"Exception in CRM function {err}")
response = make_response(500)
return response
def getRefreshToken(code,domain):
params = {
'code':code,
'client_id':CLIENT_ID,
'client_secret':CLIENT_SECRET,
'grant_type':'authorization_code',
'redirect_uri':f"{domain}/server/lead_manager_function/generateToken"
}
response = requests.post(url,params = params)
data = response.json()
print(f'\n\nrefresh_token:{data}\n\n')
return data['refresh_token']
def getAccessToken(userdetails,app):
userId = userdetails[0]['Token']['userId']
Zcql = app.zcql()
query = f"Select refresh_token FROM Token where userId={userId}"
response = Zcql.execute_query(query)
refresh_token = response[0]['Token']['refresh_token']
args = {}
args["client_id"] = CLIENT_ID
args["client_secret"] = CLIENT_SECRET
args["auth_url"] = url
args["refresh_url"] = url
args["refresh_token"] = refresh_token
connector_service = app.connection({userId:args})
access_token = connector_service.get_connector(userId).get_access_token()
return access_token
def getUserId(app):
try:
user = app.authentication().get_current_user()
user_id = user["user_id"]
response = app.zcql().execute_query(f"Select * from Token where userId = {user_id}")
return response
except Exception as err:
response = make_response({
"message":"Internal Server error",
"error":err
},500)
return response
Note: このコードを関数ファイルにコピー&ペーストした後、以下の値を必ず置き換えてください:
- 14行目のClient ID
- 15行目のClient Secret
- 32行目のTable ID
関数ディレクトリの設定が完了しました。関数とクライアントのコードについては、クライアントの設定後に説明します。
最終更新日 2026-03-05 11:43:24 +0530 IST