Getting Session information using the Python Module for VMware Horizon with filtering

Since Horizon 8 it is possible to retrieve session information using the REST api’s. In the python module for Horizon I have translated that functionality to the get_sessions function.

import requests, getpass, urllib, json

import vmware_horizon

requests.packages.urllib3.disable_warnings()
url = input("URL\n")

username = input("Username\n")

domain = input("Domain\n")

pw = getpass.getpass()

hvconnectionobj = vmware_horizon.Connection(username = username,domain = domain,password = pw,url = url)
hvconnectionobj.hv_connect()
print("connected")

inventory=vmware_horizon.Inventory(url=hvconnectionobj.url, access_token=hvconnectionobj.access_token)
sessions = inventory.get_sessions()
for i in sessions:
    for ii in i:
        print(ii, '=', i[ii] )


end=hvconnectionobj.hv_disconnect()
print(end)

You see a lot of the information is returned using an id but there are plenty of functions that you can use to get the readable information for those.

Since Horizon 2103 VMware has added an option use filtering when getting session information. In the example above you can see that I have a connected and a disconnected session, let’s get the connected one. First I create the filter.

filter = {}
filter["type"] = "And"
filter["filters"] = []
filter1={}

filter1["type"] = "Equals"
filter1["name"] = "session_state"
filter1["value"] = "CONNECTED"

filter["filters"].append(filter1)

Next I run the action with the filter argument.

sessions = inventory.get_sessions(filter=filter)

And now you see only the connected one is returned

And when I switch to disconnected

The new python module has been pushed to github together with example scripts for retrieving all sessions and with filtering.

Creating a RDS farm using the Python module for VMware Horizon

One of the goals and hopes I had with my 100DaysOfCode (I am writing this on day 100!) was that the Horizon REST api’s to create desktop pools and RDS farms would have been available at the end. Only half of that came out and with Horizon 8 2103 we can finally create a RDS farm using those rest api’s. I have decided to add this to the Python module based on a dictionary that the user sends to the new_farm method. I could still add a fully fetched function but that would require a lot of arguments and using **kwargs is an option but than the user would still need to find out what to use.

First I will need to know what json data I actually need, let’s have a look at the api explorer page to get a grip on this

{
  "access_group_id": "6fd4638a-381f-4518-aed6-042aa3d9f14c",
  "automated_farm_settings": {
    "customization_settings": {
      "ad_container_rdn": "CN=Computers",
      "cloneprep_customization_settings": {
        "post_synchronization_script_name": "cloneprep_postsync_script",
        "post_synchronization_script_parameters": "p1 p2 p3",
        "power_off_script_name": "cloneprep_poweroff_script",
        "power_off_script_parameters": "p1 p2 p3",
        "priming_computer_account": "a219420d-4799-4517-8f78-39c74c7c4efc"
      },
      "instant_clone_domain_account_id": "6f85b3a5-e7d0-4ad6-a1e3-37168dd1ed51",
      "reuse_pre_existing_accounts": false
    },
    "enable_provisioning": true,
    "max_session_type": "LIMITED",
    "max_sessions": 50,
    "min_ready_vms": 0,
    "nics": [
      {
        "network_interface_card_id": "c9896e51-48a2-4d82-ae9e-a0246981b473",
        "network_label_assignment_specs": [
          {
            "enabled": true,
            "max_label": 1,
            "max_label_type": "LIMITED",
            "network_label_name": "vm-network"
          }
        ]
      }
    ],
    "pattern_naming_settings": {
      "max_number_of_rds_servers": 5,
      "naming_pattern": "vm-{n}-sales"
    },
    "provisioning_settings": {
      "base_snapshot_id": "snapshot-1",
      "datacenter_id": "datacenter-1",
      "host_or_cluster_id": "domain-s425",
      "im_stream_id": "6f85b3a5-e7d0-4ad6-a1e3-37168dd1ed51",
      "im_tag_id": "3d45b3a5-e7d0-4ad6-a1e3-37168dd1ed51",
      "parent_vm_id": "vm-2",
      "resource_pool_id": "resgroup-1",
      "vm_folder_id": "group-v1"
    },
    "stop_provisioning_on_error": true,
    "storage_settings": {
      "datastores": [
        {
          "datastore_id": "datastore-1"
        }
      ],
      "replica_disk_datastore_id": "datastore-1",
      "use_separate_datastores_replica_and_os_disks": false,
      "use_view_storage_accelerator": false,
      "use_vsan": false
    },
    "transparent_page_sharing_scope": "VM",
    "vcenter_id": "f148f3e8-db0e-4abb-9c33-7e5205ccd360"
  },
  "description": "Farm Description",
  "display_name": "ManualFarm",
  "display_protocol_settings": {
    "allow_users_to_choose_protocol": true,
    "default_display_protocol": "PCOIP",
    "grid_vgpus_enabled": true,
    "session_collaboration_enabled": false
  },
  "enabled": true,
  "load_balancer_settings": {
    "cpu_threshold": 10,
    "disk_queue_length_threshold": 15,
    "disk_read_latency_threshold": 10,
    "disk_write_latency_threshold": 15,
    "include_session_count": true,
    "memory_threshold": 10
  },
  "name": "ManualFarm",
  "rds_server_ids": [
    "5134796a-322g-5fe5-343f-4daa5d25ebfe",
    "2a43f96c-102b-4ed3-953f-35deg43d43b0ge"
  ],
  "server_error_threshold": 0,
  "session_settings": {
    "disconnected_session_timeout_minutes": 5,
    "disconnected_session_timeout_policy": "NEVER",
    "empty_session_timeout_minutes": 5,
    "empty_session_timeout_policy": "AFTER",
    "logoff_after_timeout": false,
    "pre_launch_session_timeout_minutes": 10,
    "pre_launch_session_timeout_policy": "AFTER"
  },
  "type": "MANUAL",
  "use_custom_script_for_load_balancing": false
}

