お知らせ:

当社は、お客様により充実したサポート情報を迅速に提供するため、本ページのコンテンツは機械翻訳を用いて日本語に翻訳しています。正確かつ最新のサポート情報をご覧いただくには、本内容の英語版を参照してください。

関数ディレクトリの設定

Advanced I/O関数を設定して、Lead Managerアプリケーションのコーディングを開始します。

関数ディレクトリ functions/lead_manager_functionには以下が含まれています:

  • main.pyメイン関数ファイル
  • catalyst-config.json設定ファイル
  • 追加する外部ライブラリを記述するためのrequirements.txtファイル

main.pyにコードを追加します。関数の設定には任意のIDEを使用できます。

Note: このセクションのコードを十分に理解するために、必ず目を通してください。関数とクライアントのコードについては、クライアントの設定後に説明します。

以下のコードをコピーして、functions/lead_manager_functionディレクトリにあるmain.pyに貼り付けてファイルを保存してください。

main.py
copy
import json
import random
import zcatalyst_sdk
import logging
from flask import Request, make_response, jsonify,redirect
import requests
import os
apiUrl = "https://www.zohoapis.com/crm/v2/Leads"
url = "https://accounts.zoho.com/oauth/v2/token"
GET = "GET"
POST = "POST"
PUT = "PUT"
DELETE = "DELETE"
CLIENT_ID = "1000.CEE5336HDV5JE3UHERR2BRITBI4NCN"
CLIENT_SECRET = "5292351d5c5960d7cba13ee1684c646ed8df146530" 
logger = logging.getLogger()
def handler(request: Request):
    try:
        app = zcatalyst_sdk.initialize()
        Datastore = app.datastore()
        id = os.path.basename(request.path)
        if request.path == "/generateToken" and request.method == GET:
            try:
                code = request.args.get('code')
                user_management_service = app.authentication()
                is_local = os.getenv('X_ZOHO_CATALYST_IS_LOCAL') == 'true'
                protocol = 'http' if is_local else 'https'
                host = request.headers['host'] if is_local else request.headers['host'].split(':')[0]
                domain = f"{protocol}://{host}"
                userDetails = user_management_service.get_current_user()
                user_id = userDetails["user_id"]
                tab = Datastore.table(12130000003381913)
                row = {"refresh_token":getRefreshToken(code,domain),"userId":user_id}
                tab.insert_row(row)
                redirect_uri = f"{domain}/app/index.html"
                return redirect(redirect_uri)
            except Exception as err:
                logger.error(err)
                response = make_response(({
                    "message": "Internal Server Error. Please try again after sometime.",
                    "error": err,
                }), 500)
        elif request.path == "/getUserDetails" and request.method == GET:
            try:
                app = zcatalyst_sdk.initialize()
                userDetails = getUserId(app)
                if len(userDetails) != 0:
                    response = make_response({
                        "userId":userDetails[0]["Token"]["userId"]
                    },200)
                else:
                    response = make_response({
                        "userId":None
                    },200)
                return response
            except Exception as err:
                response = make_response({
                    "message":"Internal Server Error in Getting User Details. Please try again after sometime.",
                    "error":err
                },500)
                return response
        elif request.path == "/crmData" and request.method == GET:
            try:
                app = zcatalyst_sdk.initialize()
                userDetails = getUserId(app)
                accesstoken = getAccessToken(userDetails,app)
                headers = {
                    'Authorization':f'Zoho-oauthtoken {accesstoken}',
                    "Content-Type": "application/json"
                }
                response1 = requests.get(apiUrl,headers=headers)
                response = make_response(jsonify(response1.json()),200)
                return response
            except Exception as err:
                logger.error(err)
                response = make_response({
                    "message":"Internal Server Error.Please try again after sometime"
                },500)
                return response
        elif request.path == "/crmData" and request.method == POST:
            try:
                app = zcatalyst_sdk.initialize()
                data = {"data": [json.loads(request.data)]}
                userDetails = getUserId(app)
                accesstoken = getAccessToken(userDetails,app)
                headers = {
                    'Authorization':f'Zoho-oauthtoken {accesstoken}',
                    "Content-Type": "application/json"
                }
                response1 = requests.post(apiUrl,headers=headers,json=data)
                if response1.status_code == 200 or response1.status_code == 201:
                    response = make_response((response1.json()),200)
                    return response
                else:
                    logger.error(response1.json())
                    response = make_response((response1.json()),500)
                    return response
            except Exception as err:
                logger.error(err)
                response = make_response({
                    "message":"Internal Server Error.Please try again after sometime"
                },500)
                return response
        elif request.path == f"/crmData/{id}" and request.method == GET:
            try:
                app = zcatalyst_sdk.initialize()
                userDetails = getUserId(app)
                accesstoken = getAccessToken(userDetails,app)
                headers = {
                    'Authorization':f'Zoho-oauthtoken {accesstoken}',
                    "Content-Type": "application/json"
                }
                path = f"/{id}"
                response1 = requests.get(apiUrl+path,headers=headers)
                response = make_response((response1.json()),200)
                return response
            except Exception as err:
                logger.error(err)
                response = make_response({
                    "message":"Internal Server Errro.Please try again after sometime"
                },500)
                return response
        elif request.path == f"/crmData/{id}" and request.method == PUT:
            try:
                app = zcatalyst_sdk.initialize()
                reqData = request.get_json()
                data = {
                    "data":[reqData]
                }
                if reqData is None:
                    response = make_response({
                        "message":"Update Data not found"
                    },400)
                userDetails = getUserId(app)
                accesstoken = getAccessToken(userDetails,app)
                headers = {
                    'Authorization':f'Zoho-oauthtoken {accesstoken}',
                    "Content-Type": "application/json"
                }
                path = f"/{id}"
                response1 = requests.put(apiUrl+path,headers=headers,json=data)
                response = make_response((response1.json()),200)
                return response
            except Exception as err:
                logger.error(err)
                response = make_response({
                    "message":"Internal Server Error.Please try again after sometime"
                },500)
                return response
        elif request.path ==f"/crmData/{id}" and request.method == DELETE:
            try:
                app = zcatalyst_sdk.initialize()
                userDetails = getUserId(app)
                accesstoken = getAccessToken(userDetails,app)
                headers = {
                    'Authorization':f'Zoho-oauthtoken {accesstoken}',
                    "Content-Type": "application/json"
                }
                path = f"/{id}"
                response1 = requests.delete(apiUrl+path,headers=headers)
                response = make_response((response1.json()),200)
                return response
            except Exception as err:
                logger.error(err)
                response = make_response({
                    "message":"Internal Server Errro.Please try again after sometime"
                },500)
                return response
        else:
            response = make_response('Unknown path')
            response.status_code = 400
            return response
    except Exception as err:
        logger.error(f"Exception in CRM function {err}")
        response = make_response(500)
        return response
