Aviso:

Para brindarle información de soporte completa de manera más rápida, el contenido de esta página ha sido traducido al español mediante traducción automática. Para consultar la información de soporte más precisa, consulte la versión en inglés de este contenido.

Configurar el directorio de funciones

Ahora comenzaremos a codificar la aplicación Lead Manager configurando la función Advanced I/O.

El directorio de funciones functions/lead_manager_function contiene:

  • El archivo principal de la función main.py
  • El archivo de configuración catalyst-config.json
  • El archivo requirements.txt para mencionar cualquier biblioteca externa que puedas agregar.

Agregarás código en main.py y puedes usar cualquier IDE para configurar la función.

Nota: Por favor revisa el código en esta sección para asegurarte de que lo comprendes completamente. Discutiremos el código de la función y del cliente, después de que configures el cliente.

Puedes copiar directamente el código a continuación y pegarlo en main.py ubicado en el directorio functions/lead_manager_function y guardar el archivo.

main.py
copy
import json
import random
import zcatalyst_sdk
import logging
from flask import Request, make_response, jsonify,redirect
import requests
import os
apiUrl = "https://www.zohoapis.com/crm/v2/Leads"
url = "https://accounts.zoho.com/oauth/v2/token"
GET = "GET"
POST = "POST"
PUT = "PUT"
DELETE = "DELETE"
CLIENT_ID = "1000.CEE5336HDV5JE3UHERR2BRITBI4NCN"
CLIENT_SECRET = "5292351d5c5960d7cba13ee1684c646ed8df146530" 
logger = logging.getLogger()
def handler(request: Request):
    try:
        app = zcatalyst_sdk.initialize()
        Datastore = app.datastore()
        id = os.path.basename(request.path)
        if request.path == "/generateToken" and request.method == GET:
            try:
                code = request.args.get('code')
                user_management_service = app.authentication()
                is_local = os.getenv('X_ZOHO_CATALYST_IS_LOCAL') == 'true'
                protocol = 'http' if is_local else 'https'
                host = request.headers['host'] if is_local else request.headers['host'].split(':')[0]
                domain = f"{protocol}://{host}"
                userDetails = user_management_service.get_current_user()
                user_id = userDetails["user_id"]
                tab = Datastore.table(12130000003381913)
                row = {"refresh_token":getRefreshToken(code,domain),"userId":user_id}
                tab.insert_row(row)
                redirect_uri = f"{domain}/app/index.html"
                return redirect(redirect_uri)
            except Exception as err:
                logger.error(err)
                response = make_response(({
                    "message": "Internal Server Error. Please try again after sometime.",
                    "error": err,
                }), 500)
        elif request.path == "/getUserDetails" and request.method == GET:
            try:
                app = zcatalyst_sdk.initialize()
                userDetails = getUserId(app)
                if len(userDetails) != 0:
                    response = make_response({
                        "userId":userDetails[0]["Token"]["userId"]
                    },200)
                else:
                    response = make_response({
                        "userId":None
                    },200)
                return response
            except Exception as err:
                response = make_response({
                    "message":"Internal Server Error in Getting User Details. Please try again after sometime.",
                    "error":err
                },500)
                return response
        elif request.path == "/crmData" and request.method == GET:
            try:
                app = zcatalyst_sdk.initialize()
                userDetails = getUserId(app)
                accesstoken = getAccessToken(userDetails,app)
                headers = {
                    'Authorization':f'Zoho-oauthtoken {accesstoken}',
                    "Content-Type": "application/json"
                }
                response1 = requests.get(apiUrl,headers=headers)
                response = make_response(jsonify(response1.json()),200)
                return response
            except Exception as err:
                logger.error(err)
                response = make_response({
                    "message":"Internal Server Error.Please try again after sometime"
                },500)
                return response
        elif request.path == "/crmData" and request.method == POST:
            try:
                app = zcatalyst_sdk.initialize()
                data = {"data": [json.loads(request.data)]}
                userDetails = getUserId(app)
                accesstoken = getAccessToken(userDetails,app)
                headers = {
                    'Authorization':f'Zoho-oauthtoken {accesstoken}',
                    "Content-Type": "application/json"
                }
                response1 = requests.post(apiUrl,headers=headers,json=data)
                if response1.status_code == 200 or response1.status_code == 201:
                    response = make_response((response1.json()),200)
                    return response
                else:
                    logger.error(response1.json())
                    response = make_response((response1.json()),500)
                    return response
            except Exception as err:
                logger.error(err)
                response = make_response({
                    "message":"Internal Server Error.Please try again after sometime"
                },500)
                return response
        elif request.path == f"/crmData/{id}" and request.method == GET:
            try:
                app = zcatalyst_sdk.initialize()
                userDetails = getUserId(app)
                accesstoken = getAccessToken(userDetails,app)
                headers = {
                    'Authorization':f'Zoho-oauthtoken {accesstoken}',
                    "Content-Type": "application/json"
                }
                path = f"/{id}"
                response1 = requests.get(apiUrl+path,headers=headers)
                response = make_response((response1.json()),200)
                return response
            except Exception as err:
                logger.error(err)
                response = make_response({
                    "message":"Internal Server Errro.Please try again after sometime"
                },500)
                return response
        elif request.path == f"/crmData/{id}" and request.method == PUT:
            try:
                app = zcatalyst_sdk.initialize()
                reqData = request.get_json()
                data = {
                    "data":[reqData]
                }
                if reqData is None:
                    response = make_response({
                        "message":"Update Data not found"
                    },400)
                userDetails = getUserId(app)
                accesstoken = getAccessToken(userDetails,app)
                headers = {
                    'Authorization':f'Zoho-oauthtoken {accesstoken}',
                    "Content-Type": "application/json"
                }
                path = f"/{id}"
                response1 = requests.put(apiUrl+path,headers=headers,json=data)
                response = make_response((response1.json()),200)
                return response
            except Exception as err:
                logger.error(err)
                response = make_response({
                    "message":"Internal Server Error.Please try again after sometime"
                },500)
                return response
        elif request.path ==f"/crmData/{id}" and request.method == DELETE:
            try:
                app = zcatalyst_sdk.initialize()
                userDetails = getUserId(app)
                accesstoken = getAccessToken(userDetails,app)
                headers = {
                    'Authorization':f'Zoho-oauthtoken {accesstoken}',
                    "Content-Type": "application/json"
                }
                path = f"/{id}"
                response1 = requests.delete(apiUrl+path,headers=headers)
                response = make_response((response1.json()),200)
                return response
            except Exception as err:
                logger.error(err)
                response = make_response({
                    "message":"Internal Server Errro.Please try again after sometime"
                },500)
                return response
        else:
            response = make_response('Unknown path')
            response.status_code = 400
            return response
    except Exception as err:
        logger.error(f"Exception in CRM function {err}")
        response = make_response(500)
        return response