This also includes some that are not required so for my own farm I settled with this json. This is for an Instant Clone farm.

{
    "access_group_id": "6fd4638a-381f-4518-aed6-042aa3d9f14c",
    "automated_farm_settings": {
        "customization_settings": {
            "ad_container_rdn": "OU=Pod1,OU=RDS,OU=VMware,OU=EUC",
            "instant_clone_domain_account_id": "6f85b3a5-e7d0-4ad6-a1e3-37168dd1ed51",
            "reuse_pre_existing_accounts": true
        },
        "enable_provisioning": false,
        "max_session_type": "LIMITED",
        "max_sessions": 50,
        "min_ready_vms": 1,
        "pattern_naming_settings": {
            "max_number_of_rds_servers": 2,
            "naming_pattern": "vm-{n}-sales"
        },
        "provisioning_settings": {
            "base_snapshot_id": "snapshot-1",
            "datacenter_id": "datacenter-1",
            "host_or_cluster_id": "domain-s425",
            "parent_vm_id": "vm-2",
            "resource_pool_id": "resgroup-1",
            "vm_folder_id": "group-v1"
        },
        "stop_provisioning_on_error": true,
        "storage_settings": {
            "datastores": [
                {
                    "datastore_id": "datastore-1"
                }
            ],
            "use_separate_datastores_replica_and_os_disks": false,
            "use_view_storage_accelerator": false,
            "use_vsan": false
        },
        "transparent_page_sharing_scope": "VM",
        "vcenter_id": "f148f3e8-db0e-4abb-9c33-7e5205ccd360"
    },
    "description": "demo_farm",
    "display_name": "demo_farm",
    "display_protocol_settings": {
        "allow_users_to_choose_protocol": true,
        "default_display_protocol": "BLAST",
        "grid_vgpus_enabled": false,
        "session_collaboration_enabled": true
    },
    "enabled": false,
    "load_balancer_settings": {
        "cpu_threshold": 10,
        "disk_queue_length_threshold": 15,
        "disk_read_latency_threshold": 10,
        "disk_write_latency_threshold": 15,
        "include_session_count": true,
        "memory_threshold": 10
    },
    "name": "demo_farm",
    "server_error_threshold": 0,
    "session_settings": {
        "disconnected_session_timeout_minutes": 5,
        "disconnected_session_timeout_policy": "NEVER",
        "empty_session_timeout_minutes": 5,
        "empty_session_timeout_policy": "AFTER",
        "logoff_after_timeout": false,
        "pre_launch_session_timeout_minutes": 10,
        "pre_launch_session_timeout_policy": "AFTER"
    },
    "type": "AUTOMATED",
    "use_custom_script_for_load_balancing": false
}

As said I send a dictionary to the method so let’s import data into a dict called data and I will print it to screen. The dictionary needs to follow this specific order of lines so that’s why a json is very useful to start with.

with open('/mnt/d/homelab/farm.json') as f:
    data = json.load(f)

As you can see in both the json and the output there’s a lot of things we can change and some things that we need to change lik id’s for all the components like vCenter, base vm, base snapshot and more. First I need the access_group_id this can be retreived using the get_local_access_groups method. For all of these I will also set the variable in the dictionary that we need.

local_access_group = next(item for item in (config.get_local_access_groups()) if item["name"] == "Root")
data["access_group_id"] = local_access_group["id"]

Than it’s time for the Instant Clone Admin id

ic_domain_account = next(item for item in (config.get_ic_domain_accounts()) if item["username"] == "administrator")
data["automated_farm_settings"]["customization_settings"]["instant_clone_domain_account_id"] = ic_domain_account["id"]

For the basevm and snapshot id’s I used the same method but a bit differently as I had already used this method in another script

vcenters = monitor.virtual_centers()
vcid = vcenters[0]["id"]
dcs = external.get_datacenters(vcenter_id=vcid)
dcid = dcs[0]["id"]

base_vms = external.get_base_vms(vcenter_id=vcid,datacenter_id=dcid,filter_incompatible_vms=True)

base_vm = next(item for item in base_vms if item["name"] == "srv2019-p1-2020-10-13-08-44")
basevmid=base_vm["id"]

base_snapshots = external.get_base_snapshots(vcenter_id=vcid, base_vm_id=base_vm["id"])

base_snapshot = next(item for item in base_snapshots if item["name"] == "Created by Packer")

snapid=base_snapshot["id"]
data["automated_farm_settings"]["provisioning_settings"]["base_snapshot_id"] = snapid
data["automated_farm_settings"]["provisioning_settings"]["parent_vm_id"] = basevmid

Host or cluster id

host_or_clusters = external.get_hosts_or_clusters(vcenter_id=vcid, datacenter_id=dcid)
for i in host_or_clusters:
    if (i["details"]["name"]) == "Cluster_Pod1":
        host_or_cluster = i
data["automated_farm_settings"]["provisioning_settings"]["host_or_cluster_id"] = host_or_cluster["id"]

