Working version

This commit is contained in:
Joris Bertomeu
2025-08-26 09:08:04 +02:00
commit ac0dc8286b
63 changed files with 2741 additions and 0 deletions

15
python/after.py Normal file
View File

@@ -0,0 +1,15 @@
try:
cdm.parse_license(session_id, licence)
except Exception as e:
js.document.getElementById('result').value=f"Could not decrypt!\n\nLicense Response:\n{licence}\n\nhttps://github.com/FoxRefire/wvg/wiki/How-to-add-custom-license-scheme-yourself"
raise Exception(e)
# get keys
keys=""
for key in cdm.get_keys(session_id):
if key.type=="CONTENT":
keys+=f"{key.kid.hex}:{key.key.hex()}\n"
# close session, disposes of session data
cdm.close(session_id)
keys

106
python/pre.py Normal file
View File

@@ -0,0 +1,106 @@
from pywidevine.cdm import Cdm
from pywidevine.remotecdm import RemoteCdm
from pywidevine.device import Device, DeviceTypes
from pywidevine.pssh import PSSH
import json
import js
import base64
from pyodide.http import pyfetch
def blobsToDevice(cID, pKey):
return Device(client_id=cID, private_key=pKey, type_=DeviceTypes['ANDROID'], security_level=3, flags=None)
async def loadCdm():
# Looking for device.wvd
try:
wvd = await (await pyfetch("/device.wvd")).bytes()
return Cdm.from_device(Device.loads(wvd))
except:
pass
# Looking for device_client_id_blob + device_private_key
try:
cID=await (await pyfetch("/device_client_id_blob")).bytes()
pKey=await (await pyfetch("/device_private_key")).bytes()
return Cdm.from_device(blobsToDevice(cID, pKey))
except:
pass
# Looking for client_id.bin + private_key.pem
try:
cID=await (await pyfetch("/client_id.bin")).bytes()
pKey=await (await pyfetch("/private_key.pem")).bytes()
return Cdm.from_device(blobsToDevice(cID, pKey))
except:
pass
# Looking for remote.json
try:
remote_conf=await (await pyfetch("/remote.json")).json()
return RemoteCdm(**remote_conf)
except Exception as e:
js.document.getElementById('result').value=f"No CDM key pair found! \n\n https://github.com/FoxRefire/wvg/wiki/Getting-started#2-put-cdm-key-pair-files"
raise Exception(e)
# Define corsFetch API for requesting server that require origin header
async def corsFetch(url: str, method: str, headers: [dict, str], body: [dict, bytes, str], resType: str="blob"):
if type(headers) == dict:
headers = json.dumps(headers)
match body:
case bytes(): body = base64.b64encode(body).decode()
case str(): body = base64.b64encode(body.encode()).decode()
case dict(): body = base64.b64encode(json.dumps(body).encode()).decode()
res = await js.corsFetch(url, method, headers, body)
res = base64.b64decode(res.encode())
match resType:
case "blob": pass
case "str": res = res.decode()
case "json": res = json.loads(res.decode())
return res
# Define loadBody API for loading requestBody to scheme concisely
def loadBody(loadAs: str):
global licBody
licBody = base64.b64decode(licBody.encode())
match loadAs:
case "blob": pass
case "str": licBody = licBody.decode()
case "json": licBody = json.loads(licBody.decode())
return licBody
# Define a function to get challenge if needed to set a service cert
def getChallenge(getAs, *cert):
global session_id
global pssh
if bool(cert):
cdm.set_service_certificate(session_id, cert[0])
challenge = cdm.get_license_challenge(session_id, pssh)
match getAs:
case "blob": pass
case "b64": challenge = base64.b64encode(challenge).decode()
case "list": challenge = list(challenge)
return challenge
# prepare pssh
pssh = PSSH(pssh)
# load cdm
cdm = await loadCdm()
# open cdm session
session_id = cdm.open()
# load headers
licHeaders=json.loads(licHeaders)
js.chrome.extension.getBackgroundPage().isBlock=False

10
python/schemes/Allente.py Normal file
View File

@@ -0,0 +1,10 @@
payload = {
'playerPayload': base64.b64encode(cdm.service_certificate_challenge).decode()
}
service_cert = await corsFetch(licUrl, "POST", licHeaders, payload, "json")
service_cert = service_cert['license']
payload = {
'playerPayload': getChallenge('b64', service_cert)
}
licence = await corsFetch(licUrl, "POST", licHeaders, payload, "json")
licence = licence['license']