def getRefreshToken(code,domain):
    params = {
        'code':code,
        'client_id':CLIENT_ID,
        'client_secret':CLIENT_SECRET,
        'grant_type':'authorization_code',
        'redirect_uri':f"{domain}/server/lead_manager_function/generateToken"
        }
    response = requests.post(url,params = params)
    data = response.json()
    print(f'\n\nrefresh_token:{data}\n\n')
    return data['refresh_token']
def getAccessToken(userdetails,app):
    userId = userdetails[0]['Token']['userId'] 
    Zcql = app.zcql()
    query = f"Select refresh_token FROM Token where userId={userId}"
    response = Zcql.execute_query(query)
    refresh_token = response[0]['Token']['refresh_token']
    args = {}
    args["client_id"] = CLIENT_ID
    args["client_secret"] = CLIENT_SECRET
    args["auth_url"] = url
    args["refresh_url"] = url
    args["refresh_token"] = refresh_token
    connector_service = app.connection({userId:args})
    access_token = connector_service.get_connector(userId).get_access_token()
    return access_token
def getUserId(app):
    try:
        user = app.authentication().get_current_user()
        user_id = user["user_id"]
        response = app.zcql().execute_query(f"Select * from Token where userId = {user_id}")
        return response
    except Exception as err:
        response = make_response({
            "message":"Internal Server error",
            "error":err
        },500)
        return response
View more
Nota: Después de copiar y pegar este código en tu archivo de función, asegúrate de reemplazar los siguientes valores en él:

El directorio de funciones está ahora configurado. Discutiremos el código de la función y del cliente, después de que configures el cliente.

Última actualización 2026-03-20 21:51:56 +0530 IST