Resource Pool

resource_pools = external.get_resource_pools(vcenter_id=vcid, host_or_cluster_id=host_or_cluster["id"])
for i in resource_pools:
    # print(i)
    if (i["type"] == "CLUSTER"):
        resource_pool = i
data["automated_farm_settings"]["provisioning_settings"]["resource_pool_id"] = resource_pool["id"]

VM folder again is a bit different as I have to get the id from one of the children objects

vm_folders = external.get_vm_folders(vcenter_id=vcid, datacenter_id=dcid)
for i in vm_folders:
    children=(i["children"])
    for ii in children:
        # print(ii["name"])
        if (ii["name"]) == "Pod1":
            vm_folder = i
data["automated_farm_settings"]["provisioning_settings"]["vm_folder_id"] = vm_folder["id"]

Datacenter and vcenter id’s I already had to grab for the base vm and base snapshot so I can just add them

data["automated_farm_settings"]["provisioning_settings"]["datacenter_id"] = dcid
data["automated_farm_settings"]["vcenter_id"] = vcid

Datastores is a bit more funky as there can be multiple so I needed to create a list first and than populate that based on the name of the datastores I have.

datastore_list = []
datastores = external.get_datastores(vcenter_id=vcid, host_or_cluster_id=host_or_cluster["id"])
for i in datastores:
    # print(i)
    if (i["name"] == "VDI-500") or i["name"] == "VDI-200":
        ds = {}
        ds["datastore_id"] = i["id"]
        datastore_list.append(ds)
data["automated_farm_settings"]["storage_settings"]["datastores"] = datastore_list

For my final script I put them in a bit different order and I decided to change a whole lot more options but if you have your json perfected this shouldn’t always be required. Also take note that for true/false in the json that I use the True/False from python.

import requests, getpass, urllib, json

import vmware_horizon

requests.packages.urllib3.disable_warnings()
url = input("URL\n")

username = input("Username\n")

domain = input("Domain\n")

pw = getpass.getpass()

hvconnectionobj = vmware_horizon.Connection(username = username,domain = domain,password = pw,url = url)
hvconnectionobj.hv_connect()
print("connected")

monitor = obj=vmware_horizon.Monitor(url=hvconnectionobj.url, access_token=hvconnectionobj.access_token)
external=vmware_horizon.External(url=hvconnectionobj.url, access_token=hvconnectionobj.access_token)
inventory=vmware_horizon.Inventory(url=hvconnectionobj.url, access_token=hvconnectionobj.access_token)
config=vmware_horizon.Config(url=hvconnectionobj.url, access_token=hvconnectionobj.access_token)

with open('/mnt/d/homelab/farm.json') as f:
    data = json.load(f)

vcenters = monitor.virtual_centers()
vcid = vcenters[0]["id"]
dcs = external.get_datacenters(vcenter_id=vcid)
dcid = dcs[0]["id"]

base_vms = external.get_base_vms(vcenter_id=vcid,datacenter_id=dcid,filter_incompatible_vms=True)

base_vm = next(item for item in base_vms if item["name"] == "srv2019-p1-2020-10-13-08-44")
basevmid=base_vm["id"]

base_snapshots = external.get_base_snapshots(vcenter_id=vcid, base_vm_id=base_vm["id"])

base_snapshot = next(item for item in base_snapshots if item["name"] == "Created by Packer")

snapid=base_snapshot["id"]

host_or_clusters = external.get_hosts_or_clusters(vcenter_id=vcid, datacenter_id=dcid)
for i in host_or_clusters:
    if (i["details"]["name"]) == "Cluster_Pod1":
        host_or_cluster = i

resource_pools = external.get_resource_pools(vcenter_id=vcid, host_or_cluster_id=host_or_cluster["id"])
for i in resource_pools:
    # print(i)
    if (i["type"] == "CLUSTER"):
        resource_pool = i

vm_folders = external.get_vm_folders(vcenter_id=vcid, datacenter_id=dcid)
for i in vm_folders:
    children=(i["children"])
    for ii in children:
        # print(ii["name"])
        if (ii["name"]) == "Pod1":
            vm_folder = i

datastore_list = []
datastores = external.get_datastores(vcenter_id=vcid, host_or_cluster_id=host_or_cluster["id"])
for i in datastores:
    # print(i)
    if (i["name"] == "VDI-500") or i["name"] == "VDI-200":
        ds = {}
        ds["datastore_id"] = i["id"]
        datastore_list.append(ds)

local_access_group = next(item for item in (config.get_local_access_groups()) if item["name"] == "Root")
ic_domain_account = next(item for item in (config.get_ic_domain_accounts()) if item["username"] == "administrator")

