2023年12月2日发(作者:冼涵柳)
苹果开发者apple_developer_api之前研究了苹果超级签,为了方便的注册udid并且下载描述文件,特意写此API,方便程序调用2022.1.8 更新#!/usr/bin/env python# -*- coding:utf-8 -*-# project: 4月
# author: NinEveN# date: 2020/4/17# pip install pyjwtimport base64import loggingimport osimport timefrom collections import namedtuplefrom functools import wrapsimport jwtimport requestsfrom import settingslogger = ger(__name__)# /documentation/appstoreconnectapi/creating_api_keys_for_app_store_connect_api# /access/api
去申请秘钥#proxies = {}timeout = 120def request_format_log(req): try: (f"url:{} header:{s} code:{_code} body:{t}") except Exception as e: (e) return req#
需要和model里面的对应起来capability_info = [ [], ["PUSH_NOTIFICATIONS"], [ "PERSONAL_VPN", "PUSH_NOTIFICATIONS", "NETWORK_EXTENSIONS", ], [ "PERSONAL_VPN", "PUSH_NOTIFICATIONS", "NETWORK_EXTENSIONS", "WALLET", "ICLOUD", "INTER_APP_AUDIO", "ASSOCIATED_DOMAINS", "APP_GROUPS", "HEALTHKIT", "HOMEKIT", "WIRELESS_ACCESSORY_CONFIGURATION", "APPLE_PAY", "DATA_PROTECTION", "SIRIKIT", "MULTIPATH", "HOT_SPOT", "NFC_TAG_READING", "CLASSKIT", "AUTOFILL_CREDENTIAL_PROVIDER", "ACCESS_WIFI_INFORMATION", "COREMEDIA_HLS_LOW_LATENCY", ]]def get_capability(s_type): return capability_info[s_type]class DevicesAPI(object): # /documentation/appstoreconnectapi/devices def __init__(self, base_uri, jwt_headers): s = jwt_headers s_url = '%s/devices' % base_uri def list_devices(self, query_parameters=None): """ :param query_parameters: :return: 200 DevicesResponse OK Content-Type: application/json 400 ErrorResponse Bad Request An error occurred with your request. Content-Type: application/json 403 ErrorResponse Forbidden Request not authorized. Content-Type: application/json """ params = { "fields[devices]": "addedDate, deviceClass, model, name, platform, status, udid", "filter[platform]": "IOS", "limit": 200 } if query_parameters: for k, v in query_(): params[k] = v return request_format_log( (s_url, params=params, headers=s, proxies=s, timeout=t)) def list_enabled_devices(self): return _devices({"filter[status]": "ENABLED"}) def list_disabled_devices(self): return _devices({"filter[status]": "DISABLED"}) def list_device_by_device_id(self, device_id): return _devices({"filter[id]": device_id}) def register_device(self, device_name, device_udid, platform="IOS"): """ :param device_name: :param device_udid: :param platform: :return: 201 DeviceResponse Created Content-Type: application/json 400 ErrorResponse Bad Request An error occurred with your request. Content-Type: application/json 403 ErrorResponse Forbidden Request not authorized. Content-Type: application/json 409 ErrorResponse Conflict The provided resource data is not valid. Content-Type: application/json """ json = { 'data': { 'data': { 'type': 'devices', 'attributes': { 'name': device_name, 'udid': device_udid, 'platform': platform # IOS or MAC_OS } } } return request_format_log( (s_url, json=json, headers=s, proxies=s, timeout=t)) def read_device_information(self, device_id): """ :param device_id: :return: 200 DeviceResponse OK Content-Type: application/json 400 ErrorResponse Bad Request An error occurred with your request. Content-Type: application/json 403 ErrorResponse Forbidden Request not authorized. Content-Type: application/json 404 ErrorResponse Not Found Resource not found. Content-Type: application/json """ base_url = '%s/%s' % (s_url, device_id) params = { "fields[devices]": "addedDate, deviceClass, model, name, platform, status, udid", } return request_format_log( (base_url, params=params, headers=s, proxies=s, timeout=t)) def enabled_device(self, device_id, device_name): return _registered_device(device_id, device_name, 'ENABLED') def disabled_device(self, device_id, device_name): return _registered_device(device_id, device_name, 'DISABLED') def modify_registered_device(self, device_id, device_name, status): """ :param device_id: :param device_name: :param status: :return: 200 DeviceResponse OK Content-Type: application/json 400 ErrorResponse Bad Request An error occurred with your request. Content-Type: application/json 403 ErrorResponse Forbidden Request not authorized. Content-Type: application/json 404 ErrorResponse Not Found Resource not found. Content-Type: application/json 409 ErrorResponse Conflict The provided resource data is not valid. Content-Type: application/json """ base_url = '%s/%s' % (s_url, device_id) json = { 'data': { 'type': 'devices', 'id': device_id, 'attributes': { 'name': device_name, 'status': status } } } return request_format_log( (base_url, json=json, headers=s, proxies=s, timeout=t))class BundleIDsAPI(object): # /documentation/appstoreconnectapi/bundle_ids def __init__(self, base_uri, jwt_headers): s = jwt_headers _ids_url = '%s/bundleIds' % base_uri def register_bundle_id(self, bundle_id_name, bundle_id_identifier, platform="IOS", seed_id=''): def register_bundle_id(self, bundle_id_name, bundle_id_identifier, platform="IOS", seed_id=''): """ :param bundle_id_name: :param bundle_id_identifier: :param platform: :param seed_id: :return: 201 BundleIdResponse Created Content-Type: application/json 400 ErrorResponse Bad Request An error occurred with your request. Content-Type: application/json 403 ErrorResponse Forbidden Request not authorized. Content-Type: application/json 409 ErrorResponse Conflict The provided resource data is not valid. Content-Type: application/json """ json = { 'data': { 'type': 'bundleIds', 'attributes': { 'name': bundle_id_name, 'identifier': bundle_id_identifier, 'platform': platform, 'seedId': seed_id } } } return request_format_log( (_ids_url, json=json, headers=s, proxies=s, timeout=t)) def delete_bundle_id_by_id(self, bundle_id): """ :param bundle_id: :return: 204 No Content 400 ErrorResponse Bad Request An error occurred with your request. Content-Type: application/json 403 ErrorResponse Forbidden Request not authorized. Content-Type: application/json 404 ErrorResponse Not Found Resource not found. Content-Type: application/json 409 ErrorResponse Conflict The provided resource data is not valid. Content-Type: application/json """ base_url = '%s/%s' % (_ids_url, bundle_id) json = {} return request_format_log( (base_url, json=json, headers=s, proxies=s, timeout=t)) def list_bundle_ids(self, query_parameters=None): """ :param query_parameters: :return: 200 BundleIdsResponse OK Content-Type: application/json 400 ErrorResponse Bad Request An error occurred with your request. Content-Type: application/json 403 ErrorResponse Forbidden Request not authorized. Content-Type: application/json """ params = { "fields[bundleIds]": "identifier, name, platform, profiles, seedId", # "filter[platform]": "IOS", "limit": 200 } if query_parameters: for k, v in query_(): params[k] = v return request_format_log( (_ids_url, params=params, headers=s, proxies=s, timeout=t)) def list_bundle_id_by_identifier(self, identifier): return _bundle_ids({"filter[identifier]": identifier}) def list_bundle_id_by_id(self, bundle_id): return _bundle_ids({"filter[id]": bundle_id}) def modify_bundle_id(self, bundle_id, bundle_name): """ """ :param bundle_id: :param bundle_name: :return: 200 BundleIdResponse OK Content-Type: application/json 400 ErrorResponse Bad Request An error occurred with your request. Content-Type: application/json 403 ErrorResponse Forbidden Request not authorized. Content-Type: application/json 404 ErrorResponse Not Found Resource not found. Content-Type: application/json 409 ErrorResponse Conflict The provided resource data is not valid. Content-Type: application/json """ base_url = '%s/%s' % (_ids_url, bundle_id) json = { 'data': { 'type': 'bundleIds', 'id': bundle_id, 'attributes': { 'name': bundle_name, } } } return request_format_log( (base_url, json=json, headers=s, proxies=s, timeout=t))class BundleIDsCapabilityAPI(object): # /documentation/appstoreconnectapi/bundle_id_capabilities def __init__(self, base_uri, jwt_headers): s = jwt_headers _ids_capability_url = '%s/bundleIdCapabilities' % base_uri def disable_capability(self, bundle_id, capability_type): """ :param capability_type: :param bundle_id: :return: 204 No Content 400 ErrorResponse Bad Request An error occurred with your request. Content-Type: application/json 403 ErrorResponse Forbidden Request not authorized. Content-Type: application/json 404 ErrorResponse Not Found Resource not found. Content-Type: application/json 409 ErrorResponse Conflict The provided resource data is not valid. Content-Type: application/json """ base_url = '%s/%s_%s' % (_ids_capability_url, bundle_id, capability_type) json = {} return request_format_log( (base_url, json=json, headers=s, proxies=s, timeout=t)) def enable_capability(self, bundle_id, capability_type): """ :param bundle_id: :param capability_type: :return: 201 BundleIdCapabilityResponse Created Content-Type: application/json 400 ErrorResponse Bad Request An error occurred with your request. Content-Type: application/json 403 ErrorResponse Forbidden Request not authorized. Content-Type: application/json 409 ErrorResponse Conflict The provided resource data is not valid. Content-Type: application/json """ json = { 'data': { 'type': 'bundleIdCapabilities', 'attributes': { 'capabilityType': capability_type, # 'PUSH_NOTIFICATIONS',#PERSONAL_VPN 'settings': [] }, 'relationships': { 'bundleId': { 'data': { 'id': bundle_id, 'type': 'bundleIds', } } } } } } return request_format_log( (_ids_capability_url, json=json, headers=s, proxies=s, timeout=t))class ProfilesAPI(object): # /documentation/appstoreconnectapi/profiles def __init__(self, base_uri, jwt_headers): s = jwt_headers es_url = '%s/profiles' % base_uri def create_profile(self, bundle_id, certificate_id_list, profile_name, device_id_list, profile_type='IOS_APP_ADHOC'): """ :param bundle_id: :param certificate_id_list: :param profile_name: :param device_id_list: :param profile_type: :return: 201 ProfileResponse Created Content-Type: application/json 400 ErrorResponse Bad Request An error occurred with your request. Content-Type: application/json 403 ErrorResponse Forbidden Request not authorized. Content-Type: application/json 409 ErrorResponse Conflict The provided resource data is not valid. Content-Type: application/json """ json = { 'data': { 'type': 'profiles', 'attributes': { 'name': profile_name, 'profileType': profile_type, # Possible values: IOS_APP_DEVELOPMENT, IOS_APP_STORE, IOS_APP_ADHOC, IOS_APP_INHOUSE, # MAC_APP_DEVELOPMENT, MAC_APP_STORE, MAC_APP_DIRECT, TVOS_APP_DEVELOPMENT, TVOS_APP_STORE, # TVOS_APP_ADHOC, TVOS_APP_INHOUSE, MAC_CATALYST_APP_DEVELOPMENT, MAC_CATALYST_APP_STORE, # MAC_CATALYST_APP_DIRECT }, 'relationships': { 'bundleId': { 'data': {'id': bundle_id, 'type': 'bundleIds'} }, 'certificates': { 'data': [ {'id': certificate_id, 'type': 'certificates'} for certificate_id in certificate_id_list ] }, 'devices': { 'data': [ {'id': device_id, 'type': 'devices'} for device_id in device_id_list ] }, } } } return request_format_log( (es_url, json=json, headers=s, proxies=s, timeout=t)) def delete_profile(self, profile_id): """ :param profile_id: :return: 204 No Content 400 ErrorResponse Bad Request An error occurred with your request. Content-Type: application/json 403 ErrorResponse Forbidden Request not authorized. Content-Type: application/json 403 ErrorResponse Forbidden Request not authorized. Content-Type: application/json 404 ErrorResponse Not Found Resource not found. Content-Type: application/json 409 ErrorResponse Conflict The provided resource data is not valid. Content-Type: application/json """ base_url = '%s/%s' % (es_url, profile_id) json = {} return request_format_log( (base_url, json=json, headers=s, proxies=s, timeout=t)) def download_profile(self, profile_id): # n=base64.b64decode(profileContent) # with open('profilea','wb') as f: # (n) # print(n) pass def list_profiles(self, query_parameters=None): """ :param query_parameters: :return: 200 ProfilesResponse OK Content-Type: application/json 400 ErrorResponse Bad Request An error occurred with your request. Content-Type: application/json 403 ErrorResponse Forbidden Request not authorized. Content-Type: application/json """ params = { "limit": 200 } if query_parameters: for k, v in query_(): params[k] = v return request_format_log( (es_url, params=params, headers=s, proxies=s, timeout=t)) def list_profile_by_profile_id(self, profile_id): return _profiles({"filter[id]": profile_id, "include": ""}) def list_profile_by_profile_name(self, profile_name): return _profiles({"filter[name]": profile_name, "include": ""})class CertificatesAPI(object): # /documentation/appstoreconnectapi/certificates def __init__(self, base_uri, jwt_headers): s = jwt_headers icates_url = '%s/certificates' % base_uri def create_certificate(self, csr_content, certificate_type='IOS_DISTRIBUTION'): """ :param csr_content: :param certificate_type: :return: 201 CertificateResponse Created Content-Type: application/json 400 ErrorResponse Bad Request An error occurred with your request. Content-Type: application/json 403 ErrorResponse Forbidden Request not authorized. Content-Type: application/json 409 ErrorResponse Conflict The provided resource data is not valid. Content-Type: application/json """ json = { 'data': { 'type': 'certificates', 'attributes': { 'csrContent': csr_content, 'certificateType': certificate_type, # /documentation/appstoreconnectapi/certificatetype } } } } return request_format_log( (icates_url, json=json, headers=s, proxies=s, timeout=t)) def download_certificate(self, certificate_id): # ()['data'][0]['attributes']['certificateContent'] # n=base64.b64decode(certificateContent) # with open('xxxxxx','wb') as f: # (n) # print(n) pass def list_certificate(self, query_parameters=None): """ :param query_parameters: :return: 200 CertificatesResponse OK Content-Type: application/json 400 ErrorResponse Bad Request An error occurred with your request. Content-Type: application/json 403 ErrorResponse Forbidden Request not authorized. Content-Type: application/json """ params = { "fields[certificates]": "certificateContent, certificateType, csrContent, displayName, expirationDate, " "name, platform, serialNumber", } if query_parameters: for k, v in query_(): params[k] = v return request_format_log( (icates_url, params=params, headers=s, proxies=s, timeout=t)) def list_certificate_by_certificate_id(self, certificate_id): return _certificate({"filter[id]": certificate_id, }) def revoke_certificate(self, certificate_id): """ :param certificate_id: :return: 204 No Content 400 ErrorResponse Bad Request An error occurred with your request. Content-Type: application/json 403 ErrorResponse Forbidden Request not authorized. Content-Type: application/json 404 ErrorResponse Not Found Resource not found. Content-Type: application/json 409 ErrorResponse Conflict The provided resource data is not valid. Content-Type: application/json """ base_url = '%s/%s' % (icates_url, certificate_id) json = {} return request_format_log( (base_url, json=json, headers=s, proxies=s, timeout=t))class BaseInfoObj(object): @staticmethod def filter(obj_lists, query_parameters=None): if not isinstance(obj_lists, list): obj_lists = [obj_lists] if query_parameters: new_obj_lists = [] for obj in obj_lists: flag = True for k, v in query_(): if getattr(obj, k) != v: flag = False continue if flag: new_obj_(obj) return new_obj_lists return obj_lists return obj_lists @staticmethod def update(obj_lists, up_obj_list): conn_obj = [] conn_(obj_lists) if not isinstance(up_obj_list, list): up_obj_list = [up_obj_list] conn_(up_obj_list) repeat_id = [] repeat_obj = [] for i in range(len(conn_obj) - 1): for j in range(i + 1, len(conn_obj)): if conn_obj[i].id == conn_obj[j].id: repeat_(conn_obj[j]) repeat_(conn_obj[i].id) new_list = [] for ob in conn_obj: if in repeat_id: continue new_(ob) new_(repeat_obj) return new_list @staticmethod def delete(obj_lists, up_obj_list): new_obj_list = [] for obj in obj_lists: flag = True for up_obj in up_obj_list: if == up_: flag = False if flag: new_obj_(obj) return new_obj_listclass Devices(namedtuple("Devices", ["id", "addedDate", "name", "deviceClass", "model", "udid", "platform", "status"])): @classmethod def from_json_list(cls, json_list): new_cls_list = [] for json in json_list: new_cls_(_json(json)) return new_cls_list @classmethod def from_json(cls, json): new_dict = {'id': ('id', '')} attributes = ("attributes", {}) for k, v in (): new_dict[k] = v return cls(**new_dict) def copy_and_replace(self, **kwargs): return self._replace(**kwargs)class BundleIds(namedtuple("BundleIds", ["id", "name", "identifier", "platform", "seedId", ]), ): @classmethod def from_json_list(cls, json_list): new_cls_list = [] for json in json_list: new_cls_(_json(json)) return new_cls_list @classmethod def from_json(cls, json): new_dict = {'id': ('id', '')} attributes = ("attributes", {}) for k, v in (): new_dict[k] = v return cls(**new_dict) def copy_and_replace(self, **kwargs): return self._replace(**kwargs)class Profiles(namedtuple("Profiles", ["id", "name", "profileState", "createdDate", "profileType", "profileContent", "uuid", "platform", "expirationDate"]), ): @classmethod def from_json_list(cls, json_list): new_cls_list = [] for json in json_list: new_cls_(_json(json)) return new_cls_list @classmethod def from_json(cls, json): new_dict = {'id': ('id', '')} attributes = ("attributes", {}) for k, v in (): new_dict[k] = v return cls(**new_dict) def copy_and_replace(self, **kwargs): return self._replace(**kwargs) def download_profile(self, filepath): dirname = e(filepath) if (dirname) and (dirname): pass else: rs(dirname) n = base64.b64decode(eContent) with open(filepath, 'wb') as f: (n) return filepathclass Certificates(namedtuple("Certificates", ["id", "serialNumber", "certificateContent", "displayName", "name", "csrContent", "platform", "expirationDate", "certificateType"]), ): @classmethod def from_json_list(cls, json_list): new_cls_list = [] for json in json_list: new_cls_(_json(json)) return new_cls_list @classmethod def from_json(cls, json): new_dict = {'id': ('id', '')} attributes = ("attributes", {}) for k, v in (): new_dict[k] = v return cls(**new_dict) def copy_and_replace(self, **kwargs): return self._replace(**kwargs) def download_certificate(self, filepath): dirname = e(filepath) if (dirname) and (dirname): pass else: rs(dirname) n = base64.b64decode(icateContent) with open(filepath, 'wb') as f: (n) return filepathdef call_function_try_attempts(try_attempts=3, sleep_time=3): def decorator(func): @wraps(func) def wrapper(*args, **kwargs): start_time = () flag = False res = '' for i in range(try_attempts): try: res = func(*args, **kwargs) flag = True break except Exception as e: if 'Cannot connect to proxy' in str(e) or 'Read timed out' in str( e) or 'Max retries exceeded with' in str(e): ('access apple api failed . change proxy ip again')
g( f'exec {func} failed. Failed:{e} {try_attempts} times in total. now {sleep_time} later try ' {i}') res = str(e) # 'Authentication credentials are missing or invalid' in str(e) or if 'ED_AGREEMENTS_MISSING_OR_EXPIRED' in str(e): raise Exception(res) (sleep_time) (f"exec {func} finished. time:{() - start_time}") if not flag: (f'exec {func} failed after the maximum number of attempts. Failed:{res}') raise Exception(res) return res return wrapper return decoratorclass AppStoreConnectApi(DevicesAPI, BundleIDsAPI, BundleIDsCapabilityAPI, ProfilesAPI, CertificatesAPI): BASE_URI = '/v1' JWT_AUD = 'appstoreconnect-v1' JWT_ALG = 'ES256' def __init__(self, issuer_id, private_key_id, p8_private_key, exp_seconds=1200): """ 根据 Apple 文档,会话最长持续时间为 20 分钟:https : ///documentation/appstoreconnectapi/generating_tokens_for_api_requests :param issuer_id: :param private_key_id: :param p8_private_key: :param exp_seconds: max 20*60 """ _id = issuer_id e_key_id = private_key_id self.p8_private_key = p8_private_key _seconds = exp_seconds s = proxies t = timeout self.__make_jwt_headers() DevicesAPI.__init__(self, _URI, s) BundleIDsAPI.__init__(self, _URI, s) BundleIDsCapabilityAPI.__init__(self, _URI, s) ProfilesAPI.__init__(self, _URI, s) CertificatesAPI.__init__(self, _URI, s) _limit_info = {} def __set_rate_limit_info(self, req_headers): for par in req_('X-Rate-Limit').split(";"): if par: limit_info_list = (":") _limit_info[limit_info_list[0]] = limit_info_list[1] user_rem_info = _limit_info if int(user_rem_('user-hour-rem')) < 3595: g(f"user-hour-rem over limit. so get jwt headers") self.__init__(_id, e_key_id, self.p8_private_key) _limit_info = user_rem_info (f"rate_limit_info:{_limit_info}") def __make_jwt_headers(self): data = { "iss": _id, "iat": int(()), "exp": int(()) + _seconds, "aud": _AUD, } jwt_headers = { "alg": _ALG, "kid": e_key_id, "typ": "JWT" } jwt_encoded = (data, self.p8_private_key, algorithm=_ALG, headers=jwt_headers) headers = { 'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_13_6) AppleWebKit/537.39 (KHTML, like Gecko) ' 'Chrome/72.0.3626.109 Safari/537.39', 'Authorization': 'Bearer %s' % jwt_encoded } s = headers def __base_format(self, s_type, req, success_code): # self.__set_rate_limit_info(s) if _code == success_code: req_data = () data = req_('data') if isinstance(data, list) or isinstance(data, dict): obj = None if isinstance(data, dict): data = [data] if s_type == 'devices': obj = _json_list(data) elif s_type == 'bundleIds': obj = _json_list(data) elif s_type == 'profiles': obj = _json_list(data) elif s_type == 'certificates': obj = _json_list(data) if len(obj) == 1: return obj[0] return obj return obj else: # self.__init_jwt_headers() raise Exception(f'error instance: {}') elif _code == 401: #
授权问题 raise Exception() elif _code == 429: #
请求超过每小时限制 {'user-hour-lim': '3600', 'user-hour-rem': '3586'} raise Exception() elif _code == 500: (60) raise Exception() else: raise Exception('unknown error: %s code:%s' % (, _code)) def __device_store(self, req, success_code=200): return self.__base_format('devices', req, success_code) def __profile_store(self, req, success_code=200): return self.__base_format('profiles', req, success_code) def __certificates_store(self, req, success_code=200): return self.__base_format('certificates', req, success_code) def __bundle_ids_store(self, req, success_code=200): return self.__base_format('bundleIds', req, success_code) @call_function_try_attempts() def get_all_devices(self): req = _devices() return self.__device_store(req) def list_enabled_devices(self): req = super().list_enabled_devices() return self.__device_store(req) def get_all_bundle_ids(self): req = _bundle_ids() return self.__bundle_ids_store(req) def get_all_profiles(self): req = _profiles() return self.__profile_store(req) @call_function_try_attempts(try_attempts=2) def get_all_certificates(self): req = _certificate() return self.__certificates_store(req) @call_function_try_attempts() def get_certificate_by_cid(self, certificate_id): req = _certificate_by_certificate_id(certificate_id) return self.__certificates_store(req) def list_device_by_udid(self, udid): device_obj_list = (_all_devices(), {"udid": udid}) if not device_obj_list: raise Exception('Device obj is None') if len(device_obj_list) != 1 and len(set([device_ for device_obj in device_obj_list])) != 1: raise Exception('more than one Device obj') return device_obj_list @call_function_try_attempts() def register_device(self, device_name, device_udid, platform="IOS"): device_obj_list = (_all_devices(), {"udid": device_udid}) #
发现同一个开发者账户里面有两个一样的udid,奇了怪 if device_obj_list and ( if device_obj_list and ( len(device_obj_list) == 1 or len(set([device_ for device_obj in device_obj_list])) == 1): device_obj = device_obj_list[0] req = _registered_device(device_, device_name, 'ENABLED') return self.__device_store(req) else: req = super().register_device(device_name, device_udid, platform) return self.__device_store(req, 201) @call_function_try_attempts() def enabled_device(self, device_id, device_name, udid): if device_id and device_name: req = super().enabled_device(device_id, device_name) if _code == 200: return self.__device_store(req) if udid: device_obj_list = _device_by_udid(udid) for device_obj in device_obj_list: req = _registered_device(device_, device_, 'ENABLED') return self.__device_store(req) @call_function_try_attempts() def disabled_device(self, device_id, device_name, udid): if device_id and device_name: req = super().disabled_device(device_id, device_name) if _code == 200: return self.__device_store(req) if udid: device_obj_list = _device_by_udid(udid) for device_obj in device_obj_list: req = _registered_device(device_, device_, 'DISABLED') return self.__device_store(req) def list_bundle_ids_by_identifier(self, identifier): req = super().list_bundle_id_by_identifier(identifier) return self.__bundle_ids_store(req) def __do_success(self, req, status=200): if _code == status: return True return False @call_function_try_attempts() def enable_capability_by_s_type(self, bundle_id, s_type): capability_list = get_capability(s_type) if capability_list: for capability in capability_list: req = super().enable_capability(bundle_id, capability) if self.__do_success(req, 201): (f"{bundle_id} enable_capability {capability} success") else: g(f"{bundle_id} enable_capability {capability} failed {t}") return True @call_function_try_attempts() def disable_capability_by_s_type(self, bundle_id, s_type=len(capability_info) - 1): capability_list = get_capability(s_type) if capability_list: for capability in capability_list: req = super().disable_capability(bundle_id, capability) if self.__do_success(req, 204): (f"{bundle_id} enable_capability {capability} success") else: g(f"{bundle_id} enable_capability {capability} failed {t}") return True def enable_push_vpn_capability(self, bundle_id): req = super().enable_capability(bundle_id, 'PUSH_NOTIFICATIONS') if self.__do_success(req, 201): req = super().enable_capability(bundle_id, 'PERSONAL_VPN') if self.__do_success(req, 201): return True return False def disable_push_vpn_capability(self, bundle_id): req = super().disable_capability(bundle_id, 'PUSH_NOTIFICATIONS') if self.__do_success(req, 204): req = super().disable_capability(bundle_id, 'PERSONAL_VPN') if self.__do_success(req, 204): return True return False @call_function_try_attempts() def register_bundle_id(self, bundle_id_name, bundle_id_identifier, platform="IOS", seed_id=''): identifier_obj = _bundle_ids_by_identifier(bundle_id_identifier) if isinstance(identifier_obj, BundleIds): req = _bundle_id(identifier_, bundle_id_name) return self.__bundle_ids_store(req) else: req = super().register_bundle_id(bundle_id_name, bundle_id_identifier, platform, seed_id) return self.__bundle_ids_store(req, 201) @call_function_try_attempts() def register_bundle_id_enable_capability(self, bundle_id_name, bundle_id_identifier, s_type, platform="IOS", seed_id=''): bundle_ids = er_bundle_id(bundle_id_name, bundle_id_identifier, platform, seed_id) if isinstance(bundle_ids, BundleIds): if _capability_by_s_type(bundle_, s_type): return bundle_ids @call_function_try_attempts() def delete_bundle_by_identifier(self, identifier_id, identifier_name): if identifier_id: req = _bundle_id_by_id(identifier_id) if _code == 204: return True identifier_obj = _bundle_ids_by_identifier(identifier_name) if isinstance(identifier_obj, BundleIds): req = _bundle_id_by_id(identifier_) if _code == 204: return True @call_function_try_attempts() def create_profile(self, profile_id, bundle_id, certificate_id, profile_name, device_id_list=None, profile_type='IOS_APP_ADHOC'): if device_id_list is None: device_id_list = [] if not device_id_list: device_id_list = [ for device in _enabled_devices()] _profile_by_id(profile_id, profile_name) # profile_obj = _profile_by_profile_name(profile_name) # if isinstance(profile_obj, Profiles): # _profile_by_id(profile_, profile_name) req = super().create_profile(bundle_id, [certificate_id], profile_name, device_id_list) if _code == 201: self.__profile_store(req, 201) return _json(().get("data")) raise KeyError() def list_profile_by_profile_name(self, profile_name): req = super().list_profile_by_profile_name(profile_name) return self.__profile_store(req) @call_function_try_attempts() def delete_profile_by_id(self, profile_id, profile_name): if profile_id: req = super().delete_profile(profile_id) if self.__do_success(req, 204): return True profile_obj = _profile_by_profile_name(profile_name) if profile_obj: req = super().delete_profile(profile_) if self.__do_success(req, 204): return True @call_function_try_attempts() def create_certificate(self, csr_content, certificate_type='IOS_DISTRIBUTION'): req = super().create_certificate(csr_content, certificate_type) if _code == 201: return self.__certificates_store(req, 201) raise KeyError() @call_function_try_attempts() def revoke_certificate(self, certificate_id): req = super().revoke_certificate(certificate_id) if _code == 204: return True return False
2023年12月2日发(作者:冼涵柳)
苹果开发者apple_developer_api之前研究了苹果超级签,为了方便的注册udid并且下载描述文件,特意写此API,方便程序调用2022.1.8 更新#!/usr/bin/env python# -*- coding:utf-8 -*-# project: 4月
# author: NinEveN# date: 2020/4/17# pip install pyjwtimport base64import loggingimport osimport timefrom collections import namedtuplefrom functools import wrapsimport jwtimport requestsfrom import settingslogger = ger(__name__)# /documentation/appstoreconnectapi/creating_api_keys_for_app_store_connect_api# /access/api
去申请秘钥#proxies = {}timeout = 120def request_format_log(req): try: (f"url:{} header:{s} code:{_code} body:{t}") except Exception as e: (e) return req#
需要和model里面的对应起来capability_info = [ [], ["PUSH_NOTIFICATIONS"], [ "PERSONAL_VPN", "PUSH_NOTIFICATIONS", "NETWORK_EXTENSIONS", ], [ "PERSONAL_VPN", "PUSH_NOTIFICATIONS", "NETWORK_EXTENSIONS", "WALLET", "ICLOUD", "INTER_APP_AUDIO", "ASSOCIATED_DOMAINS", "APP_GROUPS", "HEALTHKIT", "HOMEKIT", "WIRELESS_ACCESSORY_CONFIGURATION", "APPLE_PAY", "DATA_PROTECTION", "SIRIKIT", "MULTIPATH", "HOT_SPOT", "NFC_TAG_READING", "CLASSKIT", "AUTOFILL_CREDENTIAL_PROVIDER", "ACCESS_WIFI_INFORMATION", "COREMEDIA_HLS_LOW_LATENCY", ]]def get_capability(s_type): return capability_info[s_type]class DevicesAPI(object): # /documentation/appstoreconnectapi/devices def __init__(self, base_uri, jwt_headers): s = jwt_headers s_url = '%s/devices' % base_uri def list_devices(self, query_parameters=None): """ :param query_parameters: :return: 200 DevicesResponse OK Content-Type: application/json 400 ErrorResponse Bad Request An error occurred with your request. Content-Type: application/json 403 ErrorResponse Forbidden Request not authorized. Content-Type: application/json """ params = { "fields[devices]": "addedDate, deviceClass, model, name, platform, status, udid", "filter[platform]": "IOS", "limit": 200 } if query_parameters: for k, v in query_(): params[k] = v return request_format_log( (s_url, params=params, headers=s, proxies=s, timeout=t)) def list_enabled_devices(self): return _devices({"filter[status]": "ENABLED"}) def list_disabled_devices(self): return _devices({"filter[status]": "DISABLED"}) def list_device_by_device_id(self, device_id): return _devices({"filter[id]": device_id}) def register_device(self, device_name, device_udid, platform="IOS"): """ :param device_name: :param device_udid: :param platform: :return: 201 DeviceResponse Created Content-Type: application/json 400 ErrorResponse Bad Request An error occurred with your request. Content-Type: application/json 403 ErrorResponse Forbidden Request not authorized. Content-Type: application/json 409 ErrorResponse Conflict The provided resource data is not valid. Content-Type: application/json """ json = { 'data': { 'data': { 'type': 'devices', 'attributes': { 'name': device_name, 'udid': device_udid, 'platform': platform # IOS or MAC_OS } } } return request_format_log( (s_url, json=json, headers=s, proxies=s, timeout=t)) def read_device_information(self, device_id): """ :param device_id: :return: 200 DeviceResponse OK Content-Type: application/json 400 ErrorResponse Bad Request An error occurred with your request. Content-Type: application/json 403 ErrorResponse Forbidden Request not authorized. Content-Type: application/json 404 ErrorResponse Not Found Resource not found. Content-Type: application/json """ base_url = '%s/%s' % (s_url, device_id) params = { "fields[devices]": "addedDate, deviceClass, model, name, platform, status, udid", } return request_format_log( (base_url, params=params, headers=s, proxies=s, timeout=t)) def enabled_device(self, device_id, device_name): return _registered_device(device_id, device_name, 'ENABLED') def disabled_device(self, device_id, device_name): return _registered_device(device_id, device_name, 'DISABLED') def modify_registered_device(self, device_id, device_name, status): """ :param device_id: :param device_name: :param status: :return: 200 DeviceResponse OK Content-Type: application/json 400 ErrorResponse Bad Request An error occurred with your request. Content-Type: application/json 403 ErrorResponse Forbidden Request not authorized. Content-Type: application/json 404 ErrorResponse Not Found Resource not found. Content-Type: application/json 409 ErrorResponse Conflict The provided resource data is not valid. Content-Type: application/json """ base_url = '%s/%s' % (s_url, device_id) json = { 'data': { 'type': 'devices', 'id': device_id, 'attributes': { 'name': device_name, 'status': status } } } return request_format_log( (base_url, json=json, headers=s, proxies=s, timeout=t))class BundleIDsAPI(object): # /documentation/appstoreconnectapi/bundle_ids def __init__(self, base_uri, jwt_headers): s = jwt_headers _ids_url = '%s/bundleIds' % base_uri def register_bundle_id(self, bundle_id_name, bundle_id_identifier, platform="IOS", seed_id=''): def register_bundle_id(self, bundle_id_name, bundle_id_identifier, platform="IOS", seed_id=''): """ :param bundle_id_name: :param bundle_id_identifier: :param platform: :param seed_id: :return: 201 BundleIdResponse Created Content-Type: application/json 400 ErrorResponse Bad Request An error occurred with your request. Content-Type: application/json 403 ErrorResponse Forbidden Request not authorized. Content-Type: application/json 409 ErrorResponse Conflict The provided resource data is not valid. Content-Type: application/json """ json = { 'data': { 'type': 'bundleIds', 'attributes': { 'name': bundle_id_name, 'identifier': bundle_id_identifier, 'platform': platform, 'seedId': seed_id } } } return request_format_log( (_ids_url, json=json, headers=s, proxies=s, timeout=t)) def delete_bundle_id_by_id(self, bundle_id): """ :param bundle_id: :return: 204 No Content 400 ErrorResponse Bad Request An error occurred with your request. Content-Type: application/json 403 ErrorResponse Forbidden Request not authorized. Content-Type: application/json 404 ErrorResponse Not Found Resource not found. Content-Type: application/json 409 ErrorResponse Conflict The provided resource data is not valid. Content-Type: application/json """ base_url = '%s/%s' % (_ids_url, bundle_id) json = {} return request_format_log( (base_url, json=json, headers=s, proxies=s, timeout=t)) def list_bundle_ids(self, query_parameters=None): """ :param query_parameters: :return: 200 BundleIdsResponse OK Content-Type: application/json 400 ErrorResponse Bad Request An error occurred with your request. Content-Type: application/json 403 ErrorResponse Forbidden Request not authorized. Content-Type: application/json """ params = { "fields[bundleIds]": "identifier, name, platform, profiles, seedId", # "filter[platform]": "IOS", "limit": 200 } if query_parameters: for k, v in query_(): params[k] = v return request_format_log( (_ids_url, params=params, headers=s, proxies=s, timeout=t)) def list_bundle_id_by_identifier(self, identifier): return _bundle_ids({"filter[identifier]": identifier}) def list_bundle_id_by_id(self, bundle_id): return _bundle_ids({"filter[id]": bundle_id}) def modify_bundle_id(self, bundle_id, bundle_name): """ """ :param bundle_id: :param bundle_name: :return: 200 BundleIdResponse OK Content-Type: application/json 400 ErrorResponse Bad Request An error occurred with your request. Content-Type: application/json 403 ErrorResponse Forbidden Request not authorized. Content-Type: application/json 404 ErrorResponse Not Found Resource not found. Content-Type: application/json 409 ErrorResponse Conflict The provided resource data is not valid. Content-Type: application/json """ base_url = '%s/%s' % (_ids_url, bundle_id) json = { 'data': { 'type': 'bundleIds', 'id': bundle_id, 'attributes': { 'name': bundle_name, } } } return request_format_log( (base_url, json=json, headers=s, proxies=s, timeout=t))class BundleIDsCapabilityAPI(object): # /documentation/appstoreconnectapi/bundle_id_capabilities def __init__(self, base_uri, jwt_headers): s = jwt_headers _ids_capability_url = '%s/bundleIdCapabilities' % base_uri def disable_capability(self, bundle_id, capability_type): """ :param capability_type: :param bundle_id: :return: 204 No Content 400 ErrorResponse Bad Request An error occurred with your request. Content-Type: application/json 403 ErrorResponse Forbidden Request not authorized. Content-Type: application/json 404 ErrorResponse Not Found Resource not found. Content-Type: application/json 409 ErrorResponse Conflict The provided resource data is not valid. Content-Type: application/json """ base_url = '%s/%s_%s' % (_ids_capability_url, bundle_id, capability_type) json = {} return request_format_log( (base_url, json=json, headers=s, proxies=s, timeout=t)) def enable_capability(self, bundle_id, capability_type): """ :param bundle_id: :param capability_type: :return: 201 BundleIdCapabilityResponse Created Content-Type: application/json 400 ErrorResponse Bad Request An error occurred with your request. Content-Type: application/json 403 ErrorResponse Forbidden Request not authorized. Content-Type: application/json 409 ErrorResponse Conflict The provided resource data is not valid. Content-Type: application/json """ json = { 'data': { 'type': 'bundleIdCapabilities', 'attributes': { 'capabilityType': capability_type, # 'PUSH_NOTIFICATIONS',#PERSONAL_VPN 'settings': [] }, 'relationships': { 'bundleId': { 'data': { 'id': bundle_id, 'type': 'bundleIds', } } } } } } return request_format_log( (_ids_capability_url, json=json, headers=s, proxies=s, timeout=t))class ProfilesAPI(object): # /documentation/appstoreconnectapi/profiles def __init__(self, base_uri, jwt_headers): s = jwt_headers es_url = '%s/profiles' % base_uri def create_profile(self, bundle_id, certificate_id_list, profile_name, device_id_list, profile_type='IOS_APP_ADHOC'): """ :param bundle_id: :param certificate_id_list: :param profile_name: :param device_id_list: :param profile_type: :return: 201 ProfileResponse Created Content-Type: application/json 400 ErrorResponse Bad Request An error occurred with your request. Content-Type: application/json 403 ErrorResponse Forbidden Request not authorized. Content-Type: application/json 409 ErrorResponse Conflict The provided resource data is not valid. Content-Type: application/json """ json = { 'data': { 'type': 'profiles', 'attributes': { 'name': profile_name, 'profileType': profile_type, # Possible values: IOS_APP_DEVELOPMENT, IOS_APP_STORE, IOS_APP_ADHOC, IOS_APP_INHOUSE, # MAC_APP_DEVELOPMENT, MAC_APP_STORE, MAC_APP_DIRECT, TVOS_APP_DEVELOPMENT, TVOS_APP_STORE, # TVOS_APP_ADHOC, TVOS_APP_INHOUSE, MAC_CATALYST_APP_DEVELOPMENT, MAC_CATALYST_APP_STORE, # MAC_CATALYST_APP_DIRECT }, 'relationships': { 'bundleId': { 'data': {'id': bundle_id, 'type': 'bundleIds'} }, 'certificates': { 'data': [ {'id': certificate_id, 'type': 'certificates'} for certificate_id in certificate_id_list ] }, 'devices': { 'data': [ {'id': device_id, 'type': 'devices'} for device_id in device_id_list ] }, } } } return request_format_log( (es_url, json=json, headers=s, proxies=s, timeout=t)) def delete_profile(self, profile_id): """ :param profile_id: :return: 204 No Content 400 ErrorResponse Bad Request An error occurred with your request. Content-Type: application/json 403 ErrorResponse Forbidden Request not authorized. Content-Type: application/json 403 ErrorResponse Forbidden Request not authorized. Content-Type: application/json 404 ErrorResponse Not Found Resource not found. Content-Type: application/json 409 ErrorResponse Conflict The provided resource data is not valid. Content-Type: application/json """ base_url = '%s/%s' % (es_url, profile_id) json = {} return request_format_log( (base_url, json=json, headers=s, proxies=s, timeout=t)) def download_profile(self, profile_id): # n=base64.b64decode(profileContent) # with open('profilea','wb') as f: # (n) # print(n) pass def list_profiles(self, query_parameters=None): """ :param query_parameters: :return: 200 ProfilesResponse OK Content-Type: application/json 400 ErrorResponse Bad Request An error occurred with your request. Content-Type: application/json 403 ErrorResponse Forbidden Request not authorized. Content-Type: application/json """ params = { "limit": 200 } if query_parameters: for k, v in query_(): params[k] = v return request_format_log( (es_url, params=params, headers=s, proxies=s, timeout=t)) def list_profile_by_profile_id(self, profile_id): return _profiles({"filter[id]": profile_id, "include": ""}) def list_profile_by_profile_name(self, profile_name): return _profiles({"filter[name]": profile_name, "include": ""})class CertificatesAPI(object): # /documentation/appstoreconnectapi/certificates def __init__(self, base_uri, jwt_headers): s = jwt_headers icates_url = '%s/certificates' % base_uri def create_certificate(self, csr_content, certificate_type='IOS_DISTRIBUTION'): """ :param csr_content: :param certificate_type: :return: 201 CertificateResponse Created Content-Type: application/json 400 ErrorResponse Bad Request An error occurred with your request. Content-Type: application/json 403 ErrorResponse Forbidden Request not authorized. Content-Type: application/json 409 ErrorResponse Conflict The provided resource data is not valid. Content-Type: application/json """ json = { 'data': { 'type': 'certificates', 'attributes': { 'csrContent': csr_content, 'certificateType': certificate_type, # /documentation/appstoreconnectapi/certificatetype } } } } return request_format_log( (icates_url, json=json, headers=s, proxies=s, timeout=t)) def download_certificate(self, certificate_id): # ()['data'][0]['attributes']['certificateContent'] # n=base64.b64decode(certificateContent) # with open('xxxxxx','wb') as f: # (n) # print(n) pass def list_certificate(self, query_parameters=None): """ :param query_parameters: :return: 200 CertificatesResponse OK Content-Type: application/json 400 ErrorResponse Bad Request An error occurred with your request. Content-Type: application/json 403 ErrorResponse Forbidden Request not authorized. Content-Type: application/json """ params = { "fields[certificates]": "certificateContent, certificateType, csrContent, displayName, expirationDate, " "name, platform, serialNumber", } if query_parameters: for k, v in query_(): params[k] = v return request_format_log( (icates_url, params=params, headers=s, proxies=s, timeout=t)) def list_certificate_by_certificate_id(self, certificate_id): return _certificate({"filter[id]": certificate_id, }) def revoke_certificate(self, certificate_id): """ :param certificate_id: :return: 204 No Content 400 ErrorResponse Bad Request An error occurred with your request. Content-Type: application/json 403 ErrorResponse Forbidden Request not authorized. Content-Type: application/json 404 ErrorResponse Not Found Resource not found. Content-Type: application/json 409 ErrorResponse Conflict The provided resource data is not valid. Content-Type: application/json """ base_url = '%s/%s' % (icates_url, certificate_id) json = {} return request_format_log( (base_url, json=json, headers=s, proxies=s, timeout=t))class BaseInfoObj(object): @staticmethod def filter(obj_lists, query_parameters=None): if not isinstance(obj_lists, list): obj_lists = [obj_lists] if query_parameters: new_obj_lists = [] for obj in obj_lists: flag = True for k, v in query_(): if getattr(obj, k) != v: flag = False continue if flag: new_obj_(obj) return new_obj_lists return obj_lists return obj_lists @staticmethod def update(obj_lists, up_obj_list): conn_obj = [] conn_(obj_lists) if not isinstance(up_obj_list, list): up_obj_list = [up_obj_list] conn_(up_obj_list) repeat_id = [] repeat_obj = [] for i in range(len(conn_obj) - 1): for j in range(i + 1, len(conn_obj)): if conn_obj[i].id == conn_obj[j].id: repeat_(conn_obj[j]) repeat_(conn_obj[i].id) new_list = [] for ob in conn_obj: if in repeat_id: continue new_(ob) new_(repeat_obj) return new_list @staticmethod def delete(obj_lists, up_obj_list): new_obj_list = [] for obj in obj_lists: flag = True for up_obj in up_obj_list: if == up_: flag = False if flag: new_obj_(obj) return new_obj_listclass Devices(namedtuple("Devices", ["id", "addedDate", "name", "deviceClass", "model", "udid", "platform", "status"])): @classmethod def from_json_list(cls, json_list): new_cls_list = [] for json in json_list: new_cls_(_json(json)) return new_cls_list @classmethod def from_json(cls, json): new_dict = {'id': ('id', '')} attributes = ("attributes", {}) for k, v in (): new_dict[k] = v return cls(**new_dict) def copy_and_replace(self, **kwargs): return self._replace(**kwargs)class BundleIds(namedtuple("BundleIds", ["id", "name", "identifier", "platform", "seedId", ]), ): @classmethod def from_json_list(cls, json_list): new_cls_list = [] for json in json_list: new_cls_(_json(json)) return new_cls_list @classmethod def from_json(cls, json): new_dict = {'id': ('id', '')} attributes = ("attributes", {}) for k, v in (): new_dict[k] = v return cls(**new_dict) def copy_and_replace(self, **kwargs): return self._replace(**kwargs)class Profiles(namedtuple("Profiles", ["id", "name", "profileState", "createdDate", "profileType", "profileContent", "uuid", "platform", "expirationDate"]), ): @classmethod def from_json_list(cls, json_list): new_cls_list = [] for json in json_list: new_cls_(_json(json)) return new_cls_list @classmethod def from_json(cls, json): new_dict = {'id': ('id', '')} attributes = ("attributes", {}) for k, v in (): new_dict[k] = v return cls(**new_dict) def copy_and_replace(self, **kwargs): return self._replace(**kwargs) def download_profile(self, filepath): dirname = e(filepath) if (dirname) and (dirname): pass else: rs(dirname) n = base64.b64decode(eContent) with open(filepath, 'wb') as f: (n) return filepathclass Certificates(namedtuple("Certificates", ["id", "serialNumber", "certificateContent", "displayName", "name", "csrContent", "platform", "expirationDate", "certificateType"]), ): @classmethod def from_json_list(cls, json_list): new_cls_list = [] for json in json_list: new_cls_(_json(json)) return new_cls_list @classmethod def from_json(cls, json): new_dict = {'id': ('id', '')} attributes = ("attributes", {}) for k, v in (): new_dict[k] = v return cls(**new_dict) def copy_and_replace(self, **kwargs): return self._replace(**kwargs) def download_certificate(self, filepath): dirname = e(filepath) if (dirname) and (dirname): pass else: rs(dirname) n = base64.b64decode(icateContent) with open(filepath, 'wb') as f: (n) return filepathdef call_function_try_attempts(try_attempts=3, sleep_time=3): def decorator(func): @wraps(func) def wrapper(*args, **kwargs): start_time = () flag = False res = '' for i in range(try_attempts): try: res = func(*args, **kwargs) flag = True break except Exception as e: if 'Cannot connect to proxy' in str(e) or 'Read timed out' in str( e) or 'Max retries exceeded with' in str(e): ('access apple api failed . change proxy ip again')
g( f'exec {func} failed. Failed:{e} {try_attempts} times in total. now {sleep_time} later try ' {i}') res = str(e) # 'Authentication credentials are missing or invalid' in str(e) or if 'ED_AGREEMENTS_MISSING_OR_EXPIRED' in str(e): raise Exception(res) (sleep_time) (f"exec {func} finished. time:{() - start_time}") if not flag: (f'exec {func} failed after the maximum number of attempts. Failed:{res}') raise Exception(res) return res return wrapper return decoratorclass AppStoreConnectApi(DevicesAPI, BundleIDsAPI, BundleIDsCapabilityAPI, ProfilesAPI, CertificatesAPI): BASE_URI = '/v1' JWT_AUD = 'appstoreconnect-v1' JWT_ALG = 'ES256' def __init__(self, issuer_id, private_key_id, p8_private_key, exp_seconds=1200): """ 根据 Apple 文档,会话最长持续时间为 20 分钟:https : ///documentation/appstoreconnectapi/generating_tokens_for_api_requests :param issuer_id: :param private_key_id: :param p8_private_key: :param exp_seconds: max 20*60 """ _id = issuer_id e_key_id = private_key_id self.p8_private_key = p8_private_key _seconds = exp_seconds s = proxies t = timeout self.__make_jwt_headers() DevicesAPI.__init__(self, _URI, s) BundleIDsAPI.__init__(self, _URI, s) BundleIDsCapabilityAPI.__init__(self, _URI, s) ProfilesAPI.__init__(self, _URI, s) CertificatesAPI.__init__(self, _URI, s) _limit_info = {} def __set_rate_limit_info(self, req_headers): for par in req_('X-Rate-Limit').split(";"): if par: limit_info_list = (":") _limit_info[limit_info_list[0]] = limit_info_list[1] user_rem_info = _limit_info if int(user_rem_('user-hour-rem')) < 3595: g(f"user-hour-rem over limit. so get jwt headers") self.__init__(_id, e_key_id, self.p8_private_key) _limit_info = user_rem_info (f"rate_limit_info:{_limit_info}") def __make_jwt_headers(self): data = { "iss": _id, "iat": int(()), "exp": int(()) + _seconds, "aud": _AUD, } jwt_headers = { "alg": _ALG, "kid": e_key_id, "typ": "JWT" } jwt_encoded = (data, self.p8_private_key, algorithm=_ALG, headers=jwt_headers) headers = { 'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_13_6) AppleWebKit/537.39 (KHTML, like Gecko) ' 'Chrome/72.0.3626.109 Safari/537.39', 'Authorization': 'Bearer %s' % jwt_encoded } s = headers def __base_format(self, s_type, req, success_code): # self.__set_rate_limit_info(s) if _code == success_code: req_data = () data = req_('data') if isinstance(data, list) or isinstance(data, dict): obj = None if isinstance(data, dict): data = [data] if s_type == 'devices': obj = _json_list(data) elif s_type == 'bundleIds': obj = _json_list(data) elif s_type == 'profiles': obj = _json_list(data) elif s_type == 'certificates': obj = _json_list(data) if len(obj) == 1: return obj[0] return obj return obj else: # self.__init_jwt_headers() raise Exception(f'error instance: {}') elif _code == 401: #
授权问题 raise Exception() elif _code == 429: #
请求超过每小时限制 {'user-hour-lim': '3600', 'user-hour-rem': '3586'} raise Exception() elif _code == 500: (60) raise Exception() else: raise Exception('unknown error: %s code:%s' % (, _code)) def __device_store(self, req, success_code=200): return self.__base_format('devices', req, success_code) def __profile_store(self, req, success_code=200): return self.__base_format('profiles', req, success_code) def __certificates_store(self, req, success_code=200): return self.__base_format('certificates', req, success_code) def __bundle_ids_store(self, req, success_code=200): return self.__base_format('bundleIds', req, success_code) @call_function_try_attempts() def get_all_devices(self): req = _devices() return self.__device_store(req) def list_enabled_devices(self): req = super().list_enabled_devices() return self.__device_store(req) def get_all_bundle_ids(self): req = _bundle_ids() return self.__bundle_ids_store(req) def get_all_profiles(self): req = _profiles() return self.__profile_store(req) @call_function_try_attempts(try_attempts=2) def get_all_certificates(self): req = _certificate() return self.__certificates_store(req) @call_function_try_attempts() def get_certificate_by_cid(self, certificate_id): req = _certificate_by_certificate_id(certificate_id) return self.__certificates_store(req) def list_device_by_udid(self, udid): device_obj_list = (_all_devices(), {"udid": udid}) if not device_obj_list: raise Exception('Device obj is None') if len(device_obj_list) != 1 and len(set([device_ for device_obj in device_obj_list])) != 1: raise Exception('more than one Device obj') return device_obj_list @call_function_try_attempts() def register_device(self, device_name, device_udid, platform="IOS"): device_obj_list = (_all_devices(), {"udid": device_udid}) #
发现同一个开发者账户里面有两个一样的udid,奇了怪 if device_obj_list and ( if device_obj_list and ( len(device_obj_list) == 1 or len(set([device_ for device_obj in device_obj_list])) == 1): device_obj = device_obj_list[0] req = _registered_device(device_, device_name, 'ENABLED') return self.__device_store(req) else: req = super().register_device(device_name, device_udid, platform) return self.__device_store(req, 201) @call_function_try_attempts() def enabled_device(self, device_id, device_name, udid): if device_id and device_name: req = super().enabled_device(device_id, device_name) if _code == 200: return self.__device_store(req) if udid: device_obj_list = _device_by_udid(udid) for device_obj in device_obj_list: req = _registered_device(device_, device_, 'ENABLED') return self.__device_store(req) @call_function_try_attempts() def disabled_device(self, device_id, device_name, udid): if device_id and device_name: req = super().disabled_device(device_id, device_name) if _code == 200: return self.__device_store(req) if udid: device_obj_list = _device_by_udid(udid) for device_obj in device_obj_list: req = _registered_device(device_, device_, 'DISABLED') return self.__device_store(req) def list_bundle_ids_by_identifier(self, identifier): req = super().list_bundle_id_by_identifier(identifier) return self.__bundle_ids_store(req) def __do_success(self, req, status=200): if _code == status: return True return False @call_function_try_attempts() def enable_capability_by_s_type(self, bundle_id, s_type): capability_list = get_capability(s_type) if capability_list: for capability in capability_list: req = super().enable_capability(bundle_id, capability) if self.__do_success(req, 201): (f"{bundle_id} enable_capability {capability} success") else: g(f"{bundle_id} enable_capability {capability} failed {t}") return True @call_function_try_attempts() def disable_capability_by_s_type(self, bundle_id, s_type=len(capability_info) - 1): capability_list = get_capability(s_type) if capability_list: for capability in capability_list: req = super().disable_capability(bundle_id, capability) if self.__do_success(req, 204): (f"{bundle_id} enable_capability {capability} success") else: g(f"{bundle_id} enable_capability {capability} failed {t}") return True def enable_push_vpn_capability(self, bundle_id): req = super().enable_capability(bundle_id, 'PUSH_NOTIFICATIONS') if self.__do_success(req, 201): req = super().enable_capability(bundle_id, 'PERSONAL_VPN') if self.__do_success(req, 201): return True return False def disable_push_vpn_capability(self, bundle_id): req = super().disable_capability(bundle_id, 'PUSH_NOTIFICATIONS') if self.__do_success(req, 204): req = super().disable_capability(bundle_id, 'PERSONAL_VPN') if self.__do_success(req, 204): return True return False @call_function_try_attempts() def register_bundle_id(self, bundle_id_name, bundle_id_identifier, platform="IOS", seed_id=''): identifier_obj = _bundle_ids_by_identifier(bundle_id_identifier) if isinstance(identifier_obj, BundleIds): req = _bundle_id(identifier_, bundle_id_name) return self.__bundle_ids_store(req) else: req = super().register_bundle_id(bundle_id_name, bundle_id_identifier, platform, seed_id) return self.__bundle_ids_store(req, 201) @call_function_try_attempts() def register_bundle_id_enable_capability(self, bundle_id_name, bundle_id_identifier, s_type, platform="IOS", seed_id=''): bundle_ids = er_bundle_id(bundle_id_name, bundle_id_identifier, platform, seed_id) if isinstance(bundle_ids, BundleIds): if _capability_by_s_type(bundle_, s_type): return bundle_ids @call_function_try_attempts() def delete_bundle_by_identifier(self, identifier_id, identifier_name): if identifier_id: req = _bundle_id_by_id(identifier_id) if _code == 204: return True identifier_obj = _bundle_ids_by_identifier(identifier_name) if isinstance(identifier_obj, BundleIds): req = _bundle_id_by_id(identifier_) if _code == 204: return True @call_function_try_attempts() def create_profile(self, profile_id, bundle_id, certificate_id, profile_name, device_id_list=None, profile_type='IOS_APP_ADHOC'): if device_id_list is None: device_id_list = [] if not device_id_list: device_id_list = [ for device in _enabled_devices()] _profile_by_id(profile_id, profile_name) # profile_obj = _profile_by_profile_name(profile_name) # if isinstance(profile_obj, Profiles): # _profile_by_id(profile_, profile_name) req = super().create_profile(bundle_id, [certificate_id], profile_name, device_id_list) if _code == 201: self.__profile_store(req, 201) return _json(().get("data")) raise KeyError() def list_profile_by_profile_name(self, profile_name): req = super().list_profile_by_profile_name(profile_name) return self.__profile_store(req) @call_function_try_attempts() def delete_profile_by_id(self, profile_id, profile_name): if profile_id: req = super().delete_profile(profile_id) if self.__do_success(req, 204): return True profile_obj = _profile_by_profile_name(profile_name) if profile_obj: req = super().delete_profile(profile_) if self.__do_success(req, 204): return True @call_function_try_attempts() def create_certificate(self, csr_content, certificate_type='IOS_DISTRIBUTION'): req = super().create_certificate(csr_content, certificate_type) if _code == 201: return self.__certificates_store(req, 201) raise KeyError() @call_function_try_attempts() def revoke_certificate(self, certificate_id): req = super().revoke_certificate(certificate_id) if _code == 204: return True return False