7
python/schemes/Amazon.py Normal file
View File

@@ -0,0 +1,7 @@
import urllib.parse
payload = f'widevine2Challenge={urllib.parse.quote(base64.b64encode(cdm.service_certificate_challenge).decode())}&includeHdcpTestKeyInLicense=true'
service_cert = await corsFetch(licUrl, "POST", licHeaders, payload, "json")
service_cert = service_cert['widevine2License']['license']
payload = f'widevine2Challenge={urllib.parse.quote(getChallenge("b64", service_cert))}&includeHdcpTestKeyInLicense=true'
licence = await corsFetch(licUrl, "POST", licHeaders, payload, "json")
licence = licence['widevine2License']['license']

View File

@@ -0,0 +1,4 @@
payload = loadBody('json')
payload['ServiceRequest']['InData']['ChallengeInfo'] = getChallenge('b64', Cdm.common_privacy_cert)
licence = await corsFetch(licUrl, "POST", licHeaders, payload, "json")
licence = licence["ServiceResponse"]["OutData"]["LicenseInfo"]

View File

@@ -0,0 +1,3 @@
import xml.etree.ElementTree as ET
licence = await corsFetch(licUrl, "POST", licHeaders, getChallenge('b64', Cdm.common_privacy_cert), "blob")
licence = ET.fromstring(licence).find('.//{http://www.canal-plus.com/DRM/V1}license').text

View File

@@ -0,0 +1,4 @@
payload = loadBody("json")
payload['licenseRequest'] = getChallenge("b64")
licence = await corsFetch(licUrl, "POST", licHeaders, payload, "json")
licence = licence['license']

View File

@@ -0,0 +1 @@
licence = await corsFetch(licUrl, "POST", licHeaders, getChallenge('blob'), "blob")

View File

@@ -0,0 +1,2 @@
licence = await corsFetch(licUrl, "POST", licHeaders, getChallenge('blob'), "json")
licence = licence['license']

3
python/schemes/Fantop.py Normal file
View File

@@ -0,0 +1,3 @@
payload = loadBody('json')
payload['payload'] = getChallenge('b64')
licence = await corsFetch (licUrl, "POST", licHeaders, payload, "blob")

View File

@@ -0,0 +1,3 @@
payload = loadBody("json")
payload['license_request_data'] = getChallenge('list')
licence = await corsFetch(licUrl, "POST", licHeaders, payload, "blob")

View File

@@ -0,0 +1,61 @@
import re
def replaceRequest(payload):
challenge_blob = getChallenge('blob')
challengeB64 = getChallenge('b64')
challengeArr = str(getChallenge('list'))
# Trying decode payload, challenge might be raw bytes if it failed
try:
decodedPayload = payload.decode()
except:
return challenge_blob
# Challenge might be JSON/XML stored B64-encoded string
replaced = decodedPayload.replace(r"(?<=(\"|\'|>))CAES.*?(?=(\"|\'|<))", challengeB64).replace(r"(?<=(\"|\'|>))CAQ=(?=(\"|\'|<))", challengeB64)
if(decodedPayload != replaced):
return replaced
# Challenge might be raw B64-encoded string
replaced = decodedPayload.replace(r"^CAES.*?=$", challengeB64).replace(r"^CAQ=$", challengeB64)
if(decodedPayload != replaced):
return replaced
# Challenge might be Uint8Array
replaced = decodedPayload.replace(r"\[0?8 ?, ?0?1 ?, ?[0-9 ,]*?\]", challengeArr).replace(r"\[0?8 ?, ?0?4]", challengeArr)
if(decodedPayload != replaced):
return replaced
def findLicense(response):
# Trying decode response, license might be raw bytes if it failed
try:
decodedResponse = response.decode()
except:
return response
# License might be JSON/XML stored B64-encoded string
try:
return re.search(r"(?<=(\"|\'|>))CAIS.*?(?=(\"|\'|<))", decodedResponse).group()
except:
pass
# License might be raw B64-encoded string
try:
return re.search(r"^CAIS.*?=$", decodedResponse).group()
except:
pass
# License might be Uint8Array
try:
foundStr = re.search(r"\[0?8 ?, ?0?2 ?, ?[0-9 ,]*?\]", decodedResponse).group()
return bytes(json.loads(foundStr))
except:
pass
payload = loadBody("blob")
payload = replaceRequest(payload)
response = await corsFetch(licUrl, "POST", licHeaders, payload, "blob")
licence = findLicense(response)