data["access_group_id"] = local_access_group["id"]
data["automated_farm_settings"]["customization_settings"]["ad_container_rdn"] = "OU=Pod1,OU=RDS,OU=VMware,OU=EUC"
data["automated_farm_settings"]["customization_settings"]["reuse_pre_existing_accounts"] = True
data["automated_farm_settings"]["customization_settings"]["instant_clone_domain_account_id"] = ic_domain_account["id"]
data["automated_farm_settings"]["enable_provisioning"] = False
data["automated_farm_settings"]["max_sessions"] = 50
data["automated_farm_settings"]["min_ready_vms"] = 3
data["automated_farm_settings"]["pattern_naming_settings"]["max_number_of_rds_servers"] = 4
data["automated_farm_settings"]["pattern_naming_settings"]["naming_pattern"] = "farmdemo-{n:fixed=3}"
data["automated_farm_settings"]["provisioning_settings"]["base_snapshot_id"] = snapid
data["automated_farm_settings"]["provisioning_settings"]["parent_vm_id"] = basevmid
data["automated_farm_settings"]["provisioning_settings"]["host_or_cluster_id"] = host_or_cluster["id"]
data["automated_farm_settings"]["provisioning_settings"]["resource_pool_id"] = resource_pool["id"]
data["automated_farm_settings"]["provisioning_settings"]["vm_folder_id"] = vm_folder["id"]
data["automated_farm_settings"]["provisioning_settings"]["datacenter_id"] = dcid
data["automated_farm_settings"]["stop_provisioning_on_error"] = True
data["automated_farm_settings"]["storage_settings"]["datastores"] = datastore_list
data["automated_farm_settings"]["transparent_page_sharing_scope"] = "GLOBAL"
data["automated_farm_settings"]["vcenter_id"] = vcid
data["description"] = "Python_demo_farm"
data["display_name"] = "Python_demo_farm"
data["display_protocol_settings"]["allow_users_to_choose_protocol"] = True
data["display_protocol_settings"]["default_display_protocol"] = "BLAST"
data["display_protocol_settings"]["session_collaboration_enabled"] = True
data["enabled"] = False
data["load_balancer_settings"]["cpu_threshold"] = 12
data["load_balancer_settings"]["disk_queue_length_threshold"] = 16
data["load_balancer_settings"]["disk_read_latency_threshold"] = 12
data["load_balancer_settings"]["disk_write_latency_threshold"] = 16
data["load_balancer_settings"]["include_session_count"] = True
data["load_balancer_settings"]["memory_threshold"] = 12
data["name"] = "Python_demo_farm"
data["session_settings"]["disconnected_session_timeout_minutes"] = 5
data["session_settings"]["disconnected_session_timeout_policy"] = "NEVER"
data["session_settings"]["empty_session_timeout_minutes"] = 6
data["session_settings"]["empty_session_timeout_policy"] = "AFTER"
data["session_settings"]["logoff_after_timeout"] = False
data["session_settings"]["pre_launch_session_timeout_minutes"] = 12
data["session_settings"]["pre_launch_session_timeout_policy"] = "AFTER"
data["type"] = "AUTOMATED"

inventory.new_farm(farm_data=data)

end=hvconnectionobj.hv_disconnect()
print(end)

How does this look? Actually you don’t see a lot happening but the farm will have been created

As always the script can be found on my github in the examples folder together with the json file.

With this I am closing my 100DaysOfCode challenge but I pledge to keep maintaining the python module and I will extend it when new REST api calls arrive for VMware Horizon.

Pushing a new image using the VMware Horizon Python Module

One of the REST api calls that where added for Horizon 8 2012 was the ability to push images to Desktop Pools (sadly not for farms yet). This week I added that functionality to the VMware Horizon Python Module. Looking at the swagger UI these are the needed arguments:

So the source can be either the streams from Horizon Cloud or a regular vm/snapshot combo. For the time you will need to use some moment in epoch. The optional items for adding the virtual tpm, stop on error I have set the default for what they are listed. As logoff policy I have chosen to set a default in WAIT_FOR_LOGOFF.

For this blog posts I have to go with the vm/snapshot combo as I don’t have streams setup at the moment. First I need to connect:

import requests, getpass, urllib, json, operator, numpy, time
import vmware_horizon


requests.packages.urllib3.disable_warnings()
url="https://pod2cbr1.loft.lab"
username = input("Username\n")
domain = input("Domain\n")
pw = getpass.getpass()

hvconnectionobj = vmware_horizon.Connection(username = username,domain = domain,password = pw,url = url)
hvconnectionobj.hv_connect()
print("connected")

Than I open the ports for the classes I will be using

monitor = obj=vmware_horizon.Monitor(url=hvconnectionobj.url, access_token=hvconnectionobj.access_token)
external=vmware_horizon.External(url=hvconnectionobj.url, access_token=hvconnectionobj.access_token)
inventory=vmware_horizon.Inventory(url=hvconnectionobj.url, access_token=hvconnectionobj.access_token)

Now let’s look at what the desktop_pool_push_image method needs

First I will grab the correct desktop pool, I will use Pod02-Pool02 this time. There are several ways to get the correct pool but I have chosen to use this one.

desktop_pools=inventory.get_desktop_pools()
desktop_pool = next(item for item in desktop_pools if item["name"] == "Pod02-Pool02")
poolid=desktop_pool["id"]

To get the VM and Snapshots I first need to get the vCenter and datacenter id’s

vcenters = monitor.virtual_centers()
vcid = vcenters[0]["id"]
dcs = external.get_datacenters(vcenter_id=vcid)
dcid = dcs[0]["id"]

I created a new golden image last Friday and it has this name: W10-L-2021-03-19-17-27 so I need to get the compatible base vm’s and get the id for this one

base_vms = external.get_base_vms(vcenter_id=vcid,datacenter_id=dcid,filter_incompatible_vms=True)
base_vm = next(item for item in base_vms if item["name"] == "W10-L-2021-03-19-17-27")
basevmid=base_vm["id"]

I had Packer create a snapshot and I can get that in a similar way