def getRefreshToken(code,domain):
    params = {
        'code':code,
        'client_id':CLIENT_ID,
        'client_secret':CLIENT_SECRET,
        'grant_type':'authorization_code',
        'redirect_uri':f"{domain}/server/lead_manager_function/generateToken"
        }
    response = requests.post(url,params = params)
    data = response.json()
    print(f'\n\nrefresh_token:{data}\n\n')
    return data['refresh_token']
def getAccessToken(userdetails,app):
    userId = userdetails[0]['Token']['userId'] 
    Zcql = app.zcql()
    query = f"Select refresh_token FROM Token where userId={userId}"
    response = Zcql.execute_query(query)
    refresh_token = response[0]['Token']['refresh_token']
    args = {}
    args["client_id"] = CLIENT_ID
    args["client_secret"] = CLIENT_SECRET
    args["auth_url"] = url
    args["refresh_url"] = url
    args["refresh_token"] = refresh_token
    connector_service = app.connection({userId:args})
    access_token = connector_service.get_connector(userId).get_access_token()
    return access_token
def getUserId(app):
    try:
        user = app.authentication().get_current_user()
        user_id = user["user_id"]
        response = app.zcql().execute_query(f"Select * from Token where userId = {user_id}")
        return response
    except Exception as err:
        response = make_response({
            "message":"Internal Server error",
            "error":err
        },500)
        return response
View more
Note: このコードを関数ファイルにコピー&ペーストした後、以下の値を必ず置き換えてください:

関数ディレクトリの設定が完了しました。関数とクライアントのコードについては、クライアントの設定後に説明します。

最終更新日 2026-03-05 11:43:24 +0530 IST