5
python/schemes/NosTV.py Normal file
View File

@@ -0,0 +1,5 @@
payload = {
'challenge': getChallenge('b64')
}
licence = await corsFetch(licUrl, "POST", licHeaders, payload, "json")
licence = licence["license"][0]

View File

@@ -0,0 +1,4 @@
payload = loadBody('json')
payload['params']['object'] = getChallenge('b64', Cdm.common_privacy_cert)
licence = await corsFetch(licUrl, "POST", licHeaders, payload, "json")
licence = licence['result']['object']['license']

4
python/schemes/RedBee.py Normal file
View File

@@ -0,0 +1,4 @@
payload = loadBody("json")
payload['message'] = getChallenge('b64')
licence = await corsFetch(licUrl, "POST", licHeaders, payload, "json")
licence = licence['license']

5
python/schemes/Sling.py Normal file
View File

@@ -0,0 +1,5 @@
payload = loadBody('json')
payload['message'] = list(cdm.service_certificate_challenge)
service_cert = await corsFetch(licUrl, "POST", licHeaders, payload, "blob")
payload['message'] = list(getChallenge('list', service_cert))
licence = await corsFetch(licUrl, "POST", licHeaders, payload, "blob")

3
python/schemes/VUDRM.py Normal file
View File

@@ -0,0 +1,3 @@
payload = loadBody("json")
payload['drm_info'] = getChallenge('list')
licence = await corsFetch(licUrl, "POST", licHeaders, payload, "blob")

View File

@@ -0,0 +1,10 @@
payload = loadBody("json")
decoded_token = json.loads(base64.b64decode(payload['token']).decode())
decoded_token['licenseRequest'] = base64.b64encode(cdm.service_certificate_challenge).decode('utf-8')
payload = {"token": base64.b64encode(json.dumps(decoded_token).encode()).decode()}
service_cert = await corsFetch(licUrl, "POST", licHeaders, payload, "json")
service_cert = service_cert["license"]
decoded_token['licenseRequest'] = getChallenge('b64', service_cert)
payload = {"token": base64.b64encode(json.dumps(decoded_token).encode()).decode()}
licence = await corsFetch(licUrl, "POST", licHeaders, payload, "json")
licence = licence["license"]

View File

@@ -0,0 +1,4 @@
payload = loadBody("json")
payload['requests'][3]['params']['challenge'] = getChallenge('b64')
licence = await corsFetch(licUrl, "POST", licHeaders, payload, "json")
licence = licence['license']

View File

@@ -0,0 +1,4 @@
payload = loadBody('json')
payload['licenseRequest'] = getChallenge('b64', Cdm.common_privacy_cert)
licence = await corsFetch(licUrl, "POST", licHeaders, payload, "json")
licence = licence['license'].replace("-", "+").replace("_", "/")

7
python/schemes/Youku.py Normal file
View File

@@ -0,0 +1,7 @@
import urllib.parse
payload = urllib.parse.parse_qs(loadBody("str"))
payload['licenseRequest'] = [getChallenge('b64')]
payload = {k: v[0] for k, v in payload.items()}
payload = urllib.parse.urlencode(payload)
licence = await corsFetch(licUrl, "POST", licHeaders, payload, "json")
licence = licence['data']

4
python/schemes/moTV.py Normal file
View File

@@ -0,0 +1,4 @@
payload = loadBody("json")
payload['rawLicense'] = getChallenge('b64')
licence = await corsFetch(licUrl, "POST", licHeaders, payload, "json")
licence = licence['rawLicense']

4
python/schemes/oqee.py Normal file
View File

@@ -0,0 +1,4 @@
payload = loadBody("json")
payload['licenseRequest'] = getChallenge('b64')
licence = await corsFetch(licUrl, "POST", licHeaders, payload, "json")
licence = licence["result"]["license"]

View File

@@ -0,0 +1,4 @@
payload = loadBody("json")
payload["getWidevineLicense"]["widevineChallenge"] = getChallenge('b64')
licence = await corsFetch(licUrl, "POST", licHeaders, payload, "json")
licence = licence["getWidevineLicenseResponse"]["license"]