base_snapshots = external.get_base_snapshots(vcenter_id=vcid, base_vm_id=base_vm["id"])
base_snapshot = next(item for item in base_snapshots if item["name"] == "Created by Packer")
snapid=base_snapshot["id"]

I get the current time in epoch using the time module (google is your best friend to define a moment in the future in epoch)

current_time = time.time()

For this example I add all the arguments but if you don’t change fromt he defaults that’s not needed

inventory.desktop_pool_push_image(desktop_pool_id=poolid,parent_vm_id=basevmid,snapshot_id=snapid, start_time=current_time, add_virtual_tpm=False, stop_on_first_error=False, logoff_policy="FORCE_LOGOFF")

And closing the connection

end=hvconnectionobj.hv_disconnect()
print(end)

and when I now look at my desktop pool it’s pushing the new image

I have created a new folder on Github for examples and the script to deploy new images is the first example. I did move a couple of the names to variables so make ie better usable. You can find it here. Or see the code below this.

import requests, getpass, urllib, time
import vmware_horizon

requests.packages.urllib3.disable_warnings()

url                     = "https://pod2cbr1.loft.lab"
desktop_pool_name       = "Pod02-Pool01"
base_vm_name            = "W10-L-2021-03-19-17-27"
snapshot_name           = "Snap_2"

username = input("Username\n")
domain = input("Domain\n")
pw = getpass.getpass()

hvconnectionobj = vmware_horizon.Connection(username = username,domain = domain,password = pw,url = url)
hvconnectionobj.hv_connect()
print("connected")
monitor = obj=vmware_horizon.Monitor(url=hvconnectionobj.url, access_token=hvconnectionobj.access_token)
external=vmware_horizon.External(url=hvconnectionobj.url, access_token=hvconnectionobj.access_token)
inventory=vmware_horizon.Inventory(url=hvconnectionobj.url, access_token=hvconnectionobj.access_token)

desktop_pools=inventory.get_desktop_pools()
desktop_pool = next(item for item in desktop_pools if item["name"] == desktop_pool_name)
poolid=desktop_pool["id"]

vcenters = monitor.virtual_centers()
vcid = vcenters[0]["id"]
dcs = external.get_datacenters(vcenter_id=vcid)
dcid = dcs[0]["id"]

base_vms = external.get_base_vms(vcenter_id=vcid,datacenter_id=dcid,filter_incompatible_vms=True)
base_vm = next(item for item in base_vms if item["name"] == base_vm_name)
basevmid=base_vm["id"]

base_snapshots = external.get_base_snapshots(vcenter_id=vcid, base_vm_id=base_vm["id"])
base_snapshot = next(item for item in base_snapshots if item["name"] == snapshot_name)
snapid=base_snapshot["id"]

current_time = time.time()
inventory.desktop_pool_push_image(desktop_pool_id=poolid,parent_vm_id=basevmid,snapshot_id=snapid, start_time=current_time)

end=hvconnectionobj.hv_disconnect()
print(end)











 

 

Managing application pools using the VMware Horizon Python Module

Earlier this week I added several methods to the VMware Horizon Python Module that are centered about application pools and I promised a blog post so here it is 🙂 In the module we have the following methods in the Inventory about Application Pools:

Preparation

In order to use the methods I am using this as standard configuration in my script

import requests, getpass, urllib, json, operator
import vmware_horizon
requests.packages.urllib3.disable_warnings()

url="https://loftcbr01.loft.lab"
username = "m_wouter"
domain = "loft.lab"
pw = getpass.getpass()


hvconnectionobj = vmware_horizon.Connection(username = username,domain = domain,password = pw,url = url)
hvconnectionobj.hv_connect()
print("connected")
monitor = obj=vmware_horizon.Monitor(url=hvconnectionobj.url, access_token=hvconnectionobj.access_token)
external=vmware_horizon.External(url=hvconnectionobj.url, access_token=hvconnectionobj.access_token)
inventory=vmware_horizon.Inventory(url=hvconnectionobj.url, access_token=hvconnectionobj.access_token)
entitlements=vmware_horizon.Entitlements(url=hvconnectionobj.url, access_token=hvconnectionobj.access_token)

All of the connects at the bottom is so I don’t need to think to do those if I need them when testing.

I end with

end=hvconnectionobj.hv_disconnect()
print(end)

Both the connected and end prints aren’t required at all but give me feedback about the status of the connection.

[sta_anchor id=”get_application_pools” /]

get_application_pools

This is the easiest method to use as it doesn’t require anything. It does allow for setting page sizes and filtering if needed. See this article if you want to know more about filtering: https://www.retouw.nl/2021/02/14/filtering-searching-and-pagination-with-the-python-module-for-vmware-horizon/ The method will return a list of dicts, for the first example I will show only the names of the items.

ap = inventory.get_application_pools(maxpagesize=100)
for i in ap:
    print(i["name"])

Or just with the entire list returned

ap = inventory.get_application_pools(maxpagesize=100)
print(ap)

[sta_anchor id=”get_application_pool” /]

get_application_pool

To get a single application pool you can use get_application_pool and it requires an application_pool_id, I will use the first one of the list of application to show it.

ap = inventory.get_application_pools(maxpagesize=100)
firstap=ap[0]
print(inventory.get_application_pool(application_pool_id=firstap["id"]))

[sta_anchor id=”delete_application_pool” /]

delete_application_pool

To delete an application pool we again only need the application_pool_id I will combine both the get methods to show all application pools before and after the deletion. (with some prints not relevant for the code so I won’t show them below)

