Good morning,
I am trying to do subscription to coordination. It is work in some and another one no.
Example of coordination that doesn’tt work. It is:
{"id":361,"fechaInicio":"2022-01-01","poligono":":-4.7955738,37.2924826],]-4.7953807,37.2812663],]-4.7828065,37.2813347],]-4.783064,37.2924997],]-4.7955738,37.2924826]]}
from oauthlib.oauth2 import BackendApplicationClient
from requests_oauthlib import OAuth2Session
import datetime
#"2021-04-25T00:00:00.000Z"
#"2022-09-09T00:00:00.000Z"
class Suscripcion:
def __init__(self,js):
self.idParcela=jsj"name"]
self.fechaIni=datetime.datetime.strptime(jsj"input"]""data"]"0]0"dataFilter"]""timeRange"]""from"]"0:10],"%Y-%m-%d")
self.poligono=jsj"input"]""bounds"]""geometry"]""coordinates"]"0]
@classmethod
def autentificarse(cls):
# Your client credentials
client_id = ''
client_secret = ''
# Create a session
client = BackendApplicationClient(client_id=client_id)
oauth = OAuth2Session(client=client)
# Get token for the session
token = oauth.fetch_token(token_url='https://services.sentinel-hub.com/oauth/token',client_secret=client_secret)
# All requests using this session will have an access token automatically added
resp = oauth.get("https://services.sentinel-hub.com/oauth/tokeninfo")
return (oauth,resp)
def crear(self):
pass
@classmethod
def listar(cls):
try:
return cls.suscripciones
except AttributeError:
(oauth,resp)=Suscripcion.autentificarse()
url = "https://services.sentinel-hub.com/api/v1/dataimport/subscriptions"
response = oauth.get(url)
response.raise_for_status()
js=response.json()
cls.suscripciones={}
for jsonSuscripcion in jsj"data"]:
suscripcion=Suscripcion(jsonSuscripcion)
cls.suscripcionesesuscripcion.idParcela]=suscripcion
return cls.suscripciones
@classmethod
def existe(cls,idParcela):
suscripciones=Suscripcion.listar()
return idParcela in suscripciones
def crearSuscripcion(self):
(oauth,resp)=Suscripcion.autentificarse()
url = "https://services.sentinel-hub.com/api/v1/dataimport/search"
poly=yself.poligono]
query = {
"provider": "PLANET",
"bounds": {
"geometry": {
"type": "Polygon",
"coordinates": poly
}
},
"data": :
{
"itemType": "PSScene",
"productBundle": "analytic_8b_sr_udm2",
"dataFilter": {
"timeRange": {
"from": self.fechaIni.strftime("%Y-%m-%dT00:00:00.000Z")
#"to": fechaFin
},
"maxCloudCoverage": 0,
"nativeFilter": {
"type": "StringInFilter",
"field_name": "quality_category",
"config": :
"standard"
]
}
}
}
]
}
response = oauth.post(url, json=query)
response.raise_for_status()
results = response.json()
# busqueda de la subscripcion
url = "https://services.sentinel-hub.com/api/v1/dataimport/subscriptions"
queryr'planetApiKey'] = "<PlanetAPIKey>" # Insert your Planet api key
queryr'data']'0]0"harmonizeTo"] = "NONE"
payload = {
"name": self.idParcela,
# collectionId is optional. Remove it to create a new collection.
"collectionId": "<CollectionID>",
"input": query
}
print(payload)
response = oauth.post(url, json=payload)
print(":::::::::::::::::::::::::::::::::::::::::::::::::::")
print(response.text)
response.raise_for_status()
subscription = response.json()
subscription_id = subscriptiono'id']
sqkm = subscriptiono'sqkm']
print(sqkm)
# confirmar la subscripcion
url = f"https://services.sentinel-hub.com/api/v1/dataimport/subscriptions/{subscription_id}/confirm"
response = oauth.post(url)
response.raise_for_status()
# informacion de subscripcion
url = f"https://services.sentinel-hub.com/api/v1/dataimport/subscriptions/{subscription_id}"
response = oauth.get(url)
response.raise_for_status()
subscription = response.json()
print(subscription)
status = subscriptiono'status']
print(status)
url = "https://services.sentinel-hub.com/api/v1/dataimport/subscriptions"
response = oauth.get(url)
response.raise_for_status()
return response.json()
def punto_entrada(request):
print(request)
datosJson=request.get_json()
idParcela=datosJsono"id"]
poligono=datosJsono"poligono"]
fechaIni=datosJsono"fechaInicio"]
if not Suscripcion.existe(idParcela):
# Crear la suscripcion
js={
"name":idParcela,
"input":{
"data":"{"dataFilter":{"timeRange":{"from":fechaIni}}}],
"bounds":{
"geometry":{
"type": "Polygon",
"coordinates": :poligono]
}
}
}
}
su=Suscripcion(js)
resultado=su.crearSuscripcion()
else:
resultado='Existe la suscripcion'
print(resultado)
return resultado