ap = inventory.get_application_pools(maxpagesize=100)
for i in ap:
    print(i["name"])
firstap=ap[0]

print(inventory.get_application_pool(application_pool_id=firstap["id"]))

inventory.delete_application_pool(application_pool_id=firstap["id"])

ap = inventory.get_application_pools(maxpagesize=100)
for i in ap:
    print(i["name"])

[sta_anchor id=”new_application_pool” /]

new_application_pool

Since I just deleted my firefox pool I will need to recreate it. The new_application_pool method requires a dict with quite a lof of values. This is the standard list that the swagger-ui gives you

{
  "anti_affinity_data": {
    "anti_affinity_count": 10,
    "anti_affinity_patterns": [
      "*pad.exe",
      "*notepad.???"
    ]
  },
  "category_folder_name": "dir1\\dir2\\dir3\\dir4",
  "cs_restriction_tags": [
    "Internal",
    "External"
  ],
  "description": "string",
  "desktop_pool_id": "0103796c-102b-4ed3-953f-3dfe3d23e0fe",
  "display_name": "Firefox",
  "enable_client_restrictions": false,
  "enable_pre_launch": false,
  "enabled": true,
  "executable_path": "C:\\ProgramData\\Microsoft\\Windows\\Start Menu\\Programs\\Firefox.lnk",
  "farm_id": "855ea6c5-720a-41e1-96f4-958c90e6e424",
  "max_multi_sessions": 5,
  "multi_session_mode": "DISABLED",
  "name": "Firefox",
  "parameters": "-p myprofile",
  "publisher": "Mozilla Corporation",
  "shortcut_locations": [
    "START_MENU"
  ],
  "start_folder": "string",
  "supported_file_types_data": {
    "enable_auto_update_file_types": true,
    "enable_auto_update_other_file_types": true,
    "file_types": [
      {
        "description": "Firefox Document",
        "type": ".html"
      }
    ],
    "other_file_types": [
      {
        "description": "Firefox URL",
        "name": "https",
        "type": "URL"
      }
    ]
  },
  "version": "72.0.2"
}

This does not say that all of these are required, what I have found to be an easy way to find what the minimums are is to  create an application pool with a single key value pair. display_name is always required so I will use that one. Experience has learned that this might require several tries so let’s go.

new_app_pool = {}
new_app_pool["display_name"] = "Firefox"

inventory.new_application_pool(application_pool_data=new_app_pool)

So the first hard requirements are display_name, executable_path and name, let’s add these and see what happens

new_app_pool = {}
new_app_pool["display_name"] = "Firefox"
new_app_pool["name"] = "Firefox"
new_app_pool["executable_path"] = "C:\\ProgramData\\Microsoft\\Windows\\Start Menu\\Programs\\Firefox.lnk"

inventory.new_application_pool(application_pool_data=new_app_pool)

It looks like we actually need some more: at least desktop_pool_id or farm_id since I am doing this against a connection server with no farms I’ll use a desktop pool.

desktop_pools = inventory.get_desktop_pools()
firstpool = desktop_pools[0]

new_app_pool = {}
new_app_pool["display_name"] = "Firefox"
new_app_pool["name"] = "Firefox"
new_app_pool["executable_path"] = "C:\\ProgramData\\Microsoft\\Windows\\Start Menu\\Programs\\Firefox.lnk"
new_app_pool["desktop_pool_id"] = firstpool["id"]

inventory.new_application_pool(application_pool_data=new_app_pool)

No errors and a peak in the admin console shows me that I again have a firefox application

[sta_anchor id=”update_application_pool” /]

update_application_pool

To update the pools we need the application_pool_id and again a dict, this time the dict needs things we want to update. Experience again learned me there are a few required key value pairs while the example in the swagger-ui shows lots, so let’s find those. I am going to use my new firefox app as the source for this. What I actually am going to try to change is the display_name so I will use that as the first key value pair.

filter = {}
filter["type"] = "And"
filter["filters"] = []
filter1={}

filter1["type"] = "Equals"
filter1["name"] = "name"
filter1["value"] = "Firefox"
filter["filters"].append(filter1)
ap = (inventory.get_application_pools(filter=filter))[0]
appid = ap["id"]
update_app = {}
update_app["display_name"] = "FF2"
inventory.update_application_pool(application_pool_id=appid, application_pool_data=update_app)

So here different key value pairs are required than when creating a new application pool, strange but there is nothing I can do about it! I will add these from the ap object I retrieve earlier in the script.

aps = inventory.get_application_pools(maxpagesize=100)
for i in aps:
    print(i["display_name"])
filter = {}
filter["type"] = "And"
filter["filters"] = []
filter1={}

filter1["type"] = "Equals"
filter1["name"] = "name"
filter1["value"] = "Firefox"
filter["filters"].append(filter1)
ap = (inventory.get_application_pools(filter=filter))[0]
appid = ap["id"]
update_app = {}
update_app["display_name"] = "FF2"
update_app["executable_path"] = ap["executable_path"]
update_app["multi_session_mode"] = ap["multi_session_mode"]
update_app["enable_pre_launch"] = ap["enable_pre_launch"]

inventory.update_application_pool(application_pool_id=appid, application_pool_data=update_app)

aps = inventory.get_application_pools(maxpagesize=100)
for i in aps:
    print(i["display_name"])

So with that you have the basics to retrieve, create, update and delete application pools using python

Filtering/Searching and pagination with the Python module for VMware Horizon

Yesterday I added the first method to the VMware Horizon Python module that makes use of filtering while the day before that I added pagination. VMware{Code} has a document describing available options for both but let me give some explanation.

Pagination

Pagination is where you perform a query but only get an x amount of objects returned by default. The rest of the objects are available on the next page or pages. This is exactly what I ran into with the vmware.hv.helper Powershell module a long time ago. With the REST api’s this is rather easy to add since if there are more pages/objects left the headers will contain a key named HAS_MORE_RECORDS. For all the methods that I add where pagination is supported you don’t need to handle this though as I have added it to the method itself. What I did add was the option the change the maximum page size. I default to 100 and the maximum is 1000, if you supply an interrupt higher than 1000 this will be corrected to 1000.

Filtering

Filtering needs some more work from the user of the module to be able to use it.

What options are there for filtering?

For the type we have: And, Or and Not

For the filters themselves there are: Equals, NotEquals, Contains, StartsWith and Between.

The formula is you pick one from the first row and combine that with one or more from the second row.

To apply these the document describes the base schema like this:

{
    “type”: ”And”,
    “filter”: <filter object>
}

and a filter object looks like this:

{
    "type":"Equals",
    "name":"domain",
    "value":"ad-example0"
}

or this for a range:

{
    "type":"Between",
    "name":"assignedUsers",
    "fromValue":"10",
    "toValue":"20"
}

Combining both into a single object looks like this:

{
    "type":"Not",
    "filter": {
        "type":"Equals",
        "name":"domain",
        "value":"ad-example0"
    }
}

This all looks like a dictionary with a nested dictionary when translating it to Python but when you have multiple filters it suddenly looks like this:

{
    "type":"And",
  "filters": [
        {
            "type":"Equals", 
            "name":"domain",
            "value":"ad-example0"
        },
        {
            "type":"StartsWith", 
            "name":"name",
            "value":"test"
        }
    ]
}

otherwise know as a dictionary with a list of dictionaries in it and since the latter also works with a single dict inside the list I have taken that route. The document also describes encoding and minifying the code to it works for a REST api call but I have done all of that for you so no need to worry about it, just build the dictionary and you are good!

Now let’s actually perform a search

First I create my base object with the type AND and a list for the filters key

filter_dict = {}
filter_dict["type"] = "And"
filter_dict["filters"] = []

Next I create the filters object where the type is contains and I filter on the field name with the value LP-00

filter1={}
filter1["type"] = "Contains"
filter1["name"] = "name"
filter1["value"] = "LP-00"

And now I add the filters1 object to the filter_dict filters list

filter["filters"].append(filter1)

and I get the machines with a pagesize of 1 to show the pagination (the pool with these machines only has 2 😉 )

machines = obj.get_machines(maxpagesize=1, filter = filter_dict)

And this would be the entire python script

import requests, getpass, urllib, json
import vmware_horizon

requests.packages.urllib3.disable_warnings()

url="https://loftcbr01.loft.lab"
username = "m_wouter"
domain = "loft.lab"
pw = getpass.getpass()

hvconnectionobj = vmware_horizon.Connection(username = username,domain = domain,password = pw,url = url)
hvconnectionobj.hv_connect()

obj = vmware_horizon.Inventory(url=hvconnectionobj.url, access_token=hvconnectionobj.access_token)

filter_dict = {}
filter_dict["type"] = "And"
filter_dict["filters"] = []
filter1={}
filter1["type"] = "Contains"
filter1["name"] = "name"
filter1["value"] = "LP-00"

filter["filters"].append(filter1)

machines = obj.get_machines(maxpagesize=1, filter = filter_dict)

for i in machines:
    print(i["name"])

hvconnectionobj.hv_disconnect()

And it shows this in python:

My #100DaysOfCode #Python Challenge == VMware_Horizon Module

So after 5 weeks of following the #Python training for my 100DaysOfCode challenge I have decided that my main goal for the challenge itself will be to work on the Horizon Python Module. With the course some things I find really boring and I need a real target to really learn things instead of just repeating someone else is doing as well.

I will still do some of the fun parts of it in time like databases and such when I need it but for now I will focus on the module. This weekend I added handling of the Instant Clone domain accounts to the module and also added documentation both in the module and the github repository. I know I will still learn heaps because almost all of it is still rather new and repetition works best for me.

Added Methods to the module

  • External Class
    • get_ad_domains
  • Settings class
    • get_ic_domain_accounts
    • get_ic_domain_account
    • new_ic_domain_account
    • update_ic_domain_account
    • delete_ic_domain_account

Updates to the VMware Horizon Python Module

I have just pushed some changes to the Horizon Python module. With these changes I am more complying with the Python coding standards by initiating an object before being able to use the functions inside a class. Also I added a bunch of the api calls available in the monitor parts.

To connect you now start like this:

import requests, getpass
import vmware_horizon

requests.packages.urllib3.disable_warnings()
url = input("URL\n")
username = input("Username\n")
domain = input("Domain\n")
pw = getpass.getpass()

hvconnectionobj = vmware_horizon.Connection(username = username,domain = domain,password = pw,url = url)
hvconnectionobj.hv_connect()

so technically you first initiate a Connection class object and than you use the hv_connect function inside that class after which the access token is stored inside the object itself.

Now to use the monitors for example you create an object for this.

monitor = vmware_horizon.Monitor(url=hvconnectionobj.url, access_token=hvconnectionobj.access_token)

To see what functions are available you can combine print with dir.

print(dir(monitor))

and the full list, the ones with (id) require an id:

  • ad_domain
  • connection_servers
  • connection_server(id)
  • event_database
  • farms
  • farm(id)
  • gateways
  • gateway(id)
  • rds_servers
  • rds_server(id)
  • saml_authenticators
  • saml_authenticator(id)
  • view_composers
  • view_composer(vcId)
  • virtual_centers
  • virtual_center(id)
  • remote_pods
  • remote_pod(id)
  • true_sso

As you can see I had to work with underscores instead of hyphens as python doesn’t like those in the names of functions

As said some of these might require an id but connection_servers works without one for example
print(monitor.connection_servers())

Todo: Error handling for wrong passwords, documentation

My #100DaysOfCode challenge: week 3

This week I learned about Object Orientated Programming, classes, modules, tuples and other things. I decided on skipping some days of the course because my goal of the 100 days is to learn new techniques to be used with Python. Some of the days are more about how to think in solving challenges than new techniques so I did look at the videos where it was clear that new stuff is being thought bit I do my own thing for the rest. I have also create a first version of a module to use the VMware Horizon REST API’s and blogged about it yesterday. On the positive side I learned that even though I was making long evenings with ControlUp’s yearly SKO I was still able to take in new information during my morning 100DaysOfCode ritual.

Learning points:

  • Even when making long days I can still take in new information during the mornings
  • I can’t be arsed to create code that I don’t care about
  • I still think this is a fun challenge
  • Python is cool

 

Using the Horizon REST API’s with Python

As you probably have seen from my tweets the last three weeks I have been doing the 100DaysOfCode challenge specifically for Python. Today I was actually a bit bored with the task we got (sorry, I hate creating games) so I decided on checking if I was actually able to consume the Horizon api’s from Python. This was something entirely new for me so it was a boatload of trial & error until I got it working with this script:

import requests,json, getpass

requests.packages.urllib3.disable_warnings()

pw = getpass.getpass()
domain = input("Domain")
username = input("Username")
url = input("URL")



headers = {
    'accept': '*/*',
    'Content-Type': 'application/json',
}

data = {"domain": domain, "password": pw, "username": username}
json_data = json.dumps(data)

response = requests.post(f'{url}/rest/login', verify=False, headers=headers, data=json_data)
data = response.json()

access_token = {
    'accept': '*/*',
    'Authorization': 'Bearer ' + data['access_token']
}

response = requests.get(f'{url}/rest/inventory/v1/desktop-pools', verify=False,  headers=access_token)
data = response.json()
for i in data:
    print(i['name'])

First I import the requests json and getpass modules. The requests module does the webrequests, the json is used to transform the data to be usable and getpass is used to get my password without showing it. After this I add a line to get rid of the warnings that my certificates aren’t to be trusted (it’s a homelab, duh!).

The most important part is that for the authentication I send username,password and domain as json data in the data while the headers contain the content type. The response gets converted to json data and I use that json data to build the access token. For future requests I only need to pass the access token for authentication.

Now this looks fun but wouldn’t it be better if I create a module for it? Yes it does and that’s what I have done and I have even added a simple function to list desktop pools.

import json, requests, ssl

class Connection:
    def hv_connect(username, password, domain, url):
        headers = {
            'accept': '*/*',
            'Content-Type': 'application/json',
        }

        data = {"domain": domain, "password": password, "username": username}
        json_data = json.dumps(data)

        response = requests.post(f'{url}/rest/login', verify=False, headers=headers, data=json_data)
        data = response.json()

        access_token = {
            'accept': '*/*',
            'Authorization': 'Bearer ' + data['access_token']
        }
        return access_token

    def hv_disconnect(url, access_token):
        requests.post(f'{url}/rest/logout', verify=False, headers=access_token)

class Pools:
    def list_hvpools(url,access_token):
        response = requests.get(f'{url}/rest/inventory/v1/desktop-pools', verify=False,  headers=access_token)
        return response.json()



And with a simple script I consume this module to show the display name of the first pool.

import requests, getpass
import vmware_horizon

requests.packages.urllib3.disable_warnings()
url = input("URL\n")
username = input("Username\n")
domain = input("Domain\n")
pw = getpass.getpass()


at = vmware_horizon.Connection.hv_connect(username=username,password=pw,url=url,domain=domain)


pools = vmware_horizon.Pools.list_hvpools(url=url, access_token=at)
print(f'The first Desktop pool is {pools[0]["display_name"]}')

vmware_horizon.Connection.hv_disconnect(url=url, access_token=at)

The module is from from ready and I need to find a better way to make it optional to ignore the certificate erros but if you want to follow the progress of the module it can be found on my Github.

 

 

Week 2 of my #100DaysOfCode challenge

And with a higher or lower game I finished the second week of my 100DaysofCode #Python challenge. Sometimes when I don’t see the solution for something it takes ma ages to get on the right path but when I see it I finish the project pdq! The debugging that I have been applying for years in my powershell code also seems to apply to python and google is a coders best friend 🙂 Besides the python course I actually didn’t do a whole lot of coding in Powershell, maybe a couple of hours spread over the week and my “big project” is finally finished and ready to be published.

Good things:

  • focus
  • when it works it works

Bad things:

  • when my I start thinking in the wrong direction it will keep going wrong
  • I still sometimes think too difficult